Flink-窗口概念以及窗口API使用
创始人
2024-02-18 20:33:00
0

6.3 窗口

6.3.1 窗口的概念

  1. 存储桶

在这里插入图片描述

水位线只是用来推动窗口的关闭,但不决定数据分到哪个窗口

6.3.2 窗口的分类

  1. 按照驱动类型分类
  • 时间窗口
  • 计数窗口

在这里插入图片描述

  1. 按照窗口分配数据的规则分类
  • 滚动窗口:参数为窗口的大小

在这里插入图片描述

  • 滑动窗口:参数为窗口大小,以及滑动步长

数据会重叠

运用场景,每个5分钟统计过去一小时的所有的活跃用户

在这里插入图片描述

  • 会话窗口:参数是会话的超时时间
  • 全局窗口

6.3.3 窗口API的概览

  1. 按键分区窗口

经过按键分区后(keyby),数据流会按照key被分为多条逻辑流,就是keyStream

stream.keyBy(...).window(...)
  1. 非按键分区

stream直接开窗,所有数据收集到窗口中,也就是并行度变成1,官方不推荐

stream.windowAll(...)
  1. ​ 窗口API的调用
stream.keyBy().window()//窗口分配器:要分配什么窗口.aggregate()//窗口函数:具体计算操作

6.3.4 窗口分配器

在这里插入图片描述


在这里插入图片描述

直接调用window()方法传入WindowAssigner,返回WindowedStream

在这里插入图片描述

在这里插入图片描述

WindowAssigner是一个抽象类,并且有assignWindows()方法,但是flink有抽象类的实现类,直接用实现类就好,不用自己整一个

  1. 滚动窗口

在这里插入图片描述

主要是根据窗口分类而设置的实现类,细分下面还有时间语义

例如图中的TumblingEventTimeWindows(事件时间)或者TumblingProcessingTimeWindows(处理时间)

在这里插入图片描述

在这里插入图片描述

TumblingEventTimeWindows的静态方法of需要传入一个Time大小,或者Time大小以及Time的偏移量,偏移量一般用在时差里面(伦敦时间和东八区时间)

在这里插入图片描述

Time这个类是flink下的,选包别选错,Time下就有很多方法了,例如.hour取小时

在这里插入图片描述

所以最后滚动事件时间窗口这么写

  1. 滑动窗口
    在这里插入图片描述

其他的窗口还有SlidingEventTimeWindows滑动窗口,这个就有两个参数了,一个窗口大小,一个滑动步长,当然也有三个参数,跟滑动窗口一样可以多一个offset偏移量,也是用在计算时差(伦敦时间和东八区时间)

  1. 会话窗口

在这里插入图片描述

EventTimeSessionWindows是事件事件会话窗口,参数就是会话的时间了

  1. 计数窗口

在这里插入图片描述


在这里插入图片描述


在这里插入图片描述

countWindow是计数窗口,传一个参数表示滚动窗口,传两个参数表示滑动窗口,return的是全局窗口的,以及移除器和触发器

6.3.5 窗口函数

  1. 整体介绍

在这里插入图片描述

在这里插入图片描述

  • 一般数据源获取到后做一些map等基本操作,返回的还是DataStream
  • 如果keyby了,就变成了keyedStream,再做聚合操作,返回的DataStream
  • 或者keyby后开窗,经过窗口分配器后,得到WindowedStream,再进行窗口函数,得到返回DataStream
  1. 增量聚合函数

流处理思路做批处理,依旧是来一个处理一个,等到时间点,输出计算好的数据

  • 归约函数ReduceFunction
    在这里插入图片描述

在这里插入图片描述

