open
和close
方法)getRuntimeContext
方法)MapFunction
接口 继承了 Function
接口
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class H1 {public static void main(String[] args) throws Exception {//创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(1);//获取数据源DataStreamSource dss = env.fromElements(1, 2, 3);//普通函数dss.map(new MapFunction() {@Overridepublic Integer map(Integer i) {return i * i;}}).print();//执行env.execute();}
}
测试结果
RichMapFunction
抽象类 继承了 AbstractRichFunction
抽象类
AbstractRichFunction
抽象类 实现了 RichFunction
接口
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class H1 {public static void main(String[] args) throws Exception {//创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(1);//获取数据源DataStreamSource dss = env.fromElements(1, 2, 3);//普通函数dss.map(new RichMapFunction() {@Overridepublic void open(Configuration parameters) {System.out.println("生命周期开始");}@Overridepublic void close() {System.out.println("生命周期结束");}@Overridepublic Integer map(Integer i) {return i * i;}}).print();//执行env.execute();}
}
测试结果
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class H1 {public static void main(String[] args) throws Exception {//创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(2);//获取数据源DataStreamSource dss = env.fromElements(1, 2, 3);//普通函数dss.map(new RichMapFunction() {@Overridepublic void open(Configuration parameters) {System.out.println("生命周期开始");//获取运行时上下文RuntimeContext context = getRuntimeContext();System.out.println("子任务索引:" + context.getIndexOfThisSubtask());}@Overridepublic void close() {System.out.println("生命周期结束");}@Overridepublic Integer map(Integer i) {return i * i;}}).print();//执行env.execute();}
}
并行度设置为2,测试结果
package org.apache.flink.api.common.functions;import org.apache.flink.annotation.Public;
import org.apache.flink.configuration.Configuration;@Public
public interface RichFunction extends Function {/** 函数的生命周期 */void open(Configuration parameters) throws Exception;void close() throws Exception;/** 获取函数运行时上下文对象,对象信息包含:并行度、作业ID、任务名、子任务索引… */RuntimeContext getRuntimeContext();/** 设置函数的运行时上下文。在创建函数的并行实例时,此方法被框架调用 */void setRuntimeContext(RuntimeContext t);
}
/*** RuntimeContext 包含 函数的运行时上下文信息* 函数的每个并行实例都有1个context对象,通过访问对象,可获取 静态信息、累加器、广播变量、状态*/
@Public
public interface RuntimeContext {JobID getJobId();String getTaskName();int getIndexOfThisSubtask();int getAttemptNumber();String getTaskNameWithSubtasks();// ------------------------------------ 累加器 ------------------------------------------- void addAccumulator(String name, Accumulator accumulator); Accumulator getAccumulator(String name);@PublicEvolvingIntCounter getIntCounter(String name);@PublicEvolvingLongCounter getLongCounter(String name);@PublicEvolvingDoubleCounter getDoubleCounter(String name);@PublicEvolvingHistogram getHistogram(String name);// ---------------------------------- 广播变量 -------------------------------------------@PublicEvolvingboolean hasBroadcastVariable(String name);