大数据(9f)Flink富函数RichFunction
创始人
2024-02-08 08:31:58
0

文章目录

  • 1、概述
  • 2、示例
    • 2.1、普通函数
    • 2.2、富函数
      • 2.2.1、获取富函数的运行时上下文
  • 3、源码截取
    • 3.1、RichFunction
    • 3.2、RuntimeContext

1、概述

Rich Function,译名富函数,和普通函数相比,多了:
生命周期(openclose方法)
获取函数的运行时上下文(getRuntimeContext方法)
本文版本
Flink:1.14.6
Java:1.8
Scala:2.12

2、示例

2.1、普通函数

MapFunction接口 继承了 Function接口

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class H1 {public static void main(String[] args) throws Exception {//创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(1);//获取数据源DataStreamSource dss = env.fromElements(1, 2, 3);//普通函数dss.map(new MapFunction() {@Overridepublic Integer map(Integer i) {return i * i;}}).print();//执行env.execute();}
}

测试结果

2.2、富函数

RichMapFunction抽象类 继承了 AbstractRichFunction抽象类
AbstractRichFunction抽象类 实现了 RichFunction接口

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class H1 {public static void main(String[] args) throws Exception {//创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(1);//获取数据源DataStreamSource dss = env.fromElements(1, 2, 3);//普通函数dss.map(new RichMapFunction() {@Overridepublic void open(Configuration parameters) {System.out.println("生命周期开始");}@Overridepublic void close() {System.out.println("生命周期结束");}@Overridepublic Integer map(Integer i) {return i * i;}}).print();//执行env.execute();}
}

测试结果

2.2.1、获取富函数的运行时上下文

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class H1 {public static void main(String[] args) throws Exception {//创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(2);//获取数据源DataStreamSource dss = env.fromElements(1, 2, 3);//普通函数dss.map(new RichMapFunction() {@Overridepublic void open(Configuration parameters) {System.out.println("生命周期开始");//获取运行时上下文RuntimeContext context = getRuntimeContext();System.out.println("子任务索引:" + context.getIndexOfThisSubtask());}@Overridepublic void close() {System.out.println("生命周期结束");}@Overridepublic Integer map(Integer i) {return i * i;}}).print();//执行env.execute();}
}

并行度设置为2,测试结果

3、源码截取

3.1、RichFunction

package org.apache.flink.api.common.functions;import org.apache.flink.annotation.Public;
import org.apache.flink.configuration.Configuration;@Public
public interface RichFunction extends Function {/** 函数的生命周期 */void open(Configuration parameters) throws Exception;void close() throws Exception;/** 获取函数运行时上下文对象,对象信息包含:并行度、作业ID、任务名、子任务索引… */RuntimeContext getRuntimeContext();/** 设置函数的运行时上下文。在创建函数的并行实例时,此方法被框架调用 */void setRuntimeContext(RuntimeContext t);
}

3.2、RuntimeContext

/*** RuntimeContext 包含 函数的运行时上下文信息* 函数的每个并行实例都有1个context对象,通过访问对象,可获取 静态信息、累加器、广播变量、状态*/
@Public
public interface RuntimeContext {JobID getJobId();String getTaskName();int getIndexOfThisSubtask();int getAttemptNumber();String getTaskNameWithSubtasks();// ------------------------------------ 累加器 ------------------------------------------- void addAccumulator(String name, Accumulator accumulator); Accumulator getAccumulator(String name);@PublicEvolvingIntCounter getIntCounter(String name);@PublicEvolvingLongCounter getLongCounter(String name);@PublicEvolvingDoubleCounter getDoubleCounter(String name);@PublicEvolvingHistogram getHistogram(String name);// ---------------------------------- 广播变量 -------------------------------------------@PublicEvolvingboolean hasBroadcastVariable(String name); List getBroadcastVariable(String name); C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer initializer);// -------------------------- 访问【状态】的方法 --------------------------------@PublicEvolving ValueState getState(ValueStateDescriptor stateProperties);@PublicEvolving ListState getListState(ListStateDescriptor stateProperties);@PublicEvolving ReducingState getReducingState(ReducingStateDescriptor stateProperties);@PublicEvolving AggregatingState getAggregatingState(AggregatingStateDescriptor stateProperties);@PublicEvolving MapState getMapState(MapStateDescriptor stateProperties);
}

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
客厅放八骏马摆件可以吗(家里摆... 今天给各位分享客厅放八骏马摆件可以吗的知识,其中也会对家里摆八骏马摆件好吗进行解释,如果能碰巧解决你...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...