kafka搭建k8s上

1. 概述

这篇文章是使用 Helm 安装 ZooKeeper 和 Kafka 的记录,ZooKeeper 单独安装是因为别的软件也需要使用 ZooKeeper,比如 Clickhouse。

本次安装我使用的是 Bitnami 维护的仓库,详细的安装参见官方说明:

  1. Kafka
  2. ZooKeeper

安装前先说明下当前的环境,规划一下安装方案。

节点 角色 IP 配置 Label
master master, etcd 192.168.1.100 4 核 4G50G usefulness=schedule
node1 worker 192.168.1.101 8 核 8G100G usefulness=devops
node2 worker 192.168.1.102 8 核 8G100G usefulness=demo
node3 worker 192.168.1.103 8 核 8G100G usefulness=business

我计划将 Kafka 与 ZooKeeper 都安装在 node1 节点上,因此安装的时候需要对 Chart 默认的参数进行修改。

2. 安装

开始安装之前,需要先在 host 中写入测试域名,如下:

1
2
3
4
cat >> /etc/hosts <<EOF
192.168.1.101 kafka.local.com
192.168.1.101 zookeeper.local.com
EOF

2.1 ZooKeeper

helm 安装

这里我修改了下述配置:

  1. 节点数设置为 3
  2. 加上 nodeSelector 相关的配置即可,将 pod 部署在 node1 节点上
  3. service 类型修改为 NodePort,方便本地调试
1
2
3
4
5
6
7
replicaCount: 3
nodeSelector:
  usefulness: devops
service:
  type: NodePort
  nodePorts:
    client: 2181

上述内容保存为 zookeeper.yaml,使用如下命令进行安装:

1
2
3
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
helm install zookeeper bitnami/zookeeper -f zookeeper.yaml --namespace devops

安装完成后输出如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
root@master:~/k8s/traefik/zookeeper# helm install zookeeper bitnami/zookeeper -f zookeeper.yaml --namespace devops
NAME: zookeeper
LAST DEPLOYED: Sun Dec  5 16:17:53 2021
NAMESPACE: devops
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
CHART NAME: zookeeper
CHART VERSION: 7.4.13
APP VERSION: 3.7.0

** Please be patient while the chart is being deployed **

ZooKeeper can be accessed via port 2181 on the following DNS name from within your cluster:

    zookeeper.devops.svc.cluster.local

To connect to your ZooKeeper server run the following commands:

    export POD_NAME=$(kubectl get pods --namespace devops -l "app.kubernetes.io/name=zookeeper,app.kubernetes.io/instance=zookeeper,app.kubernetes.io/component=zookeeper" -o jsonpath="{.items[0].metadata.name}")
    kubectl exec -it $POD_NAME -- zkCli.sh

To connect to your ZooKeeper server from outside the cluster execute the following commands:

    export NODE_IP=$(kubectl get nodes --namespace devops -o jsonpath="{.items[0].status.addresses[0].address}")
    export NODE_PORT=$(kubectl get --namespace devops -o jsonpath="{.spec.ports[0].nodePort}" services zookeeper)
    zkCli.sh $NODE_IP:$NODE_PORT

从上述信息得知,内部访问 ZooKeeper 的 DNS 地址为 zookeeper.devops.svc.cluster.local:2181,后面两个命令是一个是连接 ZooKeeper 执行命令,另一个是外部访问的方法。

接着我们查看下部署好的资源信息:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
# 查看 StatefulSet
root@master:~/k8s/traefik# kubectl get statefulset -n devops -o wide
NAME        READY   AGE     CONTAINERS   IMAGES
zookeeper   3/3     8m10s   zookeeper    docker.io/bitnami/zookeeper:3.7.0-debian-10-r215

# 查看 Pod
root@master:~/k8s/traefik# kubectl get pod -n devops -o wide
NAME          READY   STATUS    RESTARTS   AGE    IP             NODE    NOMINATED NODE   READINESS GATES
zookeeper-0   1/1     Running   0          9m7s   10.233.90.19   node1   <none>           <none>
zookeeper-1   1/1     Running   0          9m7s   10.233.90.20   node1   <none>           <none>
zookeeper-2   1/1     Running   0          9m7s   10.233.90.18   node1   <none>           <none>

