python - 在Python中创建响应式(Reactive)迭代器的策略是什么?

标签 python reactive-programming

我一直在阅读Javascript世界中functional reactive programming的许多令人兴奋的开发。我也被Python的iterator protocol迷住了。我知道迭代器可以用来构建联合例程,现在我想知道,什么方法可以构建一个反应迭代器,我们称它为“流”,这样在流上迭代就会阻塞,直到一个新的值被传递到流中?
下面是我希望能够做到的一个例子:

my_stream = Stream()

for x in my_stream: # <-- this "blocks" the co-routine if my_stream is empty
    do_something_to(x)

# ... meanwhile, elsewhere, in another co-routine or whatever...
my_stream.send('foo') # <-- this advances any on-going iterations on my_stream

传统上,当迭代器完成时,它将raise StopIteration并且for循环将结束。相反,我希望for循环(即对stream.next()的下一个调用)能够“阻塞”并将控制权让给另一个执行流,无论是greenlet还是coroutine或其他什么。
我想我要做的是避免使用signal/callback模式,因为回调在Python中非常笨拙,除非它们可以放入lambda中。这就是我所说的“反应迭代器”的意思——流控制是反向的,for循环的主体(或在流上迭代的任何东西)变为反应的,而不是主动的,本质上是一个内联块回调,它在项目进入流时触发。
那么,以前做过吗?如果不是的话,什么样的模式/库/什么能让它工作?格文特?龙卷风的伊洛普?小绿?

最佳答案

这看起来很像用迭代器协议包装线程和排队。

import threading
import random
import Queue
import time

class Supplier(threading.Thread):
    def __init__(self, q):
        self.queue = q
        threading.Thread.__init__(self)

    #This is the 'coroutine', it puts stuff in the queue at random
    #intervals up to three seconds apart
    def run(self):
        for i in range(10):
            self.queue.put(i)
            time.sleep(random.random()*3)
        self.queue.put(StopIteration())

class Consumer(object):
    def __init__(self, q):
        self.queue = q

    def __iter__(self):
        return self

    def next(self):
        #The call to Queue.get below blocks indefinitely unless we specify a timeout,
        item = self.queue.get()
        self.queue.task_done()
        if isinstance(item, StopIteration):
            raise StopIteration
        else:
            return item

Q = Queue.Queue()
S = Supplier(Q)
C = Consumer(Q)

S.start()

for item in C:
    print item

编辑:发表@David Eyk的评论
您可以使用greenlets、stackless或任何其他轻量级并发编程库/系统重新实现我的示例,并且示例的基本原理将保持不变。流(用FRP术语来说)是一个队列,包含所有的调度、锁定和同步,不管它是如何实现的。公平地说,队列具有缓冲流插入的额外能力,这可能是不需要的,在这种情况下,将队列的最大长度设置为1将导致流插入(放入队列)阻塞。协同程序是一个并发执行的代码块,不管它是一个线程,还是一个单独的执行堆栈。唯一的区别是切换发生时,是确定的,还是处理器控制的。不过,我要提醒的是,在并发代码块之间进行确定性交换和流控制的想法是乐观的。流在FRP术语中本质上是异步的,主要是因为它们依赖于中断驱动的IO作为输入源,这意味着它们不像您想象的那样具有确定性。对于从文件中读取的流来说,这甚至有点正确,例如,由于查找、总线拥塞等导致的IO速度的变化。显式(即确定地)将控制流切换到另一个协程的想法在功能上与在线程中的某个点同步相同。执行堆栈被切换,程序指针移动。当然,有轻量级和重量级的方法可以做到这一点。如前所述,Consumer类可以简单地重写为生成器,生成器是一个对象,它实现自己的显式堆栈,并提供用于产生控制的显式方法,即确定性微线程或协程。实际上,上面示例中的线程是一个辅助概念。使用send还可以消除显式队列的要求。然后,如果供应商是一个处理中断并将其转换为事件对象并将其放入事件队列(即流)的事件处理器,我们可以从示例中删除线程(至少显式地),但它会变得复杂得多。关键是,不管你是否看到,在玻璃钢的某个地方,无论你是否看到,穿线都会发生。
编辑2:需要对队列进行更实际的解释
尝试使用队列将消费者重新定义为生成器,这确实很简单
def Consumer():
    while True:
        item = self.queue.get()
        self.queue.task_done()
        if isinstance(item, StopIteration):
            raise StopIteration
        else:
            yield item

但是,一旦涉及到迭代器,或者更具体地说是在迭代器上循环,为了使用send和yield表达式而删除队列并不是那么简单。send方法与生成器内部的屈服表达式一起工作,例如。
def Consumer(supplied_item = None):
    ignore = yield ignore #Postion A: ignores_initiating None
    while True:
        supplied_item = yield #Position B
        if supplied_item is not False:
            yield supplied_item #Position C
        else:
            raise StopIteration()

问题是,在生成器上调用next与在for循环中调用send本质上是相同的,没有参数。由于供应商和consuming for回路之间没有同步,因此Consumer generator可以接收以下输入序列
无(需要启动;在位置A接收)
1(来自供应商;在B位置收到)
无(从下一个的for循环调用;在位置C处接收)
无(从下一个for循环调用,因为供应商仍在睡眠;在位置B收到)
无(从下一个for循环调用,因为供应商仍在睡眠;在位置C收到)
这意味着生成器向for循环[1,None,None,None,…]屈服。取决于供应商何时再次启动,发送到位置B或C,for循环可能根本看不到2、3等。因此,事实证明,如果要使用协程作为迭代器,显然必须使用队列(或其他同步方法)来避免此问题。如果有一种方法来指定您想要屈服于哪里,例如,只有在供应商调用时才屈服于这里,而不是for循环,否则就阻塞。

关于python - 在Python中创建响应式(Reactive)迭代器的策略是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27081178/

相关文章:

python - 无法在 Mac 上创建带有子进程的新控制台

python - 查找 pandas 数据框中两个相关列之间的不匹配

java - RxJavas 单。它的连接方法在哪里?

android - Single.defer() 未执行

python - 将 Pandas DataFrame 中的列组合到 DataFrame 中的列表列

未打印python子字符串

python 立方体 "AttributeError: setImage"

r - 非发光上下文中的响应式(Reactive)对象绑定(bind)

android - RxJava/RxKotlin 提示访问 View

用于值之间双向关系的 JavaScript 框架/库