reactor是一个基于reactive streams的响应式编程框架。 在了解project reactor 项目之前, 你需要熟悉观察者模式(订阅发布模式)和reactive streams。
只有了解了这些内容,才能更好的开始project reactor的学习。你可以通过看
观察者模式之我见 和
一篇入门reactive streams背压响应式编程
了简单学习这两个知识点。
学习reactor的总步骤和前置条件
学习reactor的时候建议:
如果你是为了面试,当我没说。
reactor正如所有的发布订阅模式一样,符合reactive streams规范。 所以reactor也包含有publisher, subscriber, subscription, processor, operator等概念。
Flux和Mono就是reactor实现的publisher,他们可以接受被其他的订阅器所订阅,产生数据并且把数据推送给订阅器。 同时他们还集成了一些对数据流的操作,比如map, filter等。
Flux是一个包含0到N个元素的数据流,Mono是一个包含0或者1个元素的数据流。
总体上理解了reactor的流程,才能不被琐事的概念迷失了方向。其实整个reactor就一个订阅发布模式。
Flux和Mono是整个系统默认的publisher,目的是为了简化publisher自定义的工作。
Flux和Mono集成了很多的操作符,用来减少我们自定义subscriber和processor的工作量。
因为操作符的存在,我们对数据源和元素的操作就不需要自己定义自己的processor和subscriber了,直接使用操作符的组合即可完成工作.
除非不得已,否则不要试图自定义subscriber和processor。
理解了发布订阅模式和publisher的作用,就理解了flux和mono。Flux和mono为了满足需求,有大量的产生数据的方法,
因为篇幅问题,我把这部分内容单独进行了整理,详见reactor之数据源的产生
在基本流程中,已经提到了reactor为了减少自定义subscriber和processor的工作量,集成了很多的操作符。
首先应该大概理解操作符的作用和应用场景,大概知道有哪些种类的操作符即可。
用到的时候不妨翻阅官方文档,常用的不用记,因为经常会用到。不常用的更不用记忆,因为记了也用不到。
因为篇幅问题,我把这部分内容单独进行了整理,详见reactor之操作符
subscribe 操作符用来订阅流中的元素。
当流中的元素没有被订阅的时候,所有的操作都不会触发,只有当流中的元素被订阅的时候,所有的操作才会触发。
常用的subscribe接口如下
Flux.subscribe();
/*** @param consumer 消费者接口,用来消费流中的元素* */
Flux.subscribe(Consumer super T> consumer);/*** @param consumer 消费者接口,用来消费流中的元素* @param errorConsumer 错误消费者接口,用来消费流中的错误*/
Flux.subscribe(Consumer super T> consumer, Consumer super Throwable> errorConsumer);/*** @param consumer 消费者接口,用来消费流中的元素* @param errorConsumer 错误消费者接口,用来消费流中的错误* @param completeConsumer 完成消费者接口,用来消费流中的完成*/
Flux.subscribe(Consumer super T> consumer, Consumer super Throwable> errorConsumer, Runnable completeConsumer);/*** @param consumer 消费者接口,用来消费流中的元素* @param errorConsumer 错误消费者接口,用来消费流中的错误* @param completeConsumer 完成消费者接口,用来消费流中的完成* @param subscriptionConsumer 订阅消费者接口,用来消费流中的订阅*/
Flux.subscribe(Consumer super T> consumer, Consumer super Throwable> errorConsumer, Runnable completeConsumer, Consumer super Subscription> subscriptionConsumer)
Reactor也可以被认为是 并发无关(concurrency agnostic)的。意思就是, 它并不强制要求任何并发模型。
更进一步,它将选择权交给开发者。不过,它还是提供了一些方便 进行并发执行的库。
Reactor 提供了两种在响应式链中调整调度器 Scheduler 的方法:publishOn 和 subscribeOn。
它们都接受一个 Scheduler 作为参数,从而可以改变调度器。
但是 publishOn 在链中出现的位置 是有讲究的,而 subscribeOn 则无所谓。
publishOn 它会 改变后续的操作符的执行所在线程 。而 subscribeOn 则会改变下游操作符的调度器。
在 Reactor 中,执行模式以及执行过程取决于所使用的 Scheduler。
Flux.just(1, 2, 3).publishOn(Schedulers.parallel()) //指定在parallel线程池中执行.map(i -> {System.out.println("map1: " + Thread.currentThread().getName());return i;}).publishOn(Schedulers.elastic()) // 指定下游的执行线程.map(i -> {System.out.println("map2: " + Thread.currentThread().getName());return i;}).subscribeOn(Schedulers.single()).subscribe(i -> System.out.println("subscribe: " + Thread.currentThread().getName()));
此外一些操作符会使用指定的调度器。
Flux.interval(Duration.ofSeconds(1), Schedulers.single()).subscribe(System.out::println);
Processor 是一个实现了 Publisher 和 Subscriber 接口的对象,它可以用来连接 Publisher 和 Subscriber。
多数情况下,你应该进行避免使用 Processor,它们较难正确使用,主要用于一些特殊场景下。
比起直接使用 Reactor 的 Processors,更好的方式是通过调用一次 sink() 来得到 Processor 的 Sink。
FluxProcessor processor = DirectProcessor.create();
processor.subscribe(System.out::println);
processor.onNext("foo");
processor.onNext("bar");
processor.onNext("baz");
processor.onComplete();
Sink 是一个接口,它定义了一些方法,用来向 Processor 发送数据。
UnicastProcessor processor = UnicastProcessor.create();
FluxSink sink = processor.sink();
sink.next("foo");
sink.next("bar");
sink.next("baz");
sink.complete();
Hooks算是一个工具类,设定好以后,对后面的Flux和Mono都会回调Hooks设置的方法,类似操作系统的钩子。
本部分算是reactor中比较高级的部分,建议在开始上手用reactor做项目前,大概知道有这么一个概念即可。
做了一两个项目以后,再回头来看看hooks是做什么的即可
我把这部分的内容进行了拆分,详见:reactor之Hooks
当从命令式编程风格切换到响应式编程风格的时候,一个技术上最大的挑战就是线程处理。
在命令式编程风格中,我们可以通过 ThreadLocal 来传递数据,
但是在响应式编程风格中,我们无法通过 ThreadLocal 来传递数据。
因为线程是由 Reactor 来管理的,我们无法控制线程的创建和销毁。
Context 就是用来解决这个问题的。Context 是一个接口,它定义了一些方法,用来获取和设置数据。
这部分内容相对也比较难以理解,建议把学习和理解放在后面,总之你需要用到类似多线程环境中的ThreadLocal类的时候,再来学习这部分不迟。
String key = "key";
Mono r = Mono.just("hello").flatMap(s -> Mono.subscriberContext().map(ctx -> s + " " + ctx.get(key))).subscriberContext(ctx -> ctx.put(key, "world"));
r.subscribe(System.out::println);
// 输出:hello world
Context 是一个类似于 Map(这种数据结构)的接口:它存储键值(key-value)对,你需要通过 key 来获取值:
String key = "key";
Flux r = Flux.just("hello").flatMap(s -> Mono.subscriberContext().subscriberContext(ctx -> ctx.put(key, "world"));.map(ctx -> s + " " + ctx.get(key)))
String key = "key"
Flux r = Flux.just("hello").flatMap(s -> Mono.subscriberContext().map(ctx -> s + " " + ctx.get(key))).subscriberContext(ctx -> ctx.put(key, "world"));
r.subscribe(System.out::println);
// 输出:hello world