Kafka


Kafka

//Kafka工厂配置
@Bean(name = "hrConsumerFactory")
    public ConsumerFactory<Object, Object> hrConsumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, hrDataPubKafkaBootStrap());
        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60_000);
        config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 20_000);
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10_000);
        config.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 819200);
        config.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10_000);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        return new DefaultKafkaConsumerFactory<>(config);
    }
//Kafka创建工厂
@Bean(name = "hrConsumerContainerFactory")
    public KafkaListenerContainerFactory hrConsumerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(hrConsumerFactory());
        //监听设置分区数量 ?默认为1
        //factory.setConcurrency(concurrency);
        //打开批量拉取数据
        factory.setBatchListener(true);
        //配置手动提交offset
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
/**
     * 监听kafka指定topic中对应的partitions
     * @param records 接收到的消息
     * 配置topic和分区:监听两个topic,分别为topic1、topic2,topic1只接收分区0,3的消息,
     * topic2接收分区0和分区1的消息,但是分区1的消费者初始位置为5
     */
    @KafkaListener(containerFactory = "hrConsumerContainerFactory",topicPartitions =
            { @TopicPartition(topic = "${spring.kafka.topic.topic1:action}", partitions = { "0","1", "2" }),
                    @TopicPartition(topic = "${spring.kafka.topic.topic2:data}", partitions = "0",
                            partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4"))
            },groupId = "${spring.kafka.consumer.group-id:gaiaworks}",concurrency = "${spring.kafka.concurrency:2}")
    public void onMessage(List> records,Acknowledgment acknowledgment){
        try {
            logger.info("【接收到消息】[线程ID:{} 消息内容:{}]", Thread.currentThread(), records);

        }catch (Exception exception){
            logger.error(exception.getMessage(), exception);
        }finally {
            acknowledgment.acknowledge();
        }
    }
#application.properties
spring.kafka.producer.servers=127.0.0.1:9092
spring.kafka.consumer.servers=127.0.0.1:9092
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.topic.topic1=action
spring.kafka.topic.topic2=data
spring.kafka.consumer.group-id=gaiaworks-group
#消费者的数量,不要大于分区数
spring.kafka.concurrency=1
spring.kafka.listener.ack-mode=manual
spring.kafka.consumer.client-id=Leon

文章作者: 斓龙
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 斓龙 !
  目录