大数据(9e)图解Flink窗口
创始人
2024-02-04 02:56:28
0

文章目录

  • 1、代码模板
    • 1.1、pom.xml
    • 1.2、log4j.properties
    • 1.3、Java模板
  • 2、按键分区(Keyed)、非按键分区(Non-Keyed)
    • 2.1、Keyed
    • 2.2、Non-Keyed
  • 3、窗口的分类
    • 3.1、基于时间的窗口
    • 3.2、基于事件个数的窗口
  • 4、窗口函数
  • 5、示例代码
    • 5.1、ReduceFunction
    • 5.2、AggregateFunction
    • 5.3、ProcessWindowFunction

1、代码模板

本地开发环境:WIN10+IDEA
只改##################### 业务逻辑 #####################之间的代码

1.1、pom.xml


881.14.62.122.0.32.17.2


org.apache.flinkflink-java${flink.version}org.apache.flinkflink-streaming-java_${scala.binary.version}${flink.version}org.apache.flinkflink-clients_${scala.binary.version}${flink.version}org.apache.flinkflink-runtime-web_${scala.binary.version}${flink.version}org.slf4jslf4j-api${slf4j.version}org.slf4jslf4j-log4j12${slf4j.version}org.apache.logging.log4jlog4j-to-slf4j${log4j.version}

1.2、log4j.properties

log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

1.3、Java模板

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Scanner;public class Hello {public static void main(String[] args) throws Exception {//创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度env.setParallelism(1);//加入自定义数据源DataStreamSource dss = env.addSource(new MySource());//################################### 业务逻辑 ########################################dss.print();//################################### 业务逻辑 ########################################env.execute();}public static class MySource implements SourceFunction {public MySource() {}@Overridepublic void run(SourceContext sc) {Scanner scanner = new Scanner(System.in);while (true) {String str = scanner.nextLine().trim();if (str.equals("STOP")) {break;}if (!str.equals("")) {sc.collect(str);}}scanner.close();}@Overridepublic void cancel() {}}
}

2、按键分区(Keyed)、非按键分区(Non-Keyed)

Non-Keyed的窗口的流的并行度=1

2.1、Keyed

基于时间的窗口

.keyBy(...)
.window(...)

基于事件个数的窗口

.keyBy(...)
.countWindow(...)

2.2、Non-Keyed

基于时间的窗口

.windowAll(...)

基于事件个数的窗口

.countWindowAll(...)

3、窗口的分类

  • 将 无界限的 数据 切分为 有界限的 数据
  • https://yellow520.blog.csdn.net/article/details/121288240

3.1、基于时间的窗口

基于时间滑动窗口

.window(SlidingProcessingTimeWindows.of(Time.seconds(6),Time.seconds(3)))

基于时间滚动窗口

.window(TumblingProcessingTimeWindows.of(Time.seconds(3)))

基于时间会话窗口

.window(ProcessingTimeSessionWindows.withGap(Time.seconds(2)))

基于时间的全局窗口

.window(GlobalWindows.create())

3.2、基于事件个数的窗口

基于事件个数滑动窗口

.countWindow(4,3)

基于事件个数滚动窗口

.countWindow(4)

4、窗口函数

窗口函数窗口关闭时,窗口函数就去处理窗口中的每个元素
ReduceFunction增量处理,高效
AggregateFunction增量处理,高效
ProcessWindowFunction函数执行前要在内部缓存窗口上所有的元素,低效

5、示例代码

修改代码模板中##################### 业务逻辑 #####################之间的代码

5.1、ReduceFunction

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
dss.keyBy(s -> s).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).reduce((ReduceFunction) (v1, v2) -> v1 + "," + v2).print("输出");

基于时间滚动窗口

5.2、AggregateFunction

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
dss.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))//AggregateFunction.aggregate(new AggregateFunction() {//创建累加器@Overridepublic Long createAccumulator() {return 0L;}//累加@Overridepublic Long add(String in, Long acc) {return acc + 1L;}//从累加器获取结果@Overridepublic Long getResult(Long acc) {return acc;}//合并累加器@Overridepublic Long merge(Long a1, Long a2) {return a1 + a2;}}).print("输出");

基于时间滑动窗口

5.3、ProcessWindowFunction

源码截取

abstract class ProcessAllWindowFunction {abstract void process(ProcessAllWindowFunction.Context var1,  //上下文对象Iterable var2,                                  //窗口内的所有输入Collector var3                                 //收集器);

代码

import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
dss.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable in, Collector out) {//打印窗口范围System.out.println(context.window().toString());//在窗口内,收集元素out.collect(String.valueOf(in));}}).print("输出");

测试运行截图

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...