参数要传入ReduceFunction,和转换算子那一章keyby后用到的聚合函数一样的,同个类,即把集合每一个数据拿出来,然后按照一定的规则不停的规约,最终得到一个唯一规约聚合后的结果

    public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//1.自定义Source输入SingleOutputStreamOperator stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy//乱序事件为0,相当于升序.forBoundedOutOfOrderness(Duration.ZERO)//得到WatermarkGenerator.withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));stream.map(new MapFunction>() {@Overridepublic Tuple2 map(Event value) throws Exception {return Tuple2.of(value.user,1L);}}).keyBy(data->data.f0)//.countWindow(10,2)//滑动计数窗口//.window(EventTimeSessionWindows.withGap(Time.seconds(2)))//事件事件会话窗口//.window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(5)))//滑动事件时间窗口.window(TumblingEventTimeWindows.of(Time.seconds(10)))//滚动事件时间窗口.reduce(new ReduceFunction>() {@Overridepublic Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {return Tuple2.of(value1.f0,value1.f1+value2.f1);}}).print();env.execute();}
}

结果

(Mary,3)
(Bob,3)
(Alice,4)(Bob,2)
(Alice,4)
(Mary,4)(Alice,3)
(Bob,5)
(Mary,2)(Alice,3)
(Bob,5)
(Mary,2)(Mary,4)
(Alice,3)
(Bob,3)Process finished with exit code 130
数据源一直产生,会一直不停,每隔十秒会输出一段
  • 聚合函数(AggregateFunction)

1)相比于归约函数reduce的特点是,有三个泛型,并且可以更改输出的类型

2)有三个类型,输入类型(In),累加器类型(ACC),输出类型(OUT)

3)重写4个方法createAccumulator() ,add(Event value, Tuple2 accumulator),getResult(Tuple2 accumulator),merge(Tuple2 a, Tuple2 b)

案例1

public class WindowAggregateTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//1.自定义Source输入SingleOutputStreamOperator stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy//乱序事件为0,相当于升序.forBoundedOutOfOrderness(Duration.ZERO)//得到WatermarkGenerator.withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));stream.keyBy(data->data.user).window(TumblingEventTimeWindows.of(Time.seconds(10))).aggregate(new AggregateFunction, String>() {//中间类型:一个是所有类型的和,一个是所有数的个数@Overridepublic Tuple2 createAccumulator() {//创建一个累加器return Tuple2.of(0L,0);//写初始值}@Override//叠加器,参数是传过来的数值,还有当前的状态,返回的也是当前的状态的类型,这个过程主要涉及状态的改变public Tuple2 add(Event value, Tuple2 accumulator) {return Tuple2.of(accumulator.f0+value.timestamp,accumulator.f1+1);//前面是叠加规则,后面是个数}@Override//输出结果,返回类型变化,变成Stringpublic String getResult(Tuple2 accumulator) {//汇总数/个数得到平均数Timestamp timestamp = new Timestamp(accumulator.f0 / accumulator.f1);return timestamp.toString();//转成String输出}@Override//merge合并累加器,一般用于会话窗口//这边可以实现,也可以不实现,下面是实现的写法public Tuple2 merge(Tuple2 a, Tuple2 b) {return Tuple2.of(a.f0+b.f0,a.f1+b.f1);//(和,个数)}}).print();env.execute();}
}

结果

2022-11-22 22:44:19.8032022-11-22 22:44:25.389
2022-11-22 22:44:24.522
2022-11-22 22:44:27.9212022-11-22 22:44:34.588
2022-11-22 22:44:35.089
2022-11-22 22:44:35.2932022-11-22 22:44:44.558
2022-11-22 22:44:44.349
2022-11-22 22:44:45.701
每隔十秒输出某一用户的访问时间戳平均值,意义不大,主要看的是aggregate一步可以实现求平均值用法

案例2

