转自:
springboot-starter如何整合阿里云datahub呢?
下文笔者讲述springboot整合datahub的方法分享,如下所示
Datahub简介说明
DataHub的功能:1.与大数据解决方案中Kafka具有相同的角色同时还提供数据队列功能2.DataHub还可与阿里云其它上下游产品对接其一个交换的功能,称之为数据交换
DataHub 简介
datahub对外提供开发者生产和消费的sdk在springboot中,我们也可用使用自定义starter的方式加载sdk
实现思路:1.引入相应的starter器2.application.yml中加入相应的配置信息3.编写相应的代码
引入相应的starter器
cry-starters-projects cn.com.cry.starters 2022-1.0.0
启动客户端
配置阿里云DataHub的endpoint以及AK信息
aliyun:datahub:# 开启功能havingValue: true#是否为私有云isPrivate: falseaccessId: xxxaccessKey: xxxendpoint: xxx#连接DataHub客户端超时时间conn-timeout: 10000
获取DataHub客户端
DatahubClient datahubClient=DataHubTemplate.getDataHubClient();
写数据
public int write(@RequestParam("id") Integer shardId) {Listdatas = new ArrayList<>();for (int i = 0; i < 10; i++) {Student s = new Student();s.setAge(i);s.setName("name-" + i);s.setAddress("address-" + i);datas.add(s);}int successNumbers = DataHubTemplate.write("my_test", "student", datas, shardId);return successNumbers; }
上述代码说明:projectName为my_testtopicName为studentshardId 为N的hub里写数据且返回插入成功的条数
读数据
读数据开发的逻辑类似RabbitMq的starter
使用@DataHubListener和@DataHubHandler处理器注解进行使用
@Component @DataHubListener(projectName = "my_test") public class ReadServiceImpl {@DataHubHandler(topicName = "student", shardId = 0, cursorType = CursorTypeWrapper.LATEST)public void handler(Message message) {System.out.println("读取到shardId=0的消息");System.out.println(message.getData());System.out.println(message.getCreateTsime());System.out.println(message.getSize());System.out.println(message.getConfig());System.out.println(message.getMessageId());} }
以上代码通过LATEST游标的方式监听 project=my_testtopicName=studentshardId=0最终使用Message的包装类获取dataHub实时写入的数据 此处可设置多种游标类型例:根据最新的系统时间、最早录入的序号等
核心代码
需一个DataHubClient增强类
在SpringBoot启动时开启一个线程来监听对应的project-topic-shardingId
根据游标规则来读取当前的cursor进行数据的读取
public class DataHubClientWrapper implements InitializingBean, DisposableBean {@Autowiredprivate AliyunAccountProperties properties;@Autowiredprivate ApplicationContext context;private DatahubClient datahubClient;public DataHubClientWrapper() {}/*** 执行销毁方法** @throws Exception*/@Overridepublic void destroy() throws Exception {WorkerResourceExecutor.shutdown();}@Overridepublic void afterPropertiesSet() throws Exception {/*** 创建DataHubClient*/this.datahubClient = DataHubClientFactory.create(properties);/*** 打印Banner*/BannerUtil.printBanner();/*** 赋值Template的静态对象dataHubClient*/DataHubTemplate.setDataHubClient(datahubClient);/*** 初始化Worker线程*/WorkerResourceExecutor.initWorkerResource(context);/*** 启动Worker线程*/WorkerResourceExecutor.start();} }//写数据 //构建了一个类似RedisDataTemplate的模板类 //封装了write的逻辑 //调用时只需要用DataHubTemplate.write调用public class DataHubTemplate {private static DatahubClient dataHubClient;private final static Logger logger = LoggerFactory.getLogger(DataHubTemplate.class);/*** 默认不开启重试机制** @param projectName* @param topicName* @param datas* @param shardId* @return*/public static int write(String projectName, String topicName, List> datas, Integer shardId) {return write(projectName, topicName, datas, shardId, false);}/*** 往指定的projectName以及topic和shard下面写数据** @param projectName* @param topicName* @param datas* @param shardId* @param retry* @return*/private static int write(String projectName, String topicName, List> datas, Integer shardId, boolean retry) {RecordSchema recordSchema = dataHubClient.getTopic(projectName, topicName).getRecordSchema();ListrecordEntries = new ArrayList<>();for (Object o : datas) {RecordEntry entry = new RecordEntry();Map data = BeanUtil.beanToMap(o);TupleRecordData tupleRecordData = new TupleRecordData(recordSchema);for (String key : data.keySet()) {tupleRecordData.setField(key, data.get(key));}entry.setRecordData(tupleRecordData);entry.setShardId(String.valueOf(shardId));recordEntries.add(entry);}PutRecordsResult result = dataHubClient.putRecords(projectName, topicName, recordEntries);int failedRecordCount = result.getFailedRecordCount();if (failedRecordCount > 0 && retry) {retry(dataHubClient, result.getFailedRecords(), 1, projectName, topicName);}return datas.size() - failedRecordCount;}/*** @param client* @param records* @param retryTimes* @param project* @param topic*/private static void retry(DatahubClient client, List records, int retryTimes, String project, String topic) {boolean suc = false;List failedRecords = records;while (retryTimes != 0) {logger.info("the time to send message has [{}] records failed, is starting retry", records.size());retryTimes = retryTimes - 1;PutRecordsResult result = client.putRecords(project, topic, failedRecords);int failedNum = result.getFailedRecordCount();if (failedNum > 0) {failedRecords = result.getFailedRecords();continue;}suc = true;break;}if (!suc) {logger.error("DataHub send message retry failure");}}public static DatahubClient getDataHubClient() {return dataHubClient;}public static void setDataHubClient(DatahubClient dataHubClient) {DataHubTemplate.dataHubClient = dataHubClient;} }//读数据 //需要在Spring启动时开启一个监听线程DataListenerWorkerThread //执行一个死循环不停轮询DataHub下的对应通道public class DataListenerWorkerThread extends Thread {private final static Logger logger = LoggerFactory.getLogger(DataListenerWorkerThread.class);private volatile boolean init = false;private DatahubConfig config;private String workerKey;private int recordLimits;private int sleep;private RecordSchema recordSchema;private RecordHandler recordHandler;private CursorHandler cursorHandler;public DataListenerWorkerThread(String projectName, String topicName, int shardId, CursorTypeWrapper cursorType, int recordLimits, int sleep, int sequenceOffset, String startTime, StringRedisTemplate redisTemplate) {this.config = new DatahubConfig(projectName, topicName, shardId);this.workerKey = projectName + "-" + topicName + "-" + shardId;this.cursorHandler = new CursorHandler(cursorType, sequenceOffset, startTime, redisTemplate, workerKey);this.recordLimits = recordLimits;this.sleep = sleep;this.setName("DataHub-Worker");this.setDaemon(true);}@Overridepublic void run() {initRecordSchema();String cursor = cursorHandler.positioningCursor(config);for (; ; ) {try {GetRecordsResult result = DataHubTemplate.getDataHubClient().getRecords(config.getProjectName(), config.getTopicName(), String.valueOf(config.getShardId()), recordSchema, cursor, recordLimits);if (result.getRecordCount() <= 0) {// 无数据,sleep后读取Thread.sleep(sleep);continue;}List
最后记得要做成一个starter
在resources下新建一个META-INF文件夹
新建一个spring.factories文件
org.springframework.boot.autoconfigure.EnableAutoConfiguration= \cry.starter.datahub.DataHubClientAutoConfiguration