Flink内置了很多Connector,可以满足大部分场景。但是还是有一些场景无法满足,比如RocketMQ。需要消费RocketMQ的消息,需要自定时Source。
参考FlinkKafkaConsumer:
public class FlinkKafkaConsumer extends FlinkKafkaConsumerBase{}public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFunction implements CheckpointListener, ResultTypeQueryable, CheckpointedFunction {}public abstract class RichParallelSourceFunction extends AbstractRichFunction implements ParallelSourceFunction {}public interface ParallelSourceFunction extends SourceFunction {}public interface SourceFunction extends Function, Serializable {void run(SourceFunction.SourceContext var1) throws Exception;void cancel();@Publicpublic interface SourceContext {void collect(T var1);@PublicEvolvingvoid collectWithTimestamp(T var1, long var2);@PublicEvolvingvoid emitWatermark(Watermark var1);@PublicEvolvingvoid markAsTemporarilyIdle();Object getCheckpointLock();void close();}
}
可以看到,自定义的Source,只需要实现SourceFunction。
创建FlinkRocketMQConsumer,实现SourceFunction,重写run()和cancel()方法
public class FlinkRocketMQConsumer implements SourceFunction {@Overridepublic void run(SourceContext sourceContext) throws Exception {}@Overridepublic void cancel() {}
}
需要准备一个RocketMQ的消费者客户端,在pom中添加如下依赖:
org.apache.rocketmq rocketmq-client 4.7.0 provided
对于FlinkRocketMQConsumer来说,需要初始化一个consumer,代码如下:
public class FlinkRocketMQConsumer implements SourceFunction {private static final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("0320TopicTestConsumerGroup");}
这样,在FlinkRocketMQConsumer类加载的时候,就会初始化一个consumer。
另外,还需要对consumer进行初始化,需要知道nameSrvAddr和topic,所以添加一个构造方法,对consumer进行初始化
public class FlinkRocketMQConsumer implements SourceFunction {private String nameSrvAddr;private String topic; public FlinkRocketMQConsumer(String nameSrvAddr, String topic) {this.nameSrvAddr = nameSrvAddr;this.topic = topic;}...
}
重写run方法和cancel方法
@Override
public void run(SourceContext ctx) throws Exception {consumer.setNamesrvAddr(nameSrvAddr);consumer.subscribe(topic, "*");consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context) -> {msgs.forEach(msg -> {ctx.collect(new String(msg.getBody(), Charset.forName("UTF-8")));});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();// 需要先走到 consumer.start() 后,才会走 consumer.registerMessageListener 方法,但是这个时候,意味着 run 方法已经走完,ctx已经关闭// 这个时候在 consumer.registerMessageListener 方法中,调用 ctx 会显示已关闭// 所以,不能让程序走完while (true) {Thread.sleep(10);}
}@Override
public void cancel() {consumer.shutdown();
}
完整代码如下:
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import java.nio.charset.Charset;/*** @author Johnson* @version 1.0* @description* @create 2023-03-20 10:02*/
public class FlinkRocketMQConsumer implements SourceFunction {private static final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("0320TopicTestConsumerGroup");private String nameSrvAddr;private String topic;public FlinkRocketMQConsumer(String nameSrvAddr, String topic) {this.nameSrvAddr = nameSrvAddr;this.topic = topic;}@Overridepublic void run(SourceContext ctx) throws Exception {consumer.setNamesrvAddr(nameSrvAddr);consumer.subscribe(topic, "*");consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context) -> {msgs.forEach(msg -> {ctx.collect(new String(msg.getBody(), Charset.forName("UTF-8")));});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();// 需要先走到 consumer.start() 后,才会走 consumer.registerMessageListener 方法,但是这个时候,意味着 run 方法已经走完,ctx已经关闭// 这个时候在 consumer.registerMessageListener 方法中,调用 ctx 会显示已关闭// 所以,不能让程序走完while (true) {Thread.sleep(10);}}@Overridepublic void cancel() {consumer.shutdown();}
}
package rocketmq;import com.source.FlinkRocketMQConsumer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author Johnson* @version 1.0* @description* @create 2023-03-21 10:30*/
public class FlinkRocketMQConsumerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource rmqDS = env.addSource(new FlinkRocketMQConsumer("***:9876", "test_rmq"));rmqDS .print("**********");env.execute("FlinkRocketMQConsumerDemo");}
}
到这来,就可以正常消费RocketMQ中的数据,控制台输出如下。
在FlinkRocketMQConsumer中,为了正常调用SourceContext(ctx),使用可一个线程一直占用,不让run方法结束,目前是可以正常运行,但是能不能经受得起时间检验,会不会给以后埋下隐患,还有待观察。
关于这一点,是否有更好的实现方法,欢迎各位技术大佬留言发表见解。。。