public class WindowAggregateTest {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//1.自定义Source输入SingleOutputStreamOperator stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy//乱序事件为0,相当于升序.forBoundedOutOfOrderness(Duration.ZERO)//得到WatermarkGenerator.withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));stream.keyBy(data->data.user).window(TumblingEventTimeWindows.of(Time.seconds(10))).aggregate(new AggregateFunction, String>() {//中间类型:一个是所有类型的和,一个是所有数的个数@Overridepublic Tuple2 createAccumulator() {//创建一个累加器return Tuple2.of(0L,0);//写初始值}@Override//叠加器,参数是传过来的数值,还有当前的状态,返回的也是当前的状态的类型,这个过程主要涉及状态的改变public Tuple2 add(Event value, Tuple2 accumulator) {return Tuple2.of(accumulator.f0+value.timestamp,accumulator.f1+1);//前面是叠加规则,后面是个数}@Override//输出结果,返回类型变化,变成Stringpublic String getResult(Tuple2 accumulator) {//汇总数/个数得到平均数Timestamp timestamp = new Timestamp(accumulator.f0 / accumulator.f1);return timestamp.toString();//转成String输出}@Override//merge合并累加器,一般用于会话窗口//这边可以实现,也可以不实现,下面是实现的写法public Tuple2 merge(Tuple2 a, Tuple2 b) {return Tuple2.of(a.f0+b.f0,a.f1+b.f1);//(和,个数)}}).print();env.execute();}
}

结果

在这里插入图片描述

这边url不对,后续已经改正,但是不影响此次结果查看

  1. 全窗口函数
  • 分析

在这里插入图片描述

ProcessWindowFunction是一个抽象类,继承了富函数类AbstractRichFunction,并且拥有4个泛型,并且最后一个泛型W继承了Window,可以选择TimeWindow作为子类传入

在这里插入图片描述

可以继承这个类后实现process()方法,参数分别是KEY key, Context context, Iterable elements, Collector out,第一个参数key的类型,第二个是上下文,第三个是输入(迭代器),第四个输出

在这里插入图片描述

第二个参数上下文中有很多属性,例如窗口,当前处理时间,当前watermark,以及获取当前的状态还有侧输出流

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

TimeWindow这个类中getStart()和getEnd()用,一般通过上下文调用window返回TimeWindow,然后在调用getStart()和getEnd()用方法,例如Long start = context.window()

  • 代码
public class WindowProcessTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//1.自定义Source输入SingleOutputStreamOperator stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy//乱序事件为0,相当于升序.forBoundedOutOfOrderness(Duration.ZERO)//得到WatermarkGenerator.withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));//使用ProcessWindowFunction计算UVstream.keyBy(data->true).window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new UvCountByWindow()).print();env.execute();}//实现自定义的ProcessWindowFunction,输出一条统计信息public static class UvCountByWindow extends ProcessWindowFunction{@Override//参数process(KEY key, Context context上下文, Iterable elements, Collector out)public void process(Boolean aBoolean, Context context, Iterable elements, Collector out) throws Exception {//1.用一个HashSet保存userHashSet userSet = new HashSet<>();//2.从elements遍历数据,放到set中去重for (Event event : elements) {userSet.add(event.user);}Integer uv = userSet.size();//3.结合窗口信息Long start = context.window().getStart();Long end = context.window().getEnd();//4.输出out.collect("窗口"+new Timestamp(start)+"~"+new Timestamp(end)+"  UV值为:"+uv);}}
}
  • 结果
窗口2022-11-23 00:00:20.0~2022-11-23 00:00:30.0  UV值为:2
窗口2022-11-23 00:00:30.0~2022-11-23 00:00:40.0  UV值为:3
窗口2022-11-23 00:00:40.0~2022-11-23 00:00:50.0  UV值为:3
  1. 两种函数结合
  • 概述

增量函数看不到窗口信息,全窗口是将数据攒起来后进行批处理,因此是调用增量函数中add方法,窗口结束将getResult方法结果以参数形式作为element(第三个参数)输出给到全窗口函数的process()方法

在这里插入图片描述

  • 代码
