本地开发环境:WIN10+IDEA
只改##################### 业务逻辑 #####################
之间的代码
8 8 1.14.6 2.12 2.0.3 2.17.2
org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-runtime-web_${scala.binary.version} ${flink.version} org.slf4j slf4j-api ${slf4j.version} org.slf4j slf4j-log4j12 ${slf4j.version} org.apache.logging.log4j log4j-to-slf4j ${log4j.version}
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Scanner;public class Hello {public static void main(String[] args) throws Exception {//创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度env.setParallelism(1);//加入自定义数据源DataStreamSource dss = env.addSource(new MySource());//################################### 业务逻辑 ########################################dss.print();//################################### 业务逻辑 ########################################env.execute();}public static class MySource implements SourceFunction {public MySource() {}@Overridepublic void run(SourceContext sc) {Scanner scanner = new Scanner(System.in);while (true) {String str = scanner.nextLine().trim();if (str.equals("STOP")) {break;}if (!str.equals("")) {sc.collect(str);}}scanner.close();}@Overridepublic void cancel() {}}
}
Non-Keyed的窗口的流的并行度=1
基于时间的窗口
.keyBy(...)
.window(...)
基于事件个数的窗口
.keyBy(...)
.countWindow(...)
基于时间的窗口
.windowAll(...)
基于事件个数的窗口
.countWindowAll(...)
基于时间的滑动窗口
.window(SlidingProcessingTimeWindows.of(Time.seconds(6),Time.seconds(3)))
基于时间的滚动窗口
.window(TumblingProcessingTimeWindows.of(Time.seconds(3)))
基于时间的会话窗口
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(2)))
基于时间的全局窗口
.window(GlobalWindows.create())
基于事件个数的滑动窗口
.countWindow(4,3)
基于事件个数的滚动窗口
.countWindow(4)
窗口函数 | 窗口关闭时,窗口函数就去处理窗口中的每个元素 |
---|---|
ReduceFunction | 增量处理,高效 |
AggregateFunction | 增量处理,高效 |
ProcessWindowFunction | 函数执行前要在内部缓存窗口上所有的元素,低效 |
修改代码模板中
##################### 业务逻辑 #####################
之间的代码
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
dss.keyBy(s -> s).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).reduce((ReduceFunction) (v1, v2) -> v1 + "," + v2).print("输出");
基于时间的滚动窗口
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
dss.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))//AggregateFunction.aggregate(new AggregateFunction() {//创建累加器@Overridepublic Long createAccumulator() {return 0L;}//累加@Overridepublic Long add(String in, Long acc) {return acc + 1L;}//从累加器获取结果@Overridepublic Long getResult(Long acc) {return acc;}//合并累加器@Overridepublic Long merge(Long a1, Long a2) {return a1 + a2;}}).print("输出");
基于时间的滑动窗口
源码截取
abstract class ProcessAllWindowFunction {abstract void process(ProcessAllWindowFunction.Context var1, //上下文对象Iterable var2, //窗口内的所有输入Collector var3 //收集器);
代码
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
dss.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable in, Collector out) {//打印窗口范围System.out.println(context.window().toString());//在窗口内,收集元素out.collect(String.valueOf(in));}}).print("输出");
测试运行截图
上一篇:Linux基本用户操作