大数据(9f)Flink双流JOIN
创始人
2024-02-19 07:35:39
0

文章目录

      • 概述
      • 开发环境
      • 使用状态列表实现 INNER JOIN(双流connect后CoProcessFunction)
      • 基于间隔的JOIN(Interval Join)
      • 基于窗口的JOIN(Window Join)

概述

Flink双流JOIN可用算子或SQL实现,FlinkSQL的JOIN在另一篇讲
算子JOIN中较常用的是intervalJoin

开发环境

WIN10+IDEA

881.14.62.122.0.32.17.21.18.24

org.apache.flinkflink-java${flink.version}org.apache.flinkflink-streaming-java_${scala.binary.version}${flink.version}org.apache.flinkflink-clients_${scala.binary.version}${flink.version}org.apache.flinkflink-runtime-web_${scala.binary.version}${flink.version}org.slf4jslf4j-api${slf4j.version}org.slf4jslf4j-log4j12${slf4j.version}org.apache.logging.log4jlog4j-to-slf4j${log4j.version}org.projectlomboklombok${lombok.version}

使用状态列表实现 INNER JOIN(双流connect后CoProcessFunction)

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;public class Hello {public static void main(String[] args) throws Exception {//创建执行环境,设置并行度StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//创建双流DataStreamSource> d1 = env.fromElements(Tuple2.of("a", 2L),Tuple2.of("a", 3L),Tuple2.of("b", 5L));DataStreamSource> d2 = env.fromElements(Tuple2.of("a", "A"),Tuple2.of("b", "B"),Tuple2.of("c", "C"));//双流KeyByKeyedStream, String> kd1 = d1.keyBy(t -> t.f0);KeyedStream, String> kd2 = d2.keyBy(t -> t.f0);//connectConnectedStreams, Tuple2> c = kd1.connect(kd2);//CoProcessFunctionc.process(new CoProcessFunction, Tuple2, String>() {ListState> l1;ListState> l2;@Overridepublic void open(Configuration parameters) {RuntimeContext r = getRuntimeContext();l1 = r.getListState(new ListStateDescriptor<>("L1", Types.TUPLE(Types.STRING, Types.LONG)));l2 = r.getListState(new ListStateDescriptor<>("L2", Types.TUPLE(Types.STRING, Types.STRING)));}@Overridepublic void processElement1(Tuple2 value, Context ctx, Collector out) throws Exception {l1.add(value);for (Tuple2 value2 : l2.get()) {out.collect(value + "==>" + value2);}}@Overridepublic void processElement2(Tuple2 value, Context ctx, Collector out) throws Exception {l2.add(value);for (Tuple2 value1 : l1.get()) {out.collect(value1 + "==>" + value);}}}).print();//流环境执行env.execute();}
}

基于间隔的JOIN(Interval Join)

import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class Hello {public static void main(String[] args) throws Exception {//创建执行环境,设置并行度StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//创建双流和时间时间水位线策略SingleOutputStreamOperator d1 = env.fromElements(new U("a", 3 * 1000L),new U("b", 8 * 1000L),new U("c", 13 * 1000L)).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner) (element, recordTimestamp) -> element.timestamp));SingleOutputStreamOperator d2 = env.fromElements(new U("a", 4 * 1000L),new U("b", 6 * 1000L),new U("b", 7 * 1000L),new U("c", 10 * 1000L)).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner) (element, recordTimestamp) -> element.timestamp));//键控流KeyedStream k1 = d1.keyBy(u -> u.id);KeyedStream k2 = d2.keyBy(u -> u.id);//基于间隔进行联合k1.intervalJoin(k2).between(Time.seconds(-2L), Time.seconds(1L)).process(new ProcessJoinFunction() {@Overridepublic void processElement(U left, U right, Context ctx, Collector out) {out.collect(left + " ==> " + right);}}).print();//流环境执行env.execute();}@Data@AllArgsConstructorpublic static class U {String id;Long timestamp;}
}

结果
Hello.U(id=a, timestamp=3000) ==> Hello.U(id=a, timestamp=4000)
Hello.U(id=b, timestamp=8000) ==> Hello.U(id=b, timestamp=6000)
Hello.U(id=b, timestamp=8000) ==> Hello.U(id=b, timestamp=7000)

双流JOIN是双向的,下面两种写法是等价的

k1.intervalJoin(k2).between(Time.seconds(-2L), Time.seconds(1L))
k2.intervalJoin(k1).between(Time.seconds(-1L), Time.seconds(2L))

基于窗口的JOIN(Window Join)

窗口JOIN包括滚动窗口、滑动窗口、会话窗口

滚动窗口JOIN

滑动窗口JOIN

会话窗口JOIN

语法

stream.join(otherStream).where().equalTo().window().apply()

下面只展示滚动窗口JOIN

import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;public class Hello {public static void main(String[] args) throws Exception {//创建执行环境,设置并行度StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);//创建双流和时间时间水位线策略SingleOutputStreamOperator d1 = env.fromElements(new U("a", 2000L),new U("b", 4000L)).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner) (element, recordTimestamp) -> element.timestamp));SingleOutputStreamOperator d2 = env.fromElements(new U("a", 3999L),new U("b", 3999L),new U("b", 5999L)).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner) (element, recordTimestamp) -> element.timestamp));//窗口JOINd1.join(d2).where(u -> u.id).equalTo(u -> u.id).window(TumblingEventTimeWindows.of(Time.seconds(2))).apply((JoinFunction) (first, second) -> first + " ==> " + second).print();//流环境执行env.execute();}@Data@AllArgsConstructorpublic static class U {String id;Long timestamp;}
}

结果
Hello.U(id=a, timestamp=2000) ==> Hello.U(id=a, timestamp=3999)
Hello.U(id=b, timestamp=4000) ==> Hello.U(id=b, timestamp=5999)
(4000和3999不在同一个滚动窗口,4000和5999在同一个滚动窗口)

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
苏州离哪个飞机场近(苏州离哪个... 本篇文章极速百科小编给大家谈谈苏州离哪个飞机场近,以及苏州离哪个飞机场近点对应的知识点,希望对各位有...
客厅放八骏马摆件可以吗(家里摆... 今天给各位分享客厅放八骏马摆件可以吗的知识,其中也会对家里摆八骏马摆件好吗进行解释,如果能碰巧解决你...