dolphinscheduler-数据质量-源码分析
创始人
2024-02-13 01:52:29
0

数据质量工作流程

数据质量运行流程分为2个部分:在web端进行数据质量检测的流程定义,通过dolphinscheduer进行调度,提交到spark计算引擎;spark端负责解析数据质量模型的参数,通过读取数据、执行转换、输出三个步骤,完成数据质量检测任务,工作流程如下图所示。
在这里插入图片描述

在web端进行定义

数据质量定义如下图所示,这里只定义了一个节点。
在这里插入图片描述以一个空值检测的输入参数为例,这个json文件会以字符串形式提交给spark端

{"name": "$t(null_check)","env": {"type": "batch","config": null},"readers": [{"type": "JDBC","config": {"database": "ops","password": "***","driver": "com.mysql.cj.jdbc.Driver","user": "root","output_table": "ops_ms_alarm","table": "ms_alarm","url": "jdbc:mysql://192.168.3.211:3306/ops?allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false"}}],"transformers": [{"type": "sql","config": {"index": 1,"output_table": "total_count","sql": "SELECT COUNT(*) AS total FROM ops_ms_alarm"}},{"type": "sql","config": {"index": 2,"output_table": "null_items","sql": "SELECT * FROM ops_ms_alarm WHERE (alarm_time is null or alarm_time = '') "}},{"type": "sql","config": {"index": 3,"output_table": "null_count","sql": "SELECT COUNT(*) AS nulls FROM null_items"}}],"writers": [{"type": "JDBC","config": {"database": "dolphinscheduler3","password": "***","driver": "com.mysql.cj.jdbc.Driver","user": "root","table": "t_ds_dq_execute_result","url": "jdbc:mysql://192.168.3.212:3306/dolphinscheduler3?characterEncoding=utf-8&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false","sql": "select 0 as rule_type,'$t(null_check)' as rule_name,0 as process_definition_id,25 as process_instance_id,26 as task_instance_id,null_count.nulls AS statistics_value,total_count.total AS comparison_value,7 AS comparison_type,3 as check_type,0.95 as threshold,3 as operator,1 as failure_strategy,'hdfs://xmaster:9000/user/hadoop/data_quality_error_data/0_25_211-ops-ms_alarm-空值检测' as error_output_path,'2022-11-16 03:40:32' as create_time,'2022-11-16 03:40:32' as update_time from null_count full join total_count"}},{"type": "JDBC","config": {"database": "dolphinscheduler3","password": "***","driver": "com.mysql.cj.jdbc.Driver","user": "root","table": "t_ds_dq_task_statistics_value","url": "jdbc:mysql://192.168.3.212:3306/dolphinscheduler3?characterEncoding=utf-8&allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false","sql": "select 0 as process_definition_id,26 as task_instance_id,1 as rule_id,'ZKTZKDBTRFDKXKQUDNZJVKNX8OIAEVLQ91VT2EXZD3U=' as unique_code,'null_count.nulls'AS statistics_name,null_count.nulls AS statistics_value,'2022-11-16 03:40:32' as data_time,'2022-11-16 03:40:32' as create_time,'2022-11-16 03:40:32' as update_time from null_count"}},{"type": "hdfs_file","config": {"path": "hdfs://xmaster:9000/user/hadoop/data_quality_error_data/0_25_211-ops-ms_alarm-空值检测","input_table": "null_items"}}]
}

spark端源码分析

DataQualityApplication程序入口

DataQualityApplication#main

public static void main(String[] args) throws Exception {//...
//从命令行获取参数String dataQualityParameter = args[0];
//   将json参数转为DataQualityConfiguration对象DataQualityConfiguration dataQualityConfiguration = JsonUtils.fromJson(dataQualityParameter,DataQualityConfiguration.class);//...
//构建 SparkRuntimeEnvironment的参数Config对象EnvConfig envConfig = dataQualityConfiguration.getEnvConfig();Config config = new Config(envConfig.getConfig());config.put("type",envConfig.getType());if (Strings.isNullOrEmpty(config.getString(SPARK_APP_NAME))) {config.put(SPARK_APP_NAME,dataQualityConfiguration.getName());}SparkRuntimeEnvironment sparkRuntimeEnvironment = new SparkRuntimeEnvironment(config);
//委托给 DataQualityContext执行DataQualityContext dataQualityContext = new DataQualityContext(sparkRuntimeEnvironment,dataQualityConfiguration);dataQualityContext.execute();
}

