Flink水位线-详细说明
创始人
2024-02-01 20:49:07
0

文章目录

  • 时间语义
    • Flink 中的时间语义?
    • 哪种时间语义更重要?
  • 1. 水位线(Watermark)
    • 1.1 什么是水位线?
    • 1.2 如何生成水位线?
    • 1.3 水位线的传递
    • 1.4 水位线的计算

💎💎💎💎💎

githubgithubgithubgithubgithubgithubgithubgithubgithubgithubgithubgithubgithub

更多资源链接,欢迎访问作者gitee仓库:https://gitee.com/fanggaolei/learning-notes-warehouse/tree/master

时间语义

在理解水位线概念之前我们应该先了解时间语义的内容

Flink 中的时间语义?

image-20221117163355711

1.处理时间(Processing Time)

处理时间的概念非常简单,就是指执行处理操作的机器的系统时间。

2.事件时间(Event Time)

事件时间,是指每个事件在对应的设备上发生的时间,也就是数据生成的时间。

哪种时间语义更重要?

  实际应用中,数据产生的时间处理的时间可能是完全不同的。很长时间收集起来的数据,处理或许只要一瞬间;也有可能数据量过大、处理能力不足,短时间堆了大量数据处理不完,产生“背压”(back pressure)。

   通常来说,处理时间是我们计算效率的衡量标准,而事件时间会更符合我们的业务计算逻辑。所以更多时候我们使用事件时间;不过处理时间也不是一无是处。对于处理时间而言,由于没有任何附加考虑,数据一来就直接处理,因此这种方式可以让我们的流处理延迟降到最低,效率达到最高。

1. 水位线(Watermark)

image-20221117165859376

1.1 什么是水位线?

   在Flink中,水位线是一种衡量Event Time进展的机制,用来处理实时数据中的乱序问题的,通常是水位线和窗口结合使用来实现。 从设备生成实时流事件,到Flink的source,再到多个oparator处理数据,过程中会受到网络延迟、背压等多种因素影响造成数据乱序。

​    具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。 而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

image-20221119172614899

1.有序流中的水位线

image-20221118170851128

2.乱序流中的水位线

  这里所说的“乱序”(out-of-order),是指数据的先后顺序不一致,主要就是基于数据的产生时间而言的。

image-20221118171008781

  我们插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线

image-20221118171221753

  如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时只需要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位线

image-20221118171234685

  为了让窗口能够正确收集到迟到的数据,我们也可以等上 2 秒;也就是用当前已有数据的最大时间戳减去 2 秒,就是要插入的水位线的时间戳

image-20221118171246223

3.水位线的特性

⚫ 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据

⚫ 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展

⚫ 水位线是基于数据的时间戳生成的

⚫ 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进

⚫ 水位线可以通过设置延迟,来保证正确处理乱序数据

⚫ 一个水位线 Watermark(t),表示在当前流中事件时间已经达到了时间戳 t, 这代表 t 之前的所有数据都到齐了,之后流中不会出现时间戳 t’ ≤ t 的数据

1.2 如何生成水位线?

1.生成水位线的总体原则

  我们知道,完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。而完美的东西总是可望不可即,==我们只能尽量去保证水位线的正确。如果对结果正确性要求很高、想要让窗口收集到所有数据,==我们该怎么做呢?

  一个字,等。由于网络传输的延迟不确定,为了获取所有迟到数据,我们只能等待更长的时间。作为筹划全局的程序员,我们当然不会傻傻地一直等下去。那到底等多久呢?这就需要对相关领域有一定的了解了。比如,如果我们知道当前业务中事件的迟到时间不会超过 5 秒,那就可以将水位线的时间戳设为当前已有数据的最大时间戳减去 5 秒,相当于设置了 5 秒的延迟等待。

2.水位线生成策略(Watermark Strategies)

import com.fang.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;public class WatermarkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.getConfig().setAutoWatermarkInterval(100);//从元素中读取数据SingleOutputStreamOperator stream = env.fromElements(new Event("Marry", "./home", 1000L),new Event("Bob", "./home", 1100L),new Event("Marry", "./home", 1000L),new Event("Bob", "./prod?id=1", 1000L),new Event("Bob", "./home", 3500L),new Event("Bob", "./prod?id=2", 3200L)//有序流的watermark生成
//        ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
//        .withTimestampAssigner(new SerializableTimestampAssigner() {
//            @Override
//            public long extractTimestamp(Event element, long recordTimestamp) {
//                return element.timestamp;
//            }
//        })  //提取时间戳,生成水位线).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));env.execute();}
}

它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间

.assignTimestampsAndWatermarks()

  .assignTimestampsAndWatermarks()方法需要传入一个 WatermarkStrategy 作为参数,这就
是 所 谓 的 “ 水 位 线 生 成 策 略 ”

  WatermarkStrategy 中 包 含 了 一 个 “ 时 间 戳 分 配器”TimestampAssigner 和一个“水位线生成器”WatermarkGenerator

3.Flink 内置水位线生成器

(1)有序流

stream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner() 
{@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}})
);

(2)乱序流

  由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)

.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));

1.3 水位线的传递

  水位线定义的本质了:它表示的是“当前时间之前的数据,都已经到齐了”。这是一种保证,告诉下游任务“只要你接到这个水位线,就代表之后我不会再给你发更早的数据了,你可以放心做统计计算而不会遗漏数据”。所以如果一个任务收到了来自上游并行任务的不同的水位线,说明上游各个分区处理得有快有慢,进度各不相同比如上游有两个并行子任务都发来了水位线,一个是 5 秒,一个是 7 秒;这代表第一个并行任务已经处理完 5 秒之前的所有数据,而第二个并行任务处理到了 7 秒。那这时自己的时钟怎么确定呢?

  当然也要以“这之前的数据全部到齐”为标准。如果我们以较大的水位线 7 秒作为当前时间,那就表示“7 秒前的数据都已经处理完”,这显然不是事实——第一个上游分区才处理到 5 秒,5~7 秒的数据还会不停地发来;而如果以最小的水位线 5 秒作为当前时钟就不会有这个问题了,因为确实所有上游分区都已经处理完,不会再发 5 秒前的数据了。这让我们想到“木桶原理”:所有的上游并行任务就像围成木桶的一块块木板,它们中最短的那一块,决定了我们桶中的水位。

image-20221118174258154

1.4 水位线的计算

  水位线的默认计算公式:水位线 = 观察到的最大事件时间 – 最大延迟时间 – 1 毫秒

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...