Flink常用Sink(elasticsearch(es)Sink、RedisSink、KafkaSink、MysqlSink、FileSink)
创始人
2024-02-17 03:27:19
0

flink输出到es、redis、mysql、kafka、file

文章目录

    • 配置pom文件
    • 公共实体类
    • KafkaSInk
    • ElasticsearchSink(EsSink)
    • RedisSink
    • MysqlSink(JdbcSink)
    • FileSink

自己先准备一下相关环境

配置pom文件

 88UTF-81.13.01.82.121.7.30org.apache.flinkflink-java${flink.version}org.apache.flinkflink-streaming-java_${scala.binary.version}${flink.version}org.apache.flinkflink-clients_${scala.binary.version}${flink.version}org.slf4jslf4j-api${slf4j.version}org.slf4jslf4j-log4j12${slf4j.version}org.apache.logging.log4jlog4j-to-slf4j2.14.0org.apache.flinkflink-connector-kafka_${scala.binary.version}${flink.version}org.projectlomboklombok1.18.22compilecom.alibabafastjson1.2.58org.apache.bahirflink-connector-redis_2.111.0org.apache.flinkflink-connector-elasticsearch7_${scala.binary.version}${flink.version}org.apache.flinkflink-connector-jdbc_${scala.binary.version}${flink.version}mysqlmysql-connector-java5.1.47org.apache.maven.pluginsmaven-assembly-plugin3.0.0jar-with-dependenciesmake-assemblypackagesinglenet.alchim31.mavenscala-maven-plugin3.2.2scala-compile-firstprocess-resourcesadd-sourcecompilescala-test-compileprocess-test-resourcestestCompileorg.apache.maven.pluginsmaven-compiler-plugin1.81.8

公共实体类

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;@Data
@NoArgsConstructor
@ToString
@AllArgsConstructor
public class UserEvent {private String userName;private String url;private Long timestemp;
}

KafkaSInk

将数据输出到kafka中,先启动kafka consumer,再运行程序

import com.event.UserEvent;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Properties;public class KafkaSinkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);Properties properties = new Properties();//kafka相关配置properties.setProperty("bootstrap.servers", "hadoop01:9092");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");DataStreamSource stream = env.fromCollection(Arrays.asList("xiaoming,www.baidu.com,1287538716253","Mr Li,www.baidu.com,1287538710000","Mr Zhang,www.baidu.com,1287538710900"));SingleOutputStreamOperator result = stream.map(new MapFunction() {@Overridepublic String map(String value) throws Exception {//输出规则String[] split = value.split(",");return new UserEvent(split[0].trim(), split[1].trim(), Long.valueOf(split[2].trim())).toString();}});//启动kafkaconsumer指令// ./bin/kafka-console-consumer.sh  --bootstrap-server  localhost:9092 --topic eventsresult.addSink(new FlinkKafkaProducer(//kafka所在地址"hadoop01:9092",//指定输出的topic"events",new SimpleStringSchema()));env.execute();}
}

运行结果

在这里插入图片描述

ElasticsearchSink(EsSink)

将数据输出到elasticsearch中

示例代码


import com.event.UserEvent;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.table.descriptors.Elasticsearch;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;import java.util.Arrays;
import java.util.HashMap;
import java.util.List;public class EsSinkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource userEventDataStreamSource =env.fromCollection(Arrays.asList(new UserEvent("zhangsan", "path?test123", System.currentTimeMillis() - 2000L),new UserEvent("zhangsan", "path?test", System.currentTimeMillis() + 2000L),new UserEvent("lisi", "path?checkParam", System.currentTimeMillis()),new UserEvent("bob", "path?test", System.currentTimeMillis() + 2000L),new UserEvent("mary", "path?checkParam", System.currentTimeMillis()),new UserEvent("lisi", "path?checkParam123", System.currentTimeMillis() - 2000L)));//定义host列表List hosts = Arrays.asList(new HttpHost("hadoop01", 9200));//定义ElasticsearchSinkFunctionElasticsearchSinkFunction elasticsearchSinkFunction = new ElasticsearchSinkFunction() {@Overridepublic void process(UserEvent userEvent, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {IndexRequest indexRequest = Requests.indexRequest().index("events").type("type").source(new HashMap() {{put(userEvent.getUserName(), userEvent.getUrl());}});requestIndexer.add(indexRequest);}};//写入esuserEventDataStreamSource.addSink(new ElasticsearchSink.Builder<>(hosts, elasticsearchSinkFunction).build());env.execute();}
}

指令

GET _cat/indicesGET _cat/indices/eventsGET events/_search

运行结果
在这里插入图片描述

RedisSink

将数据输出到Redis

示例代码