public class UvCountExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//1.自定义Source输入SingleOutputStreamOperator stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy//乱序事件为0,相当于升序.forBoundedOutOfOrderness(Duration.ZERO)//得到WatermarkGenerator.withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));stream.print();//使用AggregateFunction和ProcessWindowFunction结合计算UVstream.keyBy(data -> true).window(TumblingEventTimeWindows.of(Time.seconds(10))).aggregate(new UvAgg(),new UvCountResult()).print();env.execute();}//自定义实现AggregateFunction,目的在于增量计算uv值public static class UvAgg implements AggregateFunction,Long>{@Overridepublic HashSet createAccumulator() {return new HashSet<>();}@Overridepublic HashSet add(Event value, HashSet accumulator) {accumulator.add(value.user);return accumulator;}@Overridepublic Long getResult(HashSet accumulator) {return (long)accumulator.size();}@Overridepublic HashSet merge(HashSet a, HashSet b) {return null;}}//自定义实现ProcessWindowFunction,包装窗口信息输出public static class UvCountResult extends ProcessWindowFunction{@Overridepublic void process(Boolean aBoolean, ProcessWindowFunction.Context context, Iterable elements, Collector out) throws Exception {//3.结合窗口信息Long start = context.window().getStart();Long end = context.window().getEnd();Long uv = elements.iterator().next();//4.输出out.collect("窗口"+new Timestamp(start)+"~"+new Timestamp(end)+"  UV值为:"+uv);}}
}
  • 结果
Event{user='Mary', url='./cart', timestamp=2022-11-23 21:02:49.478}
Event{user='Alice', url='./home', timestamp=2022-11-23 21:02:50.496}
窗口2022-11-23 21:02:40.0~2022-11-23 21:02:50.0  UV值为:1
Event{user='Mary', url='./home', timestamp=2022-11-23 21:02:51.504}
Event{user='Alice', url='./prod?id=100', timestamp=2022-11-23 21:02:52.513}
Event{user='Alice', url='./prod?id=100', timestamp=2022-11-23 21:02:53.588}
Event{user='Alice', url='./cart', timestamp=2022-11-23 21:02:54.602}
Event{user='Bob', url='./prod?id=100', timestamp=2022-11-23 21:02:55.615}
Event{user='Mary', url='./prod?id=100', timestamp=2022-11-23 21:02:56.627}
Event{user='Mary', url='./home', timestamp=2022-11-23 21:02:57.627}
Event{user='Mary', url='./prod?id=100', timestamp=2022-11-23 21:02:58.634}
Event{user='Mary', url='./prod?id=100', timestamp=2022-11-23 21:02:59.644}
Event{user='Alice', url='./home', timestamp=2022-11-23 21:03:00.65}
窗口2022-11-23 21:02:50.0~2022-11-23 21:03:00.0  UV值为:3
  1. 统计热门url案例
  • 代码
public class UrlCountViewExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//1.自定义Source输入SingleOutputStreamOperator stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy//乱序事件为0,相当于升序.forBoundedOutOfOrderness(Duration.ZERO)//得到WatermarkGenerator.withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));stream.print("input");//统计每个url的访问量stream.keyBy(data -> data.url).window(TumblingEventTimeWindows.of(Time.seconds(10))).aggregate(new UrlViewCountAgg(),new UrlViewCountResult()).print();env.execute();}//增量聚合,来一条数据就加1private static class UrlViewCountAgg implements AggregateFunction {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(Event value, Long accumulator) {return accumulator+1;}@Overridepublic Long getResult(Long accumulator) {return accumulator;}@Overridepublic Long merge(Long a, Long b) {return null;}}//包装窗口信息,然后输出UrlViewCount//输入就是增量函数的输出private static class UrlViewCountResult extends ProcessWindowFunction{@Overridepublic void process(String url,Context context, Iterable elements, Collector out) throws Exception {//3.结合窗口信息Long start = context.window().getStart();Long end = context.window().getEnd();Long count = elements.iterator().next();//4.输出out.collect(new UrlViewCount(url,count,start,end));}}
}
  • 结果
