我目前正在使用以下代码将 Akka 源(例如使用 Akka 的 FileIO 读取文件接收到的)转换为 RxJava2 Flowable:
private Flowable<Buffer> akkaConversion(Source<ByteString, NotUsed> data,
Flow<ByteString, ByteString, NotUsed> compType) {
final Publisher<ByteString> uncompressedData =
data.via(compType)
.runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), this.materializer);
return Flowable.fromPublisher(uncompressedData)
.map(bytes -> Buffer.buffer(bytes.toArray()));
}
我对这个(工作)解决方案的问题是,至少就我目前的理解而言,.runWith()
方法调用已经运行了代码,即从给定的 Source,对其进行缓冲,然后将其放入 Publisher 中。此时有什么办法可以避免必须运行它吗?我想在此时定义转换而不使用物化器,并且仅在稍后订阅 Flowable 后才运行所有内容。
谢谢!
最佳答案
使用 defer (旁注:我必须多次这样做,因为 Akka Sources 是一次性的):
private Flowable<Buffer> akkaConversion(Source<ByteString, NotUsed> data,
Flow<ByteString, ByteString, NotUsed> compType) {
return Flowable.defer(() -> data.via(compType)
.runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), this.materializer)
).map(bytes -> Buffer.buffer(bytes.toArray()));
}
关于java - 将 Akka 源代码转换为 RxJava2 Flowable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45855239/