Anolis 8.6 部署 Kafka 3.3.1 安装和测试(二)
创始人
2024-05-24 01:23:36
0

动态初始化Kafka消费者实例

  • 一.Kafka 环境搭建
  • 二.动态初始化消费者
    • 1.Topic定义
    • 2.方法处理器工厂
    • 3.参数解析器(Copy SpringBoot 源码)
    • 4.消费接口和消费实现
    • 5.动态初始化
      • 1.关键类简介
      • 2.动态初始化实现

一.Kafka 环境搭建

参考:Kafka搭建和测试

二.动态初始化消费者

1.Topic定义

动态初始化,即不通过注解和配置文件实现消费者的初始化,定义一个Topic对象,用于设置消费者参数

package com.demo.entity;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author * @date 2023-02-08 15:06* @since 1.8*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Topic {private String id;private String topic;private Integer partitions;private String group = "test";private String clientPrefix;
}

2.方法处理器工厂

此类直接使用 SpringBoot 源码,原实现为私有类

package com.demo.manual;import org.springframework.context.ApplicationContext;
import org.springframework.core.convert.TypeDescriptor;
import org.springframework.core.convert.converter.ConditionalGenericConverter;
import org.springframework.core.convert.converter.Converter;
import org.springframework.format.support.DefaultFormattingConversionService;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.GenericMessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.util.Assert;
import org.springframework.validation.Validator;import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.*;/*** @author * @date 2023-02-08 14:18* @since 1.8*/
public class MessageHandlerMethodFactory implements org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory {private ApplicationContext applicationContext;private Validator validator;private List customMethodArgumentResolvers = new ArrayList<>();private final DefaultFormattingConversionService defaultFormattingConversionService = new DefaultFormattingConversionService();private org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory handlerMethodFactory;public MessageHandlerMethodFactory(Validator validator, ApplicationContext applicationContext) {this.validator = validator;this.applicationContext = applicationContext;}public void setHandlerMethodFactory(org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory kafkaHandlerMethodFactory1) {this.handlerMethodFactory = kafkaHandlerMethodFactory1;}private org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory getHandlerMethodFactory() {if (this.handlerMethodFactory == null) {this.handlerMethodFactory = createDefaultMessageHandlerMethodFactory();}return this.handlerMethodFactory;}private org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();if (this.validator != null) {defaultFactory.setValidator(this.validator);}defaultFactory.setBeanFactory(this.applicationContext);this.defaultFormattingConversionService.addConverter(new BytesToStringConverter(StandardCharsets.UTF_8));this.defaultFormattingConversionService.addConverter(new BytesToNumberConverter());defaultFactory.setConversionService(this.defaultFormattingConversionService);GenericMessageConverter messageConverter = new GenericMessageConverter(this.defaultFormattingConversionService);defaultFactory.setMessageConverter(messageConverter);List customArgumentsResolver =new ArrayList<>(Collections.unmodifiableList(this.customMethodArgumentResolvers));// Has to be at the end - look at PayloadMethodArgumentResolver documentationcustomArgumentsResolver.add(new NullAwarePayloadArgumentResolver(messageConverter, this.validator));defaultFactory.setCustomArgumentResolvers(customArgumentsResolver);defaultFactory.afterPropertiesSet();return defaultFactory;}@Overridepublic InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {return getHandlerMethodFactory().createInvocableHandlerMethod(bean, method);}private static class BytesToStringConverter implements Converter {private final Charset charset;BytesToStringConverter(Charset charset) {this.charset = charset;}@Overridepublic String convert(byte[] source) {return new String(source, this.charset);}}private final class BytesToNumberConverter implements ConditionalGenericConverter {BytesToNumberConverter() {}@Override@Nullablepublic Set getConvertibleTypes() {HashSet pairs = new HashSet<>();pairs.add(new ConvertiblePair(byte[].class, long.class));pairs.add(new ConvertiblePair(byte[].class, int.class));pairs.add(new ConvertiblePair(byte[].class, short.class));pairs.add(new ConvertiblePair(byte[].class, byte.class));pairs.add(new ConvertiblePair(byte[].class, Long.class));pairs.add(new ConvertiblePair(byte[].class, Integer.class));pairs.add(new ConvertiblePair(byte[].class, Short.class));pairs.add(new ConvertiblePair(byte[].class, Byte.class));return pairs;}@Override@Nullablepublic Object convert(@Nullable Object source, TypeDescriptor sourceType, TypeDescriptor targetType) {byte[] bytes = (byte[]) source;if (targetType.getType().equals(long.class) || targetType.getType().equals(Long.class)) {Assert.state(bytes.length >= 8, "At least 8 bytes needed to convert a byte[] to a long"); // NOSONARreturn ByteBuffer.wrap(bytes).getLong();}else if (targetType.getType().equals(int.class) || targetType.getType().equals(Integer.class)) {Assert.state(bytes.length >= 4, "At least 4 bytes needed to convert a byte[] to an integer"); // NOSONARreturn ByteBuffer.wrap(bytes).getInt();}else if (targetType.getType().equals(short.class) || targetType.getType().equals(Short.class)) {Assert.state(bytes.length >= 2, "At least 2 bytes needed to convert a byte[] to a short"); // NOSONARreturn ByteBuffer.wrap(bytes).getShort();}else if (targetType.getType().equals(byte.class) || targetType.getType().equals(Byte.class)) {Assert.state(bytes.length >= 1, "At least 1 byte needed to convert a byte[] to a byte"); // NOSONARreturn ByteBuffer.wrap(bytes).get();}return null;}@Overridepublic boolean matches(TypeDescriptor sourceType, TypeDescriptor targetType) {if (sourceType.getType().equals(byte[].class)) {Class target = targetType.getType();return target.equals(long.class) || target.equals(int.class) || target.equals(short.class) // NOSONAR|| target.equals(byte.class) || target.equals(Long.class) || target.equals(Integer.class)|| target.equals(Short.class) || target.equals(Byte.class);}else {return false;}}}
}

