python-3.x - 如何使用 rxpy/rxjs 延迟事件发射?

标签 python-3.x functional-programming rxjs reactive-programming rx-py

我有两个事件流。一个来自电感回路,另一个是网络摄像机。汽车将驶过环路,然后撞上相机。如果事件彼此相差 N 毫秒以内(汽车总是首先进入循环),我想将它们组合起来,但我也希望每个流中的不匹配事件(硬件都可能失败)全部合并到一个流中。像这样:

           ---> (only unmatched a's, None)
         /                                  \
stream_a (loop)                              \
         \                                    \
            --> (a, b) ---------------------------> (Maybe a, Maybe b)
         /                                    /
stream_b  (camera)                           /
         \                                  /
            --> (None, only unmatched b's)

现在我当然可以通过使用好的 ole Subject 反模式来解决问题:

unmatched_a = Subject()

def noop():
    pass

pending_as = [[]]

def handle_unmatched(a):
    if a in pending_as[0]:
        pending_as[0].remove(a)
        print("unmatched a!")
        unmatched_a.on_next((a, None))

def handle_a(a):
    pending_as[0].append(a)
    t = threading.Timer(some_timeout, handle_unmatched)
    t.start()
    return a

def handle_b(b):
    if len(pending_as[0]):
        a = pending_as[0].pop(0)
        return (a, b)

    else:
        print("unmatched b!")
        return (None, b)

stream_a.map(handle_a).subscribe(noop)
stream_b.map(handle_b).merge(unmatched_a).subscribe(print)

这不仅相当棘手,而且尽管我没有观察到它,但我很确定当我使用 threading.Timer 检查挂起队列时存在竞争条件。考虑到过多的 rx 运算符,我很确定它们的某种组合可以让您在不使用 Subject 的情况下执行此操作,但我无法弄清楚。如何做到这一点?

编辑

尽管出于组织和操作原因,我更愿意坚持使用 Python,但我会采用 JavaScript rxjs 答案并移植它,甚至可能在节点中重写整个脚本。

最佳答案

您应该能够使用 auditTimebuffer 解决问题。像这样:

function matchWithinTime(a$, b$, N) {
  const merged$ = Rx.Observable.merge(a$, b$);
  // Use auditTime to compose a closing notifier for the buffer.
  const audited$ = merged$.auditTime(N);
  // Buffer emissions within an audit and filter out empty buffers.
  return merged$
    .buffer(audited$)
    .filter(x => x.length > 0);
}

const a$ = new Rx.Subject();
const b$ = new Rx.Subject();
matchWithinTime(a$, b$, 50).subscribe(x => console.log(JSON.stringify(x)));

setTimeout(() => a$.next("a"), 0);
setTimeout(() => b$.next("b"), 0);
setTimeout(() => a$.next("a"), 100);
setTimeout(() => b$.next("b"), 125);
setTimeout(() => a$.next("a"), 200);
setTimeout(() => b$.next("b"), 275);
setTimeout(() => a$.next("a"), 400);
setTimeout(() => b$.next("b"), 425);
setTimeout(() => a$.next("a"), 500);
setTimeout(() => b$.next("b"), 575);
setTimeout(() => b$.next("b"), 700);
setTimeout(() => b$.next("a"), 800);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

如果 b 值可能紧跟在 a 值之后,而您不希望它们匹配,则可以使用更具体的审计,如下所示:

const audited$ = merged$.audit(x => x === "a" ?
  // If an `a` was received, audit upcoming values for `N` milliseconds.
  Rx.Observable.timer(N) :
  // If a `b` was received, don't audit the upcoming values.
  Rx.Observable.of(0, Rx.Scheduler.asap)
);

关于python-3.x - 如何使用 rxpy/rxjs 延迟事件发射?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50395441/

相关文章:

django - 如何在 Django 测试中添加多个测试数据库

python - 在 Python3 Mockserver 中添加 https 重定向

angular - RxJS5 随着时间的推移发出数组项并永远重复

Haskell 函数 iter 结果

scala - 函数式编程风格的过滤器列表

rxjs - 如何将Observable转换为ReplaySubject?

angular - 使用 ngrx 存储进行身份验证的路由防护

python - 为Dask分布式客户端准备数据的最佳方式

python - numpy.pad 添加的填充量是声明的两倍

Javascript 某些方法嵌套在 For 中