java - 将 ByteBuffer 流转换为 Rx 中的行的有效方法

标签 java nio reactive-programming rx-java

我要改造Observable<ByteBuffer>通过按行结束字符拆分成行 ( Observable<String> )。如果我们有类似 toString 的函数, concatsplitByLine ,我们必须能够做到以下几点:

Observable<ByteBuffer> o = ...;
o.map(toString).reduce(concat).flatMap(splitByLine);

然而,该算法需要先扫描整个字节并将它们存储在内存中,然后才能实际发出反序列化字符串的第一行。如何在每次行尾递增地出现在字节中时发出新行?

最佳答案

我终于找到了rxjava-string .它为我们提供了Rx 运算符 来处理分块 字节数组和字符串流。 API文档找到here .

题目可以通过以下方式实现:

Observable<byte[]> o = ...;
Charset charset = Charset.forName("UTF-8");
// StringObservable have no operators for ByteBuffer yet
StringObservable.byLine(StringObservable.decode(o, charset));

关于java - 将 ByteBuffer 流转换为 Rx 中的行的有效方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29804243/

相关文章:

Java iText - 获取从 pdf 文件中选择的文本的字体大小和系列

java - gridgain缓存访问性能: sql vs cache. getKey?

java - RxJava : Return List Containing Past And Current Result

java - SocketChannel write( ) 返回没有错误,但实际上没有发送数据

Java 异步套接字 IO

java - 使用 Java 7/Eclipse 3.7.1 的 headless pde 构建失败

特定文件系统的 Java 路径

Java react 器-链Mono <Void>与另一个产生Mono <Object>的异步任务

java - "More produced than requested"使用 `replay` 和 `autoConnect` 时出现异常

javascript - 从 setTimeout (Rxjs) 返回一个可观察值