在 Kafka Version 为 0.11.0.0 之后,Consumer 的 Offset 信息不再默认保存在 Zookeeper 上,而是选择用 Topic 的形式保存下来。
在命令行中可以使用 kafka-consumer-groups 的脚本实现 Offset 的相关操作。
更新 Offset 由三个维度决定:Topic 的作用域、重置策略、执行方案。
Topic 的作用域
--all-topics
:为 consumer group 下所有 topic 的所有分区调整位移)
--topic t1 --topic t2
:为指定的若干个 topic 的所有分区调整位移
--topic t1:0,1,2
:为指定的 topic 分区调整位移
重置策略
--to-earliest
:把位移调整到分区当前最小位移
--to-latest
:把位移调整到分区当前最新位移
--to-current
:把位移调整到分区当前位移
--to-offset <offset>
: 把位移调整到指定位移处
--shift-by N
: 把位移调整到当前位移 + N 处,注意 N 可以是负数,表示向前移动
--to-datetime <datetime>
:把位移调整到大于给定时间的最早位移处,datetime 格式是 yyyy-MM-ddTHH:mm:ss.xxx,比如 2017-08-04T00:00:00.000
--by-duration <duration>
:把位移调整到距离当前时间指定间隔的位移处,duration 格式是 PnDTnHnMnS,比如 PT0H5M0S
--from-file <file>
:从 CSV 文件中读取调整策略
确定执行方案
- 什么参数都不加:只是打印出位移调整方案,不具体执行
--execute
:执行真正的位移调整
--export
:把位移调整方案按照 CSV 格式打印,方便用户成 csv 文件,供后续直接使用
注意事项
- consumer group 状态必须是 inactive 的,即不能是处于正在工作中的状态
- 不加执行方案,默认是只做打印操作
常用示例
更新到当前 group 最初的 offset 位置
1
|
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group kafka_json_monitor_consumer_dev --reset-offsets --all-topics --to-latest --execute
|
更新到指定的 offset 位置
1
|
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-offset 500000 --execute
|
更新到当前 offset 位置(解决 offset 的异常)
1
|
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group kafka_json_monitor_consumer_dev --reset-offsets --all-topics --to-current --execute
|
复制
offset 位置按设置的值进行位移
1
|
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --shift-by -100000 --execute
|
offset 设置到指定时刻开始
1
|
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-datetime 2017-08-04T14:30:00.000
|
可以使用 kafka 提供的命令行工具 kafka-consumer-groups.sh 来查看所有 group 的信息,包括每个 group 消费的 topic 和分区信息、当前消费的偏移量、消费者的客户端 ID 等。
以下是具体的命令:
1
2
|
cssCopy code
./kafka-consumer-groups.sh --bootstrap-server <kafka-broker> --list
|
该命令将列出所有当前存在的消费者组名称。
1
|
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups
|
该命令将列出所有消费者组的详细信息,包括消费者组 ID、状态、成员数量、订阅的 topic 和分区信息、当前消费的偏移量等。
如果想要查看某个特定 group 的信息,可以将–all-groups 改为–group 并指定相应的 group 名称。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic kafka_json_monitor --partitions 10
kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic dwd_trace2 --partitions 9
kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic es_write1 --partitions 9
kafka-topics.sh --alter --bootstrap-server localhost:9092 --topic ue_latitude_longitude_topic --partitions 3
#kafka消费组
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups
#创建kafka的topic
kafka-topics.sh --create --topic test_process1 --replication-factor 1 --partitions 1 --bootstrap-server localhost:9092
|