# 查看 Service
root@master:~/k8s/traefik/zookeeper# kubectl get svc -n devops -o wide
NAME                 TYPE        CLUSTER-IP    EXTERNAL-IP   PORT(S)                                       AGE   SELECTOR
zookeeper            NodePort    10.233.3.58   <none>        2181:2181/TCP,2888:12521/TCP,3888:26982/TCP   95s   app.kubernetes.io/component=zookeeper,app.kubernetes.io/instance=zookeeper,app.kubernetes.io/name=zookeeper
zookeeper-headless   ClusterIP   None          <none>        2181/TCP,2888/TCP,3888/TCP                    95s   app.kubernetes.io/component=zookeeper,app.kubernetes.io/instance=zookeeper,app.kubernetes.io/name=zookeeper
traefik

这里我们使用 traefik 将外部的访问流量打到 ZooKeeper 服务上,IngressRoute 清单如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
apiVersion: traefik.containo.us/v1alpha1
kind: IngressRoute
metadata:
  name: zookeeper
  namespace: devops
spec:
  entryPoints:
    - web
  routes:
    - match: Host(`zookeeper.local.com`)
      kind: Rule
      services:
        - name: zookeeper
          port: 2181

上述内容保存为 ingress-route-zookeeper.yaml,执行命令创建即可:

1
kubectl apply -f ingress-route-zookeeper.yaml

创建好后查看路由信息:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
root@master:~/k8s/traefik/zookeeper# kubectl get ingressroute -n devops
NAME        AGE
zookeeper   24s
root@master:~/k8s/traefik/zookeeper# kubectl describe ingressroute zookeeper -n devops
Name:         zookeeper
Namespace:    devops
Labels:       <none>
Annotations:  <none>
API Version:  traefik.containo.us/v1alpha1
Kind:         IngressRoute
Metadata:
  Creation Timestamp:  2021-12-05T07:54:05Z
  Generation:          1
  Managed Fields:
    API Version:  traefik.containo.us/v1alpha1
    Fields Type:  FieldsV1
    fieldsV1:
      f:metadata:
        f:annotations:
          .:
          f:kubectl.kubernetes.io/last-applied-configuration:
      f:spec:
        .:
        f:entryPoints:
        f:routes:
    Manager:         kubectl-client-side-apply
    Operation:       Update
    Time:            2021-12-05T07:54:05Z
  Resource Version:  341404
  UID:               95a50c4d-26e5-4835-875c-7fff6ff6d635
Spec:
  Entry Points:
    web
  Routes:
    Kind:   Rule
    Match:  Host(`zookeeper.local.com`)
    Services:
      Name:  zookeeper
      Port:  2181
Events:      <none>

2.2 Kafka

helm 安装

这里我修改了下述配置:

  1. 节点数设置为 3
  2. 加上 nodeSelector 相关的配置即可,将 pod 部署在 node1 节点上
  3. 允许删除主题
  4. service 类型修改为 NodePort,方便本地调试
  5. 禁用了内部 ZooKeeper,改为使用外部 ZooKeeper,也就是上面我们安装的 ZooKeeper
  6. 启用 k8s 集群外部访问,并分别配置了每一个节点对外访问的端口
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
replicaCount: 3
nodeSelector:
  usefulness: devops
deleteTopicEnable: true
service:
  type: NodePort
  nodePorts:
    client: 39092
zookeeper:
  enabled: false
externalZookeeper:
  servers: ['zookeeper.devops.svc.cluster.local:2181']
externalAccess:
  enabled: true
  service:
    type: NodePort
    nodePorts:
      - 19090
      - 19091
      - 19092
    useHostIPs: true

上述内容保存为 kafka.yaml,使用如下命令进行安装:

1
2
3
4
5
6
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
helm install kafka bitnami/kafka -f kafka.yaml --namespace devops
  helm install kafka -n zk --set zookeeper.enabled=false --set replicaCount=1  --set externalZookeeper.servers=zookeeper /root/kafka/
如何修改kafka
  helm upgrade kafka -n zk --set zookeeper.enabled=false --set replicaCount=3  --set externalZookeeper.servers=zookeeper /root/kafka/

安装完成后输出如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
root@master:~/k8s/traefik/kafka# helm install kafka bitnami/kafka -f kafka.yaml --namespace devops
NAME: kafka
LAST DEPLOYED: Sun Dec  5 17:10:59 2021
NAMESPACE: devops
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
CHART NAME: kafka
CHART VERSION: 14.4.3
APP VERSION: 2.8.1
---------------------------------------------------------------------------------------------
 WARNING

    By specifying "serviceType=LoadBalancer" and not configuring the authentication
    you have most likely exposed the Kafka service externally without any
    authentication mechanism.

    For security reasons, we strongly suggest that you switch to "ClusterIP" or
    "NodePort". As alternative, you can also configure the Kafka authentication.

