原文:
译者:兼职翻译刘晓鹏
在这篇文章中,我们继续响应 式编程(Reactive Programming)这个系列,我们主要通过实际的代码实例来解释某些概念。最终的结果应该能够让你更好的了解到Reactive的不同之处。这里的 例子非常抽象,但是这样可以给你一个思考API和编程风格的方式,并开始感觉到它的不同之处。我们将看到Reactive中的诸多元素,并学习如何控制数 据流,如果有必要的话,将会通过后台线程中进行处理。
建立项目
我们将使用Reactor库来说解释我们的观点。该代码可以很容易地用其他工具编写。如果你想玩转这些代码,弄清楚它的工作原理而不仅仅是复制粘贴,在GitHub有可运行的示例可以参考。 开始可以在 中建立一个空白项目,并通过Maven添加对Reactor Core的依赖。 1 2 3 4 5 | <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.0.0.BUILD-SNAPSHOT</version> </dependency> |
使用Gradle非常相似:
1 | compile 'io.projectreactor:reactor-core:3.0.0.BUILD-SNAPSHOT' |
现在我们开始编写一些代码。
什么使它正常运转?
Reactive的基本模块是 由一系列事件组成的,有两个主要的参与者,一个发布者和一个订阅者。它也可以称其为一个序列“流”,因为它就是这样一个东西。如果我们需要,我们会用一个 小写的s来代表“流”,因为java 8中有一个java.util.Stream,这两者是不同的,所以不要把这两者混淆了。我们主要讨论发布者和订阅者(也就是Reactive Streams)。 我们在示例中使用了 Reactor库,所以我们将一直使用其中的标示符,发布者叫做Flux(它实现了Reactive Streams的Publisher接口)。RxJava库非常类似,包含了很多并行特性,不过在RxJava中,我们将会讨论Observable ,但代码会很相似。(Reactor 2.0将其称之为Stream,这与我们需要讨论的Java 8中Stream容易混淆,所以我们只使用Reactor2.5中的新代码)。
生成器
一个Flux是一个发布者,是一组特定的POJO类型事件的集合,所以它支持泛型,例如Flux<T> 是一个T类型的发布者。 Flux 提供一些静态方便的方法来从多个数据源创建自身的实例。例如,从一个数组创建一个Flux: 我们只要生成了一个Flux,现在就可以用它来完成一些功能。事实上,你只能做两件事情:运算(转换,或者与其他序列相结合),订阅(这是一个发布者)。
单值序列
你经常会遇到一种序列,你知道 它包含只有1或0个元素,例如,一个根据标识符查找实体的存储方法。Reactor有一个Mono类型,表示一个单值或空的Flux。Mono的API 与Flux非常类似,但是更为集中,因为并不是所有的操作对单值序列都是有意义的。RxJava也有一个类似的类(版本1.x)叫做Single,还有另 一个叫Completable的空序列。Reactor用Mono<Void>表示空序列。
操作数
Flux有很多方法,几乎所有的方法都是做运算操作的。在这里我们不会看到这些方法,因为有更好的地方去查看(例如javadocs)。我们只需要知道通过一个操作能获取到什么,它可以为你做什么。 例如,查询一个Flux的内部事件并记录到标准输出,你可以调用.log()方法,或者你可以用map()来进行转换: 1 2 3 4 5 | Flux<String> flux = Flux.just( "red" , "white" , "blue" ); Flux<String> upper = flux .log() .map(value -> value.toUpperCase()); |
1 2 3 4 5 | Stream<String> stream = Streams.of( "red" , "white" , "blue" ); Stream<String> upper = stream.map(value -> { System.out.println(value); return value.toUpperCase(); }); |
根据我们对Flux所做的观 察可以知道:没有数据处理,它只是一个执行计划。但是,Flux和Stream之间有很重要的差别,这种差别使得Stream API不适合于Reactive的使用场景。Flux还有有多操作数,其中大部分只是为了方便,其真正的区别是在消费数据时如何查找下一部分我们需要的数 据。
提示:Sebastien Deleuz的Reactive Types是一篇非常有用的博客,他通过他们定义的类型,描述了流(Stream)与响应式(Reactive)API之间的差异,以及如何使用它们。 Flux和Stream之间更多细节上差异的在这篇文章中都有高亮显示。
订阅者
为了获取数据流,你必须通过 Flux的某个subscribe()方法来进行订阅。只有这些方法能使数据流动 。背后的过程是通过在序列上声明操作链(如果有的话),并请求发布者开 始创建数据来实现的。在我们一直在使用的示例中,这意味着底层的字符串集合是在进行迭代的。在更复杂的示例中可能会触发从文件系统中读取一个文件,或者从 一个数据库中拉取,或者调用HTTP服务。
下面是调用subscribe()的实际示例:
1 2 3 4 | Flux.just( "red" , "white" , "blue" ) .log() .map(value -> value.toUpperCase()) .subscribe(); |
输出结果为:
09:17:59.665[main] INFO reactor.core.publisher.FluxLog- onSubscribe()09:17:59.666[main] INFO reactor.core.publisher.FluxLog- request(unbounded)09:17:59.666[main] INFO reactor.core.publisher.FluxLog- onNext(red)09:17:59.667[main] INFO reactor.core.publisher.FluxLog- onNext(white)09:17:59.667[main] INFO reactor.core.publisher.FluxLog- onNext(blue)09:17:59.667[main] INFO reactor.core.publisher.FluxLog- onComplete() |
这样我们可以看到无参数方法 subscribe()的效果,即请求发送者发送所有数据?-?只有一request()方法记录,并且它是“无界”的。我们也可以看到每一项的回调:发 布后(onNext()),序列结束(onComplete()),以及原始的订阅(onSubscribe())。如果你需要,你可以通过Flux的 doOn*() 方法来监听这些事情。这些都是自身的操作,不需要订阅,所以,它们不会引起任何数据流动。
subscribe() 方法是被重载的,通过其他一些你给定的选项可以用来控制不同的行为。一个重要且方便的subscribe()的重载方式是将回调函数作为其参数。第一个参 数是一个Consumer,它可以为每一项提供回调方法,你也可以选择添加一个Consumer来处理异常,或者在序列完成后执行一个Runnable。 例如,为每一项提供回调: 1 2 3 4 | Flux.just( "red" , "white" , "blue" ) .log() .map(value -> value.toUpperCase()) .subscribe(System.out::println); |
输出结果如下:
09:56:12.680[main] INFO reactor.core.publisher.FluxLog- onSubscribe()09:56:12.682[main] INFO reactor.core.publisher.FluxLog- request(unbounded)09:56:12.682[main] INFO reactor.core.publisher.FluxLog- onNext(red)RED09:56:12.682[main] INFO reactor.core.publisher.FluxLog- onNext(white)WHITE09:56:12.682[main] INFO reactor.core.publisher.FluxLog- onNext(blue)BLUE09:56:12.682[main] INFO reactor.core.publisher.FluxLog- onComplete() |
我们可以用各种方式来控制数据流,使它们“有界”。通过原生的API进行控制时,可以从一个 Subscriber中获取Subscription。这相当于简短方法subscribe()的冗长形式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | .subscribe(newSubscriber<String>(){ publicvoid onSubscribe(Subscription s){ s.request(Long.MAX_VALUE); } @Override publicvoid onNext(String t){ System.out.println(t); } @Override publicvoid onError(Throwable t){ } @Override publicvoid onComplete(){ } }) |
如果要控制流,例如,在一次最多消耗2项,你可以更智能地使用Subscription:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | .subscribe(newSubscriber<String>(){ privatelong count =0; privateSubscription subscription; @Override publicvoid onSubscribe(Subscription subscription){ this .subscription = subscription; subscription.request(2); } @Override publicvoid onNext(String t){ count++; if (count>=2){ count =0; subscription.request(2); } } ... |
该Subscriber即为一次“批处理”两项。这是一个非常常见的情形,所以你可能会考虑提取到一个方便的类,使代码更加具备可读性。输出结果如下:
09:47:13.562[main] INFO reactor.core.publisher.FluxLog- onSubscribe()09:47:13.564[main] INFO reactor.core.publisher.FluxLog- request(2)09:47:13.564[main] INFO reactor.core.publisher.FluxLog- onNext(red)09:47:13.565[main] INFO reactor.core.publisher.FluxLog- onNext(white)09:47:13.565[main] INFO reactor.core.publisher.FluxLog- request(2)09:47:13.565[main] INFO reactor.core.publisher.FluxLog- onNext(blue)09:47:13.565[main] INFO reactor.core.publisher.FluxLog- onComplete() |
事实上,订阅者的批量处理是非常常见的一种情况,所以在Flux里已经有了方便可用方法。上面的批量处理的示例也可以这样来实现:
1 2 3 4 | Flux.just( "red" , "white" , "blue" ) .log() .map(value -> value.toUpperCase()) .subscribe( null ,2); |
(注意:subscribe()的调用需要有一个请求限制)。输出结果如下:
10:25:43.739[main] INFO reactor.core.publisher.FluxLog- onSubscribe()10:25:43.740[main] INFO reactor.core.publisher.FluxLog- request(2)10:25:43.740[main] INFO reactor.core.publisher.FluxLog- onNext(red)10:25:43.741[main] INFO reactor.core.publisher.FluxLog- onNext(white)10:25:43.741[main] INFO reactor.core.publisher.FluxLog- request(2)10:25:43.741[main] INFO reactor.core.publisher.FluxLog- onNext(blue)10:25:43.741[main] INFO reactor.core.publisher.FluxLog- onComplete() |
提示:一个处理序列的库(例如 Sping Reactive Web)能够处理订阅。一种好的方式是把这些问题沉淀到栈中,因为它可以让你的代码从业务逻辑中分离出来,使其更具可读性,更容易测试和维护。所以,通常 来说,如果能避免订阅一个序列是一件好事,或者至少把该代码放入到处理层,与业务逻辑分离开来。
线程、调度和后台处理
以上所有的记录都有一个有趣的 特点,那就是它们都是在主线程上执行,即主线程是subscribe()的调用者。这强调了一个重要的观点:Reactor能大量减少线程,因为这样能让 你获取最好的性能。如果在过去的5年中你一直争论线程和线程池与异步执行的问题,这可能是一个令人惊讶的声明,因为这是试图从你的服务中挤出更多的“果 汁”。不过这是真的:没有任何线程的切换,即使优化后的JVM对线程的处理非常有效,但是在单线程做计算还是会更快。Reactor已经处理了所有异步处 理的关键因素,它假设你是知道自己在做什么。 Flux提供了一些配置方法控制线程边界。例如,你可以通过Flux.subscribeOn()来配置订阅在一个后台线程中执行: 1 2 3 4 5 | Flux.just( "red" , "white" , "blue" ) .log() .map(String::toUpperCase) .subscribeOn(Schedulers.parallel()) .subscribe( null ,2) |
输出结果如下:
13:43:41.279[parallel-1-1] INFO reactor.core.publisher.FluxLog- onSubscribe()13:43:41.280[parallel-1-1] INFO reactor.core.publisher.FluxLog- request(2)13:43:41.281[parallel-1-1] INFO reactor.core.publisher.FluxLog- onNext(red)13:43:41.281[parallel-1-1] INFO reactor.core.publisher.FluxLog- onNext(white)13:43:41.281[parallel-1-1] INFO reactor.core.publisher.FluxLog- request(2)13:43:41.281[parallel-1-1] INFO reactor.core.publisher.FluxLog- onNext(blue)13:43:41.281[parallel-1-1] INFO reactor.core.publisher.FluxLog- onComplete() |
提示:如果你自己编写这段代码,或者复制粘贴,请记住要在jvm退出之前等待处理停止。
请注意,订阅和所有的处理都发 生在后台一个叫“parallel-1-1”线程中——这是因为我们要求订阅Flux的订阅者在后台运行。如果项目是CPU密集的型话,这将是很好的(但 是实际上,在后台线程中执行是没有意义的,因为你需要关注上下文的切换,但是这样就无法更快的获取结果)。您可能还希望能够执行I / O密集型任务,这有可能会阻塞。在这种情况下,您希望尽快完成任务,而不需要阻塞调用方。一个线程池是你的最好朋友,你可以通过 Schedulers.parallel()获取线程池。要将任务的每一项分布到单独的线程(最多到线程池的限制)中执行,我们需要将它们分成独立的发布 者,并通过一个后台进程来询问每个发布者的结果。我们可以通过一个叫做flatmap()的方法来实现,将每一项映射到一个发布者(可能是不同类型的), 然后返回到一个新类型的序列: 1 2 3 4 5 6 7 8 9 | Flux.just( "red" , "white" , "blue" ) .log() .flatMap(value -> Mono.just(value.toUpperCase()) .subscribeOn(Schedulers.parallel()), 2) .subscribe(value ->{ log.info( "Consumed: " + value); }) |
注意,这里使用 flatMap()将项分解到一个“子”发布者中,在“子”发布者中我们可以控制订阅的每一项而不是整个序列。Reactor内建的默认行为是尽可能长的 维持在一个线程中,如果我们想要在后台线程中处理特殊的项或者对项进行分组,我们就需要显示的声明。事实上,这是一项公认的强制进行并行处理的技巧之一 (更多细节问题见Reactive Gems)。
输出结果如下:
1 2 3 4 5 6 7 8 9 10 | 15:24:36.596[main] INFO reactor.core.publisher.FluxLog- onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@6f1fba17) 15:24:36.610[main] INFO reactor.core.publisher.FluxLog- request(2) 15:24:36.610[main] INFO reactor.core.publisher.FluxLog- onNext(red) 15:24:36.613[main] INFO reactor.core.publisher.FluxLog- onNext(white) 15:24:36.613[parallel-1-1] INFO com.example.FluxFeaturesTests-Consumed: RED 15:24:36.613[parallel-1-1] INFO reactor.core.publisher.FluxLog- request(1) 15:24:36.613[parallel-1-1] INFO reactor.core.publisher.FluxLog- onNext(blue) 15:24:36.613[parallel-1-1] INFO reactor.core.publisher.FluxLog- onComplete() 15:24:36.614[parallel-3-1] INFO com.example.FluxFeaturesTests-Consumed: BLUE 15:24:36.617[parallel-2-1] INFO com.example.FluxFeaturesTests-Consumed: WHITE |
注意,现在有多个线程来消费 数据项,flatmap()是隐式的进行并发处理,只要有可用的项,就要确保任何给定的时间内都有2个数据项在处理。我们可以看到许多 request(1),这是因为系统试图在管道中保持2个数据项,通常它们不会在同一时间点上完成。事实上,Reactor的尝试是非常聪明的,它会从上 游发布者中预取数据项,以便消除订阅者的等待时间(我们在这没有看到,是因为这里的数据项太少?-?我们只处理3项)。
提示:(“red”、“white”、“blue”)三项可能太少,很难证明后台有多个线程在执行,所以生成更多数据可能会更合适。例如,你可以用一个随机数生成器来实现。 Flux也有一个同样的publishOn()方法,但是它是通过监听器(如onNext()或消费者(consumer)回调),而不是订阅者自身来实现的: 1 2 3 4 5 6 7 8 | Flux.just( "red" , "white" , "blue" ) .log() .map(String::toUpperCase) .subscribeOn(Schedulers.newParallel( "sub" )) .publishOn(Schedulers.newParallel( "pub" ),2) .subscribe(value ->{ log.info( "Consumed: " + value); }); |
15:12:09.750[sub-1-1] INFO reactor.core.publisher.FluxLog- onSubscribe()15:12:09.758[sub-1-1] INFO reactor.core.publisher.FluxLog- request(2)15:12:09.759[sub-1-1] INFO reactor.core.publisher.FluxLog- onNext(red)15:12:09.759[sub-1-1] INFO reactor.core.publisher.FluxLog- onNext(white)15:12:09.770[pub-1-1] INFO com.example.FluxFeaturesTests-Consumed: RED15:12:09.771[pub-1-1] INFO com.example.FluxFeaturesTests-Consumed: WHITE15:12:09.777[sub-1-1] INFO reactor.core.publisher.FluxLog- request(2)15:12:09.777[sub-1-1] INFO reactor.core.publisher.FluxLog- onNext(blue)15:12:09.777[sub-1-1] INFO reactor.core.publisher.FluxLog- onComplete()15:12:09.783[pub-1-1] INFO com.example.FluxFeaturesTests-Consumed: BLUE |
请注意,消费回调(记为 “Consumed:…”)是在发布者线程pub-1-1上的。如果你调用subscribeOn(),你可以看在pub-1-1线程上看到所有的处理数 据的第二个chunk。这再次反映了Reactor的线程节约——如果没有显式请求线程切换,它下一个请求便停留在相同的线程上。
注意:
我们改变了这个示例中的代码,将subscribe(null, 2)改为一个包含prefetch=2的publishon()方法。在这种情况下,subscribe()中隐式获取数据的尺寸将会被忽略。
提取器:黑暗面的订阅者
还有另一种方式订阅一个序列, 即Mono.block()或Mono.toFuture()或Flux.toStream()(这些都是“提取”的方法?-?这些方法获取到的 Reactive类型缺乏灵活性、阻碍抽象)。Flux也有转换器collectlist()和collectmap(),可以将Flux 转换为 Mono。实际上它们不订阅序列,但是它们可以对独立的数据项以任何方式来实现你通过订阅完成的功能。
警告
一个很好的经验法则是"永远不要调用extractor"。当然,也有一些例外(否则方法将不会存在)。一个值得注意的例外是测试中,因为它可以阻塞计算结果,这是非常有用的。
如果你需要适应旧的API, 例如Spring MVC,这些方法提供一个从Reactive到阻塞“逃生舱”。当你调用Mono.block()你就放弃Reactive Streams的所有好处。这就是Reactive Streams和java8 Streams?的本质区别——java原生的Stream只具有“全有或全无”的订阅模式,即相当于Mono.block()。当 然,subscribe()可以阻塞调用线程,所以它像转换器方法一样危险,但是你具有更好的控制——你可以通过使用subscribeOn()来代替, 你也可以通过一段时间的后台压力测试来决定是否继续使用。
结论
在这篇文章中,我们介绍了 Reactive Streams和Reactor API的基本知识。如果你需要知道更多细节,有很多地方可以查看,但是不能代替手动编码,所以,可以使用GitHub中的代码(本文中的测试项目叫做 “flux”),或前往 Lite RX Hands On专题讨论会。到目前为止,这真的只是略读,我们还没有学到太多,没有证明通过非Reactive工具不能以一种更明显的方式来完成的功能。本系列的下 一篇文章将更深入地了解Reactive模型中的阻塞,调度和异步处理,并演示在什么情况下使用可以能获得真正的好处。
http://www.jointforce.com/jfperiodical/article/2866