springboot-starter如何整合阿里云datahub呢?
创始人
2024-04-11 07:38:51
0

转自:

springboot-starter如何整合阿里云datahub呢?

下文笔者讲述springboot整合datahub的方法分享,如下所示

Datahub简介说明

DataHub的功能:1.与大数据解决方案中Kafka具有相同的角色同时还提供数据队列功能2.DataHub还可与阿里云其它上下游产品对接其一个交换的功能,称之为数据交换

DataHub 简介

datahub对外提供开发者生产和消费的sdk在springboot中,我们也可用使用自定义starter的方式加载sdk
实现思路:1.引入相应的starter器2.application.yml中加入相应的配置信息3.编写相应的代码

引入相应的starter器

cry-starters-projectscn.com.cry.starters2022-1.0.0

启动客户端

配置阿里云DataHub的endpoint以及AK信息

aliyun:datahub:# 开启功能havingValue: true#是否为私有云isPrivate: falseaccessId: xxxaccessKey: xxxendpoint: xxx#连接DataHub客户端超时时间conn-timeout: 10000

获取DataHub客户端

DatahubClient datahubClient=DataHubTemplate.getDataHubClient();

写数据

public int write(@RequestParam("id") Integer shardId) {List datas = new ArrayList<>();for (int i = 0; i < 10; i++) {Student s = new Student();s.setAge(i);s.setName("name-" + i);s.setAddress("address-" + i);datas.add(s);}int successNumbers = DataHubTemplate.write("my_test", "student", datas, shardId);return successNumbers;
}
 上述代码说明:projectName为my_testtopicName为studentshardId 为N的hub里写数据且返回插入成功的条数

读数据

读数据开发的逻辑类似RabbitMq的starter
使用@DataHubListener和@DataHubHandler处理器注解进行使用

@Component
@DataHubListener(projectName = "my_test")
public class ReadServiceImpl {@DataHubHandler(topicName = "student", shardId = 0, cursorType = CursorTypeWrapper.LATEST)public void handler(Message message) {System.out.println("读取到shardId=0的消息");System.out.println(message.getData());System.out.println(message.getCreateTsime());System.out.println(message.getSize());System.out.println(message.getConfig());System.out.println(message.getMessageId());}
}
以上代码通过LATEST游标的方式监听 project=my_testtopicName=studentshardId=0最终使用Message的包装类获取dataHub实时写入的数据 
此处可设置多种游标类型例:根据最新的系统时间、最早录入的序号等

核心代码

需一个DataHubClient增强类
在SpringBoot启动时开启一个线程来监听对应的project-topic-shardingId
根据游标规则来读取当前的cursor进行数据的读取

