欢迎来到飞鸟慕鱼博客,开始您的技术之旅!
当前位置: 首页知识笔记正文

KafkaJava四Spring配置Kafka消费者提交Offset的策略

终极管理员 知识笔记 123阅读
一、Kafka消费者提交Offset的策略

Kafka消费者提交Offset的策略有

自动提交Offset 消费者将消息拉取下来以后未被消费者消费前直接自动提交offset。自动提交可能丢失数据比如消息在被消费者消费前已经提交了offset有可能消息拉取下来以后消费者挂了手动提交Offset 消费者在消费消息时/后再提交offset在消费者中实现手动提交Offset分为手动同步提交、手动异步提交什么是Offset 参考文章Linux【Kafka三】组件介绍 二、自动提交策略

        Kafka消费者默认是自动提交Offset的策略

        可设置自动提交的时间间隔

package com.demo.lxb.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;import java.util.Arrays;import java.util.Properties;/** * Description: kafka消费者消费消息,自动提交offset * Author: lvxiaobu * Date: 2023-10-24 16:26 **/public class MyConsumerAutoSubmitOffset {    private  final static String CONSUMER_GROUP_NAME  GROUP1;    private  final static String TOPIC_NAME  topic0921;    public static void main(String[] args) {        Properties props  new Properties();        // 一、设置参数        // 配置kafka地址//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,//                192.168.151.28:9092); // 单机配置        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,                192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094); // 集群配置        // 配置消息 键值的序列化规则        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());        // 配置消费者组        props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);        // 设置消费者offset的提交方式        // 自动提交默认配置        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);        // 自动提交offset的时间间隔        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);        // 二、创建消费者        KafkaConsumer<String,String> consumer  new KafkaConsumer<String,String>(props);        // 三、消费者订阅主题        consumer.subscribe(Arrays.asList(TOPIC_NAME));        // 四、拉取消息开始消费        while (true){            // 从kafka集群中拉取消息            ConsumerRecords<String, String> records  consumer.poll(Duration.ofMillis(1000));            // 消费消息,当前是自动提交模式,在消息上一行消息被拉取下来以后,offset就自动被提交了,下面的代码如果出错,或者此时            // 消费者挂掉了,那么消费其实是没有进行消费的(也就是业务逻辑处理)            for (ConsumerRecord<String, String> record : records) {                System.out.println(接收到的消息: 分区:   record.partition()  , offset:   record.offset()                 , key值:   record.key()   , value值: record.value());            }        }    }}

上述代码中的如下代码是自动提交策略的相关设置 

        // 设置消费者offset的提交方式        // 自动提交默认配置        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);        // 自动提交offset的时间间隔        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
三、手动提交策略

3.1、手动同步提交策略

        手动同步提交会在提交offset处阻塞。当消费者接收到 kafka集群返回的消费者提交offset成功的ack后才开始执行消费者中后续的代码。

        因为使用异步提交容易丢失消息固一般使用同步提交在同步提交后不要再做其他逻辑处理。

package com.demo.lxb.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;import java.util.Arrays;import java.util.Properties;/** * Description: kafka消费者消费消息,手动同步提交offset * Author: lvxiaobu * Date: 2023-10-24 16:26 **/public class MyConsumerMauSubmitOffset {    private  final static String CONSUMER_GROUP_NAME  GROUP1;    private  final static String TOPIC_NAME  topic0921;    public static void main(String[] args) {        Properties props  new Properties();        // 一、设置参数        // 配置kafka地址//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,//                192.168.151.28:9092); // 单机配置        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,                192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094); // 集群配置        // 配置消息 键值的序列化规则        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());        // 配置消费者组        props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);        // 设置消费者offset的提交方式        // 手动提交offset        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);        // 自动提交offset的时间间隔:此时不再需要设置该值//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);        // 二、创建消费者        KafkaConsumer<String,String> consumer  new KafkaConsumer<String,String>(props);        // 三、消费者订阅主题        consumer.subscribe(Arrays.asList(TOPIC_NAME));        // 四、拉取消息开始消费        while (true){            // 从kafka集群中拉取消息            ConsumerRecords<String, String> records  consumer.poll(Duration.ofMillis(1000));            //  业务逻辑处理            for (ConsumerRecord<String, String> record : records) {                System.out.println(接收到的消息: 分区:   record.partition()  , offset:   record.offset()                 , key值:   record.key()   , value值: record.value());            }            // 当for循环业务逻辑处理结束以后,再手动提交offset            // 同步方式提交,此时会产生阻塞,当kafka集群返回了提交成功的ack以后,才会消除阻塞,进行后续的代码逻辑。            // 一般使用同步提交在同步提交后不再做其他逻辑处理            consumer.commitAsync();            // do anything        }    }}

3.2、手动异步提交策略

        异步提交不会在提交offset代码处阻塞即消费者提交了offset后不需要等待kafka集群返回的ack即可继续执行后续代码。但是在提交offset时需要提供一个回调方法供kafka集群回调来告诉消费者提交offset的结果。

package com.demo.lxb.kafka;import com.alibaba.fastjson.JSONObject;import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration;import java.util.Arrays;import java.util.Map;import java.util.Properties;/** * Description: kafka消费者消费消息,手动异步提交offset * Author: lvxiaobu * Date: 2023-10-24 16:26 **/public class MyConsumerMauSubmitOffset2 {    private  final static String CONSUMER_GROUP_NAME  GROUP1;    private  final static String TOPIC_NAME  topic0921;    public static void main(String[] args) {        Properties props  new Properties();        // 一、设置参数        // 配置kafka地址//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,//                192.168.151.28:9092); // 单机配置        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,                192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094); // 集群配置        // 配置消息 键值的序列化规则        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());        // 配置消费者组        props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);        // 设置消费者offset的提交方式        // 手动提交offset        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);        // 自动提交offset的时间间隔:此时不再需要设置该值//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);        // 二、创建消费者        KafkaConsumer<String,String> consumer  new KafkaConsumer<String,String>(props);        // 三、消费者订阅主题        consumer.subscribe(Arrays.asList(TOPIC_NAME));        // 四、拉取消息开始消费        while (true){            // 从kafka集群中拉取消息            ConsumerRecords<String, String> records  consumer.poll(Duration.ofMillis(1000));                     for (ConsumerRecord<String, String> record : records) {                System.out.println(接收到的消息: 分区:   record.partition()  , offset:   record.offset()                 , key值:   record.key()   , value值: record.value());            }            // 异步提交,不影响后续的内容。            // new OffsetCommitCallback是kafka集群会回调的方法,告诉消费者提交offset的结果            consumer.commitAsync(new OffsetCommitCallback() {                Override                public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {                    if(e ! null){                        // 可将提交失败的消息记录到日志                        System.out.println(记录提交offset失败的消息到日志);                        System.out.println(消费者提交offset抛出异常:  Arrays.toString(e.getStackTrace()));                        System.out.println(消费者提交offset异常的消息信息:  JSONObject.toJSONString(map));                    }                }            });            // 后续逻辑处理,不需要等到kafka集群返回了提交成功的ack以后才开始处理。            //do anything        }    }}

标签:
声明:无特别说明,转载请标明本文来源!