Kafka消费报错 Offset Commit Cannot Be Completed Since the Consumer Is Not Part of an Active Group for Auto Partition Assignment

1、kafka 消费报错

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
2020-12-02 23:04:32.290 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
        at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72)
        at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1378)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1085)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1134)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:999)
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1504)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2331)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2326)

2、kafka 版本 kafka_2.11-2.4.0

3、设置了手动 commit,配置代码如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Configuration
@EnableKafka
public class KafkaConfig {
    @Autowired
    private PropsConfig propsConfig;


    @Bean
    KafkaListenerContainerFactory<?> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        // 开启批量监听
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(propsConfig.getPollTimeout());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getServers());
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());
        propsMap.put(ConsumerConfig.CLIENT_ID_CONFIG, propsConfig.getClientId());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        // 如果没有offset则从最后的offset开始读
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        // 必须大于session.timeout.ms的设置
        propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000");
        // 默认为30秒
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        propsMap.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        //设置每次接收Message的数量
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MessageDeserializer.class.getName());
        return propsMap;
    }

}
# kafka配置
  kafka:
    bootstrap-servers: 192.168.54.150:9092
    consumer:
      topic: example_wk_new
      # 是否自动提交offset
      enable-auto-commit: false
      properties:
        group-id: example_wk_new
        pollTimeout: 3000
      client-id: example_wk_new_1

    listener:
      # 消费端监听的topic不存在时,项目启动会报错(关掉)
      missing-topics-fatal: false

如何解决:

1
2
3
4
5
6
这些异常一般通过加大客户端的超时时间,就可解决。

props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,60000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,60000);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,60000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,70000);
Licensed under CC BY-NC-SA 4.0
最后更新于 Jan 06, 2025 05:52 UTC
comments powered by Disqus
Built with Hugo
主题 StackJimmy 设计
Caret Up