1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
@Configuration
@EnableKafka
public class KafkaConfig {
@Autowired
private PropsConfig propsConfig;
@Bean
KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
// 开启批量监听
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(propsConfig.getPollTimeout());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getServers());
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());
propsMap.put(ConsumerConfig.CLIENT_ID_CONFIG, propsConfig.getClientId());
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// 如果没有offset则从最后的offset开始读
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// 必须大于session.timeout.ms的设置
propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "40000");
// 默认为30秒
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
propsMap.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
//设置每次接收Message的数量
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MessageDeserializer.class.getName());
return propsMap;
}
}
# kafka配置
kafka:
bootstrap-servers: 192.168.54.150:9092
consumer:
topic: example_wk_new
# 是否自动提交offset
enable-auto-commit: false
properties:
group-id: example_wk_new
pollTimeout: 3000
client-id: example_wk_new_1
listener:
# 消费端监听的topic不存在时,项目启动会报错(关掉)
missing-topics-fatal: false
|