java - RxJava : how to process Flowable interactively from console

标签 java console rx-java backpressure

我创建了一个解析文件的 Flowable (RxJava v3)。我希望它支持背压。这很重要,因为文件可能非常大,我不希望它们立即加载到内存中。这是我的第一次尝试:

public Flowable<Byte> asFlowable(InputStream is) {
    return Flowable.create(new FlowableOnSubscribe<Byte>() {

         @Override
         public void subscribe(FlowableEmitter<Byte> emitter) throws Exception {
             try (DataInputStream inputStream = new DataInputStream(is)){
                 if (inputStream.readInt() != SOME_CONSTANT) {
                     throw new IllegalArgumentException("illegal file format");
                 }

                 if (inputStream.readInt() != SOME_OTHER_CONSTANT) {
                     throw new IllegalArgumentException("illegal file format");
                 }

                 final int numItems = inputStream.readInt();

                 for(int i = 0; i < numItems; i++) {
                     if(emitter.isCancelled()) {
                         return;
                     }

                     emitter.onNext(inputStream.readByte());
                 }

                 emitter.onComplete();       
             } catch (Exception e) {
                 emitter.onError(e);
             } 
         }
     }, BackpressureStrategy.BUFFER);
}

我使用 Flowable.create 而不是 Flowable.generate 的原因是因为我需要验证文件,如果开头有一些魔数(Magic Number)则抛出错误文件错误或找不到。这不太适合 Flowable.generate lambda(但如果您知道更好的方法,请发布)。

好吧,让我们假设冷的 Flowable 支持背压。现在我想在类似控制台的应用程序中处理它。

问题: 我想从 Flowable 请求一个新的字节,并在每次用户按下空格时将其打印到控制台(类似于 moreless 在 Linux 中所做的)。最好的方法是什么?我打算直接在 public static void main 方法中观察 flowable,因为我需要使用控制台进行读写。

我一直在阅读 Backpressure section in RxJAva's Wiki并找到了这个片段:

someObservable.subscribe(new Subscriber<t>() {
    @Override
    public void onStart() {
      request(1);
    }

    @Override
    public void onCompleted() {
      // gracefully handle sequence-complete
    }

    @Override
    public void onError(Throwable e) {
      // gracefully handle error
    }

    @Override
    public void onNext(t n) {
      // do something with the emitted item "n"
      // request another item:
      request(1);
    }
});

但这让我更加困惑,因为 RxJava 3 中似乎不存在 request 方法。

最佳答案

使用generateblockingSubscribe 并从控制台读取一行:

class State {
     DataInputStream inputStream;
     int count;
     int i;
}

BufferedReader bin = new BufferedReader(new InputStreamReader(System.in));

Flowable.generate(() -> {
    State s = new State();
    s.inputStream = new DataInputStream(is);
    try {
        if (s.inputStream.readInt() != SOME_CONSTANT) {
            throw new IllegalArgumentException("illegal file format");
        }

        if (s.inputStream.readInt() != SOME_OTHER_CONSTANT) {
            throw new IllegalArgumentException("illegal file format");
        }
        s.count = s.inputStream.readInt();
    } catch (IOException ex) {
        s.inputStream.close();
        throw ex;
    }
    return s;
}, (state, emitter) -> {
    if (state.i < s.count) {
        emitter.onNext(state.inputStream.readByte());
        s.i++;
    }
    if (state.i >= s.count) {
        emitter.onComplete();
    }
}, state -> {
    state.inputStream.close();
})
.subscribeOn(Schedulers.io())
.blockingSubscribe(b -> {
    System.out.println(b);
    bin.readLine();
}, Flowable.bufferSize());

关于java - RxJava : how to process Flowable interactively from console,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58962113/

相关文章:

rx-java - 如果第二个可观察值立即失败,ReactiveX concat 不会从第一个可观察值生成 onNext

java - 将间隔与运算符一起使用 (RxJava)

java - 无法追加

java - 可观察列表转换/转换

linux - 命令了解文件夹中安装的内容

10'000 行后控制台中的 c++ 错误背景颜色

java - RxAndroid : UI changes on Schedulers. io() 线程

java - 比较和创建数组,而无需单独向数组添加元素

java - 场景元素在javafx中显示两次?

c# - 在控制台中使用颜色,如何以简化的符号存储