---------------------------------------------------------------------------------------------

** Please be patient while the chart is being deployed **

Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster:

    kafka.devops.svc.cluster.local

Each Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster:

    kafka-0.kafka-headless.devops.svc.cluster.local:9092
    kafka-1.kafka-headless.devops.svc.cluster.local:9092
    kafka-2.kafka-headless.devops.svc.cluster.local:9092

To create a pod that you can use as a Kafka client run the following commands:

    kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:2.8.1-debian-10-r57 --namespace devops --command -- sleep infinity
    kubectl exec --tty -i kafka-client --namespace devops -- bash

    PRODUCER:
        kafka-console-producer.sh \

            --broker-list kafka-0.kafka-headless.devops.svc.cluster.local:9092,kafka-1.kafka-headless.devops.svc.cluster.local:9092,kafka-2.kafka-headless.devops.svc.cluster.local:9092 \
            --topic test

    CONSUMER:
        kafka-console-consumer.sh \

            --bootstrap-server kafka.devops.svc.cluster.local:9092 \
            --topic test \
            --from-beginning

To connect to your Kafka server from outside the cluster, follow the instructions below:

    Kafka brokers domain: You can get the external node IP from the Kafka configuration file with the following commands (Check the EXTERNAL listener)

        1. Obtain the pod name:

        kubectl get pods --namespace devops -l "app.kubernetes.io/name=kafka,app.kubernetes.io/instance=kafka,app.kubernetes.io/component=kafka"

        2. Obtain pod configuration:

        kubectl exec -it KAFKA_POD -- cat /opt/bitnami/kafka/config/server.properties | grep advertised.listeners

    Kafka brokers port: You will have a different node port for each Kafka broker. You can get the list of configured node ports using the command below:

        echo "$(kubectl get svc --namespace devops -l "app.kubernetes.io/name=kafka,app.kubernetes.io/instance=kafka,app.kubernetes.io/component=kafka,pod" -o jsonpath='{.items[*].spec.ports[0].nodePort}' | tr ' ' '\n')"

从上面输出内容可以知晓,k8s 内部访问 kafka 集群的 DNS 地址为 kafka.devops.svc.cluster.local:9092,三个 Broker 的内部访问地址分别如下:

  • kafka-0.kafka-headless.devops.svc.cluster.local:9092

  • kafka-1.kafka-headless.devops.svc.cluster.local:9092

  • kafka-2.kafka-headless.devops.svc.cluster.local:9092

    image-20230423101053959

3. 测试

3.1 ZooKeeper

测试代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 连接 ZooKeeper 集群
hosts := []string{"zookeeper.local.com"}
option := zk.WithEventCallback(
  func(event zk.Event) {
    b, _ := json.Marshal(event)
    fmt.Printf("[all event] %s\n", b)
  },
)
conn, _, err := zk.Connect(hosts, time.Second*5, option)
if err != nil {
  fmt.Printf("[connect zookeeper server error] %v\n", err)
  return
}
defer conn.Close()

fmt.Println("查看根节点下子节点")
nodes, stat, err := conn.Children("/")
fmt.Printf("[root children] %+v, %+v, %v\n", nodes, stat, err)

acl := zk.WorldACL(zk.PermAll)
fmt.Println("-----------------------------------")
fmt.Println("添加golang节点")
res, err := conn.Create("/golang", []byte("this is golang znode"), ZKFlagPersistent, acl)
if err != nil {
  fmt.Printf("create znode [%s] error: %v\n", "/golang", err)
  return
}
fmt.Printf("create znode [%s] success: %v\n", "/golang", res)

fmt.Println("-----------------------------------")
fmt.Println("再次查看根节点下子节点")
nodes, stat, err = conn.Children("/")
fmt.Printf("[root children] %+v, %+v, %v\n", nodes, stat, err)

