Kafka-basic-usage

Share on:

2020 It邦幫忙鐵人賽 系列文章

由於我比較熟悉 GCP / GKE 的服務,這篇的操作過程都會以 GCP 平台作為範例,不過操作過程大體上是跨平台通用的。

寫文章真的是體力活,覺得我的文章還有參考價值,請左邊幫我點讚按個喜歡,右上角幫我按個追縱,底下歡迎留言討論。給我一點繼續走下去的動力。

對我的文章有興趣,歡迎到我的網站上 https://chechia.net 閱讀其他技術文章,有任何謬誤也請各方大德直接聯繫我,感激不盡。

Exausted Cat Face


摘要

  • 在 Kubernetes 中連線 kafka
  • 使用 golang library 連線到 Kafka
  • 透過 kafka script 操作 kafka

kubernetes 中連線 kafka

先看一看 kafka pods

 1$ kubectl get pods --selector='app=kafka'
 2
 3NAME        READY   STATUS    RESTARTS   AGE
 4kafka-1-0   1/1     Running   1          26d
 5kafka-1-1   1/1     Running   0          26d
 6kafka-1-2   1/1     Running   0          26d
 7
 8$ kubectl get pods -l 'app=zookeeper'
 9
10NAME                  READY   STATUS    RESTARTS   AGE
11kafka-1-zookeeper-0   1/1     Running   0          26d
12kafka-1-zookeeper-1   1/1     Running   0          26d
13kafka-1-zookeeper-2   1/1     Running   0          26d
14
15$ kubectl get pods -l 'app=kafka-exporter'
16
17NAME                               READY   STATUS    RESTARTS   AGE
18kafka-1-exporter-88786d84b-z954z   1/1     Running   5          26d
 1kubectl describe pods kafka-1-0
 2
 3Name:           kafka-1-0
 4Namespace:      default
 5Priority:       0
 6Node:           gke-chechiachang-pool-1-e4622744-wcq0/10.140.15.212
 7Labels:         app=kafka
 8                controller-revision-hash=kafka-1-69986d7477
 9                release=kafka-1
10                statefulset.kubernetes.io/pod-name=kafka-1-0
11Annotations:    kubernetes.io/limit-ranger: LimitRanger plugin set: cpu request for container kafka-broker
12Status:         Running
13IP:             10.12.6.178
14Controlled By:  StatefulSet/kafka-1
15Containers:
16  kafka-broker:
17    Image:         confluentinc/cp-kafka:5.0.1
18    Port:          9092/TCP
19    Host Port:     0/TCP
20    Command:
21      sh
22      -exc
23      unset KAFKA_PORT && \
24      export KAFKA_BROKER_ID=${POD_NAME##*-} && \
25      export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${POD_IP}:9092 && \
26      exec /etc/confluent/docker/run
27
28    Requests:
29      cpu:      100m
30    Liveness:   exec [sh -ec /usr/bin/jps | /bin/grep -q SupportedKafka] delay=30s timeout=5s period=10s #success=1 #failure=3
31    Readiness:  tcp-socket :kafka delay=30s timeout=5s period=10s #success=1 #failure=3
32    Environment:
33      POD_IP:                                   (v1:status.podIP)
34      POD_NAME:                                kafka-1-0 (v1:metadata.name)
35      POD_NAMESPACE:                           default (v1:metadata.namespace)
36      KAFKA_HEAP_OPTS:                         -Xmx4G -Xms1G
37      KAFKA_ZOOKEEPER_CONNECT:                 kafka-1-zookeeper:2181
38      KAFKA_LOG_DIRS:                          /opt/kafka/data/logs
39      KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE:  false
40      KAFKA_DEFAULT_REPLICATION_FACTOR:        3
41      KAFKA_MESSAGE_MAX_BYTES:                 16000000
42      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:  1
43      KAFKA_JMX_PORT:                          5555
44    Mounts:
45      /opt/kafka/data from datadir (rw)
46      /var/run/secrets/kubernetes.io/serviceaccount from default-token-2tm8c (ro)
47Conditions:
48Volumes:
49  datadir:
50    Type:       PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
51    ClaimName:  datadir-kafka-1-0
52    ReadOnly:   false
53  default-token-2tm8c:
54    Type:        Secret (a volume populated by a Secret)
55    SecretName:  default-token-2tm8c
56    Optional:    false

講幾個重點:

  • 這邊跑起來的是 kafka-broker,接收 producer 與 consumer 來的 request
  • 這邊用的是 statefulsets,不是完全無狀態的 kafka broker,而把 message 記在 datadir 上,降低故障重啟時可能遺失資料的風險。
  • 啟動時,把 kubernetes 指定的 pod name 塞進環境變數,然後作為當前 broker 的 ID
  • 沒有設定 Pod antiAffinity,所以有可能會啟三個 kafka 結果三個跑在同一台 node 上,這樣 node 故障就全死,沒有HA

Service & Endpoints

看一下 service 與 endpoints zookeeper 與 exporter 我們這邊先掠過不談,到專章講高可用性與服務監測時,再來討論。

1$ kubectl get service -l 'app=kafka'
2
3NAME               TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)    AGE
4kafka-1            ClusterIP   10.15.242.178   <none>        9092/TCP   26d
5kafka-1-headless   ClusterIP   None            <none>        9092/TCP   26d
6

