mysql cdc 整库迁移 (mysql to mysql)
创始人
2024-03-27 10:54:19
0

技术思想

利用 mysql catalog,mysql cdc,flink jdbc 等技术实现 mysql 整库迁移至下游数据库,这里是示范 mysql to mysql ,其他 sink 组件可自行扩展实现。

通过 flink ParameterTool,可以选择是整库同步还是多表亦或单表同步,可以设置全局并发,源表 mysql 参数,目标表 mysql 参数

通过 sql Connection 实现自动建表逻辑 (mysql 数据类型众多,这里并没有测试所有的类型参数,如担心建表不成功,可手动建表,不影响程序运行)

下游使用 flink jdbc 来实现,语法为 upsert 即幂等写入(重复数据只会写入一次)

使用 mysql catalog 来实现源表元数据的获取

自定义 CustomDebeziumDeserializer 实现 DebeziumDeserializationSchema 接口对数据进行转换

该任务本质上是 单 source 多 sink 任务,不同的表数据不一样可能会有一定的反压

程序测试 生成五百万条数据 五张表 一分钟左右完成,增量数据一万条,可以同步完成

环境 flink 1.16 cdc 2.3.0

refer:
https://nightlies.apache.org/flink/flink-docs-release-1.16/
https://blog.csdn.net/qq_36062467/article/details/128117647
https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc%28ZH%29.html

代码

github 地址 https://github.com/SophiaData/Bigdata_Code_Tutorial/blob/master/Code_Tutorial/Flink-demo/

