kafka消费者API
Consumer消费数据时的可靠性是很容易保证的,因为数据在kafka中是持久化的,故不用担心数据丢失的问题。
由于consumer在消费过程中可能会出现断电宕机的等故障,consumer恢复后,需要从故障前的位置继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后可以继续消费。
所以,offset的维护是consumer消费数据必须考虑的问题。
依赖
org.apache.kafka kafka-clients 0.11.0.0
KafkaConsumer:创建一个kafka消费者对象,用来消费数据
ConsumerConfig:获取所需的一系列配置参数
ConsumerRecord:每条数据都要封装成ConsumerRecord对象
public class MyConsumer {public static void main(String[] args) {//创建配置信息Properties properties = new Properties();//配置信息赋值//连接kafka集群properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hll1:9092");//开启自动提交offsetproperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//自动提交offset的时间间隔properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");//key, value的反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "bigdata");//创建消费者KafkaConsumer consumer = new KafkaConsumer(properties);//订阅主题consumer.subscribe(Collections.singletonList("bigdata"));//循环不断拉取数据while (true) {ConsumerRecords records = consumer.poll(100);for (ConsumerRecord record : records) {System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value());}}}
}
通过生产者生产消息,之后在控制台可以看到:
如果启动消费者之后,控制台一直在kafka的日志,可以在resources目录下新创建logback.xml
文件,添加下面的代码,更改日志级别:
//消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "hll-group1");//重置消费者的offset,默认是latest
/*** 重新消费一个主题的数据需要满足条件:更换一个新的消费者组(或者offset过期),且配置auto.offset.reset=earliest* 配置earliest不等于offset就是0,因为之前的数据可能会被删除,offset就不是从0开始的*/
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
自动提交虽然十分便利,但是由于是基于时间提交的,开发人员难以把握offset提交的时机,配置时间过长容易造成服务等待时间太久,配置时间过短又可能会出现服务异常但offset又成功提交了。因此kafka提供了手动提交offset的API。
如果关闭自动提交offset,在消费者服务启动期间,消费暂时是正常的,消费者每次消费之后offset会更新到服务内存中,但是并没有通知kafka同步更新最新的offset,当重启消费者之后,会从kafka中获取在kafka最新的offset进行消费,这样就会造成重复消费
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
手动提交offset的两种方法:commitSync(同步提交)
和commitAsync(异步提交)
。两者的相同点是,都会将本次poll的一批数据最高的偏移量提交;不同点是,commitSync阻塞当前线程,一直到提交成功,并且会自动失败重试(由于不可控因素,会出现提交失败),而commitAsync则没有失败重试机制,也有可能提交失败。
同步提交有offset重试机制,会更加可靠
public class CustomConsumer {public static void main(String[] args) {//创建配置信息Properties properties = new Properties();//连接kafkaproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hll1:9092");//关闭自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//key, value的反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "hll-group");//创建消费者KafkaConsumer consumer = new KafkaConsumer(properties);//订阅主题consumer.subscribe(Collections.singletonList("bigdata"));//拉取数据while (true) {ConsumerRecords records = consumer.poll(100);for (ConsumerRecord record : records) {System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value());}//同步提交 当前线程会阻塞直到offset提交成功consumer.commitSync();}}
}
如果没有consumer.commitSync()
,生产者生产消息后,消费者消费完成后不会通知kafka同步更新offset,当重启消费者服务,会从kafka端的offset重新消费数据,会重复消费
虽然同步提交会更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响,所以在更多的情况下会选择异步提交offset
public class CustomConsumer {public static void main(String[] args) {//创建配置信息Properties properties = new Properties();//连接kafkaproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hll1:9092");//关闭自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//key, value的反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "hll-group");//创建消费者KafkaConsumer consumer = new KafkaConsumer(properties);//订阅主题consumer.subscribe(Collections.singletonList("bigdata"));//拉取数据while (true) {ConsumerRecords records = consumer.poll(100);for (ConsumerRecord record : records) {System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value());}//同步提交 当前线程会阻塞直到offset提交成功//consumer.commitSync();//异步提交consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map offsets, Exception exception) {if (exception != null) {System.out.println("提交失败:" + offsets);}}});}}
}
无论同步提交还是异步提交offset,都有可能会造成数据的丢失或者重复消费。先提交offset后消费,可能造成数据的丢失;先消费后提交offset,可能造成数据重复消费
待补充…