Flink RocketMQ Connector实现
创始人
2025-05-31 17:29:49
0

Flink内置了很多Connector,可以满足大部分场景。但是还是有一些场景无法满足,比如RocketMQ。需要消费RocketMQ的消息,需要自定时Source。

一、自定义FlinkRocketMQConsumer

参考FlinkKafkaConsumer:

public class FlinkKafkaConsumer extends FlinkKafkaConsumerBase{}public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFunction implements CheckpointListener, ResultTypeQueryable, CheckpointedFunction {}public abstract class RichParallelSourceFunction extends AbstractRichFunction implements ParallelSourceFunction {}public interface ParallelSourceFunction extends SourceFunction {}public interface SourceFunction extends Function, Serializable {void run(SourceFunction.SourceContext var1) throws Exception;void cancel();@Publicpublic interface SourceContext {void collect(T var1);@PublicEvolvingvoid collectWithTimestamp(T var1, long var2);@PublicEvolvingvoid emitWatermark(Watermark var1);@PublicEvolvingvoid markAsTemporarilyIdle();Object getCheckpointLock();void close();}
}

可以看到,自定义的Source,只需要实现SourceFunction。

创建FlinkRocketMQConsumer,实现SourceFunction,重写run()和cancel()方法

public class FlinkRocketMQConsumer implements SourceFunction {@Overridepublic void run(SourceContext sourceContext) throws Exception {}@Overridepublic void cancel() {}
}

需要准备一个RocketMQ的消费者客户端,在pom中添加如下依赖:

org.apache.rocketmqrocketmq-client4.7.0provided

对于FlinkRocketMQConsumer来说,需要初始化一个consumer,代码如下:

public class FlinkRocketMQConsumer implements SourceFunction {private static final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("0320TopicTestConsumerGroup");}

这样,在FlinkRocketMQConsumer类加载的时候,就会初始化一个consumer。

另外,还需要对consumer进行初始化,需要知道nameSrvAddr和topic,所以添加一个构造方法,对consumer进行初始化

public class FlinkRocketMQConsumer implements SourceFunction {private String nameSrvAddr;private String topic;  public FlinkRocketMQConsumer(String nameSrvAddr, String topic) {this.nameSrvAddr = nameSrvAddr;this.topic = topic;}...
}

重写run方法和cancel方法

@Override
public void run(SourceContext ctx) throws Exception {consumer.setNamesrvAddr(nameSrvAddr);consumer.subscribe(topic, "*");consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context) -> {msgs.forEach(msg -> {ctx.collect(new String(msg.getBody(), Charset.forName("UTF-8")));});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();// 需要先走到 consumer.start() 后,才会走 consumer.registerMessageListener 方法,但是这个时候,意味着 run 方法已经走完,ctx已经关闭// 这个时候在 consumer.registerMessageListener 方法中,调用 ctx 会显示已关闭// 所以,不能让程序走完while (true) {Thread.sleep(10);}
}@Override
public void cancel() {consumer.shutdown();
}

完整代码如下:

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import java.nio.charset.Charset;/*** @author Johnson* @version 1.0* @description* @create 2023-03-20 10:02*/
public class FlinkRocketMQConsumer implements SourceFunction {private static final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("0320TopicTestConsumerGroup");private String nameSrvAddr;private String topic;public FlinkRocketMQConsumer(String nameSrvAddr, String topic) {this.nameSrvAddr = nameSrvAddr;this.topic = topic;}@Overridepublic void run(SourceContext ctx) throws Exception {consumer.setNamesrvAddr(nameSrvAddr);consumer.subscribe(topic, "*");consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context) -> {msgs.forEach(msg -> {ctx.collect(new String(msg.getBody(), Charset.forName("UTF-8")));});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();// 需要先走到 consumer.start() 后,才会走 consumer.registerMessageListener 方法,但是这个时候,意味着 run 方法已经走完,ctx已经关闭// 这个时候在 consumer.registerMessageListener 方法中,调用 ctx 会显示已关闭// 所以,不能让程序走完while (true) {Thread.sleep(10);}}@Overridepublic void cancel() {consumer.shutdown();}
}

二、方法调用

package rocketmq;import com.source.FlinkRocketMQConsumer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author Johnson* @version 1.0* @description* @create 2023-03-21 10:30*/
public class FlinkRocketMQConsumerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource rmqDS = env.addSource(new FlinkRocketMQConsumer("***:9876", "test_rmq"));rmqDS .print("**********");env.execute("FlinkRocketMQConsumerDemo");}
}

到这来,就可以正常消费RocketMQ中的数据,控制台输出如下。

三、隐患

在FlinkRocketMQConsumer中,为了正常调用SourceContext(ctx),使用可一个线程一直占用,不让run方法结束,目前是可以正常运行,但是能不能经受得起时间检验,会不会给以后埋下隐患,还有待观察。

关于这一点,是否有更好的实现方法,欢迎各位技术大佬留言发表见解。。。

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...