java - RxJava 和 rx.exceptions.MissingBackPressureException 异常

标签 java rx-java rx-android

我正在尝试让一个非常基本的基于 RxJava 的应用程序正常工作。我定义了以下 Observable 类,它从文件中读取并返回行:

public Observable<String> getObservable() throws IOException
    {
        return Observable.create(subscribe -> {
            InputStream in = getClass().getResourceAsStream("/trial.txt");
            BufferedReader reader = new BufferedReader(new InputStreamReader(in));
            String line = null;
            try {
                while((line = reader.readLine()) != null)
                {
                    subscribe.onNext(line);
                }
            } catch (IOException e) {
                subscribe.onError(e);
            }
            finally {
                subscribe.onCompleted();
            }
        });
    }

接下来我定义了 subscrober 代码:

public static void main(String[] args) throws IOException, InterruptedException {
        Thread thread = new Thread(() ->
        {
            RxObserver observer = new RxObserver();
            try {
                observer.getObservable()
                        .observeOn(Schedulers.io())
                        .subscribe( x ->System.out.println(x),
                                    t -> System.out.println(t),
                                    () -> System.out.println("Completed"));

            } catch (IOException e) {
                e.printStackTrace();
            }
        });

        thread.start();
        thread.join();
    }

该文件有近 50000 条记录。运行应用程序时,我收到“rx.exceptions.MissingBackPressureException”。我已经阅读了一些文档,并按照建议,我尝试在调用链中添加“.onBackPressureBuffer()”方法。但后来我没有收到异常,但完成的调用也没有被解雇。

处理快速生成 Observable 的场景的正确方法是什么?

最佳答案

第一个问题是你的 readLine 逻辑忽略了背压。您可以在开始使用 observeOn 之前应用 onBackPressureBuffer(),但最近添加了一个 SyncOnSubscribe,可让您一一生成值并获取背压处理:

SyncOnSubscribe.createSingleState(() => {
    try {
        InputStream in = getClass().getResourceAsStream("/trial.txt");
        return new BufferedReader(new InputStreamReader(in));
    } catch (IOException ex) {
        throw new RuntimeException(ex);
    }
},
(s, o) -> {
    try {
        String line = s.readLine();
        if (line == null) {
            o.onCompleted();
        } else {
            o.onNext(line);
        }
    } catch (IOException ex) {
        s.onError(ex);
    }
},
s -> {
    try {
       s.close();
    } catch (IOException ex) {
    }
});

第二个问题是您的线程将在 io 线程上的所有元素被传递之前完成,从而主程序退出。删除 observeOn、添加 .toBlocking 或使用 CountDownLatch

RxObserver observer = new RxObserver();
try {

    CountDownLatch cdl = new CountDownLatch(1);

    observer.getObservable()
           .observeOn(Schedulers.io())
           .subscribe( x ->System.out.println(x),
                       t -> { System.out.println(t); cdl.countDown(); },
                       () -> { System.out.println("Completed"); cdl.countDown(); });

    cdl.await();
 } catch (IOException | InterruptedException e) {
     e.printStackTrace();
 }

关于java - RxJava 和 rx.exceptions.MissingBackPressureException 异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34570615/

相关文章:

java - RxJava。运营商在哪里?

asynchronous - rxjava 在创建 observable 后添加项目

android - 在 Android 上调试时 RxJava 缓存线程中的 InterruptedException

java - Servlet 应该有映射,不能解析 Servlet

java - 是否可以从带有 JAXB 注释的类生成 XSD?

java - Firebase 实时查询 startAt 和 endAt 在联合属性上未按预期工作

android - Retrofit & RxJava 多个请求完成

java - 如果 RxJava 中结果无效,如何发送 onError

android - 致命 - 接口(interface)未实现 - OkHttp3 - Android

java - 如何在 JList 上工作