c++ - RxCpp:如果使用 observe_on(rxcpp::observe_on_new_thread()),观察者的生命周期

标签 c++ reactive-programming rxcpp

如果观察者正在使用 observe_on(rxcpp::observe_on_new_thread()),等待所有观察者 on_completed 被调用的正确方法是什么:

例如:

{
    Foo foo;
    auto generator = [&](rxcpp::subscriber<int> s)
    {
        s.on_next(1);
        // ...
        s.on_completed();
    };
    auto values = rxcpp::observable<>::create<int>(generator).publish();
    auto s1 = values.observe_on(rxcpp::observe_on_new_thread())
                    .subscribe([&](int) { slow_function(foo); }));

    auto lifetime = rxcpp::composite_subscription();
    lifetime.add([&](){ wrapper.log("unsubscribe");  });
    auto s2 = values.ref_count().as_blocking().subscribe(lifetime);

    // hope to call something here to wait for the completion of
    // s1's on_completed function
}

// the program usually crashes here when foo goes out of scope because 
// the slow_function(foo) is still working on foo.  I also noticed that
// s1's on_completed never got called.

我的问题是如何等到 s1 的 on_completed 完成而不必设置和轮询一些变量。

使用 observe_on() 的动机是因为值通常有多个观察者,我希望每个观察者同时运行。也许有不同的方法可以实现相同的目标,我愿意接受您的所有建议。

最佳答案

合并两者将允许单个阻塞订阅等待两者完成。

{
    Foo foo;
    auto generator = [&](rxcpp::subscriber<int> s)
    {
        s.on_next(1);
        s.on_next(2);
        // ...
        s.on_completed();
    };

    auto values = rxcpp::observable<>::create<int>(generator).publish();

    auto work = values.
        observe_on(rxcpp::observe_on_new_thread()).
        tap([&](int c) {
            slow_function(foo);
        }).
        finally([](){printf("s1 completed\n");}).
        as_dynamic();

    auto start = values.
        ref_count().
        finally([](){printf("s2 completed\n");}).
        as_dynamic();

    // wait for all to finish
    rxcpp::observable<>::from(work, start).
        merge(rxcpp::observe_on_new_thread()).
        as_blocking().subscribe();
}

几点。

流必须返回相同的类型才能使合并工作。如果组合不同类型的流,请改用 combine_latest。

observable<>::from() 中的 observables 的顺序很重要,开始流有 ref_count,因此必须最后调用它,以便后续合并在启动生成器之前订阅工作。

合并有两个线程调用它。这需要使用线程安全协调。 rxcpp 是按使用付费的。默认情况下,运算符假定所有调用都来自同一个线程。任何从多个线程获取调用的运算符都需要提供线程安全的协调,运算符使用它来实现线程安全的状态管理和输出调用。

如果需要,可以为两者使用相同的协调器实例。

关于c++ - RxCpp:如果使用 observe_on(rxcpp::observe_on_new_thread()),观察者的生命周期,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32365771/

相关文章:

android - 如何在 RxJava2 中使用发布序列实现缓冲逻辑?

ios - 以多种方式通过 RACSignal 设置按钮的启用属性?

c++ - C++ 中的 RxCpp 响应式扩展

c++ - 在链接时合并全局数组/从多个编译单元填充全局数组

c++ - popen telnet 中的子命令

c++ - 修改另一个类的 vector 成员时出错

javascript - 什么是 "callback hell"以及 RX 如何以及为何解决它?

c++ - 在嵌套的 lambda 中应用 function_traits 时编译失败

c++11 - RXCPP:阻塞功能超时

c++ - rxcpp - 当一个可观察对象发出一个值时,为什么不调用所有观察者的 on_next 函数