Kafka Operation Scripts

Share on:

2020 It邦幫忙鐵人賽 系列文章

由於我比較熟悉 GCP / GKE 的服務,這篇的操作過程都會以 GCP 平台作為範例,不過操作過程大體上是跨平台通用的。

寫文章真的是體力活,覺得我的文章還有參考價值,請左邊幫我點讚按個喜歡,右上角幫我按個追縱,底下歡迎留言討論。給我一點繼續走下去的動力。

對我的文章有興趣,歡迎到我的網站上 https://chechia.net 閱讀其他技術文章,有任何謬誤也請各方大德直接聯繫我,感激不盡。

Exausted Cat Face


摘要

  • 從 Zookeeper 獲取資訊
  • 取得並處理 topic
  • benchmark kafka

zookeeper

zookeeper 是 kafka 的分散式協調系統,在 kafka 上多個節點間需要協調的內容,例如:彼此節點的ID,位置與當前狀態,或是跨節點 topic 的設定與狀態。取名叫做 zookeeper 就是在協調混亂的分散式系統,,裡面各種不同種類的服務都要協調,象個動物園管理員。Zookeeper 的官方文件 有更詳細的說明。

Kafka 的節點資訊,與當前狀態,是放在 zookeeper 上,我們可以透過以下指令取得

 1# 首先先取得 zkCli 的 cli,這個只有連進任何一台 zookeeper 內部都有
 2kubectl exec -it kafka-0-zookeeper-0 --container kafka-broker bash
 3
 4# 由於是在 Pod 內部,直接 localhost 詢問本地
 5/usr/bin/zkCli.sh -server localhost:2181
 6
 7Connecting to localhost:2181
 82019-09-25 15:02:36,089 [myid:] - INFO  [main:Environment@100] - Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT
 92019-09-25 15:02:36,096 [myid:] - INFO  [main:Environment@100] - Client environment:host.name=kafka-0-zookeeper-0.kafka-0-zookeeper-headless.default.svc.cluster.local
