'Kafka'에 해당되는 글 3건

  1. 2020.02.18 Apache Flink 소개, 설치
  2. 2020.02.18 Apache Kafka 소개, 설치
  3. 2020.02.18 Apache BookKeeper 소개, 설치 1
반응형

# [Apache Flink](https://flink.apache.org/)

## [Architecture](https://flink.apache.org/flink-architecture.html)

* [scala](https://www.scala-lang.org/) 로 만들어짐
* Apache top level project
* data stream 에 대한 상태 저장 계산을 위한 framework
* unbounded, bounded data stream 을 처리하는 분산 처리 엔진
* Hadoop YARN, Apache Mesos, Kubernetes, standalone 으로 구성 가능

## [Distributed Runtime Environment](https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html)

### Tasks and Operator Chains

* 분산 실행을 위해 operator subtask 를 task 로 묶음
* 각 task 는 하나의 thread 로 실행됨

### Job Managers, Task Managers, Clients

|Role|Process|
|:---:|:---:|
|master|JobManager|
|slave|TaskManager|

#### Job Managers

* master 라고도 함
* 분산 실행을 조정함
* schedule task, coordinate checkpoint, 실패시 coordinate recovery 등을 담당함
* 적어도 하나 존재해야 함
* HA 구성시 여러개일 수 있음
* leader 는 항상 하나
* 나머지는 전부 standby

#### Task Managers

* worker 라고도 함
* dataflow task(subtask) 를 실행함
* data stream 을 저장하고 exchange 함
* 적어도 하나 존재해야 함

#### Clients

* 런타임이나 프로그램의 일부는 아님
* dataflow 를 준비함
* dataflow 를 JobManager 에 보내고 끊어지거나 progress report 를 받음
* java 나 scala program 으로 실행되거나 cli 에서 `./bin/flink` 로 실행됨
* spark 이나 tensorflow 처럼 job 을 submit 하면 graph 를 생성 후 master 에 전달 후 worker 가 작업함

##### Task Slots and Resources

* TaskManager 는 JVM process
* Thread 로 하나 이상의 subtask 를 실행함
* 여러 task 를 관리하기 위해 적어도 하나의 task slot 을 갖고 있음
* task slot 수를 바꿔서 subtask 가 분리되는 방법을 정의할 수 있음
* task slot 수는 cpu 개수로 하는것이 좋음
* hyper threading 사용 시 각 slot 에 2개 이상의 hardware thread context 가 필요함

### State Backends

* inMemory, filesystem, RocksDB 사용 가능
* 특정 시점 snapshot 생성
* 생성된 snapshot 을 checkpoint 의 부분으로 저장

### Savepoints

* DataStream API 로 작성된 프로그램은 savepoint 에서 resume 가능
* 상태 손실 없이, 프로그램과 flink cluster 모두 update 가능
* 수동으로 실행되는 checkpoint
* 주기적으로 worker 에서 실행됨
* 마지막만 남기고 버림, 이것만 빼면 주기적으로 실행되는 checkpoint 와 같음
* command line 으로 생성 가능
* REST API 로 작업을 취소할 때 생성됨

## [HA](https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/jobmanager_high_availability.html)

* standalone, yarn, mesos, k8s, ... 사용가능

## [Downloads](https://flink.apache.org/downloads.html)

### Apache Flink 1.9.1

* [Apache Flink 1.9.1 for Scala 2.12](https://www.apache.org/dyn/closer.lua/flink/flink-1.9.1/flink-1.9.1-bin-scala_2.12.tgz)

### Optional components

* [Avro SQL Format](https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.9.1/flink-avro-1.9.1.jar)
* [CSV SQL Format](https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/1.9.1/flink-csv-1.9.1.jar)
* [JSON SQL Format](https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/1.9.1/flink-json-1.9.1.jar)
* [Pre-bundled Hadoop 2.7.5](https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-7.0/flink-shaded-hadoop-2-uber-2.7.5-7.0.jar)
* 저장소로 hdfs 를 사용하려면 필요

Optional components 를 ${FLINK_HOME}/lib 으로 복사

### masters

```shell script
cd ${FLINK_HOME}/conf
cat masters
```

### slaves

```shell script
cat slaves
```

### 키교환

master 에서 slave 로 password 없이 ssh 접속 가능해야 함

### [Flink configs](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html)

#### [Common configs](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#common-options)

```yaml
# default: 1
parallelism.default: 1
```

#### [JobManager](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jobmanager)

```yaml
# default: none
jobmanager.archive.fs.dir:
# default: null, full|region, task 실패로부터 연산 복구를 어떻게 할지. full: 복구를 위해 모든 task 를 재실행. region: task 실패의 영향을 받을 수 있는 모든 task 를 다시 시작. https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/task_failure_recovery.html#restart-pipelined-region-failover-strategy
jobmanager.execution.failover-strategy: full
# default: 1024m
jobmanager.heap.size: 1024m
# default: none
jobmanager.rpc.address:
# default: 6123
jobmanager.rpc.port: 6123
```

#### [TaskManager](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager)

```yaml
# default: 30000, task 취소 시간 시도 간격(밀리초)
task.cancellation.interval: 30000
# default: 180000, task 취소 시간이 초과되어 fatal TaskManager 오류각 발생하는 시간(밀리초), 0 은 비활성화
task.cancellation.timeout: 180000
# default: 7500, stream task 가 취소됐을 때, 모든 타이머 쓰레드가 끝날때까지 기다리는 타이머 시간(밀리초)
task.cancellation.timers.timeout: 7500
# default: -1, checkpoint alignment buffer 최대 바이트수, buffer 를 초과하면 무시함, -1 이면 제한 없음
task.checkpoint.alignment.max-size: -1
# default: 1024m
taskmanager.heap.size: 1024m
# default: 1, TaskManager 에서 실행할 수 있는 user function instance 나 병렬 실행 개수. https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#configuring-taskmanager-processing-slots
taskmanager.numberOfTaskSlots: 1
```

#### [Distributed Coordination](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination)

#### [Distributed Coordination (via Akka)](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination-via-akka)

#### [REST](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#rest)

```yaml
rest.address: 0.0.0.0
rest.port: 8081
```

#### [Blob Server](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#blob-server)

#### [Heartbeat Manager](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#heartbeat-manager)

#### [SSL Settings](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#ssl-settings)

#### [Netty Shuffle Environment](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#netty-shuffle-environment)

#### [Network Communication (via Netty)](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#network-communication-via-netty)

#### [Web Frontend](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#web-frontend)

#### [File Systems](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#file-systems)

```yaml
# default: none, scheme 을 명시적으로 지정하지 않으면 사용함. hdfs 또는 file
fs.default-scheme:
```

#### [Compiler/Optimizer](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#compileroptimizer)

#### [Runtime Algorithms](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#runtime-algorithms)

#### [Resource Manager](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#resource-manager)

#### [Shuffle Service](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#shuffle-service)

#### [YARN](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#yarn)

#### [Mesos](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#mesos)

#### [Mesos TaskManager](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#mesos-taskmanager)

#### [High Availability (HA)](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#high-availability-ha)

```yaml
# default: "NONE", NONE|ZOOKEEPER|<specify FQN of factory class>
high-availability: NONE
# default: /default
high-availability.cluster-id: /default
# default: none, HA 구성시 flink metadata 유지 file system 경로
high-availability.storageDir:
# ZooKeeper-based HA Mode https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#zookeeper-based-ha-mode
# default: /flink, zookeeper 에서 flink 저장 root path
high-availability.zookeeper.path.root: /flink
# default: none
high-availability.zookeeper.quorum:
```

#### [ZooKeeper Security](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#zooKeeper-security)

#### [Kerberos-based Security](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#kerberos-based-security)

#### [Environment](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#environment)

```yaml
# default: none
env.java.opts.jobmanager:
# default: none
env.java.opts.taskmanager:
```

#### [Checkpointing](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#checkpointing)

```yaml
# default: none, none|jobmanager|filesystem|rocksdb|<class-name-of-factory>
state.backend:
# default: none, checkpoint data file 과 metadata 를 저장하는 디렉토리. 모든 TaskManager, JobManager 에서 접근 가능해야 함
state.checkpoints.dir:
# default: none, state backend 에서 savepoint 를 filesystem 에 쓰는데 사용함
state.savepoints.dir:
```

#### [RocksDB State Backend](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#rocksdb-state-backend)

#### [RocksDB Configurable Options](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#rocksdb-configurable-options)

#### [Queryable State](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#queryable-state)

#### [Metrics](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#metrics)

* 2019-12-11 metricbeat 의 flink module 은 아직 없음

#### [RocksDB Native Metrics](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#rocksdb-native-metrics)

#### [History Server](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#history-server)

```yaml
# default: none, Comma separated list of directories to monitor for completed jobs.
historyserver.archive.fs.dir: hdfs:///completed-jobs/
historyserver.web.address: 0.0.0.0
historyserver.web.port: 8082
```

#### [Legacy](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#legacy)

#### [Background](https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#background)

### cluster configs

```shell script
vim ${FLINK_HOME}/conf/flink-conf.yaml
```

```yaml
taskmanager.heap.size: 1024m
parallelism.default: 1
jobmanager.archive.fs.dir: file:///data/flink/completed-jobs/
jobmanager.execution.failover-strategy: region
jobmanager.heap.size: 1024m
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
taskmanager.numberOfTaskSlots: 2
rest.address: 0.0.0.0
rest.port: 8081
fs.default-scheme: file:///data
high-availability: zookeeper
high-availability.cluster-id: /myflink
high-availability.storageDir: file:///data/flink/ha
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.quorum: host1:2181,host2:2181,host3:2181
env.hadoop.conf.dir: /etc/hadoop/conf
env.pid.dir: /usr/local/flink/default
state.backend: filesystem
state.checkpoints.dir: file:///data/flink/checkpoints
state.savepoints.dir: file:///data/flink/checkpoints
historyserver.web.address: 0.0.0.0
historyserver.web.port: 8082
historyserver.archive.fs.dir: file:///data/flink/completed-jobs/
```

### start flink cluster

```shell script
./bin/start-cluster.sh
```

```
[hskimsky@host1:/usr/local/flink/default]$ ./bin/start-cluster.sh
Starting HA cluster with 3 masters.
Starting standalonesession daemon on host host1.
Starting standalonesession daemon on host host2.
Starting standalonesession daemon on host host3.
Starting taskexecutor daemon on host host1.
Starting taskexecutor daemon on host host2.
Starting taskexecutor daemon on host host3.
[hskimsky@host1:/usr/local/flink/default]$
```

#### trouble shooting

##### Could not find a file system implementation for scheme 'hdfs'.

```
...
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
...
```

`Optional components` `Pre-bundled Hadoop X.X.X` `${FLINK_HOME}lib` 으로 복사했는지 확인

### start 확인

wbe browser 에서 http://${JobManager의hostname}:${rest.port}/ 접속하여 화면이 뜨면 성공.

master 에 지정한 모든 hostname 모두 접속되는지 확인

* http://host1:8081/
* http://host2:8081/
* http://host3:8081/

## [Seoul Apache Flink Meetup](https://www.meetup.com/ko-KR/Seoul-Apache-Flink-Meetup/)

https://www.meetup.com/ko-KR/Seoul-Apache-Flink-Meetup/events/266824815/

### 일시 및 장소
* 일시 : 2019년 12월 17일 (화) 18:00 - 21:00
* 장소 : NAVER D2 STARTUP FACTORY (서울 강남구 강남대로 382 메리츠타워 16층)
* 약도보기 : http://naver.me/Fa4qOxfD
* 장소는 'LINE'에서 지원해주셨습니다.

### Session Program
* 18:00 ~ 18:20 행사장 개방, 등록
* 18:20 ~ 18:40 Interval join 뽀개기 - 광고 데이터 처리 파이프라인에 적용한 사례 (염지혜, 카카오)
* 18:40 ~ 19:00 운전습관: 진화의 시작 - T map에 Flink 이식하기 (오승철, SKT)
* 19:00 ~ 19:20 Break
* 19:20 ~ 19:40 Do Flink on Web with FLOW (김동원, SKT)
* 19:40 ~ 20:00 Source 부터 Sink 까지 (강한구, 카카오 모빌리티)
* 20:00 ~ 21:00 Q&A, Networking

#### 광고플랫폼에 flink 적용
* 카카오 염지혜
* kafka -> flink -> kafka
* interval join 은 event time 만 지원
* LeftIntervalJoij 을 새로 구현
* sideOutputTag 를 적용함
* 한쪽 스트림이 없으면 메모리 꽉차서 죽음. 현재시간을 timestamp assigner 에서 넣도록
* 시간차가 크면 한쪽 버퍼가 계속 참

#### tmap 에 flink
* skt 오승철
* kafka -> flink -> kafka
* Schema evolution 을 쓰면 스키마 버전업에 대응할 수 있음

#### Do flink on wrb with FLOW(FLink On Web)
* skt 김동원
* sql 로 코딩하던걸 web 으로..

#### flink source 부터 sink 까지
* 카카오 모빌리티 강한구
* Actor
* trigger 를 둬서 좀 더 강점이 있음
* Source
* element 에 watermark 를 심음
* Transformation

#### QnA
* flow 어찌 사용? 오픈소스는 아직 아님
* 베를린에서만 하다가 전세계로 퍼지는 중
* 외국 갔다와보니 sql 위주로 쓰더라
* 2.0 에서는 DataSet 없어지고 DataStream Table 로 갈 것

## pom.xml

```xml
<?xml version="1.0" encoding="UTF-8"?>

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.tistory.hskimsky</groupId>
<artifactId>flinktest</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<kafka.scala.version>2.13</kafka.scala.version>
<kafka.version>2.4.0</kafka.version>
</properties>

<dependencies>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.7.29</version>
</dependency>


<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.13.1</version>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.10</version>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>


<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${kafka.scala.version}</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.1</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.9.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>1.9.1</version>
<type>pom</type>
<scope>provided</scope>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson</artifactId>
<version>2.9.8-8.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.9.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.9.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.9.1</version>
<!--<scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2</artifactId>
<version>2.6.5-1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2</artifactId>
<version>2.7.5-1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2</artifactId>
<version>2.8.3-1.8.2</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>18.0-8.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-core</artifactId>
<version>1.9.1</version>
<!--<scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.9.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.9.1</version>
<!--<scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.9.1</version>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.9.1</version>
<!--<scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.9.1</version>
<!--<scope>test</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.9.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.9.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.9.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.9.1</version>
<!--<scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.9.1</version>
<!--<scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.12</artifactId>
<version>1.9.1</version>
<!--<scope>test</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-annotations</artifactId>
<version>1.9.1</version>
<!--<scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-optimizer_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-optimizer_2.12</artifactId>
<version>1.9.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
<version>2.6.5-7.0</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
<version>2.6.5-8.0</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
<version>2.7.5-8.0</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
<version>2.8.3-8.0</version>
<!--<scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-netty</artifactId>
<version>4.1.32.Final-7.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-gelly_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-gelly_2.12</artifactId>
<version>1.9.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.12</artifactId>
<version>1.9.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch-base_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch-base_2.12</artifactId>
<version>1.9.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_2.12</artifactId>
<version>1.9.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.9.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-base_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-base_2.12</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.12</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.12</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_2.11</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_2.12</artifactId>
<version>1.9.1</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<!--<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>my.programs.main.clazz</mainClass>
</transformer>
</transformers>-->
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
```

반응형
Posted by FeliZ_하늘..
,
반응형

# [Apache Kafka](https://kafka.apache.org/)

## [Introduction](http://kafka.apache.org/intro)

* Apache top level project
* 분산 스트리밍 플랫폼
* scala 로 만들어짐

## [Configs](https://kafka.apache.org/documentation)

## Broker

* [Broker Configs](http://kafka.apache.org/documentation.html#brokerconfigs)

```shell script
vim ${KAFKA_HOME}/config/server.properties
```

```properties
# required, string, hostname1:port1,hostname2:port2,hostname3:port3/chroot/path
zookeeper.connect=
# boolean, default: true, true 면 관리자가 모르는 topic 이 생김
auto.create.topics.enable=true
# required, int, default: -1, broker id 는 cluster 별로 유일해야 함
broker.id=-1
# string, default: producer, uncompressed|gzip|snappy|lz4|zstd|producer, producer: producer 의 설정을 사용함
compression.type=producer
# boolean, default: true, admin tool 을 통해 topic 을 지움, false 면 delete 표시만 함
delete.topic.enable=true
# string, default: null, comma separated list of bind URIs, 비어있으면 default interface, 0.0.0.0 이면 모든 interface, ex) PLAINTEXT://myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093
listeners=
# string, default: /tmp/kafka-logs, kafka log dir, log.dirs 보조역할
log.dir=/tmp/kafka-logs
# required, string, default: null, comma separated list of dirs, 없으면 log.dir 사용함
log.dirs=
# long default: 9223372036854775807, disk 로 flush 할때까지 담아둘 message 수
log.flush.interval.messages=9223372036854775807
# long, default: null, disk 로 flush 할때까지 담아둘 시간
log.flush.interval.ms=
# 보관주기: 시간, 용량, 용량계산 필수, 우선순위: ms > minutes > hours
# long, default: -1
log.retention.bytes=-1
# int, default: 168
log.retention.hours=168
# int, default: null
log.retention.minutes=
# long, default: null, -1 은 제한 없음
log.retention.ms=
# int, default: 1, acks=all 일 때, write 성공 위한 최소 복제수
min.insync.replicas=1
# int, default: 1, startup 시 recovery, shutdown 시 flush 하는 log data dir 당 thread 수, raid 로 묶여있으면 증가시켜주는게 좋음
num.recovery.threads.per.data.dir=1
# short, default: 3, topic 복제수
offsets.topic.replication.factor=3
# string, default: null, rack awareness 가능
broker.rack=
# int, default: 1, 복제수
default.replication.factor=1
# long, default: 9223372036854775807, compaction 할 수 없는 최대 시간
log.cleaner.max.compaction.lag.ms=9223372036854775807
# double, default: 0.5, compaction 할 dirty_log:total 의 비율
log.cleaner.min.cleanable.ratio=0.5
# long, default: 0, compaction 되지 않은 최소 시간
log.cleaner.min.compaction.lag.ms=0
# int, default: 1, topic 별 log partition 수, partitions:consumer_instances 비율은 1:1(cluster 내 broker 수) 로 가야 이상적 속도, topic 생성 후 증가만 가능
num.partitions=1
```

## [Topic Configs](http://kafka.apache.org/documentation.html#topicconfigs)

### topic name

* topic 이름으로는 영문, 숫자, '.', '_', '-' 만 사용 가능 ([소스코드](https://github.com/apache/kafka/blob/2.3.0/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L29))
* 249글자 제한([소스코드](https://github.com/apache/kafka/blob/2.3.0/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L34))
* '_', '.' 문자를 혼용하지 말라고 권고함

## [Producer Configs](http://kafka.apache.org/documentation.html#producerconfigs)

```properties
# string, default: 1, all|-1|0|1, all|-1: follower 로 복제까지 완료됐는지 확인, 0: 보내기만 하고 끝, 1: leader 가 받았는지 확인, leader 가 죽으면 message 복제시 follower 가 leader 가 되지만 그 순간 message 는 유실됨
acks=1
bootstrap.servers=
# long, default: 33554432, producer 가 broker 로 전송 대기중인 record 를 buffer 하는데 사용할 수 있는 memory bytes, 빠르게 전송되면 max.block.ms 만큼 block 후 exception 던짐
buffer.memory=33554432
# string, default: none, 다음 중 선택 가능: none|gzip|snappy|lz4|zstd
compression.type=none
# string, default: "", server side log 에 포함시켜 request tracking 을 쉽게하기 위함
client.id=
# long, default: 0, batch.size 만큼 가져오면 바로 전송, 그렇지 않으면 지정한 시간만큼 지연 후 전송, 설정하면 send 수는 줄어들지만 지연이 생김
linger.ms=0
# long, default: 60000, buffer 가 full 이거나 metadata 를 사용할 수 없을 때 block 할 시간, user Serializer 나 Partitioner 에서 실행되는 시간은 포함되지 않음
max.block.ms=60000
# class, default: org.apache.kafka.clients.producer.internals.DefaultPartitioner
partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner
# int, default: 32768, data 읽을 때 TCP receive buffer size, -1 이면 OS default 를 사용함
receive.buffer.bytes=32768
# int, default: 131072, data 보낼 때 TCP send buffer size, -1 이면 OS default 를 사용함
send.buffer.bytes=131072
# list, default: ""
interceptor.classes=
```

## [Consumer Configs](http://kafka.apache.org/documentation.html#consumerconfigs)

```properties
bootstrap.servers=
# string, default: null
group.id=
# boolean, default: true
allow.auto.create.topics=true
# int, default: 32768, data 읽을 때 TCP receive buffer size, -1 이면 OS default 를 사용함
receive.buffer.bytes=32768
# int, default: 131072, data 보낼 때 TCP send buffer size, -1 이면 OS default 를 사용함
send.buffer.bytes=131072
# string, default: "", server side log 에 포함시켜 request tracking 을 쉽게하기 위함
client.id=
# string, default: "", client 가 속해있는 rack, broker.rack 에 해당함
client.rack=
# list, default: "", implements org.apache.kafka.clients.consumer.ConsumerInterceptor
interceptor.classes=
```

## [kafka conference seoul 2019](https://www.onoffmix.com/event/196156)

2019-10-18 13:30 ~ 18:30

### confluent platform

* linkedin 내 confluent team 이었다가 따로 회사를 차렸다 함
* kafka 의 데이터를 가져가는 client 를 confluent platform 의 하나인 ksql 을 활용 가능하지 않을까 생각이 들었으나, 이미 사용중인 유저(신현우)에게 들어보니 service(control center, schema registry, ksql, kafka, zookeeper) 별로 vm 을 따로 만드느라 10개의 vm 을 만들었다 함
* [Blog](http://cnfl.io/read)

|service|description|
|:---:|:---:|
|control center|web base 로 돼있고 kafka 모니터링이 가능한듯 함. intellij plugin 도 있는듯. **구매해야 사용 가능**|
|schema registry|aws glue catalog 의 역할을 하는것 같은데 confluent 에서는 꼭 사용해야 한다 하고 이미 사용중인 유저도 사용하는게 좋다고 함. table 의 version 관리도 되는듯. [open source](https://github.com/confluentinc/schema-registry)임|
|[ksql](https://www.confluent.io/product/ksql/)|kafka 에 있는 데이터를 sql 같은 문법으로 조회 가능함. avro, json, csv 만 됨. custom 불가능. [open source](https://github.com/confluentinc/ksql)임|

### Kafka Streams: Interactive Queries

* 발표자가 contributor 였음
* StateStore: default [RocksDB](https://github.com/facebook/rocksdb) 를 사용하고 changelog topic 에 상태를 저장할 수 있음
* InteractiveQuery 가 특정 key 의 StateStore 의 값을 읽을 수 있는데 어느 host 어느 partition 인지도 알려줌
* topic 의 현재 상태를 알 수 있게 해줌

특정 topic 의 상태를 알고싶을 때 사용하면 될 듯 함

### Kafka Monitoring

* [KafkaManager](https://github.com/yahoo/kafka-manager) 만으로는 부족해서 찾게 됐다 함
* cluster 가 장애없이 정상인가? 언제 broker 를 늘려야 하는가? 등등...
* jmx 옵션을 켜야 함
* 각종 metric 들을 가지고 모니터링이 가능함

monitoring 지표를 찾았으니 화면개발을 해야 할텐데...

elastic search 에 붙여서 사용하는게 낫지 않을까...

### 카카오 datalake

* 데이터를 잘 모으는 것도 일이고 잘 정리하는건 더 일이고
* 이 데이터 어디에 있나요? 라는 말을 너무 많이 들음 ㅠㅠ
* 많은 source 에서 **무조건 kafka 를 통해** datalake 에 넣게한 후 aws glue catalog 같은 비슷한걸 구축하고 특정 데이터의 권한도 줄 수 있게 하고, 유저에게 schema 입력, 수정하게 하고(집단지성!) 데이터를 갖다쓰게 했다 함
* 참석자 중 많은 사람이 이런 업무를 하는지 마지막 qna 시간에 가장 많은 질문을 받음

## 운영경험

Kafka 운영 경험이 있다는 지인에게 문의해봄

* CPU, Memory 사용량은 적지만, Disk IO, Network IO 가 높음
* 여러 broker 가 동시에 죽지만 않는다면 하나정도의 broker 가 다운되는 등의 경우에도 default.replication.factor, min.insync.replicas 설정만 잘 돼있다면 큰 문제는 없었다 함
* RAID 구성하지 않고 사용하는 것이 더 나은 결과를 볼 수 있었다 함
* consumer 에 변화가 생길 때, 즉 다운되거나 추가될 때 partition rebalancing 시 부득이하게 그 순간의 message 를 처리하지 못하고 손실이 발생하는 문제가 있었다 함
* spark 의 [Checkpointing](https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html#checkpointing) 을 통해 해결할 수 있지 않을까 함
* [Apache Flink](https://flink.apache.org/) 를 찾아보니 Checkpoint, Savepoint 등으로 해결 가능할듯 함

## pom.xml

```xml
<?xml version="1.0" encoding="UTF-8"?>

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.tistory.hskimsky</groupId>
<artifactId>kafkatest</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<kafka.scala.version>2.13</kafka.scala.version>
<kafka.version>2.4.0</kafka.version>
</properties>

<dependencies>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${kafka.scala.version}</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<!--<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>my.programs.main.clazz</mainClass>
</transformer>
</transformers>-->
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
```

반응형
Posted by FeliZ_하늘..
,
반응형

# [Apache BookKeeper](https://bookkeeper.apache.org/)

* 실시간 워크로드에 최적화된 확장가능, 내결함성, 짧은 지연시간 스토리지 서비스`1
* Java 로 만들어짐
* Apache top level project
* [EMC 의 Pravega](http://www.pravega.io/)([github](https://github.com/pravega/pravega)), [Apache Pulsar](http://pulsar.apache.org/)([github](https://github.com/apache/pulsar)) 등에서 BookKeeper 를 storage 로 사용함
* cluster 로 구성 가능(zookeeper 사용)
* [DC/OS](https://dcos.io/), [Kubernetes](https://kubernetes.io/) 를 이용하여 배포 가능

> 글 작성 당시 최신 버전인 4.10.0 기준으로 문서를 작성함

## [BookKeeper concepts and architecture](https://bookkeeper.apache.org/docs/4.10.0/getting-started/concepts/)

### Basic terms

* entry: 각 log 단위(aka record)
* ledgers: log entry stream
* bookies: entry ledger 을 저장하는 개별 서버

### Entries

* metadata 와 ledger 에서 쓴 실제 데이터

|Field|Java type|Description|
|:---|:---|:---|
|Ledger number|`long`|entry 에 쓰여진 ledger 의 ID|
|Entry number|`long`|entry 의 unique ID|
|Last confirmed (LC)|`long`|마지막 기록된 entry 의 ID|
|Data|`byte[]`|client application 에 의해 쓰여진 entry 의 데이터|
|Authentication code|`byte[]`|entry 에 다른 모든 필드를 포함하는 메시지 인증 코드|

### Ledgers

* BookKeeper 의 기본 저장 단위
* entry 의 sequence
* 각 entry 는 byte sequence
* entry 가 저장됨
* 순차적으로, 최대 한 번
* append-only
* 수정 불가능
* 적절한 쓰기 순서는 client application 의 책임

### Clients and APIs

* ledger 생성, 삭제
* ledger 에서 entry 읽기, 쓰기
* [ledger API](https://bookkeeper.apache.org/docs/4.10.0/api/ledger-api)
* [DistributedLog API](https://bookkeeper.apache.org/docs/4.10.0/api/distributedlog-api)

### Bookies

* ledger fragment 를 처리하는 개별 BookKeeper storage server
* 성능을 위해 ledger 전체가 아닌 fragment 를 저장함(like kafka partition?)

#### Motivation

* HDFS NameNode 같은.. 이제는 그 이상
* 효율적 쓰기
* 메시지 복제를 통해 fault tolerance

### Metadata storage

* [ZooKeeper](https://zookeeper.apache.org) 를 사용

### Data management in bookies

* [log-structured](https://en.wikipedia.org/wiki/Log-structured_file_system) 방식으로 데이터 관리

#### Journals

* BookKeeper transaction log 포함
* update transaction 을 비휘발성 저장소에 기록 -> ledger 에 대한 update
* Bookie 가 시작되거나 이전 journal file 이 journal file size threshold 에 도달하면 새 journal file 이 생성됨

#### Entry logs

* BookKeeper client 로부터 받은 entry 를 관리
* ledger 의 entry 는 순차적으로
* offset 은 빠른 lookup 을 위해 ledger cache 에 pointer 로 유지
* Bookie 가 시작되거나 이전 entry log file 이 entry log size threshold 에 도달하면 새 entry log file 이 생성됨
* 오래된 entry log file 은 active ledger 와 연관없는 Garbage Collector Thread 에 의해 지워짐

#### Index files

* entry log file 에 저장된 data 의 offset 을 기록하는 header 와 fixed length index page 로 구성된 각 ledger 에 대해 index file 이 생성됨
* index file 을 update 하면 random disk I/O index file 이 sync thread 에 의해 background 로 lazy update 함

#### Ledger cache

* ledger index page 는 memory pool 에 cache
* disk head scheduling 을 효율적으로 관리함

#### Adding entries

* client 가 ledger 에 entry 를 쓰면 entry 는 다음 단계를 거쳐 disk 에 저장됨

1. entry 가 entry log 에 저장됨
1. entry index 는 ledger cache 에서 updated 됨
1. entry update 에 해당하는 transaction 이 journal 에 추가됨
1. BookKeeper client 에게 response 전송

> 성능성의 이유로, entry log 는 memory 에 buffer, batch 로 commit.
> ledger cache 는 index page 를 memory 에 저장하고 lazy flush.

#### Data flush

Ledger index pages are flushed to index files in the following two cases:

* The ledger cache memory limit is reached. There is no more space available to hold newer index pages. Dirty index pages will be evicted from the ledger cache and persisted to index files.
* A background thread synchronous thread is responsible for flushing index pages from the ledger cache to index files periodically.

Besides flushing index pages, the sync thread is responsible for rolling journal files in case that journal files use too much disk space. The data flush flow in the sync thread is as follows:

* A `LastLogMark` is recorded in memory. The `LastLogMark` indicates that those entries before it have been persisted (to both index and entry log files) and contains two parts:
1. A `txnLogId` (the file ID of a journal)
1. A `txnLogPos` (offset in a journal)
* Dirty index pages are flushed from the ledger cache to the index file, and entry log files are flushed to ensure that all buffered entries in entry log files are persisted to disk.

Ideally, a bookie only needs to flush index pages and entry log files that contain entries before `LastLogMark`. There is, however, no such information in the ledger and entry log mapping to journal files. Consequently, the thread flushes the ledger cache and entry log entirely here, and may flush entries after the `LastLogMark`. Flushing more is not a problem, though, just redundant.
* The `LastLogMark` is persisted to disk, which means that entries added before `LastLogMark` whose entry data and index page were also persisted to disk. It is now time to safely remove journal files created earlier than `txnLogId`.

If the bookie has crashed before persisting `LastLogMark` to disk, it still has journal files containing entries for which index pages may not have been persisted. Consequently, when this bookie restarts, it inspects journal files to restore those entries and data isn't lost.

Using the above data flush mechanism, it is safe for the sync thread to skip data flushing when the bookie shuts down. However, in the entry logger it uses a buffered channel to write entries in batches and there might be data buffered in the buffered channel upon a shut down. The bookie needs to ensure that the entry log flushes its buffered data during shutdown. Otherwise, entry log files become corrupted with partial entries.

#### Data compaction

On bookies, entries of different ledgers are interleaved in entry log files. A bookie runs a garbage collector thread to delete un-associated entry log files to reclaim disk space. If a given entry log file contains entries from a ledger that has not been deleted, then the entry log file would never be removed and the occupied disk space never reclaimed. In order to avoid such a case, a bookie server compacts entry log files in a garbage collector thread to reclaim disk space.

There are two kinds of compaction running with different frequency: minor compaction and major compaction. The differences between minor compaction and major compaction lies in their threshold value and compaction interval.

* The garbage collection threshold is the size percentage of an entry log file occupied by those undeleted ledgers. The default minor compaction threshold is 0.2, while the major compaction threshold is 0.8.
* The garbage collection interval is how frequently to run the compaction. The default minor compaction interval is 1 hour, while the major compaction threshold is 1 day.

> If either the threshold or interval is set to less than or equal to zero, compaction is disabled.

The data compaction flow in the garbage collector thread is as follows:

* The thread scans entry log files to get their entry log metadata, which records a list of ledgers comprising an entry log and their corresponding percentages.
* With the normal garbage collection flow, once the bookie determines that a ledger has been deleted, the ledger will be removed from the entry log metadata and the size of the entry log reduced.
* If the remaining size of an entry log file reaches a specified threshold, the entries of active ledgers in the entry log will be copied to a new entry log file.
* Once all valid entries have been copied, the old entry log file is deleted.

### ZooKeeper metadata

BookKeeper requires a ZooKeeper installation for storing [ledger](#ledger) metadata. Whenever you construct a [`BookKeeper`](../../api/javadoc/org/apache/bookkeeper/client/BookKeeper) client object, you need to pass a list of ZooKeeper servers as a parameter to the constructor, like this:

```java
String zkConnectionString = "127.0.0.1:2181";
BookKeeper bkClient = new BookKeeper(zkConnectionString);
```

> For more info on using the BookKeeper Java client, see [this guide](../../api/ledger-api#the-java-ledger-api-client).

### Ledger manager

A *ledger manager* handles ledgers' metadata (which is stored in ZooKeeper). BookKeeper offers two types of ledger managers: the [flat ledger manager](#flat-ledger-manager) and the [hierarchical ledger manager](#hierarchical-ledger-manager). Both ledger managers extend the [`AbstractZkLedgerManager`](../../api/javadoc/org/apache/bookkeeper/meta/AbstractZkLedgerManager) abstract class.

> ##### Use the flat ledger manager in most cases
> The flat ledger manager is the default and is recommended for nearly all use cases. The hierarchical ledger manager is better suited only for managing very large numbers of BookKeeper ledgers (> 50,000).

#### Flat ledger manager

The *flat ledger manager*, implemented in the [`FlatLedgerManager`](../../api/javadoc/org/apache/bookkeeper/meta/FlatLedgerManager.html) class, stores all ledgers' metadata in child nodes of a single ZooKeeper path. The flat ledger manager creates [sequential nodes](https://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#Sequence+Nodes+--+Unique+Naming) to ensure the uniqueness of the ledger ID and prefixes all nodes with `L`. Bookie servers manage their own active ledgers in a hash map so that it's easy to find which ledgers have been deleted from ZooKeeper and then garbage collect them.

The flat ledger manager's garbage collection follow proceeds as follows:

* All existing ledgers are fetched from ZooKeeper (`zkActiveLedgers`)
* All ledgers currently active within the bookie are fetched (`bkActiveLedgers`)
* The currently actively ledgers are looped through to determine which ledgers don't currently exist in ZooKeeper. Those are then garbage collected.
* The *hierarchical ledger manager* stores ledgers' metadata in two-level [znodes](https://zookeeper.apache.org/doc/current/zookeeperOver.html#Nodes+and+ephemeral+nodes).

#### Hierarchical ledger manager

The *hierarchical ledger manager*, implemented in the [`HierarchicalLedgerManager`](../../api/javadoc/org/apache/bookkeeper/meta/HierarchicalLedgerManager) class, first obtains a global unique ID from ZooKeeper using an [`EPHEMERAL_SEQUENTIAL`](https://zookeeper.apache.org/doc/current/api/org/apache/zookeeper/CreateMode.html#EPHEMERAL_SEQUENTIAL) znode. Since ZooKeeper's sequence counter has a format of `%10d` (10 digits with 0 padding, for example `<path>0000000001`), the hierarchical ledger manager splits the generated ID into 3 parts:

```shell
{level1 (2 digits)}{level2 (4 digits)}{level3 (4 digits)}
```

These three parts are used to form the actual ledger node path to store ledger metadata:

```shell
{ledgers_root_path}/{level1}/{level2}/L{level3}
```

For example, ledger 0000000001 is split into three parts, 00, 0000, and 00001, and stored in znode `/{ledgers_root_path}/00/0000/L0001`. Each znode could have as many 10,000 ledgers, which avoids the problem of the child list being larger than the maximum ZooKeeper packet size (which is the [limitation](https://issues.apache.org/jira/browse/BOOKKEEPER-39) that initially prompted the creation of the hierarchical ledger manager).

## install

* zookeeper 를 3대의 node 에 설치하여 cluster 구성
* bookkeeper 를 4대의 node 에 설치하여 cluster 구성
* ssh key 교환

### Environment Variable

```shell script
vim ~/.bashrc
```

```shell script
...
export BK_HOME="/usr/local/bookkeeper/default"
...
export PATH="${BK_HOME}/bin:${PATH}"
...
```

### 참고사항

* `zookeeper.conf.dynamic` file 적용은 ZooKeeper 3.5.0 이상부터 가능한 듯
* zookeeper 실행 후 `./bin/bookkeeper shell metaformat` 실행해야 함
* `bk_server.conf` 에서 `metadataServiceUri=zk://<HOST1>:<PORT1>;<HOST2>:<PORT2>;<HOST3>:<PORT3>/ledgers` 형식이어야 함
* `${BK_HOME}/conf/bookies` file 에 한 줄에 하나씩 host 적어주면 클러스터로 구성 가능
* cluster 실행시 `${BK_HOME}/bin/bookkeeper-cluster.sh start`

### Download & Untar

```shell script
mkdir /usr/local/bookkeeper
cd /usr/local/bookkeeper
wget http://apache.mirror.cdnetworks.com/bookkeeper/bookkeeper-4.10.0/bookkeeper-server-4.10.0-bin.tar.gz
tar zxf bookkeeper-server-4.10.0-bin.tar.gz
rm bookkeeper-server-4.10.0-bin.tar.gz
ln -s bookkeeper-server-4.10.0/ default
```

### prepare directories

```shell script
mkdir -p /data/bk/{data,journal,ranges}
mkdir -p /data/zk/txlog
chown -R hskimsky:hskimsky /data/*
```

### ${BK_HOME}/conf/bk_server.conf

```shell script
vim ${BK_HOME}/conf/bk_server.conf
```

```
...
httpServerEnabled=true
...
journalDirectories=/data/bk/journal
...
ledgerDirectories=/data/bk/data
...
metadataServiceUri=zk://bk1.sky.local:2181;bk2.sky.local:2181;bk3.sky.local:2181/ledgers
...
zkServers=bk1.sky.local:2181,bk2.sky.local:2181,bk3.sky.local:2181
...
storage.range.store.dirs=/data/bk/ranges
...
```

### ${BK_HOME}/conf/zookeeper.conf

```shell script
vim ${BK_HOME}/conf/zookeeper.conf
```

```
...
dataDir=/data/zk
...
dataLogDir=/data/zk/txlog
...
server.1=bk1.sky.local:2888:3888
server.2=bk2.sky.local:2888:3888
server.3=bk3.sky.local:2888:3888
```

### ${BK_HOME}/conf/bookies

```shell script
vim ${BK_HOME}/conf/bookies
```

```
bk1.sky.local
bk2.sky.local
bk3.sky.local
bk4.sky.local
```

### Execute ZooKeeper

```shell script
${BK_HOME}/bin/bookkeeper-daemon.sh start zookeeper
```

### [Cluster metadata setup](https://bookkeeper.apache.org/docs/4.10.0/deployment/manual#cluster-metadata-setup)

```shell script
${BK_HOME}/bin/bookkeeper shell metaformat
```

### Execute Bookie

* bookie 하나만 실행
* cluster 로 실행할 경우 `bookkeeper-cluster.sh` 로 실행

```shell script
${BK_HOME}/bin/bookkeeper-daemon.sh start bookie
```

### Execute BookKeeper cluster

```shell script
${BK_HOME}/bin/bookkeeper-cluster.sh start
```

### 설치 확인

```shell script
for i in {1..4} ; do curl http://bk${i}.sky.local:8080/heartbeat ; done
```

```
OK
OK
OK
OK
```

```shell script
curl "http://bk1.sky.local:8080/api/v1/bookie/list_bookies?print_hostnames=true"
```

```
{
"192.168.181.122:3181" : "192.168.181.122",
"192.168.181.123:3181" : "192.168.181.123",
"192.168.181.124:3181" : "192.168.181.124",
"192.168.181.121:3181" : "192.168.181.121"
}
```

### 재설치

#### cluster 내 모든 node 에서

```shell script
# ${BK_HOME}/bin/bookkeeper-daemon.sh stop zookeeper
# rm -rf /data/zk/{txlog,version-2}
rm -rf /data/bk/*/*
rm ${BK_HOME}/bin/bookkeeper-bookie.pid
# mkdir -p /data/zk/txlog
# ${BK_HOME}/bin/bookkeeper-daemon.sh start zookeeper
```

#### 한 node 에서만

```shell script
${BK_HOME}/bin/bookkeeper shell metaformat
```

#### Execute BookKeeper cluster

```shell script
${BK_HOME}/bin/bookkeeper-cluster.sh start
```

반응형
Posted by FeliZ_하늘..
,