반응형

# [Apache Kafka](https://kafka.apache.org/)

## [Introduction](http://kafka.apache.org/intro)

* Apache top level project
* 분산 스트리밍 플랫폼
* scala 로 만들어짐

## [Configs](https://kafka.apache.org/documentation)

## Broker

* [Broker Configs](http://kafka.apache.org/documentation.html#brokerconfigs)

```shell script
vim ${KAFKA_HOME}/config/server.properties
```

```properties
# required, string, hostname1:port1,hostname2:port2,hostname3:port3/chroot/path
zookeeper.connect=
# boolean, default: true, true 면 관리자가 모르는 topic 이 생김
auto.create.topics.enable=true
# required, int, default: -1, broker id 는 cluster 별로 유일해야 함
broker.id=-1
# string, default: producer, uncompressed|gzip|snappy|lz4|zstd|producer, producer: producer 의 설정을 사용함
compression.type=producer
# boolean, default: true, admin tool 을 통해 topic 을 지움, false 면 delete 표시만 함
delete.topic.enable=true
# string, default: null, comma separated list of bind URIs, 비어있으면 default interface, 0.0.0.0 이면 모든 interface, ex) PLAINTEXT://myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093
listeners=
# string, default: /tmp/kafka-logs, kafka log dir, log.dirs 보조역할
log.dir=/tmp/kafka-logs
# required, string, default: null, comma separated list of dirs, 없으면 log.dir 사용함
log.dirs=
# long default: 9223372036854775807, disk 로 flush 할때까지 담아둘 message 수
log.flush.interval.messages=9223372036854775807
# long, default: null, disk 로 flush 할때까지 담아둘 시간
log.flush.interval.ms=
# 보관주기: 시간, 용량, 용량계산 필수, 우선순위: ms > minutes > hours
# long, default: -1
log.retention.bytes=-1
# int, default: 168
log.retention.hours=168
# int, default: null
log.retention.minutes=
# long, default: null, -1 은 제한 없음
log.retention.ms=
# int, default: 1, acks=all 일 때, write 성공 위한 최소 복제수
min.insync.replicas=1
# int, default: 1, startup 시 recovery, shutdown 시 flush 하는 log data dir 당 thread 수, raid 로 묶여있으면 증가시켜주는게 좋음
num.recovery.threads.per.data.dir=1
# short, default: 3, topic 복제수
offsets.topic.replication.factor=3
# string, default: null, rack awareness 가능
broker.rack=
# int, default: 1, 복제수
default.replication.factor=1
# long, default: 9223372036854775807, compaction 할 수 없는 최대 시간
log.cleaner.max.compaction.lag.ms=9223372036854775807
# double, default: 0.5, compaction 할 dirty_log:total 의 비율
log.cleaner.min.cleanable.ratio=0.5
# long, default: 0, compaction 되지 않은 최소 시간
log.cleaner.min.compaction.lag.ms=0
# int, default: 1, topic 별 log partition 수, partitions:consumer_instances 비율은 1:1(cluster 내 broker 수) 로 가야 이상적 속도, topic 생성 후 증가만 가능
num.partitions=1
```

## [Topic Configs](http://kafka.apache.org/documentation.html#topicconfigs)

### topic name

* topic 이름으로는 영문, 숫자, '.', '_', '-' 만 사용 가능 ([소스코드](https://github.com/apache/kafka/blob/2.3.0/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L29))
* 249글자 제한([소스코드](https://github.com/apache/kafka/blob/2.3.0/clients/src/main/java/org/apache/kafka/common/internals/Topic.java#L34))
* '_', '.' 문자를 혼용하지 말라고 권고함

## [Producer Configs](http://kafka.apache.org/documentation.html#producerconfigs)

```properties
# string, default: 1, all|-1|0|1, all|-1: follower 로 복제까지 완료됐는지 확인, 0: 보내기만 하고 끝, 1: leader 가 받았는지 확인, leader 가 죽으면 message 복제시 follower 가 leader 가 되지만 그 순간 message 는 유실됨
acks=1
bootstrap.servers=
# long, default: 33554432, producer 가 broker 로 전송 대기중인 record 를 buffer 하는데 사용할 수 있는 memory bytes, 빠르게 전송되면 max.block.ms 만큼 block 후 exception 던짐
buffer.memory=33554432
# string, default: none, 다음 중 선택 가능: none|gzip|snappy|lz4|zstd
compression.type=none
# string, default: "", server side log 에 포함시켜 request tracking 을 쉽게하기 위함
client.id=
# long, default: 0, batch.size 만큼 가져오면 바로 전송, 그렇지 않으면 지정한 시간만큼 지연 후 전송, 설정하면 send 수는 줄어들지만 지연이 생김
linger.ms=0
# long, default: 60000, buffer 가 full 이거나 metadata 를 사용할 수 없을 때 block 할 시간, user Serializer 나 Partitioner 에서 실행되는 시간은 포함되지 않음
max.block.ms=60000
# class, default: org.apache.kafka.clients.producer.internals.DefaultPartitioner
partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner
# int, default: 32768, data 읽을 때 TCP receive buffer size, -1 이면 OS default 를 사용함
receive.buffer.bytes=32768
# int, default: 131072, data 보낼 때 TCP send buffer size, -1 이면 OS default 를 사용함
send.buffer.bytes=131072
# list, default: ""
interceptor.classes=
```

## [Consumer Configs](http://kafka.apache.org/documentation.html#consumerconfigs)

```properties
bootstrap.servers=
# string, default: null
group.id=
# boolean, default: true
allow.auto.create.topics=true
# int, default: 32768, data 읽을 때 TCP receive buffer size, -1 이면 OS default 를 사용함
receive.buffer.bytes=32768
# int, default: 131072, data 보낼 때 TCP send buffer size, -1 이면 OS default 를 사용함
send.buffer.bytes=131072
# string, default: "", server side log 에 포함시켜 request tracking 을 쉽게하기 위함
client.id=
# string, default: "", client 가 속해있는 rack, broker.rack 에 해당함
client.rack=
# list, default: "", implements org.apache.kafka.clients.consumer.ConsumerInterceptor
interceptor.classes=
```

## [kafka conference seoul 2019](https://www.onoffmix.com/event/196156)

2019-10-18 13:30 ~ 18:30

### confluent platform

* linkedin 내 confluent team 이었다가 따로 회사를 차렸다 함
* kafka 의 데이터를 가져가는 client 를 confluent platform 의 하나인 ksql 을 활용 가능하지 않을까 생각이 들었으나, 이미 사용중인 유저(신현우)에게 들어보니 service(control center, schema registry, ksql, kafka, zookeeper) 별로 vm 을 따로 만드느라 10개의 vm 을 만들었다 함
* [Blog](http://cnfl.io/read)

|service|description|
|:---:|:---:|
|control center|web base 로 돼있고 kafka 모니터링이 가능한듯 함. intellij plugin 도 있는듯. **구매해야 사용 가능**|
|schema registry|aws glue catalog 의 역할을 하는것 같은데 confluent 에서는 꼭 사용해야 한다 하고 이미 사용중인 유저도 사용하는게 좋다고 함. table 의 version 관리도 되는듯. [open source](https://github.com/confluentinc/schema-registry)임|
|[ksql](https://www.confluent.io/product/ksql/)|kafka 에 있는 데이터를 sql 같은 문법으로 조회 가능함. avro, json, csv 만 됨. custom 불가능. [open source](https://github.com/confluentinc/ksql)임|

### Kafka Streams: Interactive Queries

* 발표자가 contributor 였음
* StateStore: default [RocksDB](https://github.com/facebook/rocksdb) 를 사용하고 changelog topic 에 상태를 저장할 수 있음
* InteractiveQuery 가 특정 key 의 StateStore 의 값을 읽을 수 있는데 어느 host 어느 partition 인지도 알려줌
* topic 의 현재 상태를 알 수 있게 해줌

특정 topic 의 상태를 알고싶을 때 사용하면 될 듯 함

### Kafka Monitoring

* [KafkaManager](https://github.com/yahoo/kafka-manager) 만으로는 부족해서 찾게 됐다 함
* cluster 가 장애없이 정상인가? 언제 broker 를 늘려야 하는가? 등등...
* jmx 옵션을 켜야 함
* 각종 metric 들을 가지고 모니터링이 가능함

monitoring 지표를 찾았으니 화면개발을 해야 할텐데...

elastic search 에 붙여서 사용하는게 낫지 않을까...

### 카카오 datalake

* 데이터를 잘 모으는 것도 일이고 잘 정리하는건 더 일이고
* 이 데이터 어디에 있나요? 라는 말을 너무 많이 들음 ㅠㅠ
* 많은 source 에서 **무조건 kafka 를 통해** datalake 에 넣게한 후 aws glue catalog 같은 비슷한걸 구축하고 특정 데이터의 권한도 줄 수 있게 하고, 유저에게 schema 입력, 수정하게 하고(집단지성!) 데이터를 갖다쓰게 했다 함
* 참석자 중 많은 사람이 이런 업무를 하는지 마지막 qna 시간에 가장 많은 질문을 받음

## 운영경험

Kafka 운영 경험이 있다는 지인에게 문의해봄

* CPU, Memory 사용량은 적지만, Disk IO, Network IO 가 높음
* 여러 broker 가 동시에 죽지만 않는다면 하나정도의 broker 가 다운되는 등의 경우에도 default.replication.factor, min.insync.replicas 설정만 잘 돼있다면 큰 문제는 없었다 함
* RAID 구성하지 않고 사용하는 것이 더 나은 결과를 볼 수 있었다 함
* consumer 에 변화가 생길 때, 즉 다운되거나 추가될 때 partition rebalancing 시 부득이하게 그 순간의 message 를 처리하지 못하고 손실이 발생하는 문제가 있었다 함
* spark 의 [Checkpointing](https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html#checkpointing) 을 통해 해결할 수 있지 않을까 함
* [Apache Flink](https://flink.apache.org/) 를 찾아보니 Checkpoint, Savepoint 등으로 해결 가능할듯 함

## pom.xml

```xml
<?xml version="1.0" encoding="UTF-8"?>

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.tistory.hskimsky</groupId>
<artifactId>kafkatest</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<kafka.scala.version>2.13</kafka.scala.version>
<kafka.version>2.4.0</kafka.version>
</properties>

<dependencies>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${kafka.scala.version}</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<!--<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>my.programs.main.clazz</mainClass>
</transformer>
</transformers>-->
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
```

반응형
Posted by FeliZ_하늘..
,