Flink-水位线的设置以及传递
创始人
2024-02-01 19:12:52
0

6.2 水位线

6.2.1 概述

  1. 分类
  • 有序流

image.png

  • 无序流
    image.png
    判断的时间延迟
  1. 延迟时间判定

6.2.2 水位线的设置

  1. 分析

image.png
DataStream下的assignTimstampsAndWatermarks方法,返回SingleOutputStreamOperator本质还是个算子,传入的参数是WatermarkStrategy的生成策略

image.png
但是WatermarkStrategy是一个接口

  • 有序流

image.png

因此调用静态方法forMonotonousTimeStamps后new AscendingTimestampsWatermarks返回WatermarkGernerator
image.png

AscendingTimestampsWatermarks这个继承自BoundOutOfOrdernessWatermarks

image.png
image.png
image.png

BoundOutOfOrdernessWatermarks这个类有onEvent和onPeriodicEmit这两方法,因为实现了WatermarkGenerator这个接口

image.png

然后在调用接口中的默认方法withTimestampAssigner得到返回WatermarkStrategy,参数是new SerializableTimestampAssigner的对象,重写extractTimestamp方法,这个方法作用是怎么样从数据里面提取时间戳

image.png

  • 乱序流

image.png
因此调用静态方法forBoundedOutOfOrderness(参数为最大乱序程度,也就是延迟时间)后new BoundOutOfOrdernessWatermarks返回WatermarkGernerator

image.png

BoundOutOfOrdernessWatermarks这个类有onEvent和onPeriodicEmit这两方法,因为实现了WatermarkGenerator这个接口(跟上面一样了)

image.png

后面也跟有序一样,然后在调用接口中的默认方法withTimestampAssigner得到返回WatermarkStrategy

  1. 完整代码
public class WatermarkTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//1.输入SingleOutputStreamOperator stream = env.fromElements(new Event("Mary", "./home", 1000L),new Event("Bob", "./cart", 2000L),new Event("Alice", "./prod?id=100", 3000L),new Event("Bob", "./prod?id=1", 3300L),new Event("Alice", "./prod?id=200", 3000L),new Event("Bob", "./home", 3500L),new Event("Bob", "./prod?id=2", 3800L),new Event("Bob", "./prod?id=3", 4200L))//                //有序流的watermark生成
//                //forMonotonousTimestamps前指定泛型
//                .assignTimestampsAndWatermarks(WatermarkStrategy
//                        .forMonotonousTimestamps()//得到WatermarkGenerator
//                        .withTimestampAssigner(new SerializableTimestampAssigner() {//返回WatermarkStrategy
//                            @Override
//                            //参数是当前传过来的数据element,另一个传出的recordTimestamp是时间戳
//                            public long extractTimestamp(Event element, long recordTimestamp) {
//                                return element.timestamp;
//                            }
//                        })
//                ).assignTimestampsAndWatermarks(WatermarkStrategy//forMonotonousTimestamps前指定泛型//forMonotonousTimestamps参数是最大乱序时间.forBoundedOutOfOrderness(Duration.ofSeconds(2))//得到WatermarkGenerator.withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));env.execute();}
}

6.2.3 自定义水位线

  1. 分析

image.png

或者直接new 一个接口WatermarkStrategy重写createWatermarkGenerator的watermark生成器的方法(生成WatermarkGenerator)以及createTimeStampAssigner提取时间戳分配器的方法(生成TimeStampAssigner)创建watermark

image.png

image.png

image.png

image.png

WatermarkGenerator是个接口,有两个方法分别是onEvent方法,主要目标是要发出一个WatermarkOutput,另一个是onperiodicEmit方法,表示周期性的生成,周期性生成时间默认是2秒,env调用getConfig后调用setAutoWatermarkInterval后可以更改周期性生成时间

image.png
image.png

WatermarkOutput也是一个接口,调用emitWatermark就能发出一个watermark,

image.png

image.png

除了WatermarkGenerator接口还有TimeStampAssigner也是个接口,里面只有一个方法叫做extractTimestamp,目的是从当前数据提取时间戳,同时也会作为WatermarkGenerator这个接口中onEvent方法中传入的参数eventTimestamp时间戳(见上上上上上上图)

  1. 代码
  • 正常水位线
// 自定义水位线的产生
public class CustomWatermarkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.addSource(new ClickSource()).assignTimestampsAndWatermarks(new CustomWatermarkStrategy()).print();env.execute();}//内部静态类public static class CustomWatermarkStrategy implements WatermarkStrategy {@Override//createTimestampAssigner方法生成TimeStampAssignerpublic TimestampAssigner createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner() {@Override//extractTimestamp,目的是从当前数据提取时间戳public long extractTimestamp(Event element, long recordTimestamp){return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段}};}@Override//createWatermarkGenerator生成WatermarkGeneratorpublic WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new CustomPeriodicGenerator();}}//CustomPeriodicGenerator实现WatermarkGenerator接口,并重写方法public static class CustomPeriodicGenerator implements WatermarkGenerator {private Long delayTime = 5000L; // 延迟时间private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 观察到的最大时间戳@Override//更新当前时间戳,这边不发送水位线,目的是保存时间戳public void onEvent(Event event, long eventTimestamp, WatermarkOutputoutput) {// 每来一条数据就调用一次maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认 200ms 调用一次//-1毫秒都是为了贴切窗口闭合的时候左闭右开设计output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}}
}
  • 断点水位线

在onevent根据条件触发,onPeriodicEmit这个方法中就不用做了

    public class CustomPunctuatedGenerator implements WatermarkGenerator {@Overridepublic void onEvent(Event r, long eventTimestamp, WatermarkOutput output) {// 只有在遇到特定的 itemId 时,才发出水位线if (r.user.equals("Mary")) {output.emitWatermark(new Watermark(r.timestamp - 1));}}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 不需要做任何事情,因为我们在 onEvent 方法中发射了水位线}}
  • 在自定义数据源中发送水位线

使用 collectWithTimestamp 方法将数据发送出去,原来直接out.collect()的

image.png

参数是当前数据还有当前数据的时间戳,跟水位线生成中extractTimestamp(Event element, long recordTimestamp)这个类似,也是一个数据是什么,一个时间戳是啥

然后发送水位线,用emitWatermark方法生成

public class CustomWatermarkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.addSource(new ClickSourceWithWatermark()).print();env.execute();}// 泛型是数据源中的类型public static class ClickSourceWithWatermark implements SourceFunction{private boolean running = true;@Overridepublic void run(SourceFunction.SourceContext sourceContext) throws Exception {Random random = new Random();String[] userArr = {"Mary", "Bob", "Alice"};String[] urlArr = {"./home", "./cart", "./prod?id=1"};while (running) {long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳String username = userArr[random.nextInt(userArr.length)];String url = urlArr[random.nextInt(urlArr.length)];Event event = new Event(username, url, currTs);// 使用 collectWithTimestamp 方法将数据发送出去,并指明数据中的时间戳的字段sourceContext.collectWithTimestamp(event, event.timestamp);// 发送水位线sourceContext.emitWatermark(new Watermark(event.timestamp - 1L));Thread.sleep(1000L);}}@Overridepublic void cancel() {running = false;}}
}

6.2.4 水位线的传递

针对多个分区,上游需要告诉下游水位线情况,采用的是广播的方式给所有下游子任务

但是上游如果也是并行的,向下传输的水位线可能有多个,以上游发过来最小的时钟为准,并且下游会有一个分区专门保存上游发过来的水位线最小的数据

image.png

image.png

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...