数据质量配置类

public class DataQualityConfiguration implements IConfig {@JsonProperty("name")private String name; // 名称@JsonProperty("env")private EnvConfig envConfig; // 环境配置@JsonProperty("readers")private List readerConfigs; // reader配置@JsonProperty("transformers")private List transformerConfigs;  // transformer配置@JsonProperty("writers")private List writerConfigs; // writer配置
//...
}

DataQualityContext#execute

从dataQualityConfiguration类中获取 readers、transformers、writers, 委托给SparkBatchExecution

public void execute() throws DataQualityException {
// 将List转为ListList readers = ReaderFactory.getInstance().getReaders(this.sparkRuntimeEnvironment,dataQualityConfiguration.getReaderConfigs());
// 将List转为ListList transformers = TransformerFactory.getInstance().getTransformer(this.sparkRuntimeEnvironment,dataQualityConfiguration.getTransformerConfigs());
// 将List转为ListList writers = WriterFactory.getInstance().getWriters(this.sparkRuntimeEnvironment,dataQualityConfiguration.getWriterConfigs());
// spark 运行环境if (sparkRuntimeEnvironment.isBatch()) {
// 批模式sparkRuntimeEnvironment.getBatchExecution().execute(readers,transformers,writers);} else {
// 流模式, 暂不支持throw new DataQualityException("stream mode is not supported now");}
}

ReaderFactory 类采用了单例和工厂方法的设计模式, 目前支持JDBC和HIVE 的数据源的读取, 对应Reader类HiveReader、JdbcReader
WriterFactory 类采用了单例和工厂方法的设计模式, 目前支持JDBC、HDFS、LOCAL_FILE 的数据源的输出, 对应Writer类 HdfsFileWriter LocalFileWriter JdbcWriter
TransformerFactory 类采用了单例和工厂方法的设计模式,目前仅支持TransformerType.SQL的转换器类型

结合json 可以看出 一个空值检测的reader、tranformer、 writer情况
1个reader : 读取源表数据

3个tranformer: total_count 行总数 、null_items 空值项(行数据) 、null_count (空值数),计算sql 如下
– SELECT COUNT() AS total FROM ops_ms_alarm
– SELECT * FROM ops_ms_alarm WHERE (alarm_time is null or alarm_time = ‘’)
– SELECT COUNT(
) AS nulls FROM null_items

3个writer:
第一个是jdbc writer, 将比较值、统计值 输出t_ds_dq_execute_result 数据质量执行结果表,

SELECT//...null_count.nulls AS statistics_value,total_count.total AS comparison_value,//...'hdfs://xmaster:9000/user/hadoop/data_quality_error_data/0_25_211-ops-ms_alarm-空值检测' AS error_output_path,//...
FROMnull_countFULL JOIN total_count

第二个是jdbc writer,将statistics_value写入到表 t_ds_dq_task_statistics_value

SELECT//...//...'null_count.nulls' AS statistics_name,null_count.nulls AS statistics_value,//...
FROMnull_count

第3个是hdfs writer,将空值项写入到hdfs 文件目录

{"type": "hdfs_file","config": {"path": "hdfs://xmaster:9000/user/hadoop/data_quality_error_data/0_25_211-ops-ms_alarm-空值检测","input_table": "null_items"}
}

目前 DolphinScheduler占不支持实时数据的质量检测。

SparkBatchExecution#execute