测试结果如下,可以看到查看和创建都正常:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
查看根节点下子节点
[all event] {"Type":-1,"State":1,"Path":"","Err":null,"Server":"192.168.1.101:2181"}
[all event] {"Type":-1,"State":100,"Path":"","Err":null,"Server":"192.168.1.101:2181"}
2021/12/05 16:23:44 Connected to 192.168.1.101:2181
[all event] {"Type":-1,"State":101,"Path":"","Err":null,"Server":"192.168.1.101:2181"}
2021/12/05 16:23:44 authenticated: id=216182479996452864, timeout=5000
2021/12/05 16:23:44 re-submitting `0` credentials after reconnect
[root children] [zookeeper], &{Czxid:0 Mzxid:0 Ctime:0 Mtime:0 Version:0 Cversion:-1 Aversion:0 EphemeralOwner:0 DataLength:0 NumChildren:1 Pzxid:0}, <nil>
-----------------------------------
添加golang节点
create znode [/golang] success: /golang
-----------------------------------
再次查看根节点下子节点
[root children] [zookeeper golang], &{Czxid:0 Mzxid:0 Ctime:0 Mtime:0 Version:0 Cversion:0 Aversion:0 EphemeralOwner:0 DataLength:0 NumChildren:2 Pzxid:8589934594}, <nil>

3.2 Kafka

3.2.1 主题

首先我们需要创建一个名为 test_topic 的主题,分区数设为 3,副本数设为 2,创建之后获取所有的主题信息:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"os"

	"github.com/Shopify/sarama"
)

func main() {
	servers := []string{
		"kafka.local.com:19090",
		"kafka.local.com:19091",
		"kafka.local.com:19092",
	}
	topic := "test_topic"
	sarama.Logger = log.New(os.Stderr, "[Sarama] ", log.LstdFlags)

	cfg := sarama.NewConfig()
	cfg.Version = sarama.V2_8_0_0

	// 连接
	admin, err := sarama.NewClusterAdmin(servers, cfg)
	if err != nil {
		panic(err)
	}
	defer admin.Close()

	// 创建Topic
	fmt.Println("----------------------------------")
	fmt.Println("创建Topic")
	err = admin.CreateTopic(
		topic, &sarama.TopicDetail{
			NumPartitions:     3,
			ReplicationFactor: 2,
		}, false,
	)
	if err != nil {
		panic(err)
	}

	// 获取所有Topic
	fmt.Println("----------------------------------")
	fmt.Println("获取Topic列表")
	res, err := admin.ListTopics()
	if err != nil {
		panic(err)
	}
	for topic, detail := range res {
		b, _ := json.Marshal(detail)
		fmt.Printf("topic: %s, detail: %s\n", topic, b)
	}
}

