KeyState案例
public class KeyStateDemo {/*** 使用KeyState中得ValueState获取数据中的最大值(实际中直接使用maxBy即可)* 编码步骤;* 1. 定义一个状态用来存放最大值* private transient ValueState maxValueState;* 2. 创建一个状态描述对象* ValueStateDescriptor descriptor = new ValueStateDescriptor(“maxValueState”,Long.class)* 3. 根据状态描述符获取State* maxValueState = getRuntimeContext().getState(maxValueStateDescriptor)*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource> tupleDs = env.fromElements(Tuple2.of("北京", 3),Tuple2.of("上海", 6),Tuple2.of("北京", 8),Tuple2.of("重庆", 9),Tuple2.of("天津", 6),Tuple2.of("北京", 3),Tuple2.of("上海", 22));// 开发使用SingleOutputStreamOperator> result1 = tupleDs.keyBy(t -> t.f0).maxBy(1);SingleOutputStreamOperator> result2 = tupleDs.keyBy(t -> t.f0).map(new RichMapFunction, Tuple3>() {//1 定义一个状态用来存放最大值private ValueState maxValueState;// 初始化状态@Overridepublic void open(Configuration parameters) throws Exception {// 2 创建状态描述器ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("maxValueState", Integer.class);// 3 根据状态描述器获取/初始化状态maxValueState = getRuntimeContext().getState(stateDescriptor);}// 使用状态@Overridepublic Tuple3 map(Tuple2 value) throws Exception {Integer currentValue = value.f1;Integer historyValue = maxValueState.value();if (historyValue == null || currentValue > historyValue) {historyValue = currentValue;maxValueState.update(historyValue);return Tuple3.of(value.f0, currentValue, historyValue);} else {maxValueState.update(historyValue);return Tuple3.of(value.f0, currentValue, historyValue);}}});result1.print();result2.printToErr();env.execute();}
}
模拟KafkaSource 功力不够,先简单熟悉
public class OperateStateDemo {/*** 需求 使用ListState存储offset-->模拟kafka的offset** @param args* @throws Exception*/public static void main(String[] args) throws Exception {// TODO SourceStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1); // 设置并行度为1// CheckPoint和重启策略,先照抄env.enableCheckpointing(1000); // 每隔1s执行一次Checkpointenv.setStateBackend(new FsStateBackend("file:///F:/ckp"));env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 固定延迟重启策略:程序出现异常的时候,重启两次,每次延迟3秒重启,超过2次程序退出env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 3000));// TODO SourceDataStreamSource ds = env.addSource(new MyKafkaSource()).setParallelism(1);// TODO Transformation// TODO Sinkds.print();env.execute();}// s使用Operator中得ListState模拟KafkaSource进行offset维护public static class MyKafkaSource extends RichParallelSourceFunction implements CheckpointedFunction {// 1 声明ListStateprivate ListState offsetState = null;private Long offset = 0L;private Boolean flag = true;// 2 初始化/创建ListState@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ListStateDescriptor stateDescriptor = new ListStateDescriptor<>("offsetState", Long.class);offsetState = context.getOperatorStateStore().getListState(stateDescriptor);}//3 使用state@Overridepublic void run(SourceContext ctx) throws Exception {while (flag) {Iterator iterator = offsetState.get().iterator();if (iterator.hasNext()) {offset = iterator.next();}offset += 1;int subtaskId = getRuntimeContext().getIndexOfThisSubtask();ctx.collect("subTaskId " + subtaskId + ", 当前offset值为 " + offset);Thread.sleep(1000);if (offset % 5 == 0) {throw new RuntimeException("bug 出现了");}}}// 4 state 持久化// 该方法会定时执行将state状态从内存持久化到Checkpoint磁盘目录@Overridepublic void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {offsetState.clear();// 清理磁盘数据并存入Checkpoint磁盘目录中offsetState.add(offset);}@Overridepublic void cancel() {flag = false;}}
}
0. Flink的JobManager创建CheckpointCoordinator
注意:
前面学习了Checkpoint其实就是Flink中某一时刻,所有的Operator的全局快照,
那么快照存储存储的地方叫做 状态后端
使用RocksDB引入依赖
重启策略分类
默认重启策略
配置了Checkpoint的情况下不做任务配置:默认无限重启并自动恢复,可以解决小问题,但可能会隐藏真正的bug
无重启策略
有bug立即抛出
env.setrestartStrategy(RestartStrategies.noRestart())
固定延迟重启策略(开发中使用)
job 失败,重启3次,每次间隔5s
env.setRestartStrategy(RestartStrategies.fixeDelayRestart(
3,// 最多重启次数
Time.of(5,TimeUnit.SECONDS)
))
如果5分钟job失败不超过3次,自动重启,每次重启间隔3s
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3,
Time.of(5,TimeUnit.MINUTES),
Time.of(3,TimeUnit.SECONDS)
))
手动重启
基于flink 浏览器客户端
1.11 开始,默认使用Blink
org.apache.flink flink-table-api-java-bridge_${scala.binary.version} ${flink.version} org.apache.flink flink-table-planner-blink_${scala.binary.version} ${flink.version} org.apache.flink flink-table-common ${flink.version}
依赖
4.0.0 com.atguigu Flink-demo 1.0-SNAPSHOT jar Flink-demo http://maven.apache.org 1.13.0 1.8 2.12 1.7.30 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.slf4j slf4j-api ${slf4j.version} org.slf4j slf4j-log4j12 ${slf4j.version} org.apache.logging.log4j log4j-to-slf4j 2.14.0 org.apache.flink flink-sql-connector-kafka_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version} org.apache.bahir flink-connector-redis_2.11 1.0 org.apache.flink flink-table-api-java-bridge_${scala.binary.version} ${flink.version} org.apache.flink flink-table-planner-blink_${scala.binary.version} ${flink.version} org.apache.flink flink-table-common ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} org.projectlombok lombok 1.18.24 org.apache.flink flink-csv ${flink.version} org.apache.flink flink-json ${flink.version} org.apache.maven.plugins maven-assembly-plugin 3.0.0 jar-with-dependencies make-assembly package single
public class SqlDemo1 {public static void main(String[] args) throws Exception {demo3();}/*** 将DataStream数据转Table和View然后使用Sql进行统计查询** @throws Exception*/public static void demo1() throws Exception {// TODO envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);// TODO sourceDataStreamSource orderA = env.fromElements(new Order(1L, "beer", 3),new Order(1L, "diaper", 4),new Order(3L, "rubber", 2),new Order(3L, "ali", 3),new Order(1L, "tom", 4));DataStreamSource orderB = env.fromElements(new Order(1L, "beer", 3),new Order(1L, "diaper", 4),new Order(3L, "rubber", 2),new Order(3L, "ali", 3),new Order(1L, "tom", 4));// TODO transform 转化table 和 viewTable tableA = tenv.fromDataStream(orderA, $("user"), $("product"), $("amount"));tableA.printSchema();System.out.println(tableA);tenv.createTemporaryView("tableB", orderB, $("user"), $("product"), $("amount"));String sql = "select * from " + tableA + " where amount > 2 union select * from tableB where amount > 1";Table resultTable = tenv.sqlQuery(sql);resultTable.printSchema();System.out.println(resultTable);// 将table 转为dataStream
// 将计算后的数据append到结果DataStream中去
// DataStream resultDS = tenv.toAppendStream(resultTable, Order.class);// 将计算后的性的数据在DataStream元数据的基础上更新true或是删除falseDataStream> resultDS = tenv.toRetractStream(resultTable, Order.class);resultDS.print();// TODO exeenv.execute();}/*** 使用SQL和Table两种方式做WordCount** @throws Exception*/public static void demo2() throws Exception {// TODO envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);// TODO sourceDataStreamSource wordsDs = env.fromElements(new WC("hello", 1L),new WC("word", 1L),new WC("hello", 1L));/* tenv.createTemporaryView("t_words",wordsDs,$("word"),$("frequency"));String sql = "select word,sum(frequency) as frequency from t_words group by word";Table table = tenv.sqlQuery(sql);DataStream> ds = tenv.toRetractStream(table, WC.class);ds.print();*//* Table table = tenv.fromDataStream(wordsDs, $("word"), $("frequency"));String sql = "select word,sum(frequency) as frequency from "+table+" group by word";Table table1 = tenv.sqlQuery(sql);DataStream> ds = tenv.toRetractStream(table1, WC.class);ds.print();*/Table table = tenv.fromDataStream(wordsDs, $("word"), $("frequency"));Table select = table.groupBy($("word")).select($("word"), $("frequency").sum().as("frequency"));DataStream> ds = tenv.toRetractStream(select, Row.class);ds.print();// TODO exeenv.execute();}/*** 使用Flink SQL来统计5秒内 每个用户的 订单数、订单的最大金额、订单的最小金额* * 也就是每隔5s统计最近5s的每隔用户的订单总数,订单的最大金额,订单的最小金额*
* WaterMarker+时间时间+窗口 : SQL 实现** @throws Exception*/public static void demo3() throws Exception {// TODO envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);DataStreamSource ds = env.addSource(new RichSourceFunction() {private Boolean flag = true;@Overridepublic void run(SourceContext sourceContext) throws Exception {Random random = new Random();SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss");while (flag) {String orderId = UUID.randomUUID().toString();int userId = random.nextInt(2);int money = random.nextInt(101);long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;TOrder tOrder = new TOrder(orderId, userId, money, eventTime, df.format(new Date(eventTime)));sourceContext.collect(tOrder);Thread.sleep(1000);}}@Overridepublic void cancel() {flag = false;}});SingleOutputStreamOperator wDS = ds.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((o, l) -> o.eventTime));tenv.createTemporaryView("t_order", wDS, $("orderId"), $("userId"), $("money"), $("eventTime").rowtime(), $("timeString"));String sql = "select userId,count(orderId) as orderCount,max(money) as maxMoney,min(money) as minMoney from t_order " +"group by userId,tumble(eventTime,INTERVAL '5' SECOND)";Table table = tenv.sqlQuery(sql);DataStream> rDS = tenv.toRetractStream(table, Row.class);rDS.print();// TODO exeenv.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class TOrder {public String orderId;public Integer userId;public Integer money;public Long eventTime;public String timeString;}@Data@NoArgsConstructor@AllArgsConstructorpublic static class WC {public String word;public Long frequency;}@Data@NoArgsConstructor@AllArgsConstructorpublic static class Order {private Long user;private String product;private Integer amount;}
}
FlinkSql 直连 kafka
public class SqlKafka {/*** flink SQL直接读写kafka* {"user_id":"1","page_id":"1","status":"success"}* {"user_id":"1","page_id":"1","status":"success"}* {"user_id":"1","page_id":"1","status":"success"}* {"user_id":"1","page_id":"1","status":"fail"}* @param args*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);String inputSQL = "CREATE TABLE input_kafka (`user_id` BIGINT,`page_id` BIGINT,`status` STRING ) " +"with (" +"'connector'='kafka','topic'='input_kafka','properties.bootstrap.servers'='hadoop102:9092', " +"'properties.group_id'='testGroup','scan.startup.mode'='latest-offset','format'='json')";TableResult inputTable = tenv.executeSql(inputSQL);// TODO TransformationString sql = "select * from input_kafka where status='success'";Table etl = tenv.sqlQuery(sql);
// tenv.toRetractStream(etl, Row.class ).print();// TODO SinkTableResult outputTable= tenv.executeSql("CREATE TABLE output_kafka (`user_id` BIGINT,`page_id` BIGINT,`status` STRING )" +"with (" +"'connector'='kafka','topic'='input_kafka','properties.bootstrap.servers'='hadoop102:9092'," +"'sink.partitioner'='round-robin','format'='json')");tenv.executeSql("insert into output_kafka select * from "+etl);env.execute();}
}
双十一大屏
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.FastDateFormat;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Random;
import java.util.stream.Collectors;public class BigScreemDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource> orderDS = env.addSource(new MySource());// TODO transform--:初步聚合:每隔1s聚合一下各个分类的销售总额SingleOutputStreamOperator tempAggResult = orderDS.keyBy(t -> t.f0)// 窗口从当前时间的 00:00:00开始.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))// 自定义触发时机 :每隔1秒触发.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
// 简单聚合
// .sum()
// 自定义聚合.aggregate(new PriceAggregate(), new WindowResult());tempAggResult.keyBy(CategoryPojo::getDateTime).window(TumblingProcessingTimeWindows.of(Time.seconds(1))).process(new FinalResultWindowProcess());env.execute();}private static class PriceAggregate implements AggregateFunction, Double, Double> {// 初始化累加器@Overridepublic Double createAccumulator() {return 0D;}// 把数据累加到累加器上@Overridepublic Double add(Tuple2 value, Double accumulator) {return value.f1 + accumulator;}// 获取累加结果@Overridepublic Double getResult(Double accumulator) {return accumulator;}// 合并各个Subtask结果@Overridepublic Double merge(Double a, Double b) {return a + b;}}// 自定义窗口函数,指定窗口数据手机规则private static class WindowResult implements WindowFunction {private FastDateFormat df = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");@Overridepublic void apply(String category, TimeWindow window, Iterable input, Collector out) throws Exception {long currentTimeMillis = System.currentTimeMillis();String dateTime = df.format(currentTimeMillis);Double totalPrice = input.iterator().next();out.collect(new CategoryPojo(category, totalPrice, dateTime));}}private static class FinalResultWindowProcess extends ProcessWindowFunction {/*** 下面的Key/dataTime表示当前这1s的时间* elements:表示截止到当前这1s各个分类的销售总额** @param dataTime* @param context* @param elements* @param out* @throws Exception*/@Overridepublic void process(String dataTime,ProcessWindowFunction.Context context,Iterable elements,Collector
超时自动好评
public class OrderAutomaticFavorableComments {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// <用户Id,订单Id,订单生成时间>DataStreamSource> orderDS = env.addSource(new MySource());long interval = 5000L;// 超时时间// 分组后 使用自定义KeyedProcessFunction完成定时判断超时订单并自动好评orderDS.keyBy(t->t.f0).process(new TimeProcessFunction(interval));env.execute();}public static class MySource implements SourceFunction> {private Boolean flag = true;@Overridepublic void run(SourceContext> sourceContext) throws Exception {Random random = new Random();while (flag) {String userId = random.nextInt(5) + "";String orderId = UUID.randomUUID().toString();long currentTimeMillis = System.currentTimeMillis();sourceContext.collect(Tuple3.of(userId,orderId,currentTimeMillis));}}@Overridepublic void cancel() {flag= false;}}private static class TimeProcessFunction extends KeyedProcessFunction,Object> {private MapState mapState = null;private Long interval;public TimeProcessFunction(long interval){this.interval = interval;}// 初始化@Overridepublic void open(Configuration parameters) throws Exception {MapStateDescriptor mapStateDescriptor = new MapStateDescriptor<>("mapState", String.class, Long.class);mapState= getRuntimeContext().getMapState(mapStateDescriptor);}@Overridepublic void processElement(Tuple3 value,KeyedProcessFunction, Object>.Context context,Collector
此逻辑 未处理完
public class BoardCastDemo {/*** 有一个事件流--用户行为日志,里面有用户Id,但是没有用户详细信息* 有一个配置流--规则流--里面有用户的详细信息* 现在要将事件流和配置流进行关联,得出日志中用户的详细信息,如果(用户Id,详细信息,操作)* 那么我们可以将配置流/规则流--用户信息流 作为状态进行广播(因为配置流/规则流 -- 用户信息流较小)*/public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource> eventDs = env.addSource(new MySource());DataStreamSource