flink 自定义实时数据源使用流处理比较简单,比如 Kafka、MQ 等,如果使用 MySQL、redis 批处理也比较简单
如果需要定时加载数据作为 flink 数据源使用流处理,比如定时从 mysql 或者 redis 获取一批数据,传入 flink 做处理,如下简单实现
注意 flink 好多包从 1.15.0 开始不需要指定 Scala 版本,内部自带
下面 pom 文件有 flink 两个版本 1.16.0 和 1.12.7(Scala:2.12)
4.0.0 com.ye flink-study 0.1 jar Flink Quickstart Job UTF-8 1.16.0 1.8 2.12 ${target.java.version} ${target.java.version} 2.17.1 apache.snapshots Apache Development Snapshot Repository https://repository.apache.org/content/repositories/snapshots/ false true org.apache.flink flink-streaming-java ${flink.version} provided org.apache.flink flink-clients ${flink.version} provided
org.apache.flink flink-connector-files ${flink.version} org.apache.logging.log4j log4j-slf4j-impl ${log4j.version} runtime org.apache.logging.log4j log4j-api ${log4j.version} runtime org.apache.logging.log4j log4j-core ${log4j.version} runtime org.apache.maven.plugins maven-compiler-plugin 3.1 ${target.java.version} ${target.java.version} org.apache.maven.plugins maven-shade-plugin 3.1.1 package shade false org.apache.flink:flink-shaded-force-shading com.google.code.findbugs:jsr305 org.slf4j:* org.apache.logging.log4j:* *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA com.ye.DataStreamJob org.eclipse.m2e lifecycle-mapping 1.0.0 org.apache.maven.plugins maven-shade-plugin [3.1.1,) shade org.apache.maven.plugins maven-compiler-plugin [3.1,) testCompile compile
使用 Timer 定时任务(当然也可以使用线程池 Executors)自定义数据源,每过五秒随机生成一串字符串
public class TimerSinkRich extends RichSourceFunction {private ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>();private boolean flag = true;private Timer timer;private TimerTask timerTask;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);timerTask = new TimerTask() {@Overridepublic void run() {// 可以在这块获取 MySQL、redis 等连接并查询数据Random random = new Random();StringBuilder str = new StringBuilder();for (int i = 0; i < 10; i++) {char ranLowLetter = (char) ((random.nextInt(26) + 97));str.append(ranLowLetter);}queue.add(str.toString());}};timer = new Timer();// 延时和执行周期参数可以通过构造方法传递timer.schedule(timerTask,1000,5000);}@Overridepublic void run(SourceContext ctx) throws Exception {while (flag){if(queue.size()>0){ctx.collect(queue.remove());}}}@Overridepublic void cancel() {if(null!=timer) timer.cancel();if(null!=timerTask) timerTask.cancel();// 撤销任务时,flink 默认 30 s(不同 flink 版本可能不同)尝试关闭数据源,关闭失败 TaskManager 不能释放 slot,最终导致失败if(queue.size()<=0) flag = false;}
}
public class TimerSinkStreamJob {public static void main(String[] args) throws Exception {StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();executionEnvironment.setParallelism(1);DataStreamSource streamSource = executionEnvironment.addSource(new TimerSinkRich());streamSource.print();executionEnvironment.execute("TimerSinkStreamJob 定时任务打印数据");}
}
本地测试成功
启动成功
撤销任务成功
solt 也成功释放
启动成功
撤销任务当然也没问题,同样能正常释放 slot
当然你也可以不要 open() 方法
public class DiySinkRich extends RichSourceFunction {private TimerTask timerTask;private Timer timer;private boolean flag = true;private ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>();@Overridepublic void run(SourceFunction.SourceContext ctx) throws Exception {timerTask = new TimerTask() {@Overridepublic void run() {Random random = new Random();StringBuilder str = new StringBuilder();for (int i = 0; i < 10; i++) {char ranLowLetter = (char) ((random.nextInt(26) + 97));str.append(ranLowLetter);}queue.add(str.toString());}};timer = new Timer();timer.schedule(timerTask, 1000, 5000);while (flag) {if (queue.size() > 0) {ctx.collect(queue.remove());}}}@Overridepublic void cancel() {if (timer != null) timer.cancel();if (timerTask != null) timerTask.cancel();if (queue.size() == 0) flag = false;}
}
以上就是 flink 定时加载数据源的简单实例