java - 将 Akka 源代码转换为 RxJava2 Flowable?

标签 java akka rx-java rx-java2

我目前正在使用以下代码将 Akka 源(例如使用 Akka 的 FileIO 读取文件接收到的)转换为 RxJava2 Flowable:

private Flowable<Buffer> akkaConversion(Source<ByteString, NotUsed> data,
        Flow<ByteString, ByteString, NotUsed> compType) {
    final Publisher<ByteString> uncompressedData =
         data.via(compType)
             .runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), this.materializer);
    return Flowable.fromPublisher(uncompressedData)
       .map(bytes -> Buffer.buffer(bytes.toArray()));
}

我对这个(工作)解决方案的问题是,至少就我目前的理解而言,.runWith()方法调用已经运行了代码,即从给定的 Source,对其进行缓冲,然后将其放入 Publisher 中。此时有什么办法可以避免必须运行它吗?我想在此时定义转换而不使用物化器,并且仅在稍后订阅 Flowable 后才运行所有内容。

谢谢!

最佳答案

使用 defer (旁注:我必须多次这样做,因为 Akka Sources 是一次性的):

private Flowable<Buffer> akkaConversion(Source<ByteString, NotUsed> data,
        Flow<ByteString, ByteString, NotUsed> compType) {

    return Flowable.defer(() -> data.via(compType)
         .runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), this.materializer) 
    ).map(bytes -> Buffer.buffer(bytes.toArray()));
}

关于java - 将 Akka 源代码转换为 RxJava2 Flowable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45855239/

相关文章:

java - 通过引用传递对象到线程

java - Spring JTA TransactionManager 配置 : Supporting both Tomcat and JBoss

java - 如何从java swing中的多个文本框中获取值

java - System.exit 在 "sbt run"中工作,但在 .jar 中不起作用

scala - Akka 2.1 异常处理(Scala)

java - RxJava : How to . 压缩两个 Observable,然后 .merge 它们并最终 .reduce 以聚合所有结果

java - 如何在java中为带括号的字符串编写正则表达式?

Scala Akka Actor - 获取 Actor 的状态

java - 如何在没有主题的 RX 中创建反馈循环?

android - 如果对 RxJava 主题进行了 onComplete 调用,我是否必须再次手动取消订阅?