Kafka Operation Scripts
2020 It邦幫忙鐵人賽 系列文章
- ELK Stack
- Self-host ELK stack on GCP
- Secure ELK Stask
- 監測 Google Compute Engine 上服務的各項數據
- 監測 Google Kubernetes Engine 的各項數據
- 是否選擇 ELK 作為解決方案
- 使用 logstash pipeline 做數據前處理
- Elasticsearch 日常維護:數據清理,效能調校,永久儲存
- Debug ELK stack on GCP
- Kafka HA on Kubernetes(6)
- Deploy kafka-ha
- Kafka Introduction
- kafka 基本使用
- kafka operation scripts
- 集群內部的 HA topology
- 集群內部的 HA 細節
- Prometheus Metrics Exporter 很重要
- 效能調校
由於我比較熟悉 GCP / GKE 的服務,這篇的操作過程都會以 GCP 平台作為範例,不過操作過程大體上是跨平台通用的。
寫文章真的是體力活,覺得我的文章還有參考價值,請左邊幫我點讚按個喜歡,右上角幫我按個追縱,底下歡迎留言討論。給我一點繼續走下去的動力。
對我的文章有興趣,歡迎到我的網站上 https://chechia.net 閱讀其他技術文章,有任何謬誤也請各方大德直接聯繫我,感激不盡。
摘要
- 從 Zookeeper 獲取資訊
- 取得並處理 topic
- benchmark kafka
zookeeper
zookeeper 是 kafka 的分散式協調系統,在 kafka 上多個節點間需要協調的內容,例如:彼此節點的ID,位置與當前狀態,或是跨節點 topic 的設定與狀態。取名叫做 zookeeper 就是在協調混亂的分散式系統,,裡面各種不同種類的服務都要協調,象個動物園管理員。Zookeeper 的官方文件 有更詳細的說明。
Kafka 的節點資訊,與當前狀態,是放在 zookeeper 上,我們可以透過以下指令取得
1# 首先先取得 zkCli 的 cli,這個只有連進任何一台 zookeeper 內部都有
2kubectl exec -it kafka-0-zookeeper-0 --container kafka-broker bash
3
4# 由於是在 Pod 內部,直接 localhost 詢問本地
5/usr/bin/zkCli.sh -server localhost:2181
6
7Connecting to localhost:2181
82019-09-25 15:02:36,089 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT
92019-09-25 15:02:36,096 [myid:] - INFO [main:Environment@100] - Client environment:host.name=kafka-0-zookeeper-0.kafka-0-zookeeper-headless.default.svc.cluster.local
102019-09-25 15:02:36,096 [myid:] - INFO [main:Environment@100] - Client environment:java.version=1.8.0_131
112019-09-25 15:02:36,100 [myid:] - INFO [main:Environment@100] - Client environment:java.vendor=Oracle Corporation
122019-09-25 15:02:36,100 [myid:] - INFO [main:Environment@100] - Client environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre
132019-09-25 15:02:36,100 [myid:] - INFO [main:Environment@100] - Client environment:java.class.path=/usr/bin/../build/classes:/usr/bin/../build/lib/*.jar:/usr/bin/../share/zookeeper/zookeeper-3.4.10.jar:/usr/bin/../share/zookeeper/slf4j-log4j12-1.6.1.jar:/usr/bin/../share/zookeeper/slf4j-api-1.6.1.jar:/usr/bin/../share/zookeeper/netty-3.10.5.Final.jar:/usr/bin/../share/zookeeper/log4j-1.2.16.jar:/usr/bin/../share/zookeeper/jline-0.9.94.jar:/usr/bin/../src/java/lib/*.jar:/usr/bin/../etc/zookeeper:
142019-09-25 15:02:36,100 [myid:] - INFO [main:Environment@100] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
152019-09-25 15:02:36,100 [myid:] - INFO [main:Environment@100] - Client environment:java.io.tmpdir=/tmp
162019-09-25 15:02:36,100 [myid:] - INFO [main:Environment@100] - Client environment:java.compiler=<NA>
172019-09-25 15:02:36,101 [myid:] - INFO [main:Environment@100] - Client environment:os.name=Linux
182019-09-25 15:02:36,101 [myid:] - INFO [main:Environment@100] - Client environment:os.arch=amd64
192019-09-25 15:02:36,101 [myid:] - INFO [main:Environment@100] - Client environment:os.version=4.14.127+
202019-09-25 15:02:36,101 [myid:] - INFO [main:Environment@100] - Client environment:user.name=zookeeper
212019-09-25 15:02:36,102 [myid:] - INFO [main:Environment@100] - Client environment:user.home=/home/zookeeper
222019-09-25 15:02:36,102 [myid:] - INFO [main:Environment@100] - Client environment:user.dir=/
232019-09-25 15:02:36,105 [myid:] - INFO [main:ZooKeeper@438] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@42110406
24Welcome to ZooKeeper!
252019-09-25 15:02:36,160 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1032] - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
26JLine support is enabled
272019-09-25 15:02:36,374 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@876] - Socket connection established to localhost/127.0.0.1:2181, initiating session
282019-09-25 15:02:36,393 [myid:] - INFO [main-SendThread(localhost:2181):ClientCnxn$SendThread@1299] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x16d67baf1310001, negotiated timeout = 30000
29
30WATCHER::
31
32WatchedEvent state:SyncConnected type:None path:null
33[zk: localhost:2181(CONNECTED) 0]
取得 kafka broker 資料
1# List root Nodes
2$ ls /
3
4[cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
5
6# Brokers 的資料節點
7$ ls /brokers
8[ids, topics, seqid]
9
10# List /brokers/ids 得到三個 kafka broker
11$ ls /brokers/ids
12[0, 1, 2]
13
14# 列出所有 topic 名稱
15ls /brokers/topics
16[ticker]
ticker 是上篇範利用到的 topic
簡單來說,zookeeper 存放這些狀態與 topic 的 metadata
- 儲存核心的狀態與資料,特別是 broker 萬一掛掉,也還需要維持的資料
- 協調工作,例如協助 broker 處理 quorum,紀錄 partition master 等
1# 離開 zkCli
2quit
Kafka
這邊一樣先連線進去一台 broker,取得 kafka binary
1kubectl exec -it kafka-0-0 --container kafka-broker bash
2
3 ls /usr/bin/ | grep kafka
4kafka-acls
5kafka-broker-api-versions
6kafka-configs
7kafka-console-consumer
8kafka-console-producer
9kafka-consumer-groups
10kafka-consumer-perf-test
11kafka-delegation-tokens
12kafka-delete-records
13kafka-dump-log
14kafka-log-dirs
15kafka-mirror-maker
16kafka-preferred-replica-election
17kafka-producer-perf-test
18kafka-reassign-partitions
19kafka-replica-verification
20kafka-run-class
21kafka-server-start
22kafka-server-stop
23kafka-streams-application-reset
24kafka-topics
25kafka-verifiable-consumer
26kafka-verifiable-producer
很多工具,我們這邊只會看其中幾個
topic 資訊
Topic 的資訊,跟 zookeeper 要
1# List topics
2/usr/bin/kafka-topics --list --zookeeper kafka-0-zookeeper
3
4ticker
操作 message
從 topic 取得 message
1# This will create a new console-consumer and start consuming message to stdout
2/usr/bin/kafka-console-consumer \
3--bootstrap-server localhost:9092 \
4--topic engine_topic_soundwave_USD \
5--timeout 0 \
6--from-beginning
如果 ticker 那個 example pod 還在執行,這邊就會收到 ticker 的每秒 message
如果沒有,也可以開啟另一個 broker 的連線
1kubectl exec -it kafka-0-1 --container kafka-broker bash
2
3# 使用 producer 的 console 連入,topic 把 message 塞進去
4/usr/bin/kafka-console-producer \
5--broker-list localhost:9092\
6 --topic ticker
7
8tick [enter]
9tick [enter]
kafka-console-consumer 那個 terminal 就會收到 message
1tick
2tick
當然也可以使用 consumer group
1# Use consumer to check ticker topics
2/usr/bin/kafka-console-consumer \
3--bootstrap-server localhost:9092 \
4--topic ticker \
5--group test
有做過上面的操作產生 consumer group,就可以透過 consumer API,取得 consumer group 狀態
1# Check consumer group
2/usr/bin/kafka-consumer-groups \
3--bootstrap-server localhost:9092 \
4--group ticker \
5--describe
6
7Consumer group 'test' has no active members.
8
9TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
10ticker 0 23 23 0 - - -
Topic 設定操作
Topic 設定文件 在此
這邊透過 kafka-configs 從 zookeeper 取得 topic 設定,這邊的 max.message.bytes,是這個 topic 每個 message 的最大上限。
1/usr/bin/kafka-configs --zookeeper kafka-0-zookeeper:2181 --describe max.message.bytes --entity-type topics
2
3Configs for topic '__consumer_offsets' are segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
4Configs for topic 'ticker' are
__consumer__offsets
是系統的 topic ,紀錄目前 consumer 讀取的位置。
ticker 沒有設定,就是 producer 當初產生 topic 時沒有指定,使用 default 值
由於我們公司的使用情境常常會超過,所以可以檢查 producer app 那端送出的 message 大小,在比較這邊的設定。當然現在 ticker 的範例,只有一個 0-60 的數值,並不會超過。這個可以在 helm install 的時候,使用 value.yaml 傳入時更改。
不喜歡這個值,可以更改,這邊增加到 16MB
1TOPIC=ticker
2
3/usr/bin/kafka-configs \
4 --zookeeper kafka-3-zookeeper:2181 \
5 --entity-type topics \
6 --alter \
7 --entity-name ${TOPIC} \
8 --add-config max.message.bytes=16000000
Benchmark
使用內建工具跑 benchmark
Producer
1/usr/bin/kafka-producer-perf-test \
2 --num-records 100 \
3 --record-size 100 \
4 --topic performance-test \
5 --throughput 100 \
6 --producer-props bootstrap.servers=kafka:9092 max.in.flight.requests.per.connection=5 batch.size=100 compression.type=none
7
8100 records sent, 99.108028 records/sec (0.01 MB/sec), 26.09 ms avg latency, 334.00 ms max latency, 5 ms 50th, 70 ms 95th, 334 ms 99th, 334 ms 99.9th.
Consumer
1/usr/bin/kafka-consumer-perf-test \
2 --messages 100 \
3 --broker-list=kafka:9092 \
4 --topic performance-test \
5 --group performance-test \
6 --num-fetch-threads 1