兩個 services

  • 一個是 cluster-ip service,有 single cluster IP 與 load-balance,DNS 會過 kube-proxy。
  • 一個是 headless service,DNS 沒有過 kube-proxy,而是由 endpoint controller 直接 address record,指向把符合 service selector 的 pod。適合做 service discovery,不會依賴於 kubernetes 的實現。

詳細說明在官方文件

簡單來說,kafka broker 會做 auto service discovery,我們可以使用 headless service。

客戶端(consumer & producer) 連入時,則使用 cluster-ip service,做 load balancing。

1$ kubectl get endpoints -l 'app=kafka'
2
3NAME                            ENDPOINTS                                                          AGE
4kafka-1                         10.12.1.14:9092,10.12.5.133:9092,10.12.6.178:9092                  26d
5kafka-1-headless                10.12.1.14:9092,10.12.5.133:9092,10.12.6.178:9092                  26d

Golang Example

附上簡單的 Golang 客戶端,完整 Github Repository 在這邊

 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"strconv"
 7	"time"
 8
 9	"github.com/segmentio/kafka-go" // 使用的套件
10)
11
12func main() {
13	topic := "ticker" // 指定 message 要使用的 topic
14	partition := 0 // 指定 partition,由於底下連線指定連線到 partition 的 leader,所以需要指定 partition
15	kafkaURL := "kafka-0:9092" // 指定 kafkaURL,也可以透過 os.GetEnv() 從環境變數裡拿到。
16
17  // producer 對指定 topic, partition 的 leader 產生連線
18	producerConn, _ := kafka.DialLeader(context.Background(), "tcp", kafkaURL, topic, partition)
19  // 程式結束最後把 connection 關掉。不關會造成 broker 累積大量 connection,需要等待 broker 端 timeout 才會釋放。
20	defer producerConn.Close()
21
22	//producerConn.SetWriteDeadline(time.Now().Add(10 * time.Second))
23  // 使用 go routine 跑一個 subprocess for loop,一直產生 message 到 kafka topic,這邊的範例是每秒推一個秒數。
24	go func() {
25		for {
26			producerConn.WriteMessages(
27				kafka.Message{
28					Value: []byte(strconv.Itoa(time.Now().Second())),
29				},
30			)
31			time.Sleep(1 * time.Second)
32		}
33	}()
34
35	// make a new reader that consumes from topic-A, partition 0
36	r := kafka.NewReader(kafka.ReaderConfig{
37		Brokers:   []string{kafkaURL},
38		Topic:     topic,
39		Partition: 0,
40		MinBytes:  10e2, // 1KB
41		MaxBytes:  10e3, // 10KB
42	})
43	defer r.Close()
44	//r.SetOffset(42)
45
46  // 印出 reader 收到的 message
47	for {
48		m, err := r.ReadMessage(context.Background())
49		if err != nil {
50			break
51		}
52		fmt.Printf("%v message at offset %d: %s = %s\n", time.Now(), m.Offset, string(m.Key), string(m.Value))
53	}
54
55}

這邊可以使用 Dockerfile 包成一個 container image,然後丟上 kubernetes

我稍晚補一下 docker image 跟 deployment 方便大家操作好了。

或是攋人測試,直接 kubectl run 一個 golang base image 讓它 sleep,然後在連進去

1kubectl run DEPLOYMENT_NAME --image=golang:1.13.0-alpine3.10 sleep 3600
2
3kubectl exec -it POD_NAME sh
 1# 裡面沒有 Git 跟 vim 裝一下
 2apk add git vim
 3
 4go get github.com/chechiachang/kafka-on-kubernetes
 5
 6cd src/github.com/chechiachang/kafka-on-kubernetes/
 7vim main.go
 8
 9go build .
10./kafka-on-kubernetes
11
122019-09-24 14:20:46.872554693 +0000 UTC m=+9.154112787 message at offset 1:  = 46
132019-09-24 14:20:47.872563087 +0000 UTC m=+9.154121166 message at offset 2:  = 47
142019-09-24 14:20:48.872568848 +0000 UTC m=+9.154126926 message at offset 3:  = 48
152019-09-24 14:20:49.872574499 +0000 UTC m=+9.154132576 message at offset 4:  = 49
162019-09-24 14:20:50.872579957 +0000 UTC m=+9.154138032 message at offset 5:  = 50
172019-09-24 14:20:51.872588823 +0000 UTC m=+9.154146892 message at offset 6:  = 51
182019-09-24 14:20:52.872594672 +0000 UTC m=+9.154152748 message at offset 7:  = 52
192019-09-24 14:20:53.872599986 +0000 UTC m=+9.154158060 message at offset 8:  = 53

這樣就連上了,完成一個最簡單的使用範例。

這個例子太過簡單,上一篇講的 consumer group, partitions, offset 什麼設定全都沒用上。實務上這些都需要好好思考,並且依據需求做調整設定。

Clean up

把測試用的 deployment 幹掉

1kubectl delete deployment DEPLOYMENT_NAME

小結

  • 簡述 kafka 在 kubernetes 上運行的狀況,連線方法
  • Demo 一個小程式