【kafka】十五、kafka消费者API
创始人
2024-02-20 09:08:50
0

kafka消费者API

Consumer消费数据时的可靠性是很容易保证的,因为数据在kafka中是持久化的,故不用担心数据丢失的问题。

由于consumer在消费过程中可能会出现断电宕机的等故障,consumer恢复后,需要从故障前的位置继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后可以继续消费。

所以,offset的维护是consumer消费数据必须考虑的问题。

依赖

org.apache.kafkakafka-clients0.11.0.0

1.自动提交offset

KafkaConsumer:创建一个kafka消费者对象,用来消费数据

ConsumerConfig:获取所需的一系列配置参数

ConsumerRecord:每条数据都要封装成ConsumerRecord对象

public class MyConsumer {public static void main(String[] args) {//创建配置信息Properties properties = new Properties();//配置信息赋值//连接kafka集群properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hll1:9092");//开启自动提交offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动提交offset的时间间隔properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");//key, value的反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "bigdata");//创建消费者KafkaConsumer consumer = new KafkaConsumer(properties);//订阅主题consumer.subscribe(Collections.singletonList("bigdata"));//循环不断拉取数据while (true) {ConsumerRecords records = consumer.poll(100);for (ConsumerRecord record : records) {System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value());}}}
}

通过生产者生产消息,之后在控制台可以看到:

image-20220302222101359

如果启动消费者之后,控制台一直在kafka的日志,可以在resources目录下新创建logback.xml文件,添加下面的代码,更改日志级别:


2.重置offset

//消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "hll-group1");//重置消费者的offset,默认是latest
/*** 重新消费一个主题的数据需要满足条件:更换一个新的消费者组(或者offset过期),且配置auto.offset.reset=earliest* 配置earliest不等于offset就是0,因为之前的数据可能会被删除,offset就不是从0开始的*/
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

3.手动提交offset

自动提交虽然十分便利,但是由于是基于时间提交的,开发人员难以把握offset提交的时机,配置时间过长容易造成服务等待时间太久,配置时间过短又可能会出现服务异常但offset又成功提交了。因此kafka提供了手动提交offset的API。

如果关闭自动提交offset,在消费者服务启动期间,消费暂时是正常的,消费者每次消费之后offset会更新到服务内存中,但是并没有通知kafka同步更新最新的offset,当重启消费者之后,会从kafka中获取在kafka最新的offset进行消费,这样就会造成重复消费

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

手动提交offset的两种方法:commitSync(同步提交)commitAsync(异步提交)。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由于不可控因素,会出现提交失败),而commitAsync则没有失败重试机制,也有可能提交失败。

3.1 同步提交offset

同步提交有offset重试机制,会更加可靠

public class CustomConsumer {public static void main(String[] args) {//创建配置信息Properties properties = new Properties();//连接kafkaproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hll1:9092");//关闭自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//key, value的反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "hll-group");//创建消费者KafkaConsumer consumer = new KafkaConsumer(properties);//订阅主题consumer.subscribe(Collections.singletonList("bigdata"));//拉取数据while (true) {ConsumerRecords records = consumer.poll(100);for (ConsumerRecord record : records) {System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value());}//同步提交 当前线程会阻塞直到offset提交成功consumer.commitSync();}}
}

如果没有consumer.commitSync(),生产者生产消息后,消费者消费完成后不会通知kafka同步更新offset,当重启消费者服务,会从kafka端的offset重新消费数据,会重复消费

3.2异步提交offset

虽然同步提交会更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响,所以在更多的情况下会选择异步提交offset

public class CustomConsumer {public static void main(String[] args) {//创建配置信息Properties properties = new Properties();//连接kafkaproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hll1:9092");//关闭自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//key, value的反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "hll-group");//创建消费者KafkaConsumer consumer = new KafkaConsumer(properties);//订阅主题consumer.subscribe(Collections.singletonList("bigdata"));//拉取数据while (true) {ConsumerRecords records = consumer.poll(100);for (ConsumerRecord record : records) {System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value());}//同步提交 当前线程会阻塞直到offset提交成功//consumer.commitSync();//异步提交consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map offsets, Exception exception) {if (exception != null) {System.out.println("提交失败:" + offsets);}}});}}
}

无论同步提交还是异步提交offset,都有可能会造成数据的丢失或者重复消费。先提交offset后消费,可能造成数据的丢失;先消费后提交offset,可能造成数据重复消费

4.自定义存储offset

待补充…

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
苏州离哪个飞机场近(苏州离哪个... 本篇文章极速百科小编给大家谈谈苏州离哪个飞机场近,以及苏州离哪个飞机场近点对应的知识点,希望对各位有...
客厅放八骏马摆件可以吗(家里摆... 今天给各位分享客厅放八骏马摆件可以吗的知识,其中也会对家里摆八骏马摆件好吗进行解释,如果能碰巧解决你...