import com.event.UserEvent;
import my.test.source.CustomSouce;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisClusterConfig;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;public class RedisSinkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource streamSource = env.addSource(new CustomSouce());//创建jedis连接配置FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder().setHost("master").setTimeout(10000).setPort(6379).build();//写到redisstreamSource.addSink(new RedisSink<>(config, new MyRedisMapper()));env.execute();}public static class MyRedisMapper implements RedisMapper{@Overridepublic RedisCommandDescription getCommandDescription() {//写入方式为hsetreturn new RedisCommandDescription(RedisCommand.HSET, "events"); //additionalKey参数标识存储再哪里}@Overridepublic String getKeyFromData(UserEvent userEvent) {return userEvent.getUserName();}@Overridepublic String getValueFromData(UserEvent userEvent) {return userEvent.getUrl();}}}

自定义source

import com.event.UserEvent;
import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Calendar;
import java.util.Random;public class CustomSouce implements SourceFunction {// 声明一个布尔变量,作为控制数据生成的标识位private Boolean running = true;@Overridepublic void run(SourceContext ctx) throws Exception {Random random = new Random(); // 在指定的数据集中随机选取数据String[] users = {"Mary", "Alice", "Bob", "Cary"};String[] urls = {"./home", "./cart", "./fav", "./prod?id=1","./prod?id=2"};while (running) {ctx.collect(new UserEvent(users[random.nextInt(users.length)],urls[random.nextInt(urls.length)],Calendar .getInstance().getTimeInMillis()));// 隔 1 秒生成一个点击事件,方便观测Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}
}

运行结果
因为上述source是一个无界流,所以数据一直会变化
在这里插入图片描述

MysqlSink(JdbcSink)

将数据输出到mysql

表结构

create table events(user_name varchar(20) not null,url varchar(100) not null
);

示例代码


import com.event.UserEvent;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;public class MysqlSinkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//一组数据DataStreamSource userEventDataStreamSource =env.fromCollection(Arrays.asList(new UserEvent("zhangsan", "/path?test123", System.currentTimeMillis() - 2000L),new UserEvent("zhangsan", "/path?test", System.currentTimeMillis() + 2000L),new UserEvent("lisi", "/path?checkParam", System.currentTimeMillis()),new UserEvent("bob", "/path?test", System.currentTimeMillis() + 2000L),new UserEvent("mary", "/path?checkParam", System.currentTimeMillis()),new UserEvent("lisi", "/path?checkParam123", System.currentTimeMillis() - 2000L)));userEventDataStreamSource.addSink(JdbcSink.sink(//要执行的sql语句"INSERT INTO events (user_name, url) VALUES(?, ?)",new JdbcStatementBuilder() {@Overridepublic void accept(PreparedStatement preparedStatement, UserEvent userEvent) throws SQLException {//sql占位符赋值preparedStatement.setString(1, userEvent.getUserName());preparedStatement.setString(2, userEvent.getUrl());}},//jdbc相关参数配置new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://hadoop01:3306/mysql").withUsername("root").withPassword("123456").withDriverName("com.mysql.jdbc.Driver").build()));env.execute();}
}

当程序运行结束之后可以看到mysql的events表里面多了数据
在这里插入图片描述

FileSink

将数据输出到文件中(可以输出分区文件)

import com.event.UserEvent;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.util.TimeUtils;import java.util.Arrays;
import java.util.concurrent.TimeUnit;public class FileSinkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource userEventDataStreamSource =env.fromCollection(Arrays.asList(new UserEvent("zhangsan", "path?test123", System.currentTimeMillis() - 2000L),new UserEvent("zhangsan", "path?test", System.currentTimeMillis() + 2000L),new UserEvent("lisi", "path?checkParam", System.currentTimeMillis()),new UserEvent("bob", "path?test", System.currentTimeMillis() + 2000L),new UserEvent("mary", "path?checkParam", System.currentTimeMillis()),new UserEvent("lisi", "path?checkParam123", System.currentTimeMillis() - 2000L)));StreamingFileSink streamingFileSink = StreamingFileSink.forRowFormat(new Path("./output/"), new SimpleStringEncoder<>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder().withMaxPartSize(1024 * 1024 * 1024).withRolloverInterval(TimeUnit.MINUTES.toMillis(15))//不活跃的间隔时间,用于归档保存使用.withInactivityInterval(TimeUnit.MINUTES.toMillis(5)).build()).build();userEventDataStreamSource.map(data -> data.getUserName()).addSink(streamingFileSink);env.execute();}
}

运行结束后会多出来一些文件
在这里插入图片描述

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
苏州离哪个飞机场近(苏州离哪个... 本篇文章极速百科小编给大家谈谈苏州离哪个飞机场近,以及苏州离哪个飞机场近点对应的知识点,希望对各位有...
客厅放八骏马摆件可以吗(家里摆... 今天给各位分享客厅放八骏马摆件可以吗的知识,其中也会对家里摆八骏马摆件好吗进行解释,如果能碰巧解决你...