반응형

# [Apache Kafka](https://kafka.apache.org/)

## [Introduction](http://kafka.apache.org/intro)

* Apache top level project
* 분산 스트리밍 플랫폼
* scala 로 만들어짐

## [Configs](https://kafka.apache.org/documentation)

## Broker

* [Broker Configs](http://kafka.apache.org/documentation.html#brokerconfigs)

```shell script
vim ${KAFKA_HOME}/config/server.properties
```

```properties
# required, string, hostname1:port1,hostname2:port2,hostname3:port3/chroot/path
zookeeper.connect=
# boolean, default: true, true 면 관리자가 모르는 topic 이 생김
auto.create.topics.enable=true
# required, int, default: -1, broker id 는 cluster 별로 유일해야 함
broker.id=-1
# string, default: producer, uncompressed|gzip|snappy|lz4|zstd|producer, producer: producer 의 설정을 사용함
compression.type=producer
# boolean, default: true, admin tool 을 통해 topic 을 지움, false 면 delete 표시만 함
delete.topic.enable=true
# string, default: null, comma separated list of bind URIs, 비어있으면 default interface, 0.0.0.0 이면 모든 interface, ex) PLAINTEXT://myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093
listeners=
# string, default: /tmp/kafka-logs, kafka log dir, log.dirs 보조역할
log.dir=/tmp/kafka-logs
# required, string, default: null, comma separated list of dirs, 없으면 log.dir 사용함
log.dirs=
# long default: 9223372036854775807, disk 로 flush 할때까지 담아둘 message 수
log.flush.interval.messages=9223372036854775807
# long, default: null, disk 로 flush 할때까지 담아둘 시간
log.flush.interval.ms=
# 보관주기: 시간, 용량, 용량계산 필수, 우선순위: ms > minutes > hours
# long, default: -1
log.retention.bytes=-1
# int, default: 168
log.retention.hours=168
# int, default: null
log.retention.minutes=
# long, default: null, -1 은 제한 없음
log.retention.ms=
# int, default: 1, acks=all 일 때, write 성공 위한 최소 복제수
min.insync.replicas=1
# int, default: 1, startup 시 recovery, shutdown 시 flush 하는 log data dir 당 thread 수, raid 로 묶여있으면 증가시켜주는게 좋음
num.recovery.threads.per.data.dir=1
# short, default: 3, topic 복제수
offsets.topic.replication.factor=3
# string, default: null, rack awareness 가능
broker.rack=
# int, default: 1, 복제수
default.replication.factor=1
# long, default: 9223372036854775807, compaction 할 수 없는 최대 시간
log.cleaner.max.compaction.lag.ms=9223372036854775807
# double, default: 0.5, compaction 할 dirty_log:total 의 비율
log.cleaner.min.cleanable.ratio=0.5
# long, default: 0, compaction 되지 않은 최소 시간
log.cleaner.min.compaction.lag.ms=0
# int, default: 1, topic 별 log partition 수, partitions:consumer_instances 비율은 1:1(cluster 내 broker 수) 로 가야 이상적 속도, topic 생성 후 증가만 가능
num.partitions=1
```

## [Topic Configs](http://kafka.apache.org/documentation.html#topicconfigs)

### topic name

* topic 이름으로는 영문, 숫자, '.', '_', '-' 만 사용 가능 ([소스코드](https://github.com/apache/kafka/blob/2.3.0/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L29))
* 249글자 제한([소스코드](https://github.com/apache/kafka/blob/2.3.0/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L34))
* '_', '.' 문자를 혼용하지 말라고 권고함

## [Producer Configs](http://kafka.apache.org/documentation.html#producerconfigs)

```properties
# string, default: 1, all|-1|0|1, all|-1: follower 로 복제까지 완료됐는지 확인, 0: 보내기만 하고 끝, 1: leader 가 받았는지 확인, leader 가 죽으면 message 복제시 follower 가 leader 가 되지만 그 순간 message 는 유실됨
acks=1
bootstrap.servers=
# long, default: 33554432, producer 가 broker 로 전송 대기중인 record 를 buffer 하는데 사용할 수 있는 memory bytes, 빠르게 전송되면 max.block.ms 만큼 block 후 exception 던짐
buffer.memory=33554432
# string, default: none, 다음 중 선택 가능: none|gzip|snappy|lz4|zstd
compression.type=none
# string, default: "", server side log 에 포함시켜 request tracking 을 쉽게하기 위함
client.id=
# long, default: 0, batch.size 만큼 가져오면 바로 전송, 그렇지 않으면 지정한 시간만큼 지연 후 전송, 설정하면 send 수는 줄어들지만 지연이 생김
linger.ms=0
# long, default: 60000, buffer 가 full 이거나 metadata 를 사용할 수 없을 때 block 할 시간, user Serializer 나 Partitioner 에서 실행되는 시간은 포함되지 않음
max.block.ms=60000
# class, default: org.apache.kafka.clients.producer.internals.DefaultPartitioner
partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner
# int, default: 32768, data 읽을 때 TCP receive buffer size, -1 이면 OS default 를 사용함
receive.buffer.bytes=32768
# int, default: 131072, data 보낼 때 TCP send buffer size, -1 이면 OS default 를 사용함
send.buffer.bytes=131072
# list, default: ""
interceptor.classes=
```

## [Consumer Configs](http://kafka.apache.org/documentation.html#consumerconfigs)

```properties
bootstrap.servers=
# string, default: null
group.id=
# boolean, default: true
allow.auto.create.topics=true
# int, default: 32768, data 읽을 때 TCP receive buffer size, -1 이면 OS default 를 사용함
receive.buffer.bytes=32768
# int, default: 131072, data 보낼 때 TCP send buffer size, -1 이면 OS default 를 사용함
send.buffer.bytes=131072
# string, default: "", server side log 에 포함시켜 request tracking 을 쉽게하기 위함
client.id=
# string, default: "", client 가 속해있는 rack, broker.rack 에 해당함
client.rack=
# list, default: "", implements org.apache.kafka.clients.consumer.ConsumerInterceptor
interceptor.classes=
```

## [kafka conference seoul 2019](https://www.onoffmix.com/event/196156)

2019-10-18 13:30 ~ 18:30

### confluent platform

* linkedin 내 confluent team 이었다가 따로 회사를 차렸다 함
* kafka 의 데이터를 가져가는 client 를 confluent platform 의 하나인 ksql 을 활용 가능하지 않을까 생각이 들었으나, 이미 사용중인 유저(신현우)에게 들어보니 service(control center, schema registry, ksql, kafka, zookeeper) 별로 vm 을 따로 만드느라 10개의 vm 을 만들었다 함
* [Blog](http://cnfl.io/read)

|service|description|
|:---:|:---:|
|control center|web base 로 돼있고 kafka 모니터링이 가능한듯 함. intellij plugin 도 있는듯. **구매해야 사용 가능**|
|schema registry|aws glue catalog 의 역할을 하는것 같은데 confluent 에서는 꼭 사용해야 한다 하고 이미 사용중인 유저도 사용하는게 좋다고 함. table 의 version 관리도 되는듯. [open source](https://github.com/confluentinc/schema-registry)임|
|[ksql](https://www.confluent.io/product/ksql/)|kafka 에 있는 데이터를 sql 같은 문법으로 조회 가능함. avro, json, csv 만 됨. custom 불가능. [open source](https://github.com/confluentinc/ksql)임|

### Kafka Streams: Interactive Queries

* 발표자가 contributor 였음
* StateStore: default [RocksDB](https://github.com/facebook/rocksdb) 를 사용하고 changelog topic 에 상태를 저장할 수 있음
* InteractiveQuery 가 특정 key 의 StateStore 의 값을 읽을 수 있는데 어느 host 어느 partition 인지도 알려줌
* topic 의 현재 상태를 알 수 있게 해줌

특정 topic 의 상태를 알고싶을 때 사용하면 될 듯 함

### Kafka Monitoring

* [KafkaManager](https://github.com/yahoo/kafka-manager) 만으로는 부족해서 찾게 됐다 함
* cluster 가 장애없이 정상인가? 언제 broker 를 늘려야 하는가? 등등...
* jmx 옵션을 켜야 함
* 각종 metric 들을 가지고 모니터링이 가능함

monitoring 지표를 찾았으니 화면개발을 해야 할텐데...

elastic search 에 붙여서 사용하는게 낫지 않을까...

### 카카오 datalake

* 데이터를 잘 모으는 것도 일이고 잘 정리하는건 더 일이고
* 이 데이터 어디에 있나요? 라는 말을 너무 많이 들음 ㅠㅠ
* 많은 source 에서 **무조건 kafka 를 통해** datalake 에 넣게한 후 aws glue catalog 같은 비슷한걸 구축하고 특정 데이터의 권한도 줄 수 있게 하고, 유저에게 schema 입력, 수정하게 하고(집단지성!) 데이터를 갖다쓰게 했다 함
* 참석자 중 많은 사람이 이런 업무를 하는지 마지막 qna 시간에 가장 많은 질문을 받음

## 운영경험

Kafka 운영 경험이 있다는 지인에게 문의해봄

* CPU, Memory 사용량은 적지만, Disk IO, Network IO 가 높음
* 여러 broker 가 동시에 죽지만 않는다면 하나정도의 broker 가 다운되는 등의 경우에도 default.replication.factor, min.insync.replicas 설정만 잘 돼있다면 큰 문제는 없었다 함
* RAID 구성하지 않고 사용하는 것이 더 나은 결과를 볼 수 있었다 함
* consumer 에 변화가 생길 때, 즉 다운되거나 추가될 때 partition rebalancing 시 부득이하게 그 순간의 message 를 처리하지 못하고 손실이 발생하는 문제가 있었다 함
* spark 의 [Checkpointing](https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html#checkpointing) 을 통해 해결할 수 있지 않을까 함
* [Apache Flink](https://flink.apache.org/) 를 찾아보니 Checkpoint, Savepoint 등으로 해결 가능할듯 함

## pom.xml

```xml
<?xml version="1.0" encoding="UTF-8"?>

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.tistory.hskimsky</groupId>
<artifactId>kafkatest</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<kafka.scala.version>2.13</kafka.scala.version>
<kafka.version>2.4.0</kafka.version>
</properties>

<dependencies>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${kafka.scala.version}</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<!--<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>my.programs.main.clazz</mainClass>
</transformer>
</transformers>-->
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
```

반응형
Posted by FeliZ_하늘..
,
반응형

# [Apache BookKeeper](https://bookkeeper.apache.org/)

* 실시간 워크로드에 최적화된 확장가능, 내결함성, 짧은 지연시간 스토리지 서비스`1
* Java 로 만들어짐
* Apache top level project
* [EMC 의 Pravega](http://www.pravega.io/)([github](https://github.com/pravega/pravega)), [Apache Pulsar](http://pulsar.apache.org/)([github](https://github.com/apache/pulsar)) 등에서 BookKeeper 를 storage 로 사용함
* cluster 로 구성 가능(zookeeper 사용)
* [DC/OS](https://dcos.io/), [Kubernetes](https://kubernetes.io/) 를 이용하여 배포 가능

> 글 작성 당시 최신 버전인 4.10.0 기준으로 문서를 작성함

## [BookKeeper concepts and architecture](https://bookkeeper.apache.org/docs/4.10.0/getting-started/concepts/)

### Basic terms

* entry: 각 log 단위(aka record)
* ledgers: log entry stream
* bookies: entry ledger 을 저장하는 개별 서버

### Entries

* metadata 와 ledger 에서 쓴 실제 데이터

|Field|Java type|Description|
|:---|:---|:---|
|Ledger number|`long`|entry 에 쓰여진 ledger 의 ID|
|Entry number|`long`|entry 의 unique ID|
|Last confirmed (LC)|`long`|마지막 기록된 entry 의 ID|
|Data|`byte[]`|client application 에 의해 쓰여진 entry 의 데이터|
|Authentication code|`byte[]`|entry 에 다른 모든 필드를 포함하는 메시지 인증 코드|

### Ledgers

* BookKeeper 의 기본 저장 단위
* entry 의 sequence
* 각 entry 는 byte sequence
* entry 가 저장됨
* 순차적으로, 최대 한 번
* append-only
* 수정 불가능
* 적절한 쓰기 순서는 client application 의 책임

### Clients and APIs

* ledger 생성, 삭제
* ledger 에서 entry 읽기, 쓰기
* [ledger API](https://bookkeeper.apache.org/docs/4.10.0/api/ledger-api)
* [DistributedLog API](https://bookkeeper.apache.org/docs/4.10.0/api/distributedlog-api)

### Bookies

* ledger fragment 를 처리하는 개별 BookKeeper storage server
* 성능을 위해 ledger 전체가 아닌 fragment 를 저장함(like kafka partition?)

#### Motivation

* HDFS NameNode 같은.. 이제는 그 이상
* 효율적 쓰기
* 메시지 복제를 통해 fault tolerance

### Metadata storage

* [ZooKeeper](https://zookeeper.apache.org) 를 사용

### Data management in bookies

* [log-structured](https://en.wikipedia.org/wiki/Log-structured_file_system) 방식으로 데이터 관리

#### Journals

* BookKeeper transaction log 포함
* update transaction 을 비휘발성 저장소에 기록 -> ledger 에 대한 update
* Bookie 가 시작되거나 이전 journal file 이 journal file size threshold 에 도달하면 새 journal file 이 생성됨

#### Entry logs

* BookKeeper client 로부터 받은 entry 를 관리
* ledger 의 entry 는 순차적으로
* offset 은 빠른 lookup 을 위해 ledger cache 에 pointer 로 유지
* Bookie 가 시작되거나 이전 entry log file 이 entry log size threshold 에 도달하면 새 entry log file 이 생성됨
* 오래된 entry log file 은 active ledger 와 연관없는 Garbage Collector Thread 에 의해 지워짐

#### Index files

* entry log file 에 저장된 data 의 offset 을 기록하는 header 와 fixed length index page 로 구성된 각 ledger 에 대해 index file 이 생성됨
* index file 을 update 하면 random disk I/O index file 이 sync thread 에 의해 background 로 lazy update 함

#### Ledger cache

* ledger index page 는 memory pool 에 cache
* disk head scheduling 을 효율적으로 관리함

#### Adding entries

* client 가 ledger 에 entry 를 쓰면 entry 는 다음 단계를 거쳐 disk 에 저장됨

1. entry 가 entry log 에 저장됨
1. entry index 는 ledger cache 에서 updated 됨
1. entry update 에 해당하는 transaction 이 journal 에 추가됨
1. BookKeeper client 에게 response 전송

> 성능성의 이유로, entry log 는 memory 에 buffer, batch 로 commit.
> ledger cache 는 index page 를 memory 에 저장하고 lazy flush.

#### Data flush

Ledger index pages are flushed to index files in the following two cases:

* The ledger cache memory limit is reached. There is no more space available to hold newer index pages. Dirty index pages will be evicted from the ledger cache and persisted to index files.
* A background thread synchronous thread is responsible for flushing index pages from the ledger cache to index files periodically.

Besides flushing index pages, the sync thread is responsible for rolling journal files in case that journal files use too much disk space. The data flush flow in the sync thread is as follows:

* A `LastLogMark` is recorded in memory. The `LastLogMark` indicates that those entries before it have been persisted (to both index and entry log files) and contains two parts:
1. A `txnLogId` (the file ID of a journal)
1. A `txnLogPos` (offset in a journal)
* Dirty index pages are flushed from the ledger cache to the index file, and entry log files are flushed to ensure that all buffered entries in entry log files are persisted to disk.

Ideally, a bookie only needs to flush index pages and entry log files that contain entries before `LastLogMark`. There is, however, no such information in the ledger and entry log mapping to journal files. Consequently, the thread flushes the ledger cache and entry log entirely here, and may flush entries after the `LastLogMark`. Flushing more is not a problem, though, just redundant.
* The `LastLogMark` is persisted to disk, which means that entries added before `LastLogMark` whose entry data and index page were also persisted to disk. It is now time to safely remove journal files created earlier than `txnLogId`.

If the bookie has crashed before persisting `LastLogMark` to disk, it still has journal files containing entries for which index pages may not have been persisted. Consequently, when this bookie restarts, it inspects journal files to restore those entries and data isn't lost.

Using the above data flush mechanism, it is safe for the sync thread to skip data flushing when the bookie shuts down. However, in the entry logger it uses a buffered channel to write entries in batches and there might be data buffered in the buffered channel upon a shut down. The bookie needs to ensure that the entry log flushes its buffered data during shutdown. Otherwise, entry log files become corrupted with partial entries.

#### Data compaction

On bookies, entries of different ledgers are interleaved in entry log files. A bookie runs a garbage collector thread to delete un-associated entry log files to reclaim disk space. If a given entry log file contains entries from a ledger that has not been deleted, then the entry log file would never be removed and the occupied disk space never reclaimed. In order to avoid such a case, a bookie server compacts entry log files in a garbage collector thread to reclaim disk space.

There are two kinds of compaction running with different frequency: minor compaction and major compaction. The differences between minor compaction and major compaction lies in their threshold value and compaction interval.

* The garbage collection threshold is the size percentage of an entry log file occupied by those undeleted ledgers. The default minor compaction threshold is 0.2, while the major compaction threshold is 0.8.
* The garbage collection interval is how frequently to run the compaction. The default minor compaction interval is 1 hour, while the major compaction threshold is 1 day.

> If either the threshold or interval is set to less than or equal to zero, compaction is disabled.

The data compaction flow in the garbage collector thread is as follows:

* The thread scans entry log files to get their entry log metadata, which records a list of ledgers comprising an entry log and their corresponding percentages.
* With the normal garbage collection flow, once the bookie determines that a ledger has been deleted, the ledger will be removed from the entry log metadata and the size of the entry log reduced.
* If the remaining size of an entry log file reaches a specified threshold, the entries of active ledgers in the entry log will be copied to a new entry log file.
* Once all valid entries have been copied, the old entry log file is deleted.

### ZooKeeper metadata

BookKeeper requires a ZooKeeper installation for storing [ledger](#ledger) metadata. Whenever you construct a [`BookKeeper`](../../api/javadoc/org/apache/bookkeeper/client/BookKeeper) client object, you need to pass a list of ZooKeeper servers as a parameter to the constructor, like this:

```java
String zkConnectionString = "127.0.0.1:2181";
BookKeeper bkClient = new BookKeeper(zkConnectionString);
```

> For more info on using the BookKeeper Java client, see [this guide](../../api/ledger-api#the-java-ledger-api-client).

### Ledger manager

A *ledger manager* handles ledgers' metadata (which is stored in ZooKeeper). BookKeeper offers two types of ledger managers: the [flat ledger manager](#flat-ledger-manager) and the [hierarchical ledger manager](#hierarchical-ledger-manager). Both ledger managers extend the [`AbstractZkLedgerManager`](../../api/javadoc/org/apache/bookkeeper/meta/AbstractZkLedgerManager) abstract class.

> ##### Use the flat ledger manager in most cases
> The flat ledger manager is the default and is recommended for nearly all use cases. The hierarchical ledger manager is better suited only for managing very large numbers of BookKeeper ledgers (> 50,000).

#### Flat ledger manager

The *flat ledger manager*, implemented in the [`FlatLedgerManager`](../../api/javadoc/org/apache/bookkeeper/meta/FlatLedgerManager.html) class, stores all ledgers' metadata in child nodes of a single ZooKeeper path. The flat ledger manager creates [sequential nodes](https://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#Sequence+Nodes+--+Unique+Naming) to ensure the uniqueness of the ledger ID and prefixes all nodes with `L`. Bookie servers manage their own active ledgers in a hash map so that it's easy to find which ledgers have been deleted from ZooKeeper and then garbage collect them.

The flat ledger manager's garbage collection follow proceeds as follows:

* All existing ledgers are fetched from ZooKeeper (`zkActiveLedgers`)
* All ledgers currently active within the bookie are fetched (`bkActiveLedgers`)
* The currently actively ledgers are looped through to determine which ledgers don't currently exist in ZooKeeper. Those are then garbage collected.
* The *hierarchical ledger manager* stores ledgers' metadata in two-level [znodes](https://zookeeper.apache.org/doc/current/zookeeperOver.html#Nodes+and+ephemeral+nodes).

#### Hierarchical ledger manager

The *hierarchical ledger manager*, implemented in the [`HierarchicalLedgerManager`](../../api/javadoc/org/apache/bookkeeper/meta/HierarchicalLedgerManager) class, first obtains a global unique ID from ZooKeeper using an [`EPHEMERAL_SEQUENTIAL`](https://zookeeper.apache.org/doc/current/api/org/apache/zookeeper/CreateMode.html#EPHEMERAL_SEQUENTIAL) znode. Since ZooKeeper's sequence counter has a format of `%10d` (10 digits with 0 padding, for example `<path>0000000001`), the hierarchical ledger manager splits the generated ID into 3 parts:

```shell
{level1 (2 digits)}{level2 (4 digits)}{level3 (4 digits)}
```

These three parts are used to form the actual ledger node path to store ledger metadata:

```shell
{ledgers_root_path}/{level1}/{level2}/L{level3}
```

For example, ledger 0000000001 is split into three parts, 00, 0000, and 00001, and stored in znode `/{ledgers_root_path}/00/0000/L0001`. Each znode could have as many 10,000 ledgers, which avoids the problem of the child list being larger than the maximum ZooKeeper packet size (which is the [limitation](https://issues.apache.org/jira/browse/BOOKKEEPER-39) that initially prompted the creation of the hierarchical ledger manager).

## install

* zookeeper 를 3대의 node 에 설치하여 cluster 구성
* bookkeeper 를 4대의 node 에 설치하여 cluster 구성
* ssh key 교환

### Environment Variable

```shell script
vim ~/.bashrc
```

```shell script
...
export BK_HOME="/usr/local/bookkeeper/default"
...
export PATH="${BK_HOME}/bin:${PATH}"
...
```

### 참고사항

* `zookeeper.conf.dynamic` file 적용은 ZooKeeper 3.5.0 이상부터 가능한 듯
* zookeeper 실행 후 `./bin/bookkeeper shell metaformat` 실행해야 함
* `bk_server.conf` 에서 `metadataServiceUri=zk://<HOST1>:<PORT1>;<HOST2>:<PORT2>;<HOST3>:<PORT3>/ledgers` 형식이어야 함
* `${BK_HOME}/conf/bookies` file 에 한 줄에 하나씩 host 적어주면 클러스터로 구성 가능
* cluster 실행시 `${BK_HOME}/bin/bookkeeper-cluster.sh start`

### Download & Untar

```shell script
mkdir /usr/local/bookkeeper
cd /usr/local/bookkeeper
wget http://apache.mirror.cdnetworks.com/bookkeeper/bookkeeper-4.10.0/bookkeeper-server-4.10.0-bin.tar.gz
tar zxf bookkeeper-server-4.10.0-bin.tar.gz
rm bookkeeper-server-4.10.0-bin.tar.gz
ln -s bookkeeper-server-4.10.0/ default
```

### prepare directories

```shell script
mkdir -p /data/bk/{data,journal,ranges}
mkdir -p /data/zk/txlog
chown -R hskimsky:hskimsky /data/*
```

### ${BK_HOME}/conf/bk_server.conf

```shell script
vim ${BK_HOME}/conf/bk_server.conf
```

```
...
httpServerEnabled=true
...
journalDirectories=/data/bk/journal
...
ledgerDirectories=/data/bk/data
...
metadataServiceUri=zk://bk1.sky.local:2181;bk2.sky.local:2181;bk3.sky.local:2181/ledgers
...
zkServers=bk1.sky.local:2181,bk2.sky.local:2181,bk3.sky.local:2181
...
storage.range.store.dirs=/data/bk/ranges
...
```

### ${BK_HOME}/conf/zookeeper.conf

```shell script
vim ${BK_HOME}/conf/zookeeper.conf
```

```
...
dataDir=/data/zk
...
dataLogDir=/data/zk/txlog
...
server.1=bk1.sky.local:2888:3888
server.2=bk2.sky.local:2888:3888
server.3=bk3.sky.local:2888:3888
```

### ${BK_HOME}/conf/bookies

```shell script
vim ${BK_HOME}/conf/bookies
```

```
bk1.sky.local
bk2.sky.local
bk3.sky.local
bk4.sky.local
```

### Execute ZooKeeper

```shell script
${BK_HOME}/bin/bookkeeper-daemon.sh start zookeeper
```

### [Cluster metadata setup](https://bookkeeper.apache.org/docs/4.10.0/deployment/manual#cluster-metadata-setup)

```shell script
${BK_HOME}/bin/bookkeeper shell metaformat
```

### Execute Bookie

* bookie 하나만 실행
* cluster 로 실행할 경우 `bookkeeper-cluster.sh` 로 실행

```shell script
${BK_HOME}/bin/bookkeeper-daemon.sh start bookie
```

### Execute BookKeeper cluster

```shell script
${BK_HOME}/bin/bookkeeper-cluster.sh start
```

### 설치 확인

```shell script
for i in {1..4} ; do curl http://bk${i}.sky.local:8080/heartbeat ; done
```

```
OK
OK
OK
OK
```

```shell script
curl "http://bk1.sky.local:8080/api/v1/bookie/list_bookies?print_hostnames=true"
```

```
{
"192.168.181.122:3181" : "192.168.181.122",
"192.168.181.123:3181" : "192.168.181.123",
"192.168.181.124:3181" : "192.168.181.124",
"192.168.181.121:3181" : "192.168.181.121"
}
```

### 재설치

#### cluster 내 모든 node 에서

```shell script
# ${BK_HOME}/bin/bookkeeper-daemon.sh stop zookeeper
# rm -rf /data/zk/{txlog,version-2}
rm -rf /data/bk/*/*
rm ${BK_HOME}/bin/bookkeeper-bookie.pid
# mkdir -p /data/zk/txlog
# ${BK_HOME}/bin/bookkeeper-daemon.sh start zookeeper
```

#### 한 node 에서만

```shell script
${BK_HOME}/bin/bookkeeper shell metaformat
```

#### Execute BookKeeper cluster

```shell script
${BK_HOME}/bin/bookkeeper-cluster.sh start
```

반응형
Posted by FeliZ_하늘..
,
반응형

얼마 전 업무를 하다가

Apache HBase 에서는 client 와 Region Server 사이에서 통신은 어떻게 이루어질까 하는 의문이 들었다.

그래서 HBase 소스를 보니 HBase 에서는 모든 통신을 ZooKeeper 를 통해서 한다는걸 알았다.

소스 상에서는 ZooKeeper 에다가 node 에 변경을 가하고 끝이다.

그러면 HBase 를 띄울 때 같이 실행된 ZKWatcher 에서는 해당 node 를 계속 listen 하고 있다가 node 가 변경되면 뭔가의 작업이 이루어지는 구조였다.

그래서 mapper 가 많은데 mapper 에서 HTable 을 열어서 뭔가의 작업을 하면 항상 ZooKeeper max connection 에러가 나서 Region Server 가 몇개 죽거나 그랬다

애초에 mapper 에서 HTable.delete() 를 한다는게 말이 안되는 거였다.

왜냐하면 HTable 을 열고 뭔가 작업을 하면 HBase cluster 전체에 있는 Region Server 전체가 실행할 것이다 근데 그걸 모든 mapper 에서 실행했으니 ZooKeeper max connection 을 초과하는게 당연한 얘기일 것이다.


그리고 가끔 Apache incubator project list 를 보는데 Ratis 라는 project 를 보니 ZooKeeper 와 비슷한 project 였다.

Ratis project 는 Raft 알고리즘의 구현체이다.

Raft 알고리즘은 대표적인 합의 알고리즘(Consensus algorithm)인 Paxos 보다 이해가 쉽고 구현이 쉬운 알고리즘이라고 소개하고 있다.

구글링 해보면 합의 알고리즘이란 분산환경에서 신뢰성을 보장하도록 하는 알고리즘으로 최근 블록체인이 이슈가 되면서 합의 알고리즘 포스팅도 많아진 것 같다.


다음 포스팅은 Raft Protocol 논문을 읽고 요약해봐야겠다

반응형
Posted by FeliZ_하늘..
,