public class DataHubClientWrapper implements InitializingBean, DisposableBean {@Autowiredprivate AliyunAccountProperties properties;@Autowiredprivate ApplicationContext context;private DatahubClient datahubClient;public DataHubClientWrapper() {}/*** 执行销毁方法** @throws Exception*/@Overridepublic void destroy() throws Exception {WorkerResourceExecutor.shutdown();}@Overridepublic void afterPropertiesSet() throws Exception {/*** 创建DataHubClient*/this.datahubClient = DataHubClientFactory.create(properties);/*** 打印Banner*/BannerUtil.printBanner();/*** 赋值Template的静态对象dataHubClient*/DataHubTemplate.setDataHubClient(datahubClient);/*** 初始化Worker线程*/WorkerResourceExecutor.initWorkerResource(context);/*** 启动Worker线程*/WorkerResourceExecutor.start();}
}//写数据
//构建了一个类似RedisDataTemplate的模板类
//封装了write的逻辑
//调用时只需要用DataHubTemplate.write调用public class DataHubTemplate {private static DatahubClient dataHubClient;private final static Logger logger = LoggerFactory.getLogger(DataHubTemplate.class);/*** 默认不开启重试机制** @param projectName* @param topicName* @param datas* @param shardId* @return*/public static int write(String projectName, String topicName, List datas, Integer shardId) {return write(projectName, topicName, datas, shardId, false);}/*** 往指定的projectName以及topic和shard下面写数据** @param projectName* @param topicName* @param datas* @param shardId* @param retry* @return*/private static int write(String projectName, String topicName, List datas, Integer shardId, boolean retry) {RecordSchema recordSchema = dataHubClient.getTopic(projectName, topicName).getRecordSchema();List recordEntries = new ArrayList<>();for (Object o : datas) {RecordEntry entry = new RecordEntry();Map data = BeanUtil.beanToMap(o);TupleRecordData tupleRecordData = new TupleRecordData(recordSchema);for (String key : data.keySet()) {tupleRecordData.setField(key, data.get(key));}entry.setRecordData(tupleRecordData);entry.setShardId(String.valueOf(shardId));recordEntries.add(entry);}PutRecordsResult result = dataHubClient.putRecords(projectName, topicName, recordEntries);int failedRecordCount = result.getFailedRecordCount();if (failedRecordCount > 0 && retry) {retry(dataHubClient, result.getFailedRecords(), 1, projectName, topicName);}return datas.size() - failedRecordCount;}/*** @param client* @param records* @param retryTimes* @param project* @param topic*/private static void retry(DatahubClient client, List records, int retryTimes, String project, String topic) {boolean suc = false;List failedRecords = records;while (retryTimes != 0) {logger.info("the time to send message has [{}] records failed, is starting retry", records.size());retryTimes = retryTimes - 1;PutRecordsResult result = client.putRecords(project, topic, failedRecords);int failedNum = result.getFailedRecordCount();if (failedNum > 0) {failedRecords = result.getFailedRecords();continue;}suc = true;break;}if (!suc) {logger.error("DataHub send message retry failure");}}public static DatahubClient getDataHubClient() {return dataHubClient;}public static void setDataHubClient(DatahubClient dataHubClient) {DataHubTemplate.dataHubClient = dataHubClient;}
}//读数据
//需要在Spring启动时开启一个监听线程DataListenerWorkerThread
//执行一个死循环不停轮询DataHub下的对应通道public class DataListenerWorkerThread extends Thread {private final static Logger logger = LoggerFactory.getLogger(DataListenerWorkerThread.class);private volatile boolean init = false;private DatahubConfig config;private String workerKey;private int recordLimits;private int sleep;private RecordSchema recordSchema;private RecordHandler recordHandler;private CursorHandler cursorHandler;public DataListenerWorkerThread(String projectName, String topicName, int shardId, CursorTypeWrapper cursorType, int recordLimits, int sleep, int sequenceOffset, String startTime, StringRedisTemplate redisTemplate) {this.config = new DatahubConfig(projectName, topicName, shardId);this.workerKey = projectName + "-" + topicName + "-" + shardId;this.cursorHandler = new CursorHandler(cursorType, sequenceOffset, startTime, redisTemplate, workerKey);this.recordLimits = recordLimits;this.sleep = sleep;this.setName("DataHub-Worker");this.setDaemon(true);}@Overridepublic void run() {initRecordSchema();String cursor = cursorHandler.positioningCursor(config);for (; ; ) {try {GetRecordsResult result = DataHubTemplate.getDataHubClient().getRecords(config.getProjectName(), config.getTopicName(), String.valueOf(config.getShardId()), recordSchema, cursor, recordLimits);if (result.getRecordCount() <= 0) {// 无数据,sleep后读取Thread.sleep(sleep);continue;}List> dataMap = recordHandler.convert2List(result.getRecords());logger.info("receive [{}] records from project:[{}] topic:[{}] shard:[{}]", dataMap.size(), config.getProjectName(), config.getTopicName(), config.getShardId());// 拿到下一个游标cursor = cursorHandler.nextCursor(result);//执行方法WorkerResourceExecutor.invokeMethod(workerKey, JsonUtils.toJson(dataMap), dataMap.size(), config, cursor);} catch (InvalidParameterException ex) {//非法游标或游标已过期,建议重新定位后开始消费cursor = cursorHandler.resetCursor(config);logger.error("get Cursor error and reset cursor localtion ,errorMessage:{}", ex.getErrorMessage());} catch (DatahubClientException e) {logger.error("DataHubException:{}", e.getErrorMessage());this.interrupt();} catch (InterruptedException e) {logger.info("daemon thread {}-{} interrupted", this.getName(), this.getId());} catch (Exception e) {this.interrupt();logger.error("receive DataHub records cry.exception:{}", e, e);}}}/*** 终止*/public void shutdown() {if (!interrupted()) {interrupt();}}/*** 初始化topic字段以及recordSchema*/private void initRecordSchema() {try {if (!init) {recordSchema = DataHubTemplate.getDataHubClient().getTopic(config.getProjectName(), config.getTopicName()).getRecordSchema();List fields = recordSchema.getFields();this.recordHandler = new RecordHandler(fields);init = true;}} catch (Exception e) {logger.error("initRecordSchema error:{}", e, e);}}
}//read的时候结合了注解开发
//通过定义类注解DataHubListener和方法注解DataHubHandler内置属性
//来动态的控制需要在哪些方法中处理监听到的数据的逻辑:DataHubHandler@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataHubHandler {/*** 话题名称** @return*/String topicName();/*** shardId** @return*/int shardId();/*** 最大数据量限制** @return*/int recordLimit() default 1000;/*** 游标类型** @return*/CursorTypeWrapper cursorType() default CursorTypeWrapper.LATEST;/*** 若未监听到数据添加,休眠时间 ms** @return*/int sleep() default 10000;/*** 使用CursorType.SYSTEM_TIME的时候配置 时间偏移量** @return*/String startTime() default "";/*** 使用使用CursorType.SEQUENCE的时候配置,偏移量,必须是正整数** @return*/int sequenceOffset() default 0;
}DataHubListener@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataHubListener {String projectName();
}//启动SpringBootStarter的EnableConfigurationProperties注解
//使用配置文件来控制default-bean的开启或关闭启动类
@Configuration
@EnableConfigurationProperties(value = {AliyunAccountProperties.class})
public class DataHubClientAutoConfiguration {/*** 初始化dataHub装饰bean** @return*/@Beanpublic DataHubClientWrapper dataHubWrapper() {return new DataHubClientWrapper();}}//属性配置类
@ConditionalOnProperty(prefix = "aliyun.datahub",havingValue = "true")
@Data
public class AliyunAccountProperties implements Properties{/*** http://xxx.aliyuncs.com*/private String endpoint;/*** account*/private String accessId;/*** password*/private String accessKey;/*** private cloud || public cloud*/private boolean isPrivate;/*** unit: ms*/private Integer connTimeout = 10000;
} 

最后记得要做成一个starter
在resources下新建一个META-INF文件夹
新建一个spring.factories文件

org.springframework.boot.autoconfigure.EnableAutoConfiguration= \cry.starter.datahub.DataHubClientAutoConfiguration

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
客厅放八骏马摆件可以吗(家里摆... 今天给各位分享客厅放八骏马摆件可以吗的知识,其中也会对家里摆八骏马摆件好吗进行解释,如果能碰巧解决你...
苏州离哪个飞机场近(苏州离哪个... 本篇文章极速百科小编给大家谈谈苏州离哪个飞机场近,以及苏州离哪个飞机场近点对应的知识点,希望对各位有...