102019-09-25 15:02:36,096 [myid:] - INFO  [main:Environment@100] - Client environment:java.version=1.8.0_131
112019-09-25 15:02:36,100 [myid:] - INFO  [main:Environment@100] - Client environment:java.vendor=Oracle Corporation
122019-09-25 15:02:36,100 [myid:] - INFO  [main:Environment@100] - Client environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre
132019-09-25 15:02:36,100 [myid:] - INFO  [main:Environment@100] - Client environment:java.class.path=/usr/bin/../build/classes:/usr/bin/../build/lib/*.jar:/usr/bin/../share/zookeeper/zookeeper-3.4.10.jar:/usr/bin/../share/zookeeper/slf4j-log4j12-1.6.1.jar:/usr/bin/../share/zookeeper/slf4j-api-1.6.1.jar:/usr/bin/../share/zookeeper/netty-3.10.5.Final.jar:/usr/bin/../share/zookeeper/log4j-1.2.16.jar:/usr/bin/../share/zookeeper/jline-0.9.94.jar:/usr/bin/../src/java/lib/*.jar:/usr/bin/../etc/zookeeper:
142019-09-25 15:02:36,100 [myid:] - INFO  [main:Environment@100] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
152019-09-25 15:02:36,100 [myid:] - INFO  [main:Environment@100] - Client environment:java.io.tmpdir=/tmp
162019-09-25 15:02:36,100 [myid:] - INFO  [main:Environment@100] - Client environment:java.compiler=<NA>
172019-09-25 15:02:36,101 [myid:] - INFO  [main:Environment@100] - Client environment:os.name=Linux
182019-09-25 15:02:36,101 [myid:] - INFO  [main:Environment@100] - Client environment:os.arch=amd64
192019-09-25 15:02:36,101 [myid:] - INFO  [main:Environment@100] - Client environment:os.version=4.14.127+
202019-09-25 15:02:36,101 [myid:] - INFO  [main:Environment@100] - Client environment:user.name=zookeeper
212019-09-25 15:02:36,102 [myid:] - INFO  [main:Environment@100] - Client environment:user.home=/home/zookeeper
222019-09-25 15:02:36,102 [myid:] - INFO  [main:Environment@100] - Client environment:user.dir=/
232019-09-25 15:02:36,105 [myid:] - INFO  [main:ZooKeeper@438] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@42110406
24Welcome to ZooKeeper!
252019-09-25 15:02:36,160 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@1032] - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
26JLine support is enabled
272019-09-25 15:02:36,374 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@876] - Socket connection established to localhost/127.0.0.1:2181, initiating session
282019-09-25 15:02:36,393 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@1299] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x16d67baf1310001, negotiated timeout = 30000
29
30WATCHER::
31
32WatchedEvent state:SyncConnected type:None path:null
33[zk: localhost:2181(CONNECTED) 0]

取得 kafka broker 資料

 1# List root Nodes
 2$ ls /
 3
 4[cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
 5
 6# Brokers 的資料節點
 7$ ls /brokers
 8[ids, topics, seqid]
 9
10# List /brokers/ids 得到三個 kafka broker
11$ ls /brokers/ids
12[0, 1, 2]
13
14# 列出所有 topic 名稱
15ls /brokers/topics
16[ticker]

ticker 是上篇範利用到的 topic

簡單來說,zookeeper 存放這些狀態與 topic 的 metadata

  • 儲存核心的狀態與資料,特別是 broker 萬一掛掉,也還需要維持的資料
  • 協調工作,例如協助 broker 處理 quorum,紀錄 partition master 等
1# 離開 zkCli
2quit

Kafka

這邊一樣先連線進去一台 broker,取得 kafka binary

 1kubectl exec -it kafka-0-0 --container kafka-broker bash
 2
 3 ls /usr/bin/ | grep kafka
 4kafka-acls
 5kafka-broker-api-versions
 6kafka-configs
 7kafka-console-consumer
 8kafka-console-producer
 9kafka-consumer-groups
10kafka-consumer-perf-test
11kafka-delegation-tokens
12kafka-delete-records
13kafka-dump-log
14kafka-log-dirs
15kafka-mirror-maker
16kafka-preferred-replica-election
17kafka-producer-perf-test
18kafka-reassign-partitions
19kafka-replica-verification
20kafka-run-class
21kafka-server-start
22kafka-server-stop
23kafka-streams-application-reset
24kafka-topics
25kafka-verifiable-consumer
26kafka-verifiable-producer

很多工具,我們這邊只會看其中幾個

topic 資訊

Topic 的資訊,跟 zookeeper 要

1# List topics
2/usr/bin/kafka-topics --list --zookeeper kafka-0-zookeeper
3
4ticker

操作 message

從 topic 取得 message

1# This will create a new console-consumer and start consuming message to stdout
2/usr/bin/kafka-console-consumer \
3--bootstrap-server localhost:9092 \
4--topic engine_topic_soundwave_USD \
5--timeout 0 \
6--from-beginning

如果 ticker 那個 example pod 還在執行,這邊就會收到 ticker 的每秒 message

如果沒有,也可以開啟另一個 broker 的連線

1kubectl exec -it kafka-0-1 --container kafka-broker bash
2
3# 使用 producer 的 console 連入,topic 把 message 塞進去
4/usr/bin/kafka-console-producer \
5--broker-list localhost:9092\
6 --topic ticker
7
8tick [enter]
9tick [enter]

kafka-console-consumer 那個 terminal 就會收到 message

1tick
2tick

當然也可以使用 consumer group

1# Use consumer to check ticker topics
2/usr/bin/kafka-console-consumer \
3--bootstrap-server localhost:9092 \
4--topic ticker \
5--group test

有做過上面的操作產生 consumer group,就可以透過 consumer API,取得 consumer group 狀態

 1# Check consumer group
 2/usr/bin/kafka-consumer-groups \
 3--bootstrap-server localhost:9092 \
 4--group ticker \
 5--describe
 6
 7Consumer group 'test' has no active members.
 8
 9TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
10ticker          0          23              23              0               -               -               -

Topic 設定操作

Topic 設定文件 在此

這邊透過 kafka-configs 從 zookeeper 取得 topic 設定,這邊的 max.message.bytes,是這個 topic 每個 message 的最大上限。

1/usr/bin/kafka-configs --zookeeper kafka-0-zookeeper:2181 --describe max.message.bytes --entity-type topics
2
3Configs for topic '__consumer_offsets' are segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
4Configs for topic 'ticker' are

__consumer__offsets 是系統的 topic ,紀錄目前 consumer 讀取的位置。

ticker 沒有設定,就是 producer 當初產生 topic 時沒有指定,使用 default 值

由於我們公司的使用情境常常會超過,所以可以檢查 producer app 那端送出的 message 大小,在比較這邊的設定。當然現在 ticker 的範例,只有一個 0-60 的數值,並不會超過。這個可以在 helm install 的時候,使用 value.yaml 傳入時更改。

不喜歡這個值,可以更改,這邊增加到 16MB

1TOPIC=ticker
2
3/usr/bin/kafka-configs \
4  --zookeeper kafka-3-zookeeper:2181 \
5  --entity-type topics \
6  --alter \
7  --entity-name ${TOPIC} \
8  --add-config max.message.bytes=16000000
9

Benchmark

使用內建工具跑 benchmark

Producer

1/usr/bin/kafka-producer-perf-test \
2  --num-records 100 \
3  --record-size 100 \
4  --topic performance-test \
5  --throughput 100 \
6  --producer-props bootstrap.servers=kafka:9092 max.in.flight.requests.per.connection=5 batch.size=100 compression.type=none
7
8100 records sent, 99.108028 records/sec (0.01 MB/sec), 26.09 ms avg latency, 334.00 ms max latency, 5 ms 50th, 70 ms 95th, 334 ms 99th, 334 ms 99.9th.

Consumer

1/usr/bin/kafka-consumer-perf-test \
2  --messages 100 \
3  --broker-list=kafka:9092 \
4  --topic performance-test \
5  --group performance-test \
6  --num-fetch-threads 1