# [Apache Flink](https://flink.apache.org/)

## [Architecture](https://flink.apache.org/flink-architecture.html)

* [scala](https://www.scala-lang.org/) 로 만들어짐
* Apache top level project
* data stream 에 대한 상태 저장 계산을 위한 framework
* unbounded, bounded data stream 을 처리하는 분산 처리 엔진
* Hadoop YARN, Apache Mesos, Kubernetes, standalone 으로 구성 가능

## [Distributed Runtime Environment](https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html)

### Tasks and Operator Chains

* 분산 실행을 위해 operator subtask 를 task 로 묶음
* 각 task 는 하나의 thread 로 실행됨

### Job Managers, Task Managers, Clients

|Role|Process|
|:---:|:---:|
|master|JobManager|
|slave|TaskManager|

#### Job Managers

* master 라고도 함
* 분산 실행을 조정함
* schedule task, coordinate checkpoint, 실패시 coordinate recovery 등을 담당함
* 적어도 하나 존재해야 함
* HA 구성시 여러개일 수 있음
* leader 는 항상 하나
* 나머지는 전부 standby

#### Task Managers

* worker 라고도 함
* dataflow task(subtask) 를 실행함
* data stream 을 저장하고 exchange 함
* 적어도 하나 존재해야 함

#### Clients

* 런타임이나 프로그램의 일부는 아님
* dataflow 를 준비함
* dataflow 를 JobManager 에 보내고 끊어지거나 progress report 를 받음
* java 나 scala program 으로 실행되거나 cli 에서 `./bin/flink` 로 실행됨
* spark 이나 tensorflow 처럼 job 을 submit 하면 graph 를 생성 후 master 에 전달 후 worker 가 작업함

##### Task Slots and Resources

* TaskManager 는 JVM process
* Thread 로 하나 이상의 subtask 를 실행함
* 여러 task 를 관리하기 위해 적어도 하나의 task slot 을 갖고 있음
* task slot 수를 바꿔서 subtask 가 분리되는 방법을 정의할 수 있음
* task slot 수는 cpu 개수로 하는것이 좋음
* hyper threading 사용 시 각 slot 에 2개 이상의 hardware thread context 가 필요함

### State Backends

* inMemory, filesystem, RocksDB 사용 가능
* 특정 시점 snapshot 생성
* 생성된 snapshot 을 checkpoint 의 부분으로 저장

### Savepoints

* DataStream API 로 작성된 프로그램은 savepoint 에서 resume 가능
* 상태 손실 없이, 프로그램과 flink cluster 모두 update 가능
* 수동으로 실행되는 checkpoint
* 주기적으로 worker 에서 실행됨
* 마지막만 남기고 버림, 이것만 빼면 주기적으로 실행되는 checkpoint 와 같음
* command line 으로 생성 가능
* REST API 로 작업을 취소할 때 생성됨

## [HA](https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/jobmanager_high_availability.html)

* standalone, yarn, mesos, k8s, ... 사용가능

## [Downloads](https://flink.apache.org/downloads.html)

### Apache Flink 1.9.1

* [Apache Flink 1.9.1 for Scala 2.12](https://www.apache.org/dyn/closer.lua/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.12.tgz)

### Optional components

* [Avro SQL Format](https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.9.1/flink-avro-1.9.1.jar)
* [CSV SQL Format](https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.9.1/flink-csv-1.9.1.jar)
* [JSON SQL Format](https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/1.9.1/flink-json-1.9.1.jar)
* [Pre-bundled Hadoop 2.7.5](https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-7.0/flink-shaded-hadoop-2-uber-2.7.5-7.0.jar)
* 저장소로 hdfs 를 사용하려면 필요

Optional components 를 ${FLINK_HOME}/lib 으로 복사

### masters

```shell script
cd ${FLINK_HOME}/conf
cat masters
```

### slaves

```shell script
cat slaves
```

### 키교환

master 에서 slave 로 password 없이 ssh 접속 가능해야 함

### [Flink configs](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html)

#### [Common configs](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#common-options)

```yaml
# default: 1
parallelism.default: 1
```

#### [JobManager](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jobmanager)

```yaml
# default: none
jobmanager.archive.fs.dir:
# default: null, full|region, task 실패로부터 연산 복구를 어떻게 할지. full: 복구를 위해 모든 task 를 재실행. region: task 실패의 영향을 받을 수 있는 모든 task 를 다시 시작. https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/task_failure_recovery.html#restart-pipelined-region-failover-strategy
jobmanager.execution.failover-strategy: full
# default: 1024m
jobmanager.heap.size: 1024m
# default: none
jobmanager.rpc.address:
# default: 6123
jobmanager.rpc.port: 6123
```

#### [TaskManager](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager)

```yaml
# default: 30000, task 취소 시간 시도 간격(밀리초)
task.cancellation.interval: 30000
# default: 180000, task 취소 시간이 초과되어 fatal TaskManager 오류각 발생하는 시간(밀리초), 0 은 비활성화
task.cancellation.timeout: 180000
# default: 7500, stream task 가 취소됐을 때, 모든 타이머 쓰레드가 끝날때까지 기다리는 타이머 시간(밀리초)
task.cancellation.timers.timeout: 7500
# default: -1, checkpoint alignment buffer 최대 바이트수, buffer 를 초과하면 무시함, -1 이면 제한 없음
task.checkpoint.alignment.max-size: -1
# default: 1024m
taskmanager.heap.size: 1024m
# default: 1, TaskManager 에서 실행할 수 있는 user function instance 나 병렬 실행 개수. https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#configuring-taskmanager-processing-slots
taskmanager.numberOfTaskSlots: 1
```

#### [Distributed Coordination](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination)

#### [Distributed Coordination (via Akka)](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination-via-akka)

#### [REST](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#rest)

```yaml
rest.address: 0.0.0.0
rest.port: 8081
```

#### [Blob Server](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#blob-server)

#### [Heartbeat Manager](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#heartbeat-manager)

#### [SSL Settings](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#ssl-settings)

#### [Netty Shuffle Environment](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#netty-shuffle-environment)

#### [Network Communication (via Netty)](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#network-communication-via-netty)

#### [Web Frontend](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#web-frontend)

#### [File Systems](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#file-systems)

```yaml
# default: none, scheme 을 명시적으로 지정하지 않으면 사용함. hdfs 또는 file
fs.default-scheme:
```

#### [Compiler/Optimizer](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#compileroptimizer)

#### [Runtime Algorithms](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#runtime-algorithms)

#### [Resource Manager](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#resource-manager)

#### [Shuffle Service](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#shuffle-service)

#### [YARN](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#yarn)

#### [Mesos](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#mesos)

#### [Mesos TaskManager](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#mesos-taskmanager)

#### [High Availability (HA)](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#high-availability-ha)

```yaml
# default: "NONE", NONE|ZOOKEEPER|<specify FQN of factory class>
high-availability: NONE
# default: /default
high-availability.cluster-id: /default
# default: none, HA 구성시 flink metadata 유지 file system 경로
high-availability.storageDir:
# ZooKeeper-based HA Mode https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#zookeeper-based-ha-mode
# default: /flink, zookeeper 에서 flink 저장 root path
high-availability.zookeeper.path.root: /flink
# default: none
high-availability.zookeeper.quorum:
```

#### [ZooKeeper Security](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#zooKeeper-security)

#### [Kerberos-based Security](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#kerberos-based-security)

#### [Environment](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#environment)

```yaml
# default: none
env.java.opts.jobmanager:
# default: none
env.java.opts.taskmanager:
```

#### [Checkpointing](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#checkpointing)

```yaml
# default: none, none|jobmanager|filesystem|rocksdb|<class-name-of-factory>
state.backend:
# default: none, checkpoint data file 과 metadata 를 저장하는 디렉토리. 모든 TaskManager, JobManager 에서 접근 가능해야 함
state.checkpoints.dir:
# default: none, state backend 에서 savepoint 를 filesystem 에 쓰는데 사용함
state.savepoints.dir:
```

#### [RocksDB State Backend](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#rocksdb-state-backend)

#### [RocksDB Configurable Options](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#rocksdb-configurable-options)

#### [Queryable State](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#queryable-state)

#### [Metrics](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#metrics)

* 2019-12-11 metricbeat 의 flink module 은 아직 없음

#### [RocksDB Native Metrics](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#rocksdb-native-metrics)

#### [History Server](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#history-server)

```yaml
# default: none, Comma separated list of directories to monitor for completed jobs.
historyserver.archive.fs.dir: hdfs:///completed-jobs/
historyserver.web.address: 0.0.0.0
historyserver.web.port: 8082
```

#### [Legacy](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#legacy)

#### [Background](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#background)

### cluster configs

```shell script
vim ${FLINK_HOME}/conf/flink-conf.yaml
```

```yaml
taskmanager.heap.size: 1024m
parallelism.default: 1
jobmanager.archive.fs.dir: file:///data/flink/completed-jobs/
jobmanager.execution.failover-strategy: region
jobmanager.heap.size: 1024m
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
taskmanager.numberOfTaskSlots: 2
rest.address: 0.0.0.0
rest.port: 8081
fs.default-scheme: file:///data
high-availability: zookeeper
high-availability.cluster-id: /myflink
high-availability.storageDir: file:///data/flink/ha
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.quorum: host1:2181,host2:2181,host3:2181
env.hadoop.conf.dir: /etc/hadoop/conf
env.pid.dir: /usr/local/flink/default
state.backend: filesystem
state.checkpoints.dir: file:///data/flink/checkpoints
state.savepoints.dir: file:///data/flink/checkpoints
historyserver.web.address: 0.0.0.0
historyserver.web.port: 8082
historyserver.archive.fs.dir: file:///data/flink/completed-jobs/
```

### start flink cluster

```shell script
./bin/start-cluster.sh
```

```
[hskimsky@host1:/usr/local/flink/default]$ ./bin/start-cluster.sh
Starting HA cluster with 3 masters.
Starting standalonesession daemon on host host1.
Starting standalonesession daemon on host host2.
Starting standalonesession daemon on host host3.
Starting taskexecutor daemon on host host1.
Starting taskexecutor daemon on host host2.
Starting taskexecutor daemon on host host3.
[hskimsky@host1:/usr/local/flink/default]$
```

#### trouble shooting

##### Could not find a file system implementation for scheme 'hdfs'.

```
...
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
...
```

`Optional components` `Pre-bundled Hadoop X.X.X` `${FLINK_HOME}lib` 으로 복사했는지 확인

### start 확인

wbe browser 에서 http://${JobManager의hostname}:${rest.port}/ 접속하여 화면이 뜨면 성공.

master 에 지정한 모든 hostname 모두 접속되는지 확인

* http://host1:8081/
* http://host2:8081/
* http://host3:8081/

## [Seoul Apache Flink Meetup](https://www.meetup.com/ko-KR/Seoul-Apache-Flink-Meetup/)

https://www.meetup.com/ko-KR/Seoul-Apache-Flink-Meetup/events/266824815/

### 일시 및 장소
* 일시 : 2019년 12월 17일 (화) 18:00 - 21:00
* 장소 : NAVER D2 STARTUP FACTORY (서울 강남구 강남대로 382 메리츠타워 16층)
* 약도보기 : http://naver.me/Fa4qOxfD
* 장소는 'LINE'에서 지원해주셨습니다.

### Session Program
* 18:00 ~ 18:20 행사장 개방, 등록
* 18:20 ~ 18:40 Interval join 뽀개기 - 광고 데이터 처리 파이프라인에 적용한 사례 (염지혜, 카카오)
* 18:40 ~ 19:00 운전습관: 진화의 시작 - T map에 Flink 이식하기 (오승철, SKT)
* 19:00 ~ 19:20 Break
* 19:20 ~ 19:40 Do Flink on Web with FLOW (김동원, SKT)
* 19:40 ~ 20:00 Source 부터 Sink 까지 (강한구, 카카오 모빌리티)
* 20:00 ~ 21:00 Q&A, Networking

#### 광고플랫폼에 flink 적용
* 카카오 염지혜
* kafka -> flink -> kafka
* interval join 은 event time 만 지원
* LeftIntervalJoij 을 새로 구현
* sideOutputTag 를 적용함
* 한쪽 스트림이 없으면 메모리 꽉차서 죽음. 현재시간을 timestamp assigner 에서 넣도록
* 시간차가 크면 한쪽 버퍼가 계속 참

#### tmap 에 flink
* skt 오승철
* kafka -> flink -> kafka
* Schema evolution 을 쓰면 스키마 버전업에 대응할 수 있음

#### Do flink on wrb with FLOW(FLink On Web)
* skt 김동원
* sql 로 코딩하던걸 web 으로..

#### flink source 부터 sink 까지
* 카카오 모빌리티 강한구
* Actor
* trigger 를 둬서 좀 더 강점이 있음
* Source
* element 에 watermark 를 심음
* Transformation

#### QnA
* flow 어찌 사용? 오픈소스는 아직 아님
* 베를린에서만 하다가 전세계로 퍼지는 중
* 외국 갔다와보니 sql 위주로 쓰더라
* 2.0 에서는 DataSet 없어지고 DataStream Table 로 갈 것

## pom.xml

```xml
<?xml version="1.0" encoding="UTF-8"?>

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.tistory.hskimsky</groupId>
<artifactId>flinktest</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<kafka.scala.version>2.13</kafka.scala.version>
<kafka.version>2.4.0</kafka.version>
</properties>

<dependencies>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.7.29</version>
</dependency>


<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.13.1</version>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.10</version>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>


<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${kafka.scala.version}</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.1</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.9.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>1.9.1</version>
<type>pom</type>
<scope>provided</scope>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
<version>2.9.8-8.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.9.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.9.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.9.1</version>
<!--<scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2</artifactId>
<version>2.6.5-1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2</artifactId>
<version>2.7.5-1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2</artifactId>
<version>2.8.3-1.8.2</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>18.0-8.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-core</artifactId>
<version>1.9.1</version>
<!--<scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.9.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.9.1</version>
<!--<scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.9.1</version>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.9.1</version>
<!--<scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.9.1</version>
<!--<scope>test</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.9.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.9.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.9.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.9.1</version>
<!--<scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.9.1</version>
<!--<scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.12</artifactId>
<version>1.9.1</version>
<!--<scope>test</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-annotations</artifactId>
<version>1.9.1</version>
<!--<scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-optimizer_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-optimizer_2.12</artifactId>
<version>1.9.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
<version>2.6.5-7.0</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
<version>2.6.5-8.0</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
<version>2.7.5-8.0</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
<version>2.8.3-8.0</version>
<!--<scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-netty</artifactId>
<version>4.1.32.Final-7.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-gelly_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-gelly_2.12</artifactId>
<version>1.9.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.12</artifactId>
<version>1.9.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch-base_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch-base_2.12</artifactId>
<version>1.9.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_2.12</artifactId>
<version>1.9.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.9.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-base_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-base_2.12</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_2.12</artifactId>
<version>1.9.1</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<!--<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>my.programs.main.clazz</mainClass>
</transformer>
</transformers>-->
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
```

'[BigData] > [Apache Flink]' 카테고리의 다른 글

Apache Flink 소개, 설치  (0) 2020.02.18
Posted by FeliZ_하늘..

댓글을 달아 주세요

# [Apache Kafka](https://kafka.apache.org/)

## [Introduction](http://kafka.apache.org/intro)

* Apache top level project
* 분산 스트리밍 플랫폼
* scala 로 만들어짐

## [Configs](https://kafka.apache.org/documentation)

## Broker

* [Broker Configs](http://kafka.apache.org/documentation.html#brokerconfigs)

```shell script
vim ${KAFKA_HOME}/config/server.properties
```

```properties
# required, string, hostname1:port1,hostname2:port2,hostname3:port3/chroot/path
zookeeper.connect=
# boolean, default: true, true 면 관리자가 모르는 topic 이 생김
auto.create.topics.enable=true
# required, int, default: -1, broker id 는 cluster 별로 유일해야 함
broker.id=-1
# string, default: producer, uncompressed|gzip|snappy|lz4|zstd|producer, producer: producer 의 설정을 사용함
compression.type=producer
# boolean, default: true, admin tool 을 통해 topic 을 지움, false 면 delete 표시만 함
delete.topic.enable=true
# string, default: null, comma separated list of bind URIs, 비어있으면 default interface, 0.0.0.0 이면 모든 interface, ex) PLAINTEXT://myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093
listeners=
# string, default: /tmp/kafka-logs, kafka log dir, log.dirs 보조역할
log.dir=/tmp/kafka-logs
# required, string, default: null, comma separated list of dirs, 없으면 log.dir 사용함
log.dirs=
# long default: 9223372036854775807, disk 로 flush 할때까지 담아둘 message 수
log.flush.interval.messages=9223372036854775807
# long, default: null, disk 로 flush 할때까지 담아둘 시간
log.flush.interval.ms=
# 보관주기: 시간, 용량, 용량계산 필수, 우선순위: ms > minutes > hours
# long, default: -1
log.retention.bytes=-1
# int, default: 168
log.retention.hours=168
# int, default: null
log.retention.minutes=
# long, default: null, -1 은 제한 없음
log.retention.ms=
# int, default: 1, acks=all 일 때, write 성공 위한 최소 복제수
min.insync.replicas=1
# int, default: 1, startup 시 recovery, shutdown 시 flush 하는 log data dir 당 thread 수, raid 로 묶여있으면 증가시켜주는게 좋음
num.recovery.threads.per.data.dir=1
# short, default: 3, topic 복제수
offsets.topic.replication.factor=3
# string, default: null, rack awareness 가능
broker.rack=
# int, default: 1, 복제수
default.replication.factor=1
# long, default: 9223372036854775807, compaction 할 수 없는 최대 시간
log.cleaner.max.compaction.lag.ms=9223372036854775807
# double, default: 0.5, compaction 할 dirty_log:total 의 비율
log.cleaner.min.cleanable.ratio=0.5
# long, default: 0, compaction 되지 않은 최소 시간
log.cleaner.min.compaction.lag.ms=0
# int, default: 1, topic 별 log partition 수, partitions:consumer_instances 비율은 1:1(cluster 내 broker 수) 로 가야 이상적 속도, topic 생성 후 증가만 가능
num.partitions=1
```

## [Topic Configs](http://kafka.apache.org/documentation.html#topicconfigs)

### topic name

* topic 이름으로는 영문, 숫자, '.', '_', '-' 만 사용 가능 ([소스코드](https://github.com/apache/kafka/blob/2.3.0/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L29))
* 249글자 제한([소스코드](https://github.com/apache/kafka/blob/2.3.0/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L34))
* '_', '.' 문자를 혼용하지 말라고 권고함

## [Producer Configs](http://kafka.apache.org/documentation.html#producerconfigs)

```properties
# string, default: 1, all|-1|0|1, all|-1: follower 로 복제까지 완료됐는지 확인, 0: 보내기만 하고 끝, 1: leader 가 받았는지 확인, leader 가 죽으면 message 복제시 follower 가 leader 가 되지만 그 순간 message 는 유실됨
acks=1
bootstrap.servers=
# long, default: 33554432, producer 가 broker 로 전송 대기중인 record 를 buffer 하는데 사용할 수 있는 memory bytes, 빠르게 전송되면 max.block.ms 만큼 block 후 exception 던짐
buffer.memory=33554432
# string, default: none, 다음 중 선택 가능: none|gzip|snappy|lz4|zstd
compression.type=none
# string, default: "", server side log 에 포함시켜 request tracking 을 쉽게하기 위함
client.id=
# long, default: 0, batch.size 만큼 가져오면 바로 전송, 그렇지 않으면 지정한 시간만큼 지연 후 전송, 설정하면 send 수는 줄어들지만 지연이 생김
linger.ms=0
# long, default: 60000, buffer 가 full 이거나 metadata 를 사용할 수 없을 때 block 할 시간, user Serializer 나 Partitioner 에서 실행되는 시간은 포함되지 않음
max.block.ms=60000
# class, default: org.apache.kafka.clients.producer.internals.DefaultPartitioner
partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner
# int, default: 32768, data 읽을 때 TCP receive buffer size, -1 이면 OS default 를 사용함
receive.buffer.bytes=32768
# int, default: 131072, data 보낼 때 TCP send buffer size, -1 이면 OS default 를 사용함
send.buffer.bytes=131072
# list, default: ""
interceptor.classes=
```

## [Consumer Configs](http://kafka.apache.org/documentation.html#consumerconfigs)

```properties
bootstrap.servers=
# string, default: null
group.id=
# boolean, default: true
allow.auto.create.topics=true
# int, default: 32768, data 읽을 때 TCP receive buffer size, -1 이면 OS default 를 사용함
receive.buffer.bytes=32768
# int, default: 131072, data 보낼 때 TCP send buffer size, -1 이면 OS default 를 사용함
send.buffer.bytes=131072
# string, default: "", server side log 에 포함시켜 request tracking 을 쉽게하기 위함
client.id=
# string, default: "", client 가 속해있는 rack, broker.rack 에 해당함
client.rack=
# list, default: "", implements org.apache.kafka.clients.consumer.ConsumerInterceptor
interceptor.classes=
```

## [kafka conference seoul 2019](https://www.onoffmix.com/event/196156)

2019-10-18 13:30 ~ 18:30

### confluent platform

* linkedin 내 confluent team 이었다가 따로 회사를 차렸다 함
* kafka 의 데이터를 가져가는 client 를 confluent platform 의 하나인 ksql 을 활용 가능하지 않을까 생각이 들었으나, 이미 사용중인 유저(신현우)에게 들어보니 service(control center, schema registry, ksql, kafka, zookeeper) 별로 vm 을 따로 만드느라 10개의 vm 을 만들었다 함
* [Blog](http://cnfl.io/read)

|service|description|
|:---:|:---:|
|control center|web base 로 돼있고 kafka 모니터링이 가능한듯 함. intellij plugin 도 있는듯. **구매해야 사용 가능**|
|schema registry|aws glue catalog 의 역할을 하는것 같은데 confluent 에서는 꼭 사용해야 한다 하고 이미 사용중인 유저도 사용하는게 좋다고 함. table 의 version 관리도 되는듯. [open source](https://github.com/confluentinc/schema-registry)임|
|[ksql](https://www.confluent.io/product/ksql/)|kafka 에 있는 데이터를 sql 같은 문법으로 조회 가능함. avro, json, csv 만 됨. custom 불가능. [open source](https://github.com/confluentinc/ksql)임|

### Kafka Streams: Interactive Queries

* 발표자가 contributor 였음
* StateStore: default [RocksDB](https://github.com/facebook/rocksdb) 를 사용하고 changelog topic 에 상태를 저장할 수 있음
* InteractiveQuery 가 특정 key 의 StateStore 의 값을 읽을 수 있는데 어느 host 어느 partition 인지도 알려줌
* topic 의 현재 상태를 알 수 있게 해줌

특정 topic 의 상태를 알고싶을 때 사용하면 될 듯 함

### Kafka Monitoring

* [KafkaManager](https://github.com/yahoo/kafka-manager) 만으로는 부족해서 찾게 됐다 함
* cluster 가 장애없이 정상인가? 언제 broker 를 늘려야 하는가? 등등...
* jmx 옵션을 켜야 함
* 각종 metric 들을 가지고 모니터링이 가능함

monitoring 지표를 찾았으니 화면개발을 해야 할텐데...

elastic search 에 붙여서 사용하는게 낫지 않을까...

### 카카오 datalake

* 데이터를 잘 모으는 것도 일이고 잘 정리하는건 더 일이고
* 이 데이터 어디에 있나요? 라는 말을 너무 많이 들음 ㅠㅠ
* 많은 source 에서 **무조건 kafka 를 통해** datalake 에 넣게한 후 aws glue catalog 같은 비슷한걸 구축하고 특정 데이터의 권한도 줄 수 있게 하고, 유저에게 schema 입력, 수정하게 하고(집단지성!) 데이터를 갖다쓰게 했다 함
* 참석자 중 많은 사람이 이런 업무를 하는지 마지막 qna 시간에 가장 많은 질문을 받음

## 운영경험

Kafka 운영 경험이 있다는 지인에게 문의해봄

* CPU, Memory 사용량은 적지만, Disk IO, Network IO 가 높음
* 여러 broker 가 동시에 죽지만 않는다면 하나정도의 broker 가 다운되는 등의 경우에도 default.replication.factor, min.insync.replicas 설정만 잘 돼있다면 큰 문제는 없었다 함
* RAID 구성하지 않고 사용하는 것이 더 나은 결과를 볼 수 있었다 함
* consumer 에 변화가 생길 때, 즉 다운되거나 추가될 때 partition rebalancing 시 부득이하게 그 순간의 message 를 처리하지 못하고 손실이 발생하는 문제가 있었다 함
* spark 의 [Checkpointing](https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html#checkpointing) 을 통해 해결할 수 있지 않을까 함
* [Apache Flink](https://flink.apache.org/) 를 찾아보니 Checkpoint, Savepoint 등으로 해결 가능할듯 함

## pom.xml

```xml
<?xml version="1.0" encoding="UTF-8"?>

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.tistory.hskimsky</groupId>
<artifactId>kafkatest</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<kafka.scala.version>2.13</kafka.scala.version>
<kafka.version>2.4.0</kafka.version>
</properties>

<dependencies>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${kafka.scala.version}</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<!--<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>my.programs.main.clazz</mainClass>
</transformer>
</transformers>-->
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
```

'[BigData] > [Apache Kafka]' 카테고리의 다른 글

Apache Kafka 소개, 설치  (0) 2020.02.18
Posted by FeliZ_하늘..

댓글을 달아 주세요

# [Apache BookKeeper](https://bookkeeper.apache.org/)

* 실시간 워크로드에 최적화된 확장가능, 내결함성, 짧은 지연시간 스토리지 서비스`1
* Java 로 만들어짐
* Apache top level project
* [EMC 의 Pravega](http://www.pravega.io/)([github](https://github.com/pravega/pravega)), [Apache Pulsar](http://pulsar.apache.org/)([github](https://github.com/apache/pulsar)) 등에서 BookKeeper 를 storage 로 사용함
* cluster 로 구성 가능(zookeeper 사용)
* [DC/OS](https://dcos.io/), [Kubernetes](https://kubernetes.io/) 를 이용하여 배포 가능

> 글 작성 당시 최신 버전인 4.10.0 기준으로 문서를 작성함

## [BookKeeper concepts and architecture](https://bookkeeper.apache.org/docs/4.10.0/getting-started/concepts/)

### Basic terms

* entry: 각 log 단위(aka record)
* ledgers: log entry stream
* bookies: entry ledger 을 저장하는 개별 서버

### Entries

* metadata 와 ledger 에서 쓴 실제 데이터

|Field|Java type|Description|
|:---|:---|:---|
|Ledger number|`long`|entry 에 쓰여진 ledger 의 ID|
|Entry number|`long`|entry 의 unique ID|
|Last confirmed (LC)|`long`|마지막 기록된 entry 의 ID|
|Data|`byte[]`|client application 에 의해 쓰여진 entry 의 데이터|
|Authentication code|`byte[]`|entry 에 다른 모든 필드를 포함하는 메시지 인증 코드|

### Ledgers

* BookKeeper 의 기본 저장 단위
* entry 의 sequence
* 각 entry 는 byte sequence
* entry 가 저장됨
* 순차적으로, 최대 한 번
* append-only
* 수정 불가능
* 적절한 쓰기 순서는 client application 의 책임

### Clients and APIs

* ledger 생성, 삭제
* ledger 에서 entry 읽기, 쓰기
* [ledger API](https://bookkeeper.apache.org/docs/4.10.0/api/ledger-api)
* [DistributedLog API](https://bookkeeper.apache.org/docs/4.10.0/api/distributedlog-api)

### Bookies

* ledger fragment 를 처리하는 개별 BookKeeper storage server
* 성능을 위해 ledger 전체가 아닌 fragment 를 저장함(like kafka partition?)

#### Motivation

* HDFS NameNode 같은.. 이제는 그 이상
* 효율적 쓰기
* 메시지 복제를 통해 fault tolerance

### Metadata storage

* [ZooKeeper](https://zookeeper.apache.org) 를 사용

### Data management in bookies

* [log-structured](https://en.wikipedia.org/wiki/Log-structured_file_system) 방식으로 데이터 관리

#### Journals

* BookKeeper transaction log 포함
* update transaction 을 비휘발성 저장소에 기록 -> ledger 에 대한 update
* Bookie 가 시작되거나 이전 journal file 이 journal file size threshold 에 도달하면 새 journal file 이 생성됨

#### Entry logs

* BookKeeper client 로부터 받은 entry 를 관리
* ledger 의 entry 는 순차적으로
* offset 은 빠른 lookup 을 위해 ledger cache 에 pointer 로 유지
* Bookie 가 시작되거나 이전 entry log file 이 entry log size threshold 에 도달하면 새 entry log file 이 생성됨
* 오래된 entry log file 은 active ledger 와 연관없는 Garbage Collector Thread 에 의해 지워짐

#### Index files

* entry log file 에 저장된 data 의 offset 을 기록하는 header 와 fixed length index page 로 구성된 각 ledger 에 대해 index file 이 생성됨
* index file 을 update 하면 random disk I/O index file 이 sync thread 에 의해 background 로 lazy update 함

#### Ledger cache

* ledger index page 는 memory pool 에 cache
* disk head scheduling 을 효율적으로 관리함

#### Adding entries

* client 가 ledger 에 entry 를 쓰면 entry 는 다음 단계를 거쳐 disk 에 저장됨

1. entry 가 entry log 에 저장됨
1. entry index 는 ledger cache 에서 updated 됨
1. entry update 에 해당하는 transaction 이 journal 에 추가됨
1. BookKeeper client 에게 response 전송

> 성능성의 이유로, entry log 는 memory 에 buffer, batch 로 commit.
> ledger cache 는 index page 를 memory 에 저장하고 lazy flush.

#### Data flush

Ledger index pages are flushed to index files in the following two cases:

* The ledger cache memory limit is reached. There is no more space available to hold newer index pages. Dirty index pages will be evicted from the ledger cache and persisted to index files.
* A background thread synchronous thread is responsible for flushing index pages from the ledger cache to index files periodically.

Besides flushing index pages, the sync thread is responsible for rolling journal files in case that journal files use too much disk space. The data flush flow in the sync thread is as follows:

* A `LastLogMark` is recorded in memory. The `LastLogMark` indicates that those entries before it have been persisted (to both index and entry log files) and contains two parts:
1. A `txnLogId` (the file ID of a journal)
1. A `txnLogPos` (offset in a journal)
* Dirty index pages are flushed from the ledger cache to the index file, and entry log files are flushed to ensure that all buffered entries in entry log files are persisted to disk.

Ideally, a bookie only needs to flush index pages and entry log files that contain entries before `LastLogMark`. There is, however, no such information in the ledger and entry log mapping to journal files. Consequently, the thread flushes the ledger cache and entry log entirely here, and may flush entries after the `LastLogMark`. Flushing more is not a problem, though, just redundant.
* The `LastLogMark` is persisted to disk, which means that entries added before `LastLogMark` whose entry data and index page were also persisted to disk. It is now time to safely remove journal files created earlier than `txnLogId`.

If the bookie has crashed before persisting `LastLogMark` to disk, it still has journal files containing entries for which index pages may not have been persisted. Consequently, when this bookie restarts, it inspects journal files to restore those entries and data isn't lost.

Using the above data flush mechanism, it is safe for the sync thread to skip data flushing when the bookie shuts down. However, in the entry logger it uses a buffered channel to write entries in batches and there might be data buffered in the buffered channel upon a shut down. The bookie needs to ensure that the entry log flushes its buffered data during shutdown. Otherwise, entry log files become corrupted with partial entries.

#### Data compaction

On bookies, entries of different ledgers are interleaved in entry log files. A bookie runs a garbage collector thread to delete un-associated entry log files to reclaim disk space. If a given entry log file contains entries from a ledger that has not been deleted, then the entry log file would never be removed and the occupied disk space never reclaimed. In order to avoid such a case, a bookie server compacts entry log files in a garbage collector thread to reclaim disk space.

There are two kinds of compaction running with different frequency: minor compaction and major compaction. The differences between minor compaction and major compaction lies in their threshold value and compaction interval.

* The garbage collection threshold is the size percentage of an entry log file occupied by those undeleted ledgers. The default minor compaction threshold is 0.2, while the major compaction threshold is 0.8.
* The garbage collection interval is how frequently to run the compaction. The default minor compaction interval is 1 hour, while the major compaction threshold is 1 day.

> If either the threshold or interval is set to less than or equal to zero, compaction is disabled.

The data compaction flow in the garbage collector thread is as follows:

* The thread scans entry log files to get their entry log metadata, which records a list of ledgers comprising an entry log and their corresponding percentages.
* With the normal garbage collection flow, once the bookie determines that a ledger has been deleted, the ledger will be removed from the entry log metadata and the size of the entry log reduced.
* If the remaining size of an entry log file reaches a specified threshold, the entries of active ledgers in the entry log will be copied to a new entry log file.
* Once all valid entries have been copied, the old entry log file is deleted.

### ZooKeeper metadata

BookKeeper requires a ZooKeeper installation for storing [ledger](#ledger) metadata. Whenever you construct a [`BookKeeper`](../../api/javadoc/org/apache/bookkeeper/client/BookKeeper) client object, you need to pass a list of ZooKeeper servers as a parameter to the constructor, like this:

```java
String zkConnectionString = "127.0.0.1:2181";
BookKeeper bkClient = new BookKeeper(zkConnectionString);
```

> For more info on using the BookKeeper Java client, see [this guide](../../api/ledger-api#the-java-ledger-api-client).

### Ledger manager

A *ledger manager* handles ledgers' metadata (which is stored in ZooKeeper). BookKeeper offers two types of ledger managers: the [flat ledger manager](#flat-ledger-manager) and the [hierarchical ledger manager](#hierarchical-ledger-manager). Both ledger managers extend the [`AbstractZkLedgerManager`](../../api/javadoc/org/apache/bookkeeper/meta/AbstractZkLedgerManager) abstract class.

> ##### Use the flat ledger manager in most cases
> The flat ledger manager is the default and is recommended for nearly all use cases. The hierarchical ledger manager is better suited only for managing very large numbers of BookKeeper ledgers (> 50,000).

#### Flat ledger manager

The *flat ledger manager*, implemented in the [`FlatLedgerManager`](../../api/javadoc/org/apache/bookkeeper/meta/FlatLedgerManager.html) class, stores all ledgers' metadata in child nodes of a single ZooKeeper path. The flat ledger manager creates [sequential nodes](https://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#Sequence+Nodes+--+Unique+Naming) to ensure the uniqueness of the ledger ID and prefixes all nodes with `L`. Bookie servers manage their own active ledgers in a hash map so that it's easy to find which ledgers have been deleted from ZooKeeper and then garbage collect them.

The flat ledger manager's garbage collection follow proceeds as follows:

* All existing ledgers are fetched from ZooKeeper (`zkActiveLedgers`)
* All ledgers currently active within the bookie are fetched (`bkActiveLedgers`)
* The currently actively ledgers are looped through to determine which ledgers don't currently exist in ZooKeeper. Those are then garbage collected.
* The *hierarchical ledger manager* stores ledgers' metadata in two-level [znodes](https://zookeeper.apache.org/doc/current/zookeeperOver.html#Nodes+and+ephemeral+nodes).

#### Hierarchical ledger manager

The *hierarchical ledger manager*, implemented in the [`HierarchicalLedgerManager`](../../api/javadoc/org/apache/bookkeeper/meta/HierarchicalLedgerManager) class, first obtains a global unique ID from ZooKeeper using an [`EPHEMERAL_SEQUENTIAL`](https://zookeeper.apache.org/doc/current/api/org/apache/zookeeper/CreateMode.html#EPHEMERAL_SEQUENTIAL) znode. Since ZooKeeper's sequence counter has a format of `%10d` (10 digits with 0 padding, for example `<path>0000000001`), the hierarchical ledger manager splits the generated ID into 3 parts:

```shell
{level1 (2 digits)}{level2 (4 digits)}{level3 (4 digits)}
```

These three parts are used to form the actual ledger node path to store ledger metadata:

```shell
{ledgers_root_path}/{level1}/{level2}/L{level3}
```

For example, ledger 0000000001 is split into three parts, 00, 0000, and 00001, and stored in znode `/{ledgers_root_path}/00/0000/L0001`. Each znode could have as many 10,000 ledgers, which avoids the problem of the child list being larger than the maximum ZooKeeper packet size (which is the [limitation](https://issues.apache.org/jira/browse/BOOKKEEPER-39) that initially prompted the creation of the hierarchical ledger manager).

## install

* zookeeper 를 3대의 node 에 설치하여 cluster 구성
* bookkeeper 를 4대의 node 에 설치하여 cluster 구성
* ssh key 교환

### Environment Variable

```shell script
vim ~/.bashrc
```

```shell script
...
export BK_HOME="/usr/local/bookkeeper/default"
...
export PATH="${BK_HOME}/bin:${PATH}"
...
```

### 참고사항

* `zookeeper.conf.dynamic` file 적용은 ZooKeeper 3.5.0 이상부터 가능한 듯
* zookeeper 실행 후 `./bin/bookkeeper shell metaformat` 실행해야 함
* `bk_server.conf` 에서 `metadataServiceUri=zk://<HOST1>:<PORT1>;<HOST2>:<PORT2>;<HOST3>:<PORT3>/ledgers` 형식이어야 함
* `${BK_HOME}/conf/bookies` file 에 한 줄에 하나씩 host 적어주면 클러스터로 구성 가능
* cluster 실행시 `${BK_HOME}/bin/bookkeeper-cluster.sh start`

### Download & Untar

```shell script
mkdir /usr/local/bookkeeper
cd /usr/local/bookkeeper
wget http://apache.mirror.cdnetworks.com/bookkeeper/bookkeeper-4.10.0/bookkeeper-server-4.10.0-bin.tar.gz
tar zxf bookkeeper-server-4.10.0-bin.tar.gz
rm bookkeeper-server-4.10.0-bin.tar.gz
ln -s bookkeeper-server-4.10.0/ default
```

### prepare directories

```shell script
mkdir -p /data/bk/{data,journal,ranges}
mkdir -p /data/zk/txlog
chown -R hskimsky:hskimsky /data/*
```

### ${BK_HOME}/conf/bk_server.conf

```shell script
vim ${BK_HOME}/conf/bk_server.conf
```

```
...
httpServerEnabled=true
...
journalDirectories=/data/bk/journal
...
ledgerDirectories=/data/bk/data
...
metadataServiceUri=zk://bk1.sky.local:2181;bk2.sky.local:2181;bk3.sky.local:2181/ledgers
...
zkServers=bk1.sky.local:2181,bk2.sky.local:2181,bk3.sky.local:2181
...
storage.range.store.dirs=/data/bk/ranges
...
```

### ${BK_HOME}/conf/zookeeper.conf

```shell script
vim ${BK_HOME}/conf/zookeeper.conf
```

```
...
dataDir=/data/zk
...
dataLogDir=/data/zk/txlog
...
server.1=bk1.sky.local:2888:3888
server.2=bk2.sky.local:2888:3888
server.3=bk3.sky.local:2888:3888
```

### ${BK_HOME}/conf/bookies

```shell script
vim ${BK_HOME}/conf/bookies
```

```
bk1.sky.local
bk2.sky.local
bk3.sky.local
bk4.sky.local
```

### Execute ZooKeeper

```shell script
${BK_HOME}/bin/bookkeeper-daemon.sh start zookeeper
```

### [Cluster metadata setup](https://bookkeeper.apache.org/docs/4.10.0/deployment/manual#cluster-metadata-setup)

```shell script
${BK_HOME}/bin/bookkeeper shell metaformat
```

### Execute Bookie

* bookie 하나만 실행
* cluster 로 실행할 경우 `bookkeeper-cluster.sh` 로 실행

```shell script
${BK_HOME}/bin/bookkeeper-daemon.sh start bookie
```

### Execute BookKeeper cluster

```shell script
${BK_HOME}/bin/bookkeeper-cluster.sh start
```

### 설치 확인

```shell script
for i in {1..4} ; do curl http://bk${i}.sky.local:8080/heartbeat ; done
```

```
OK
OK
OK
OK
```

```shell script
curl "http://bk1.sky.local:8080/api/v1/bookie/list_bookies?print_hostnames=true"
```

```
{
"192.168.181.122:3181" : "192.168.181.122",
"192.168.181.123:3181" : "192.168.181.123",
"192.168.181.124:3181" : "192.168.181.124",
"192.168.181.121:3181" : "192.168.181.121"
}
```

### 재설치

#### cluster 내 모든 node 에서

```shell script
# ${BK_HOME}/bin/bookkeeper-daemon.sh stop zookeeper
# rm -rf /data/zk/{txlog,version-2}
rm -rf /data/bk/*/*
rm ${BK_HOME}/bin/bookkeeper-bookie.pid
# mkdir -p /data/zk/txlog
# ${BK_HOME}/bin/bookkeeper-daemon.sh start zookeeper
```

#### 한 node 에서만

```shell script
${BK_HOME}/bin/bookkeeper shell metaformat
```

#### Execute BookKeeper cluster

```shell script
${BK_HOME}/bin/bookkeeper-cluster.sh start
```

'[BigData] > [Apache BookKeeper]' 카테고리의 다른 글

Apache BookKeeper 소개, 설치  (1) 2020.02.18
Posted by FeliZ_하늘..

댓글을 달아 주세요

  1. FeliZ_하늘.. 2020.02.18 07:33 신고  댓글주소  수정/삭제  댓글쓰기

    번역 진행 중...

얼마 전 업무를 하다가

Apache HBase 에서는 client 와 Region Server 사이에서 통신은 어떻게 이루어질까 하는 의문이 들었다.

그래서 HBase 소스를 보니 HBase 에서는 모든 통신을 ZooKeeper 를 통해서 한다는걸 알았다.

소스 상에서는 ZooKeeper 에다가 node 에 변경을 가하고 끝이다.

그러면 HBase 를 띄울 때 같이 실행된 ZKWatcher 에서는 해당 node 를 계속 listen 하고 있다가 node 가 변경되면 뭔가의 작업이 이루어지는 구조였다.

그래서 mapper 가 많은데 mapper 에서 HTable 을 열어서 뭔가의 작업을 하면 항상 ZooKeeper max connection 에러가 나서 Region Server 가 몇개 죽거나 그랬다

애초에 mapper 에서 HTable.delete() 를 한다는게 말이 안되는 거였다.

왜냐하면 HTable 을 열고 뭔가 작업을 하면 HBase cluster 전체에 있는 Region Server 전체가 실행할 것이다 근데 그걸 모든 mapper 에서 실행했으니 ZooKeeper max connection 을 초과하는게 당연한 얘기일 것이다.


그리고 가끔 Apache incubator project list 를 보는데 Ratis 라는 project 를 보니 ZooKeeper 와 비슷한 project 였다.

Ratis project 는 Raft 알고리즘의 구현체이다.

Raft 알고리즘은 대표적인 합의 알고리즘(Consensus algorithm)인 Paxos 보다 이해가 쉽고 구현이 쉬운 알고리즘이라고 소개하고 있다.

구글링 해보면 합의 알고리즘이란 분산환경에서 신뢰성을 보장하도록 하는 알고리즘으로 최근 블록체인이 이슈가 되면서 합의 알고리즘 포스팅도 많아진 것 같다.


다음 포스팅은 Raft Protocol 논문을 읽고 요약해봐야겠다

'[프로그래밍]' 카테고리의 다른 글

Apache Ratis 소개  (3) 2018.03.21
java 로 KMP 알고리즘  (0) 2017.12.21
dfs 알고리즘을 java 로 구현하기  (0) 2017.02.16
[IntelliJ] SVN 커밋 시 'is out of date' 에러 문제  (0) 2015.10.23
rsync 로 CRAN mirroring  (0) 2015.10.06
GIS shp 파일 변환  (0) 2015.07.28
Posted by FeliZ_하늘..

댓글을 달아 주세요

  1. FeliZ_하늘.. 2018.03.26 19:56 신고  댓글주소  수정/삭제  댓글쓰기

    https://www.youtube.com/watch?v=JEpsBg0AO6o

  2. FeliZ_하늘.. 2018.03.26 19:56 신고  댓글주소  수정/삭제  댓글쓰기

    https://www.youtube.com/watch?v=YbZ3zDzDnrw

  3. FeliZ_하늘.. 2018.10.22 21:15 신고  댓글주소  수정/삭제  댓글쓰기

    아마 이때부터 장비 노후화가 진행된 것 같다
    BulkLoad 를 하면 hdfs schema 가 다르면 copy 를 하고 schema 가 같으면 move 를 하는데 thread 를 여러개 만들어서 실행한다
    이때 thread 개수는 nrThreads = conf.getInt("hbase.loadincremental.threads.max", Runtime.getRuntime().availableProcessors()); 이다. (BulkLoad source 중)
    일할 때 이것마저 여러개를 실행하니
    node 의 IO 가 급격히 올라가고 ssh 도 접속이 힘들어지더니
    Ambari 에서는 Heartbeat Lost 됐다가 결국엔 reboot 됐다
    그래서 BulkLoad 를 순차로 실행하도록 수정하거나 thread 개수를 수정하거나 해야했다

'[Java]' 카테고리의 다른 글

java 에서 숫자의 자릿수 알아내기  (0) 2018.01.20
remote visual vm  (0) 2016.12.18
RxJava  (0) 2016.02.23
BigInteger 최대공약수 gcd 최소공배수 lcm 구하기  (0) 2016.02.22
java 로 클립보드 사용하기  (0) 2015.10.30
java 로 동영상 처리  (0) 2015.10.28
Posted by FeliZ_하늘..

댓글을 달아 주세요

'[프로그래밍]' 카테고리의 다른 글

Apache Ratis 소개  (3) 2018.03.21
java 로 KMP 알고리즘  (0) 2017.12.21
dfs 알고리즘을 java 로 구현하기  (0) 2017.02.16
[IntelliJ] SVN 커밋 시 'is out of date' 에러 문제  (0) 2015.10.23
rsync 로 CRAN mirroring  (0) 2015.10.06
GIS shp 파일 변환  (0) 2015.07.28
Posted by FeliZ_하늘..

댓글을 달아 주세요

hawq 가 설치 된 모든 노드에서 JAVA_HOME 을 지정한 후 아래 명령어를 실행한다



그리고 hawq master 에서 아래 명령어를 실행한다



pg_language table 을 조회해보면 java 와 javau 가 추가된 것을 확인할 수 있다


gpadmin=# select oid, * from pg_language;

  oid  | lanname  | lanispl | lanpltrusted | lanplcallfoid | lanvalidator | lanacl 

-------+----------+---------+--------------+---------------+--------------+--------

    12 | internal | f       | f            |             0 |         2246 | 

    13 | c        | f       | f            |             0 |         2247 | 

    14 | sql      | f       | t            |             0 |         2248 | 

 10885 | plpgsql  | t       | t            |         10883 |        10884 | 

 16542 | java     | t       | t            |         16540 |            0 | 

 16543 | javau    | t       | f            |         16541 |            0 | 

(6 rows)


제대로 java 가 실행되는 지 확인해보기 위해서 아래를 실행해봤다


gpadmin=# create function get_property(varchar) returns varchar as 'java.lang.System.getProperty' language java;

CREATE FUNCTION

gpadmin=# select get_property('user.name')

gpadmin-# ;

 get_property 

--------------

 gpadmin

(1 row)


gpadmin=# select get_property('user.home');

 get_property  

---------------

 /home/gpadmin

(1 row)



'[BigData] > [HDP]' 카테고리의 다른 글

HAWQ 2.1.1.0 에서 pljava 활성화  (0) 2017.04.08
HDP 2.4.2 install with Ambari 2.2.2.0  (0) 2016.06.18
Posted by FeliZ_하늘..

댓글을 달아 주세요

hdp 의 hdb(hawq) 에서 redis 를 사용해 볼 목적으로 redis 를 설치했다


hdp 는 6개의 노드로 구성했고 namenode HA, resourcemanager HA 가 설정돼있고 datanode 는 3개이다


hawq 도 master, standby master, segment 3개로 구성했다


목표는 6개의 노드 전체에 redis 를 설치한 후 cluster 로 묶는 것이다.


아래 문서들을 참조하였다


https://redis.io/topics/cluster-tutorial

http://the-earth.tistory.com/entry/redis-cluster-%EA%B5%AC%EC%84%B1%ED%95%98%EA%B8%B0

https://www.facebook.com/notes/yongbok-kim/redis-cluster-%EA%B0%84%EB%8B%A8-%EC%84%A4%EC%A0%95/704524782949857/

http://www.redisgate.com/redis/cluster/cluster.php


아래 명령어를 실행하여 컴파일 했다



그리고나서 src 아래의 모든 파일을 내가 사용 할 redis directory 에 복사했다


나는 /usr/local/redis/redis-3.2.8 디렉토리를 만들어 여기에 복사한 뒤


/usr/local/redis/default 라는 링크를 만들어 위에서 만든 디렉토리에 링크를 걸어줬다


그리고 내가 수정 한 redis.conf 는 아래 5개이다 (port 는 default 값 6379 를 그대로 사용했다)


그리고 다운받고 압축을 풀었던 디렉토리에 존재하는 redis.conf 를 /usr/local/redis/default 아래에 복사했다

bind 0.0.0.0
appendonly yes
cluster-enabled yes
cluster-config-file nodes.conf
cluster-node-timeout 5000

그리고나서 아래 명령어를 실행하여 cluster 를 생성한다. (꼭 IP 로 적어야 정상실행 된다)

./redis-trib.rb create \
172.16.196.150:6379 \
172.16.196.151:6379 \
172.16.196.152:6379 \
172.16.196.153:6379 \
172.16.196.154:6379 \
172.16.196.155:6379

redis-trib.rb 를 실행할 때 각종 오류가 났다


1. rubygems 가 설치 돼있지 않음

-> yum install rubygems 를 실행했다


2. ruby 에서 redis 를 설치하지 않음

/usr/lib/ruby/site_ruby/1.8/rubygems/custom_require.rb:31:in `gem_original_require': no such file to load -- redis (LoadError)

from /usr/lib/ruby/site_ruby/1.8/rubygems/custom_require.rb:31:in `require'

from ./redis-trib.rb:25

-> gem install redis 를 실행했다


3. redis-server 를 실행하지 않음

-> /usr/local/redis/default/redis-server /usr/local/redis/default/redis.conf 를 실행함


4. redis cluster cannot connect to node 오류가 발생함

-> redis.conf 에서 bind 값을 127.0.0.1 에서 0.0.0.0 으로 수정


5. redis cluster is already busy (Redis::CommandError) 오류가 발생함

-> cluster 로 묶으려는 전 노드에서 아래 명령어를 실행했다

flushall
cluster reset soft

그리고나서 redis-trib.rb 를 재실행한다


[root@admin default]# ./redis-trib.rb create 172.16.196.150:6379 172.16.196.151:6379 172.16.196.152:6379 172.16.196.153:6379 172.16.196.154:6379 172.16.196.155:6379

>>> Creating cluster

>>> Performing hash slots allocation on 6 nodes...

Using 6 masters:

172.16.196.155:6379

172.16.196.154:6379

172.16.196.153:6379

172.16.196.152:6379

172.16.196.151:6379

172.16.196.150:6379

M: b267c9964edae16d5b3c40e49f83c72585cac439 172.16.196.150:6379

   slots:13653-16383 (2731 slots) master

M: 000ac0c60361218076084d513306f15ea0460ab1 172.16.196.151:6379

   slots:10923-13652 (2730 slots) master

M: 16a628108b8ca9ecdcb353c1472a3e36d3269699 172.16.196.152:6379

   slots:8192-10922 (2731 slots) master

M: 99bd9cdd76efaa9a438844339114e654c3e92e6b 172.16.196.153:6379

   slots:5461-8191 (2731 slots) master

M: 4757a336d40f0203032acef88b3b440ea9f94152 172.16.196.154:6379

   slots:2731-5460 (2730 slots) master

M: 7d879254412e8636d106ea64161e95b8b8f09f13 172.16.196.155:6379

   slots:0-2730 (2731 slots) master

Can I set the above configuration? (type 'yes' to accept): yes

>>> Nodes configuration updated

>>> Assign a different config epoch to each node

>>> Sending CLUSTER MEET messages to join the cluster

Waiting for the cluster to join...

>>> Performing Cluster Check (using node 172.16.196.150:6379)

M: b267c9964edae16d5b3c40e49f83c72585cac439 172.16.196.150:6379

   slots:13653-16383 (2731 slots) master

   0 additional replica(s)

M: 000ac0c60361218076084d513306f15ea0460ab1 172.16.196.151:6379

   slots:10923-13652 (2730 slots) master

   0 additional replica(s)

M: 16a628108b8ca9ecdcb353c1472a3e36d3269699 172.16.196.152:6379

   slots:8192-10922 (2731 slots) master

   0 additional replica(s)

M: 4757a336d40f0203032acef88b3b440ea9f94152 172.16.196.154:6379

   slots:2731-5460 (2730 slots) master

   0 additional replica(s)

M: 99bd9cdd76efaa9a438844339114e654c3e92e6b 172.16.196.153:6379

   slots:5461-8191 (2731 slots) master

   0 additional replica(s)

M: 7d879254412e8636d106ea64161e95b8b8f09f13 172.16.196.155:6379

   slots:0-2730 (2731 slots) master

   0 additional replica(s)

[OK] All nodes agree about slots configuration.

>>> Check for open slots...

>>> Check slots coverage...

[OK] All 16384 slots covered.

[root@admin default]# ./redis-cli cluster nodes

b267c9964edae16d5b3c40e49f83c72585cac439 172.16.196.150:6379 myself,master - 0 0 1 connected 13653-16383

000ac0c60361218076084d513306f15ea0460ab1 172.16.196.151:6379 master - 0 1491643229950 6 connected 10923-13652

16a628108b8ca9ecdcb353c1472a3e36d3269699 172.16.196.152:6379 master - 0 1491643228945 2 connected 8192-10922

4757a336d40f0203032acef88b3b440ea9f94152 172.16.196.154:6379 master - 0 1491643228443 4 connected 2731-5460

99bd9cdd76efaa9a438844339114e654c3e92e6b 172.16.196.153:6379 master - 0 1491643229950 3 connected 5461-8191

7d879254412e8636d106ea64161e95b8b8f09f13 172.16.196.155:6379 master - 0 1491643228945 5 connected 0-2730

[root@admin default]# 

'[DB] > [Redis]' 카테고리의 다른 글

CentOS 6.7 에서 Redis cluster 구축  (0) 2017.04.08
Posted by FeliZ_하늘..

댓글을 달아 주세요

dfs 알고리즘


'[프로그래밍]' 카테고리의 다른 글

Apache Ratis 소개  (3) 2018.03.21
java 로 KMP 알고리즘  (0) 2017.12.21
dfs 알고리즘을 java 로 구현하기  (0) 2017.02.16
[IntelliJ] SVN 커밋 시 'is out of date' 에러 문제  (0) 2015.10.23
rsync 로 CRAN mirroring  (0) 2015.10.06
GIS shp 파일 변환  (0) 2015.07.28
Posted by FeliZ_하늘..

댓글을 달아 주세요

remote visual vm

[Java] 2016. 12. 18. 23:55

http://jnylove.tistory.com/335

'[Java]' 카테고리의 다른 글

java 에서 숫자의 자릿수 알아내기  (0) 2018.01.20
remote visual vm  (0) 2016.12.18
RxJava  (0) 2016.02.23
BigInteger 최대공약수 gcd 최소공배수 lcm 구하기  (0) 2016.02.22
java 로 클립보드 사용하기  (0) 2015.10.30
java 로 동영상 처리  (0) 2015.10.28
Posted by FeliZ_하늘..

댓글을 달아 주세요