input> Event{user='Bob', url='./home', timestamp=2022-11-23 22:02:55.388}
input> Event{user='Mary', url='./prod?id=100', timestamp=2022-11-23 22:02:56.409}
input> Event{user='Mary', url='./prod?id=100', timestamp=2022-11-23 22:02:57.411}
input> Event{user='Alice', url='./prod?id=100', timestamp=2022-11-23 22:02:58.418}
input> Event{user='Alice', url='./fav', timestamp=2022-11-23 22:02:59.419}
input> Event{user='Bob', url='./home', timestamp=2022-11-23 22:03:00.42}
UrlViewCount{url='./home', count=1, windowStart=2022-11-23 22:02:50.0, windowEnd=2022-11-22 22:03:00.0}
UrlViewCount{url='./prod?id=100', count=3, windowStart=2022-11-23 22:02:50.0, windowEnd=2022-11-23 22:03:00.0}最后一条input不算入窗口

6.3.6 其他API

  1. 触发器(Trigger)
  • 用法
stream.keyBy(...).window(...).trigger(new MyTrigger)
  • 分析

在这里插入图片描述

Trigger是一个抽象类

在这里插入图片描述

在这里插入图片描述

CountTrigger是实现类,使用onEventTime()方法返回TriggerResult类,即触发的结果

在这里插入图片描述

TriggerResult是一个枚举类,有CONTINUE(不动),FIRE_AND_PURGE,FIRE(发射到下游),PURGE(把窗口清空关闭)

后面听不懂了,毁灭吧

  1. 移除器(Evictor)
stream.keyBy(...).window(...).evictor(new MyEvictor())
  1. 允许延迟
  • 窗口延迟关闭
stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.hours(1))).allowedLateness(Time.minutes(1))
  • 测输出流

大招:把迟到的数据放到测输出流

stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(outputTag)
  • 代码测试
public class LateDataTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator stream = env.socketTextStream("hadoop2",7777).map(new MapFunction() {@Overridepublic Event map(String value) throws Exception {String[] fileds = value.split(",");return new Event(fileds[0].trim(),fileds[1].trim(),Long.valueOf(fileds[2].trim()));}}).assignTimestampsAndWatermarks(WatermarkStrategy//乱序事件为0,相当于升序.forBoundedOutOfOrderness(Duration.ofSeconds(2))//得到WatermarkGenerator.withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));stream.print("input");//定义一个输出标签//使用匿名类定义测输出流标签定义出来OutputTag late = new OutputTag("late"){};//统计每个url的访问量SingleOutputStreamOperator result = stream.keyBy(data -> data.url).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.minutes(1))//允许一分钟的延迟.sideOutputLateData(late)//测输出流//可以传入两个参数,一个AggregateFunction<,另一个ProcessWindowFunction.aggregate(new UrlCountViewExample.UrlViewCountAgg(),new UrlCountViewExample.UrlViewCountResult());result.print("result");result.getSideOutput(late).print("late");env.execute();}
}

结果

在这里插入图片描述

在这里插入图片描述

当输入Bob,./prod?id=10,12000的时候,0-10的窗口才会关闭(依据水位线10),并计算前面4条的属于0-10窗口结果,

当再次输入Bob,./prod?id=20,8000,Bob,./prod?id=10,9000,由于设置了allowedLatenes窗口延迟一分钟,因此仍然可以叠加计算并输出结果

即使Bob,./prod?id=10,70000后,关闭的是10-20秒的窗口,如果后面有20-30也会关闭,即使水位线在68秒,继续输入Mary,./home,6500,也会继续叠加计算

只有当输入Bob,./prod?id=10,72000的时候,水位线猜到了70,那么窗口时间根据水位线以及延迟的1分钟,即0-10的窗口才是真正关闭掉了

窗口关闭,此时再输入数据Bob,./cart,7000,就会被放到测输出流中

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
苏州离哪个飞机场近(苏州离哪个... 本篇文章极速百科小编给大家谈谈苏州离哪个飞机场近,以及苏州离哪个飞机场近点对应的知识点,希望对各位有...
客厅放八骏马摆件可以吗(家里摆... 今天给各位分享客厅放八骏马摆件可以吗的知识,其中也会对家里摆八骏马摆件好吗进行解释,如果能碰巧解决你...