python - 通过 rxpy 从 observable 返回一个值

标签 python reactivex rx-py

将 rx.Observable 对象转换为函数中的“普通”对象的优雅方法是什么?

例如:

def foo():
    return rx.Observable.just('value').subscribe(<some magic here>)

>>> print(foo())

# expected:
# value
# however get:
# <rx.disposables.anonymousdisposable.AnonymousDisposable at SOME ADDRESS>

我知道 subscribe 的返回是一个一次性对象,实现这一点的“丑陋”方式是:

class Foo:
    def __init__(self):
        self.buffer = None

    def call_kernel(self):
        rx.Observable.just('value').subscribe(lambda v: self.buffer = v)

    def __call__(self):
        self.call_kernel()
        return self.buffer
>>> Foo()

# get:
# value

有没有更好的方法来做到这一点?

谢谢。

最佳答案

看一下 Observable::to_blocking():它创建一个 BlockingObservable,它可强制转换为包含所有发出项目的列表。举个例子:

def foo():
  return list(rx.Observable.just('value').to_blocking())[0]

我还想指出您的第二个解决方案是危险的,因为无法保证何时发出值,并且您的 __call__ 依赖于 'value' 立即发出。

关于python - 通过 rxpy 从 observable 返回一个值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46051331/

相关文章:

python - 如何将列表传递给@app.route(/<list>)

android - Rx Kotlin : map function can't infer return type

python - RxPy 有 `flatten` 运算符吗?

python - 如何从标准输入等流中创建 rx.py Observable?

python - 从 pandas 数据框中删除重复的行,其中只有一部分列是相同的

python - 如何在tensorflow中给word2vec模型一个特定的词

python - 使用networkX的karger min cut算法运行时间非常慢

javascript - 合并来自不断变化的 Observables 列表的事件

rx-java - 惰性随机生成器可观察

python - 如何使用 rxpython 使长时间运行的程序超时?