/** (@SophiaData) (@date 2022/12/1 16:02). */
public class CustomDebeziumDeserializerimplements DebeziumDeserializationSchema> {private static final Logger LOG = LoggerFactory.getLogger(CustomDebeziumDeserializer.class);private final Map physicalConverterMap =Maps.newConcurrentMap();CustomDebeziumDeserializer(Map tableRowTypeMap) {for (String tableName : tableRowTypeMap.keySet()) {RowType rowType = tableRowTypeMap.get(tableName);DeserializationRuntimeConverter physicalConverter = createNotNullConverter(rowType);this.physicalConverterMap.put(tableName, physicalConverter);}}@Overridepublic void deserialize(SourceRecord record, Collector> out)throws Exception {Envelope.Operation op = Envelope.operationFor(record);Struct value = (Struct) record.value();Schema valueSchema = record.valueSchema();Struct source = value.getStruct("source");String tableName = source.get("table").toString();DeserializationRuntimeConverter physicalConverter = physicalConverterMap.get(tableName);if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {Row insert = extractAfterRow(value, valueSchema, physicalConverter);insert.setKind(RowKind.INSERT);out.collect(Tuple2.of(tableName, insert));} else if (op == Envelope.Operation.DELETE) {Row delete = extractBeforeRow(value, valueSchema, physicalConverter);delete.setKind(RowKind.DELETE);out.collect(Tuple2.of(tableName, delete));} else if (op == Envelope.Operation.UPDATE) {Row before = extractBeforeRow(value, valueSchema, physicalConverter);before.setKind(RowKind.UPDATE_BEFORE);out.collect(Tuple2.of(tableName, before));Row after = extractAfterRow(value, valueSchema, physicalConverter);after.setKind(RowKind.UPDATE_AFTER);out.collect(Tuple2.of(tableName, after));} else {LOG.warn(" Unexpected statement: {}", value);}}private Row extractAfterRow(Struct value, Schema valueSchema, DeserializationRuntimeConverter physicalConverter)throws Exception {Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();Struct after = value.getStruct(Envelope.FieldName.AFTER);return (Row) physicalConverter.convert(after, afterSchema);}private Row extractBeforeRow(Struct value, Schema valueSchema, DeserializationRuntimeConverter physicalConverter)throws Exception {Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();Struct before = value.getStruct(Envelope.FieldName.BEFORE);return (Row) physicalConverter.convert(before, beforeSchema);}@Overridepublic TypeInformation> getProducedType() {return TypeInformation.of(new TypeHint>() {});}public static DeserializationRuntimeConverter createNotNullConverter(LogicalType type) {switch (type.getTypeRoot()) {case NULL:return new DeserializationRuntimeConverter() {private static final long serialVersionUID = 1L;@Overridepublic Object convert(Object dbzObj, Schema schema) {return null;}};case BOOLEAN:return convertToBoolean();case TINYINT:return new DeserializationRuntimeConverter() {private static final long serialVersionUID = 1L;@Overridepublic Object convert(Object dbzObj, Schema schema) {return Byte.parseByte(dbzObj.toString());}};case SMALLINT:return new DeserializationRuntimeConverter() {private static final long serialVersionUID = 1L;@Overridepublic Object convert(Object dbzObj, Schema schema) {return Short.parseShort(dbzObj.toString());}};case INTEGER:case INTERVAL_YEAR_MONTH:return convertToInt();case BIGINT:case INTERVAL_DAY_TIME:return convertToLong();case DATE:return convertToDate();case TIME_WITHOUT_TIME_ZONE:return convertToTime();case TIMESTAMP_WITHOUT_TIME_ZONE:return convertToTimestamp(ZoneId.of("UTC"));case TIMESTAMP_WITH_LOCAL_TIME_ZONE:return convertToLocalTimeZoneTimestamp(ZoneId.of("UTC"));case FLOAT:return convertToFloat();case DOUBLE:return convertToDouble();case CHAR:case VARCHAR:return convertToString();case BINARY:case VARBINARY:return convertToBinary();case DECIMAL:return createDecimalConverter((DecimalType) type);case ROW:return createRowConverter((RowType) type);case ARRAY:case MAP:case MULTISET:case RAW:default:throw new UnsupportedOperationException("Unsupported type: " + type);}}private static DeserializationRuntimeConverter convertToBoolean() {return new DeserializationRuntimeConverter() {private static final long serialVersionUID = 1L;@Overridepublic Object convert(Object dbzObj, Schema schema) {if (dbzObj instanceof Boolean) {return dbzObj;} else if (dbzObj instanceof Byte) {return (byte) dbzObj == 1;} else if (dbzObj instanceof Short) {return (short) dbzObj == 1;} else {return Boolean.parseBoolean(dbzObj.toString());}}};}private static DeserializationRuntimeConverter convertToInt() {return new DeserializationRuntimeConverter() {private static final long serialVersionUID = 1L;@Overridepublic Object convert(Object dbzObj, Schema schema) {if (dbzObj instanceof Integer) {return dbzObj;} else if (dbzObj instanceof Long) {return ((Long) dbzObj).intValue();} else {return Integer.parseInt(dbzObj.toString());}}};}private static DeserializationRuntimeConverter convertToLong() {return new DeserializationRuntimeConverter() {private static final long serialVersionUID = 1L;@Overridepublic Object convert(Object dbzObj, Schema schema) {if (dbzObj instanceof Integer) {return ((Integer) dbzObj).longValue();} else if (dbzObj instanceof Long) {return dbzObj;} else {return Long.parseLong(dbzObj.toString());}}};}private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) {final int precision = decimalType.getPrecision();final int scale = decimalType.getScale();return new DeserializationRuntimeConverter() {private static final long serialVersionUID = 1L;@Overridepublic Object convert(Object dbzObj, Schema schema) {BigDecimal bigDecimal;if (dbzObj instanceof byte[]) {// decimal.handling.mode=precisebigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj);} else if (dbzObj instanceof String) {// decimal.handling.mode=stringbigDecimal = new BigDecimal((String) dbzObj);} else if (dbzObj instanceof Double) {// decimal.handling.mode=doublebigDecimal = BigDecimal.valueOf((Double) dbzObj);} else {if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {SpecialValueDecimal decimal =VariableScaleDecimal.toLogical((Struct) dbzObj);bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO);} else {// fallback to stringbigDecimal = new BigDecimal(dbzObj.toString());}}return DecimalData.fromBigDecimal(bigDecimal, precision, scale);}};}private static DeserializationRuntimeConverter convertToDouble() {return new DeserializationRuntimeConverter() {private static final long serialVersionUID = 1L;@Overridepublic Object convert(Object dbzObj, Schema schema) {if (dbzObj instanceof Float) {return ((Float) dbzObj).doubleValue();} else if (dbzObj instanceof Double) {return dbzObj;} else {return Double.parseDouble(dbzObj.toString());}}};}private static DeserializationRuntimeConverter convertToFloat() {return new DeserializationRuntimeConverter() {private static final long serialVersionUID = 1L;@Overridepublic Object convert(Object dbzObj, Schema schema) {if (dbzObj instanceof Float) {return dbzObj;} else if (dbzObj instanceof Double) {return ((Double) dbzObj).floatValue();} else {return Float.parseFloat(dbzObj.toString());}}};}private static DeserializationRuntimeConverter convertToDate() {return new DeserializationRuntimeConverter() {private static final long serialVersionUID = 1L;@Overridepublic Object convert(Object dbzObj, Schema schema) {return (int) TemporalConversions.toLocalDate(dbzObj).toEpochDay();}};}private static DeserializationRuntimeConverter convertToTime() {return new DeserializationRuntimeConverter() {private static final long serialVersionUID = 1L;@Overridepublic Object convert(Object dbzObj, Schema schema) {if (dbzObj instanceof Long) {switch (schema.name()) {case MicroTime.SCHEMA_NAME:return (int) ((long) dbzObj / 1000);case NanoTime.SCHEMA_NAME:return (int) ((long) dbzObj / 1000_000);}} else if (dbzObj instanceof Integer) {return dbzObj;}// get number of milliseconds of the dayreturn TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000;}};}private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) {return new DeserializationRuntimeConverter() {private static final long serialVersionUID = 1L;@Overridepublic Object convert(Object dbzObj, Schema schema) {if (dbzObj instanceof Long) {switch (schema.name()) {case Timestamp.SCHEMA_NAME:return TimestampData.fromEpochMillis((Long) dbzObj);case MicroTimestamp.SCHEMA_NAME:long micro = (long) dbzObj;return TimestampData.fromEpochMillis(micro / 1000, (int) (micro % 1000 * 1000));case NanoTimestamp.SCHEMA_NAME:long nano = (long) dbzObj;return TimestampData.fromEpochMillis(nano / 1000_000, (int) (nano % 1000_000));}}LocalDateTime localDateTime =TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);return TimestampData.fromLocalDateTime(localDateTime);}};}private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp(ZoneId serverTimeZone) {return new DeserializationRuntimeConverter() {private static final long serialVersionUID = 1L;@Overridepublic Object convert(Object dbzObj, Schema schema) {if (dbzObj instanceof String) {String str = (String) dbzObj;// TIMESTAMP_LTZ type is encoded in string typeInstant instant = Instant.parse(str);return TimestampData.fromLocalDateTime(LocalDateTime.ofInstant(instant, serverTimeZone));}throw new IllegalArgumentException("Unable to convert to TimestampData from unexpected value '"+ dbzObj+ "' of type "+ dbzObj.getClass().getName());}};}private static DeserializationRuntimeConverter convertToString() {return new DeserializationRuntimeConverter() {private static final long serialVersionUID = 1L;@Overridepublic Object convert(Object dbzObj, Schema schema) {return StringData.fromString(dbzObj.toString());}};}private static DeserializationRuntimeConverter convertToBinary() {return new DeserializationRuntimeConverter() {private static final long serialVersionUID = 1L;@Overridepublic Object convert(Object dbzObj, Schema schema) {if (dbzObj instanceof byte[]) {return dbzObj;} else if (dbzObj instanceof ByteBuffer) {ByteBuffer byteBuffer = (ByteBuffer) dbzObj;byte[] bytes = new byte[byteBuffer.remaining()];byteBuffer.get(bytes);return bytes;} else {throw new UnsupportedOperationException("Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName());}}};}private static DeserializationRuntimeConverter createRowConverter(RowType rowType) {final DeserializationRuntimeConverter[] fieldConverters =rowType.getFields().stream().map(RowType.RowField::getType).map(CustomDebeziumDeserializer::createNotNullConverter).toArray(DeserializationRuntimeConverter[]::new);final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);return new DeserializationRuntimeConverter() {private static final long serialVersionUID = 1L;@Overridepublic Object convert(Object dbzObj, Schema schema) throws Exception {Struct struct = (Struct) dbzObj;int arity = fieldNames.length;Row row = new Row(arity);for (int i = 0; i < arity; i++) {String fieldName = fieldNames[i];Field field = schema.field(fieldName);if (field == null) {row.setField(i, null);} else {Object fieldValue = struct.getWithoutDefault(fieldName);Schema fieldSchema = schema.field(fieldName).schema();Object convertedField =convertField(fieldConverters[i], fieldValue, fieldSchema);row.setField(i, convertedField);}}return row;}};}private static Object convertField(DeserializationRuntimeConverter fieldConverter, Object fieldValue, Schema fieldSchema)throws Exception {if (fieldValue == null) {return null;} else {return fieldConverter.convert(fieldValue, fieldSchema);}}
}

效果

在这里插入图片描述

在这里插入图片描述

最后

代码和测试可能不充分,仅供参考,欢迎提出意见。

欢迎访问博客 https://sophiadata.github.io/Bigdata_Blog_Website/learning/overview

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
客厅放八骏马摆件可以吗(家里摆... 今天给各位分享客厅放八骏马摆件可以吗的知识,其中也会对家里摆八骏马摆件好吗进行解释,如果能碰巧解决你...
苏州离哪个飞机场近(苏州离哪个... 本篇文章极速百科小编给大家谈谈苏州离哪个飞机场近,以及苏州离哪个飞机场近点对应的知识点,希望对各位有...