kafka命令行查询数据

在 Kafka Version 为 0.11.0.0 之后,Consumer 的 Offset 信息不再默认保存在 Zookeeper 上,而是选择用 Topic 的形式保存下来。

在命令行中可以使用 kafka-consumer-groups 的脚本实现 Offset 的相关操作。

更新 Offset 由三个维度决定:Topic 的作用域、重置策略、执行方案。

Topic 的作用域

  • --all-topics:为 consumer group 下所有 topic 的所有分区调整位移)
  • --topic t1 --topic t2:为指定的若干个 topic 的所有分区调整位移
  • --topic t1:0,1,2:为指定的 topic 分区调整位移

重置策略

  • --to-earliest:把位移调整到分区当前最小位移
  • --to-latest:把位移调整到分区当前最新位移
  • --to-current:把位移调整到分区当前位移
  • --to-offset <offset>: 把位移调整到指定位移处
  • --shift-by N: 把位移调整到当前位移 + N 处,注意 N 可以是负数,表示向前移动
  • --to-datetime <datetime>:把位移调整到大于给定时间的最早位移处,datetime 格式是 yyyy-MM-ddTHH:mm:ss.xxx,比如 2017-08-04T00:00:00.000
  • --by-duration <duration>:把位移调整到距离当前时间指定间隔的位移处,duration 格式是 PnDTnHnMnS,比如 PT0H5M0S
  • --from-file <file>:从 CSV 文件中读取调整策略

确定执行方案

  • 什么参数都不加:只是打印出位移调整方案,不具体执行
  • --execute:执行真正的位移调整
  • --export:把位移调整方案按照 CSV 格式打印,方便用户成 csv 文件,供后续直接使用

注意事项

  1. consumer group 状态必须是 inactive 的,即不能是处于正在工作中的状态
  2. 不加执行方案,默认是只做打印操作

常用示例

更新到当前 group 最初的 offset 位置

1
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group kafka_json_monitor_consumer_dev --reset-offsets --all-topics --to-latest --execute

更新到指定的 offset 位置

1
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-offset 500000 --execute

更新到当前 offset 位置(解决 offset 的异常)

1
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group kafka_json_monitor_consumer_dev --reset-offsets --all-topics --to-current --execute

复制

offset 位置按设置的值进行位移

1
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --shift-by -100000 --execute

offset 设置到指定时刻开始

1
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-datetime 2017-08-04T14:30:00.000

可以使用 kafka 提供的命令行工具 kafka-consumer-groups.sh 来查看所有 group 的信息,包括每个 group 消费的 topic 和分区信息、当前消费的偏移量、消费者的客户端 ID 等。

以下是具体的命令:

1
2
cssCopy code
./kafka-consumer-groups.sh --bootstrap-server <kafka-broker> --list

该命令将列出所有当前存在的消费者组名称。

1
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups

该命令将列出所有消费者组的详细信息,包括消费者组 ID、状态、成员数量、订阅的 topic 和分区信息、当前消费的偏移量等。

如果想要查看某个特定 group 的信息,可以将–all-groups 改为–group 并指定相应的 group 名称。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
kafka-topics.sh --alter  --bootstrap-server  localhost:9092  --topic  kafka_json_monitor --partitions 10

kafka-topics.sh --alter  --bootstrap-server  localhost:9092  --topic  dwd_trace2 --partitions 9

kafka-topics.sh --alter  --bootstrap-server  localhost:9092  --topic  es_write1 --partitions 9



kafka-topics.sh --alter  --bootstrap-server  localhost:9092  --topic  ue_latitude_longitude_topic --partitions 3


#kafka消费组
kafka-consumer-groups.sh --bootstrap-server localhost:9092    --describe --all-groups


#创建kafka的topic
kafka-topics.sh --create --topic  test_process1 --replication-factor 1 --partitions 1 --bootstrap-server localhost:9092
Licensed under CC BY-NC-SA 4.0
最后更新于 Jan 06, 2025 05:52 UTC
comments powered by Disqus
Built with Hugo
主题 StackJimmy 设计
Caret Up