Kafka
@Bean(name = "hrConsumerFactory")
public ConsumerFactory<Object, Object> hrConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hrDataPubKafkaBootStrap());
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60_000);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20_000);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10_000);
config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 819200);
config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10_000);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean(name = "hrConsumerContainerFactory")
public KafkaListenerContainerFactory hrConsumerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(hrConsumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
/**
* 监听kafka指定topic中对应的partitions
* @param records 接收到的消息
* 配置topic和分区:监听两个topic,分别为topic1、topic2,topic1只接收分区0,3的消息,
* topic2接收分区0和分区1的消息,但是分区1的消费者初始位置为5
*/
@KafkaListener(containerFactory = "hrConsumerContainerFactory",topicPartitions =
{ @TopicPartition(topic = "${spring.kafka.topic.topic1:action}", partitions = { "0","1", "2" }),
@TopicPartition(topic = "${spring.kafka.topic.topic2:data}", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4"))
},groupId = "${spring.kafka.consumer.group-id:gaiaworks}",concurrency = "${spring.kafka.concurrency:2}")
public void onMessage(List> records,Acknowledgment acknowledgment){
try {
logger.info("【接收到消息】[线程ID:{} 消息内容:{}]", Thread.currentThread(), records);
}catch (Exception exception){
logger.error(exception.getMessage(), exception);
}finally {
acknowledgment.acknowledge();
}
}
#application.properties
spring.kafka.producer.servers=127.0.0.1:9092
spring.kafka.consumer.servers=127.0.0.1:9092
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.topic.topic1=action
spring.kafka.topic.topic2=data
spring.kafka.consumer.group-id=gaiaworks-group
#消费者的数量,不要大于分区数
spring.kafka.concurrency=1
spring.kafka.listener.ack-mode=manual
spring.kafka.consumer.client-id=Leon