ChunJun-JDBC轮询增量更新-源码分析
创始人
2024-05-24 20:42:05
0

ChunJun-JDBC轮询增量更新-源码分析

    • 断点续传字段与增量更新字段
    • 在读取数据时更新state(StartLocation)
    • 间隔轮询,增量更新

  • 版本 ChunJun 1.12

断点续传字段与增量更新字段

  • 在“ChunJun任务提交-源码分析”中,我们可以看到:
    1. 在执行Flink应用的main方法时,会先构建Flink的数据源DataStream
    2. 构建数据源,则是调用了JdbcSourceFactory中的createSource()方法
package com.dtstack.chunjun.connector.jdbc.source;// import ...public abstract class JdbcSourceFactory extends SourceFactory {// code...@Overridepublic DataStream createSource() {initColumnInfo();initRestoreConfig();initPollingConfig();initSplitConfig();initIncrementConfig();JdbcInputFormatBuilder builder = getBuilder();// code ...}// code...}
  • 先看断点续传的初始化配置,调用initRestoreConfig()。如果存在断点续传的配置,那么会在jdbcConf中设置断点续传用的字段名、字段索引、字段类型。
package com.dtstack.chunjun.connector.jdbc.source;// import ...public abstract class JdbcSourceFactory extends SourceFactory {// code...protected void initRestoreConfig() {String name = syncConf.getRestore().getRestoreColumnName();if (StringUtils.isNotBlank(name)) {FieldConf fieldConf = FieldConf.getSameNameMetaColumn(jdbcConf.getColumn(), name);if (fieldConf != null) {jdbcConf.setRestoreColumn(name);jdbcConf.setRestoreColumnIndex(fieldConf.getIndex());jdbcConf.setRestoreColumnType(fieldConf.getType());restoreKeyUtil = jdbcDialect.initKeyUtil(fieldConf.getName(), fieldConf.getType());} else {throw new IllegalArgumentException("unknown restore column name: " + name);}}}// code...}
  • 再看增量更新的初始化配置,调用initIncrementConfig()
    1. 如果配置了增量更新字段,那么会获取对应的字段名、字段索引、字段类型。
    2. 接着会将该“字段名、字段索引、字段类型”设置到增量更新配置、断点续传配置,也就覆盖了initRestoreConfig()中的配置
    3. initStartLocation();处还对起始位置的配置做了初始化的检查
package com.dtstack.chunjun.connector.jdbc.source;// import ...public abstract class JdbcSourceFactory extends SourceFactory {// code.../** 初始化增量或间隔轮询任务配置 */private void initIncrementConfig() {String increColumn = jdbcConf.getIncreColumn();// 增量字段不为空,表示任务为增量或间隔轮询任务if (StringUtils.isNotBlank(increColumn)) {List fieldConfList = jdbcConf.getColumn();String type = null;String name = null;int index = -1;// 纯数字则表示增量字段在column中的顺序位置if (NumberUtils.isNumber(increColumn)) {int idx = Integer.parseInt(increColumn);if (idx > fieldConfList.size() - 1) {throw new ChunJunRuntimeException(String.format("config error : incrementColumn must less than column.size() when increColumn is number, column = %s, size = %s, increColumn = %s",GsonUtil.GSON.toJson(fieldConfList),fieldConfList.size(),increColumn));}FieldConf fieldColumn = fieldConfList.get(idx);type = fieldColumn.getType();name = fieldColumn.getName();index = fieldColumn.getIndex();} else {for (FieldConf field : fieldConfList) {if (Objects.equals(increColumn, field.getName())) {type = field.getType();name = field.getName();index = field.getIndex();break;}}}if (type == null || name == null) {throw new IllegalArgumentException(String.format("config error : increColumn's name or type is null, column = %s, increColumn = %s",GsonUtil.GSON.toJson(fieldConfList), increColumn));}jdbcConf.setIncrement(true);jdbcConf.setIncreColumn(name);jdbcConf.setIncreColumnType(type);jdbcConf.setIncreColumnIndex(index);jdbcConf.setRestoreColumn(name);jdbcConf.setRestoreColumnType(type);jdbcConf.setRestoreColumnIndex(index);incrementKeyUtil = jdbcDialect.initKeyUtil(name, type);restoreKeyUtil = incrementKeyUtil;initStartLocation();if (StringUtils.isBlank(jdbcConf.getSplitPk())) {jdbcConf.setSplitPk(name);splitKeyUtil = incrementKeyUtil;}}}// code...}
  • 也就是说当配置了增量更新后,那么断点续传功能会使用增量更新字段作为断点续传字段

在读取数据时更新state(StartLocation)

  • ChunJun在创建Flink数据源时,会先调用SourceFactory中的createSource()。而createSource()会利用DtInputFormatSourceFunction来创建数据源的数据流DataStream
  • 先看到Flink-SourceFunction的子类DtInputFormatSourceFunction
    • 当一个Flink应用运行时,读取数据源是通过SourceFunction实现的
    • SourceFunction初始化时会调用open方法
    • SourceFunction开始正常处理数据时,则会调用run方法