3.参数解析器(Copy SpringBoot 源码)

此类直接使用 SpringBoot 源码,原实现为私有类

package com.demo.manual;import org.springframework.core.MethodParameter;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver;
import org.springframework.validation.Validator;import java.util.List;/*** @author * @date 2023-02-08 14:36* @since 1.8*/
public class NullAwarePayloadArgumentResolver extends PayloadMethodArgumentResolver {NullAwarePayloadArgumentResolver(MessageConverter messageConverter, Validator validator) {super(messageConverter, validator);}@Overridepublic Object resolveArgument(MethodParameter parameter, Message message) throws Exception { // NOSONARObject resolved = super.resolveArgument(parameter, message);/** Replace KafkaNull list elements with null.*/if (resolved instanceof List) {List list = ((List) resolved);for (int i = 0; i < list.size(); i++) {if (list.get(i) instanceof KafkaNull) {list.set(i, null);}}}return resolved;}@Overrideprotected boolean isEmptyPayload(Object payload) {return payload == null || payload instanceof KafkaNull;}}

4.消费接口和消费实现

当前接口和实现为了用于做统一的数据处理,可以在实现类内再根据Topic去调用对应的数据解析方法

接口:

package com.demo.manual;import org.apache.kafka.clients.consumer.ConsumerRecord;/*** @author * @date 2023-02-08 13:46* @since 1.8*/
public interface Handler {void deal(ConsumerRecord cRecord);
}

实现:

package com.demo.manual;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;/*** @author * @date 2023-02-08 11:49* @since 1.8*/
@Slf4j
public class ManualHandler implements Handler{@Overridepublic void deal(ConsumerRecord cRecord) {log.info("  Topic:{} Partition:{} Content:{}",cRecord.topic(),cRecord.partition(),cRecord.value());}
}

5.动态初始化

1.关键类简介

此处通过接口调用,实现创建、暂停和恢复消费;可根据实际应用场景进行设计

关键类说明
KafkaListenerEndpointRegistrySpring 的 Kafka 监听容器,可以通过 Id 获取 Listener 实例,从而暂停或恢复消费监听
ConcurrentKafkaListenerContainerFactoryListener 工厂,定义代码可参考上面链接的(2.3 节)
ConsumerAwareListenerErrorHandler消费异常处理器,定义代码可参考上面链接的(2.3 节)
ApplicationContextSpring 的上下文容器,MessageHandlerMethodFactory 初始化用
MethodKafkaListenerEndpointKafka 配置节点,详细逻辑可参考源码

SpringBoot 自动初始化 Kafka 消费者的主要实现类和方法

package org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor

	/*** 此处为相关源码,仅供参考 寻找带有 @KafkaListener 注解的类并初始化/@Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {if (!this.nonAnnotatedClasses.contains(bean.getClass())) {Class targetClass = AopUtils.getTargetClass(bean);Collection classLevelListeners = findListenerAnnotations(targetClass);final boolean hasClassLevelListeners = !classLevelListeners.isEmpty();final List multiMethods = new ArrayList<>();Map> annotatedMethods = MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookup>) method -> {Set listenerMethods = findListenerAnnotations(method);return (!listenerMethods.isEmpty() ? listenerMethods : null);});if (hasClassLevelListeners) {Set methodsWithHandler = MethodIntrospector.selectMethods(targetClass,(ReflectionUtils.MethodFilter) method ->AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);multiMethods.addAll(methodsWithHandler);}if (annotatedMethods.isEmpty() && !hasClassLevelListeners) {this.nonAnnotatedClasses.add(bean.getClass());this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());}else {// Non-empty set of methodsfor (Map.Entry> entry : annotatedMethods.entrySet()) {Method method = entry.getKey();for (KafkaListener listener : entry.getValue()) {processKafkaListener(listener, method, bean, beanName);}}this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"+ beanName + "': " + annotatedMethods);}if (hasClassLevelListeners) {processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);}}return bean;}

2.动态初始化实现

package com.demo.controller;import com.demo.entity.Topic;
import com.demo.manual.MessageHandlerMethodFactory;
import com.demo.manual.ManualHandler;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** @author * @date 2023-02-07 13:40* @since 1.8*/
@Slf4j
@RestController
@RequestMapping("/listener")
public class ListenerController {@AutowiredKafkaListenerEndpointRegistry registry;@Autowired@Qualifier("batchTestContainerFactory")ConcurrentKafkaListenerContainerFactory batchTestContainerFactory;@AutowiredConsumerAwareListenerErrorHandler consumerAwareListenerErrorHandler;@AutowiredApplicationContext applicationContext;MessageHandlerMethodFactory factory;@PostConstructprivate void init(){factory = new MessageHandlerMethodFactory(null,applicationContext);}static Map map = new ConcurrentHashMap<>();static {map.put("test_manual_1_id",new Topic("test_manual_1_id","test-topic-new.1",2,"mygroup","test_manual_1_batch"));map.put("test_manual_2_id",new Topic("test_manual_2_id","test-topic-new.2",1,"mygroup","test_manual_2_batch"));}/*** 停止消费 自行选择停止时是否需要从监听容器内移除实例,容器为 Map 实现* Map* @param id*/@GetMapping("/close")public void close(String id){MessageListenerContainer container = registry.unregisterListenerContainer(id);container.destroy();}/*** 开始消费 若果是已注册的则判断是否暂停,暂停则恢复* 如果不存在,则定义一个消费者,注册到容器内并启动* @param id* @throws NoSuchMethodException*/@GetMapping("/open")public void open(String id) throws NoSuchMethodException {MessageListenerContainer container = registry.getListenerContainer(id);if (null!=container){if (!container.isRunning()){container.start();container.resume();}} else {//TODO 新建一个对应 Topic 的实例Topic topic = map.get(id);if (null==topic){return;}ManualHandler bean = new ManualHandler();MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint<>();endpoint.setMessageHandlerMethodFactory(factory);endpoint.setBean(bean);Method[] methods = bean.getClass().getDeclaredMethods();endpoint.setMethod(checkProxy(methods[0],bean));endpoint.setId(topic.getId());endpoint.setTopics(topic.getTopic());endpoint.setGroupId(topic.getGroup());endpoint.setClientIdPrefix(topic.getClientPrefix());endpoint.setConcurrency(topic.getPartitions());endpoint.setErrorHandler(consumerAwareListenerErrorHandler);registry.registerListenerContainer(endpoint,batchTestContainerFactory);container = registry.getListenerContainer(id);container.start();}}/*** Copy Spring 源码* @param methodArg* @param bean* @return*/private Method checkProxy(Method methodArg, Object bean) {Method method = methodArg;if (AopUtils.isJdkDynamicProxy(bean)) {try {// Found a @KafkaListener method on the target class for this JDK proxy ->// is it also present on the proxy itself?method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());Class[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();for (Class iface : proxiedInterfaces) {try {method = iface.getMethod(method.getName(), method.getParameterTypes());break;}catch (@SuppressWarnings("unused") NoSuchMethodException noMethod) {// NOSONAR}}}catch (SecurityException ex) {ReflectionUtils.handleReflectionException(ex);}catch (NoSuchMethodException ex) {throw new IllegalStateException(String.format("@KafkaListener method '%s' found on bean target class '%s', " +"but not found in any interface(s) for bean JDK proxy. Either " +"pull the method up to an interface or switch to subclass (CGLIB) " +"proxies by setting proxy-target-class/proxyTargetClass " +"attribute to 'true'", method.getName(),method.getDeclaringClass().getSimpleName()), ex);}}return method;}
}

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...