public class SparkBatchExecution implements Execution {private final SparkRuntimeEnvironment environment;public SparkBatchExecution(SparkRuntimeEnvironment environment) throws ConfigRuntimeException {this.environment = environment;}@Overridepublic void execute(List readers, List transformers, List writers) {
// 为每一个reader注册输入临时表readers.forEach(reader -> registerInputTempView(reader, environment));if (!readers.isEmpty()) {
// 取readers列表的第一个reader读取数据集合, reader的实现类有HiveReader、JdbcReaderDataset ds = readers.get(0).read(environment);for (BatchTransformer tf:transformers) {
// 执行转换ds = executeTransformer(environment, tf, ds);
// 将转换后结果写到临时表registerTransformTempView(tf, ds);}for (BatchWriter sink: writers) {
// 执行将转换结果由writer输出, writer的实现类有JdbcWriter、LocalFileWriter、HdfsFileWriterexecuteWriter(environment, sink, ds);}}
// 结束environment.sparkSession().stop();}
}

SparkBatchExecution#registerInputTempView

//注册输入临时表, 临时表表名为OUTPUT_TABLE的名字private void registerInputTempView(BatchReader reader, SparkRuntimeEnvironment environment) {Config conf = reader.getConfig();if (Boolean.TRUE.equals(conf.has(OUTPUT_TABLE))) {// ops_ms_alarmString tableName = conf.getString(OUTPUT_TABLE);        registerTempView(tableName, reader.read(environment));} else {throw new ConfigRuntimeException("[" + reader.getClass().getName() + "] must be registered as dataset, please set \"output_table\" config");}}

调用 Dataset.createOrReplaceTempView方法

private void registerTempView(String tableName, Dataset ds) {if (ds != null) {ds.createOrReplaceTempView(tableName);} else {throw new ConfigRuntimeException("dataset is null, can not createOrReplaceTempView");}
}

执行转换executeTransformer

private Dataset executeTransformer(SparkRuntimeEnvironment environment, BatchTransformer transformer, Dataset dataset) {Config config = transformer.getConfig();Dataset inputDataset;Dataset outputDataset = null;if (Boolean.TRUE.equals(config.has(INPUT_TABLE))) {
// 从INPUT_TABLE获取表名String[] tableNames = config.getString(INPUT_TABLE).split(",");// outputDataset合并了inputDataset数据集合for (String sourceTableName: tableNames) {inputDataset = environment.sparkSession().read().table(sourceTableName);if (outputDataset == null) {outputDataset = inputDataset;} else {outputDataset = outputDataset.union(inputDataset);}}} else {
//  配置文件无INPUT_TABLEoutputDataset = dataset;}
// 如果配置文件中配置了TMP_TABLE, 将outputDataset 注册到TempViewif (Boolean.TRUE.equals(config.has(TMP_TABLE))) {if (outputDataset == null) {outputDataset = dataset;}String tableName = config.getString(TMP_TABLE);registerTempView(tableName, outputDataset);}
//  转换器进行转换return transformer.transform(outputDataset, environment);
}

SqlTransformer#transform 最终是使用spark-sql进行处理, 所以核心还是这个sql语句,sql 需要在web端生成好,参加前面的json文件。

public class SqlTransformer implements BatchTransformer {private final Config config;public SqlTransformer(Config config) {this.config = config;}
//...@Overridepublic Dataset transform(Dataset data, SparkRuntimeEnvironment env) {return env.sparkSession().sql(config.getString(SQL));}
}

将数据输出到指定的位置executeWriter

private void executeWriter(SparkRuntimeEnvironment environment, BatchWriter writer, Dataset ds) {Config config = writer.getConfig();Dataset inputDataSet = ds;if (Boolean.TRUE.equals(config.has(INPUT_TABLE))) {String sourceTableName = config.getString(INPUT_TABLE);inputDataSet = environment.sparkSession().read().table(sourceTableName);}writer.write(inputDataSet, environment);
}

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
客厅放八骏马摆件可以吗(家里摆... 今天给各位分享客厅放八骏马摆件可以吗的知识,其中也会对家里摆八骏马摆件好吗进行解释,如果能碰巧解决你...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...