package com.dtstack.chunjun.source;// import ...public class DtInputFormatSourceFunction extends InputFormatSourceFunctionimplements CheckpointedFunction {// code ...@Overridepublic void open(Configuration parameters) {// code ...}@Overridepublic void run(SourceContext ctx) throws Exception {Exception tryException = null;try {Counter completedSplitsCounter =getRuntimeContext().getMetricGroup().counter("numSplitsProcessed");if (isRunning && format instanceof RichInputFormat) {((RichInputFormat) format).openInputFormat();}OUT nextElement = serializer.createInstance();while (isRunning) {format.open(splitIterator.next());// for each element we also check if cancel// was called by checking the isRunning flagwhile (isRunning && !format.reachedEnd()) {synchronized (ctx.getCheckpointLock()) {nextElement = format.nextRecord(nextElement);if (nextElement != null) {ctx.collect(nextElement);}}}format.close();completedSplitsCounter.inc();if (isRunning) {isRunning = splitIterator.hasNext();}}} catch (Exception exception) {tryException = exception;LOG.error("Exception happened, start to close format", exception);} finally {isRunning = false;gracefulClose();if (null != tryException) {throw tryException;}}}// code ...}
  • 处理每条数据时,调用format.nextRecord(nextElement);
package com.dtstack.chunjun.source.format;// import ...public abstract class BaseRichInputFormat extends RichInputFormat {// code...@Overridepublic RowData nextRecord(RowData rowData) {if (byteRateLimiter != null) {byteRateLimiter.acquire();}RowData internalRow = null;try {internalRow = nextRecordInternal(rowData);} catch (ReadRecordException e) {dirtyManager.collect(e.getRowData(), e, null);}if (internalRow != null) {updateDuration();if (numReadCounter != null) {numReadCounter.add(1);}if (bytesReadCounter != null) {bytesReadCounter.add(rowSizeCalculator.getObjectSize(internalRow));}}return internalRow;}}
  • 再调用connector插件的实现类(不要找错了,是插件代码里面的),例如JdbcInputFormat中的nextRecordInternal方法
package com.dtstack.chunjun.connector.jdbc.source;// import ...public class JdbcInputFormat extends BaseRichInputFormat {// code ...@Overridepublic RowData nextRecordInternal(RowData rowData) throws ReadRecordException {if (!hasNext) {return null;}try {@SuppressWarnings("unchecked")RowData finalRowData = rowConverter.toInternal(resultSet);if (needUpdateEndLocation) {BigInteger location =incrementKeyUtil.getLocationValueFromRs(resultSet, jdbcConf.getIncreColumnIndex() + 1);endLocationAccumulator.add(location);LOG.debug("update endLocationAccumulator, current Location = {}", location);}if (jdbcConf.getRestoreColumnIndex() > -1) {state = resultSet.getObject(jdbcConf.getRestoreColumnIndex() + 1);}return finalRowData;} catch (Exception se) {throw new ReadRecordException("", se, 0, rowData);} finally {try {hasNext = resultSet.next();} catch (SQLException e) {LOG.error("can not read next record", e);hasNext = false;}}}// code ...}
  • 此处解析了数据库返回的当前这条数据的增量更新字段的值(实际用的是RestoreColumn断点续传字段),并加1,保存到了state变量
  • 下次查询时会读取state变量,作为startLocation

间隔轮询,增量更新

  • 先看到上面DtInputFormatSourceFunction中的run方法
  • while循环时,会不断调用format.reachedEnd()方法
package com.dtstack.chunjun.connector.jdbc.source;// import ...public class JdbcInputFormat extends BaseRichInputFormat {// code ...@Overridepublic boolean reachedEnd() {if (hasNext) {return false;} else {if (currentJdbcInputSplit.isPolling()) {try {TimeUnit.MILLISECONDS.sleep(jdbcConf.getPollingInterval());// 间隔轮询检测数据库连接是否断开,超时时间三秒,断开后自动重连if (!dbConn.isValid(3)) {dbConn = getConnection();// 重新连接后还是不可用则认为数据库异常,任务失败if (!dbConn.isValid(3)) {String message =String.format("cannot connect to %s, username = %s, please check %s is available.",jdbcConf.getJdbcUrl(),jdbcConf.getUsername(),jdbcDialect.dialectName());throw new ChunJunRuntimeException(message);}}dbConn.setAutoCommit(true);JdbcUtil.closeDbResources(resultSet, null, null, false);queryForPolling(restoreKeyUtil.transToLocationValue(state).toString());return false;} catch (InterruptedException e) {LOG.warn("interrupted while waiting for polling, e = {}",ExceptionUtil.getErrorMessage(e));} catch (SQLException e) {JdbcUtil.closeDbResources(resultSet, ps, null, false);String message =String.format("error to execute sql = %s, startLocation = %s, e = %s",jdbcConf.getQuerySql(),state,ExceptionUtil.getErrorMessage(e));throw new ChunJunRuntimeException(message, e);}}return true;}}// code ...protected void queryForPolling(String startLocation) throws SQLException {// 每隔五分钟打印一次,(当前时间 - 任务开始时间) % 300秒 <= 一个间隔轮询周期if ((System.currentTimeMillis() - startTime) % 300000 <= jdbcConf.getPollingInterval()) {LOG.info("polling startLocation = {}", startLocation);} else {LOG.debug("polling startLocation = {}", startLocation);}incrementKeyUtil.setPsWithLocationStr(ps, 1, startLocation);resultSet = ps.executeQuery();hasNext = resultSet.next();}// code ...}
  • 代码if (currentJdbcInputSplit.isPolling()) {处,对应配置polling,看看是否要做轮询
  • 代码TimeUnit.MILLISECONDS.sleep(jdbcConf.getPollingInterval());出,对应配置pollingInterval,用作轮询间隔
  • 代码queryForPolling(restoreKeyUtil.transToLocationValue(state).toString());处,获取了state变量,作为startLocation
  • 再看queryForPolling方法,将startLocation传入到了编译好的PreparedStatement中,并执行了SQL查询

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...