在使用kafka发送消息时,producer端需要序列化,在大多数场景中, 需要传输的是与业务规则相关的复杂类型, 这就需要自定义数据结构。 Avro是一种序列化框架, 使用JSON来定义schema, sh cema由原始类型(null, boolean, int, long, float, double, bytes, string) 和复杂类型 (record, enum, array, map, union, fixed) 组成, schema文件以.avsc结尾, 表示avro schema
有2种序列化方式
序列化后的数据号schema和data同时存在的, 如下图、
props.put("key.serializer",StringSerializer.class.getName());
props.put("value.serializer",StringSerializer.class.getName());
props.put("key.deserializer",StringDeserializer.class.getName());
props.put("value.deserializer",StringDeserializer.class.getName());
KafkaSourceBuilder 类提供了两个方法来反序列数据,分别是 setDeserializer 和 setValueOnlyDeserializer
从名字上就应该可以看出这两者的区别,前者是反序列化完整的 ConsumerRecord,后者只反序列化 ConsumerRecord 的 value.
如果你想要获取 kafka 的元数据信息选择实现 KafkaDeserializationSchema 接口就可以了,KafkaDeserializationSchema 接口还有 4 个静态方法,其中的 of 方法就是用来反序列化 ConsumerRecord 的,剩下的 3 个 valueOnly 是用来反序列化 kafka 消息中的 value 的.
KafkaSource source = KafkaSource.builder().setBootstrapServers("ip").setTopics("web-topic").setGroupId("web-group").setStartingOffsets(OffsetsInitializer.earliest())//低版本jdk报错,改成jdk8.setValueOnlyDeserializer(new SimpleStringSchema()).build();