执行结果如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
[Sarama] 2021/12/05 18:41:22 Initializing new client
[Sarama] 2021/12/05 18:41:22 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:41:22 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:41:22 client/metadata fetching metadata for all topics from broker kafka.local.com:19091
[Sarama] 2021/12/05 18:41:22 Connected to broker at kafka.local.com:19091 (unregistered)
[Sarama] 2021/12/05 18:41:22 client/brokers registered new broker #0 at 192.168.1.101:19090
[Sarama] 2021/12/05 18:41:22 client/brokers registered new broker #2 at 192.168.1.101:19092
[Sarama] 2021/12/05 18:41:22 client/brokers registered new broker #1 at 192.168.1.101:19091
[Sarama] 2021/12/05 18:41:22 Successfully initialized new client
[Sarama] 2021/12/05 18:41:22 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
----------------------------------
[Sarama] 2021/12/05 18:41:22 Connected to broker at 192.168.1.101:19090 (registered as #0)
创建Topic
----------------------------------
获取Topic列表
[Sarama] 2021/12/05 18:41:23 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:41:23 Connected to broker at 192.168.1.101:19091 (registered as #1)
topic: test_topic, detail: {"NumPartitions":3,"ReplicationFactor":2,"ReplicaAssignment":{"0":[0,1],"1":[2,0],"2":[1,2]},"ConfigEntries":{"flush.messages":"10000","flush.ms":"1000","max.message.bytes":"1000012","retention.bytes":"1073741824","segment.bytes":"1073741824"}}
topic: __consumer_offsets, detail: {"NumPartitions":50,"ReplicationFactor":1,"ReplicaAssignment":{"0":[2],"1":[1],"10":[1],"11":[0],"12":[2],"13":[1],"14":[0],"15":[2],"16":[1],"17":[0],"18":[2],"19":[1],"2":[0],"20":[0],"21":[2],"22":[1],"23":[0],"24":[2],"25":[1],"26":[0],"27":[2],"28":[1],"29":[0],"3":[2],"30":[2],"31":[1],"32":[0],"33":[2],"34":[1],"35":[0],"36":[2],"37":[1],"38":[0],"39":[2],"4":[1],"40":[1],"41":[0],"42":[2],"43":[1],"44":[0],"45":[2],"46":[1],"47":[0],"48":[2],"49":[1],"5":[0],"6":[2],"7":[1],"8":[0],"9":[2]},"ConfigEntries":{"cleanup.policy":"compact","compression.type":"producer","flush.messages":"10000","flush.ms":"1000","max.message.bytes":"1000012","retention.bytes":"1073741824","segment.bytes":"104857600"}}
[Sarama] 2021/12/05 18:41:23 Closing Client
3.2.2 生产者

这里启动了 3 个 Goroutine,分别往 test_topic 的三个分区各自写入 10 条消息,消息格式为 [分区号-消息号]。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
	"fmt"
	"log"
	"os"
	"sync"

	"github.com/Shopify/sarama"
)

func main() {
	servers := []string{
		"kafka.local.com:19090",
		"kafka.local.com:19091",
		"kafka.local.com:19092",
	}
	topic := "test_topic"
	sarama.Logger = log.New(os.Stderr, "[Sarama] ", log.LstdFlags)
	cfg := sarama.NewConfig()
	cfg.Producer.RequiredAcks = sarama.WaitForAll
	cfg.Producer.Return.Successes = true
	cfg.Producer.Partitioner = sarama.NewManualPartitioner

	// 生产消息
	producer, err := sarama.NewSyncProducer(servers, cfg)
	if err != nil {
		panic(err)
	}
	defer producer.Close()
	wg := sync.WaitGroup{}
	for p := 0; p < 3; p++ {
		wg.Add(1)
		go func(p int32) {
			defer wg.Done()
			for i := 0; i < 10; i++ {
				// 消息
				msg := &sarama.ProducerMessage{}
				msg.Topic = topic
				msg.Partition = p
				msg.Value = sarama.StringEncoder(fmt.Sprintf("this is test message: %d-%d", p, i))
				_, _, _ = producer.SendMessage(msg)
				fmt.Printf("produce message [%s] to [Partition %d] success.\n", msg.Value, p)
			}
		}(int32(p))
	}
	wg.Wait()
}

执行结果如下,可以看到消息已经正常写入主题 test_topic 对应的分区。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
[Sarama] 2021/12/05 18:47:05 Initializing new client
[Sarama] 2021/12/05 18:47:05 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:47:05 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:47:05 client/metadata fetching metadata for all topics from broker kafka.local.com:19090
[Sarama] 2021/12/05 18:47:05 Connected to broker at kafka.local.com:19090 (unregistered)
[Sarama] 2021/12/05 18:47:05 client/brokers registered new broker #0 at 192.168.1.101:19090
[Sarama] 2021/12/05 18:47:05 client/brokers registered new broker #2 at 192.168.1.101:19092
[Sarama] 2021/12/05 18:47:05 client/brokers registered new broker #1 at 192.168.1.101:19091
[Sarama] 2021/12/05 18:47:05 Successfully initialized new client
[Sarama] 2021/12/05 18:47:05 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:47:05 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:47:05 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:47:05 producer/broker/1 starting up
[Sarama] 2021/12/05 18:47:05 producer/broker/2 starting up
[Sarama] 2021/12/05 18:47:05 Connected to broker at 192.168.1.101:19092 (registered as #2)
[Sarama] 2021/12/05 18:47:05 producer/broker/1 state change to [open] on test_topic/2
[Sarama] 2021/12/05 18:47:05 producer/broker/0 starting up
[Sarama] 2021/12/05 18:47:05 producer/broker/2 state change to [open] on test_topic/1
[Sarama] 2021/12/05 18:47:05 producer/broker/0 state change to [open] on test_topic/0
[Sarama] 2021/12/05 18:47:05 Connected to broker at 192.168.1.101:19091 (registered as #1)
[Sarama] 2021/12/05 18:47:05 Connected to broker at 192.168.1.101:19090 (registered as #0)
produce message [this is test message: 1-0] to [Partition 1] success.
produce message [this is test message: 1-1] to [Partition 1] success.
produce message [this is test message: 1-2] to [Partition 1] success.
produce message [this is test message: 1-3] to [Partition 1] success.
produce message [this is test message: 0-0] to [Partition 0] success.
produce message [this is test message: 0-1] to [Partition 0] success.
produce message [this is test message: 1-4] to [Partition 1] success.
produce message [this is test message: 2-0] to [Partition 2] success.
produce message [this is test message: 0-2] to [Partition 0] success.
produce message [this is test message: 1-5] to [Partition 1] success.
produce message [this is test message: 0-3] to [Partition 0] success.
produce message [this is test message: 1-6] to [Partition 1] success.
produce message [this is test message: 2-1] to [Partition 2] success.
produce message [this is test message: 0-4] to [Partition 0] success.
produce message [this is test message: 1-7] to [Partition 1] success.
produce message [this is test message: 2-2] to [Partition 2] success.
produce message [this is test message: 1-8] to [Partition 1] success.
produce message [this is test message: 0-5] to [Partition 0] success.
produce message [this is test message: 2-3] to [Partition 2] success.
produce message [this is test message: 1-9] to [Partition 1] success.
produce message [this is test message: 0-6] to [Partition 0] success.
produce message [this is test message: 2-4] to [Partition 2] success.
produce message [this is test message: 0-7] to [Partition 0] success.
produce message [this is test message: 2-5] to [Partition 2] success.
produce message [this is test message: 0-8] to [Partition 0] success.
produce message [this is test message: 2-6] to [Partition 2] success.
produce message [this is test message: 0-9] to [Partition 0] success.
produce message [this is test message: 2-7] to [Partition 2] success.
produce message [this is test message: 2-8] to [Partition 2] success.
produce message [this is test message: 2-9] to [Partition 2] success.
[Sarama] 2021/12/05 18:47:06 Producer shutting down.
[Sarama] 2021/12/05 18:47:06 Closing Client
[Sarama] 2021/12/05 18:47:06 producer/broker/1 input chan closed
[Sarama] 2021/12/05 18:47:06 producer/broker/1 shut down
[Sarama] 2021/12/05 18:47:06 producer/broker/2 input chan closed
[Sarama] 2021/12/05 18:47:06 producer/broker/2 shut down
[Sarama] 2021/12/05 18:47:06 producer/broker/0 input chan closed
[Sarama] 2021/12/05 18:47:06 producer/broker/0 shut down
3.2.3 消费者

接下来启动三个分区消费者,分别读取各个分区的消息,代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
	"fmt"
	"log"
	"os"

	"github.com/Shopify/sarama"
)

func main() {
	servers := []string{
		"kafka.local.com:19090",
		"kafka.local.com:19091",
		"kafka.local.com:19092",
	}
	topic := "test_topic"
	sarama.Logger = log.New(os.Stderr, "[Sarama] ", log.LstdFlags)

	consumer, _ := sarama.NewConsumer(servers, nil)
	partitions, _ := consumer.Partitions(topic)
	for partition := range partitions {
		go func(partition int32) {
			pc, _ := consumer.ConsumePartition(topic, partition, sarama.OffsetOldest)
			for msg := range pc.Messages() {
				fmt.Printf(
					"PartitionConsumer [%d] consume one message: Partition=%d, Offset=%d, Key=%v, Value=%s\n",
					partition, msg.Partition,
					msg.Offset, msg.Key, msg.Value,
				)
			}
		}(int32(partition))
	}
	select {}
}

执行结果如下,可以看到各个分区的消费者已经争取消费了各自的 10 条数据:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
[Sarama] 2021/12/05 18:54:17 Initializing new client
[Sarama] 2021/12/05 18:54:17 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:54:17 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:54:17 client/metadata fetching metadata for all topics from broker kafka.local.com:19092
[Sarama] 2021/12/05 18:54:17 Connected to broker at kafka.local.com:19092 (unregistered)
[Sarama] 2021/12/05 18:54:17 client/brokers registered new broker #0 at 192.168.1.101:19090
[Sarama] 2021/12/05 18:54:17 client/brokers registered new broker #2 at 192.168.1.101:19092
[Sarama] 2021/12/05 18:54:17 client/brokers registered new broker #1 at 192.168.1.101:19091
[Sarama] 2021/12/05 18:54:17 Successfully initialized new client
[Sarama] 2021/12/05 18:54:17 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:54:17 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:54:17 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
[Sarama] 2021/12/05 18:54:17 Connected to broker at 192.168.1.101:19090 (registered as #0)
[Sarama] 2021/12/05 18:54:17 Connected to broker at 192.168.1.101:19092 (registered as #2)
[Sarama] 2021/12/05 18:54:17 Connected to broker at 192.168.1.101:19091 (registered as #1)
[Sarama] 2021/12/05 18:54:17 consumer/broker/0 added subscription to test_topic/0
[Sarama] 2021/12/05 18:54:17 consumer/broker/1 added subscription to test_topic/2
[Sarama] 2021/12/05 18:54:17 consumer/broker/2 added subscription to test_topic/1
PartitionConsumer [0] consume one message: Partition=0, Offset=0, Key=[], Value=this is test message: 0-0
PartitionConsumer [0] consume one message: Partition=0, Offset=1, Key=[], Value=this is test message: 0-1
PartitionConsumer [0] consume one message: Partition=0, Offset=2, Key=[], Value=this is test message: 0-2
PartitionConsumer [0] consume one message: Partition=0, Offset=3, Key=[], Value=this is test message: 0-3
PartitionConsumer [0] consume one message: Partition=0, Offset=4, Key=[], Value=this is test message: 0-4
PartitionConsumer [0] consume one message: Partition=0, Offset=5, Key=[], Value=this is test message: 0-5
PartitionConsumer [0] consume one message: Partition=0, Offset=6, Key=[], Value=this is test message: 0-6
PartitionConsumer [0] consume one message: Partition=0, Offset=7, Key=[], Value=this is test message: 0-7
PartitionConsumer [0] consume one message: Partition=0, Offset=8, Key=[], Value=this is test message: 0-8
PartitionConsumer [0] consume one message: Partition=0, Offset=9, Key=[], Value=this is test message: 0-9
PartitionConsumer [1] consume one message: Partition=1, Offset=0, Key=[], Value=this is test message: 1-0
PartitionConsumer [1] consume one message: Partition=1, Offset=1, Key=[], Value=this is test message: 1-1
PartitionConsumer [1] consume one message: Partition=1, Offset=2, Key=[], Value=this is test message: 1-2
PartitionConsumer [1] consume one message: Partition=1, Offset=3, Key=[], Value=this is test message: 1-3
PartitionConsumer [1] consume one message: Partition=1, Offset=4, Key=[], Value=this is test message: 1-4
PartitionConsumer [1] consume one message: Partition=1, Offset=5, Key=[], Value=this is test message: 1-5
PartitionConsumer [1] consume one message: Partition=1, Offset=6, Key=[], Value=this is test message: 1-6
PartitionConsumer [1] consume one message: Partition=1, Offset=7, Key=[], Value=this is test message: 1-7
PartitionConsumer [1] consume one message: Partition=1, Offset=8, Key=[], Value=this is test message: 1-8
PartitionConsumer [1] consume one message: Partition=1, Offset=9, Key=[], Value=this is test message: 1-9
PartitionConsumer [2] consume one message: Partition=2, Offset=0, Key=[], Value=this is test message: 2-0
PartitionConsumer [2] consume one message: Partition=2, Offset=1, Key=[], Value=this is test message: 2-1
PartitionConsumer [2] consume one message: Partition=2, Offset=2, Key=[], Value=this is test message: 2-2
PartitionConsumer [2] consume one message: Partition=2, Offset=3, Key=[], Value=this is test message: 2-3
PartitionConsumer [2] consume one message: Partition=2, Offset=4, Key=[], Value=this is test message: 2-4
PartitionConsumer [2] consume one message: Partition=2, Offset=5, Key=[], Value=this is test message: 2-5
PartitionConsumer [2] consume one message: Partition=2, Offset=6, Key=[], Value=this is test message: 2-6
PartitionConsumer [2] consume one message: Partition=2, Offset=7, Key=[], Value=this is test message: 2-7
PartitionConsumer [2] consume one message: Partition=2, Offset=8, Key=[], Value=this is test message: 2-8
PartitionConsumer [2] consume one message: Partition=2, Offset=9, Key=[], Value=this is test message: 2-9

4. 总结

本篇文章使用 helm 安装了 ZooKeeper 和 Kafka,并且安装完成后使用 golang 进行了测试,测试结果都合预期一样。

参考文档:

https://blog.lerzen.com/post/helm%E5%AE%89%E8%A3%85zookeeper%E5%92%8Ckafka/

Licensed under CC BY-NC-SA 4.0
最后更新于 Jan 06, 2025 05:52 UTC
comments powered by Disqus
Built with Hugo
主题 StackJimmy 设计
Caret Up