reactive-programming - 使用 RxJava 实现类似十字转门的运算符

标签 reactive-programming rx-java

我需要帮助在 RxJava (RxScala) 中实现类似十字转门的运算符。我花了很多时间思考它,但我似乎被卡住了。

函数的类型应该如下:

def turnstile[T](queue: Observable[T], turnstile: Observable[Boolean]): Observable[T]

这个想法是运算符(operator)的行为应该与真正的十字转门非常相似。有人来了 (queue),并且有一个 turnstile 准备好接受新的单例 人 (a true 元素,你可以把它想象成一个 token 插入到闸机中),或者关闭(闸机中的 false,取消之前的 token )。对于旋转门中的每个 true 元素,只有一个人可以通过。

此外,在没有人通过的情况下连续插入多个 token (旋转门中的多个 true 项目)与仅插入一个 token 相同,旋转门不计算 token 。

换句话说,旋转门最初是关闭的。当其中出现 true 元素时,它会为一个人打开。如果有人出现,它会通过(到输出端),然后旋转门再次关闭。如果旋转门中出现 false 元素,旋转门也会关闭。

queue       ----A---B-------------C--D--
turnstile   --T--------T--T-T-T-T------T
            ============================
output      ----A------B----------C----D

一个弹珠图,显示打开的旋转门等待 A 人,然后 B 等待旋转门打开,然后几个 token 表现为一个人 C 通过,但 D 必须再次等待新 token

----A----B--
--T---T-F-T-
============
----A-----B-

一张大理石图,显示旋转门中的 false 元素如何再次关闭旋转门。

感谢任何帮助。我认为在不编写自定义运算符的情况下实现这一点的唯一方法是以某种方式使用 zip 运算符,因为它可能是唯一使一个序列中的元素等待另一个序列中的元素(或者是还有其他我不知道的吗?)。但我需要压缩一些旋转栅门元素,具体取决于它们是否与人配对......

我认为这是一个有趣的问题,而且我对一些好的解决方案很好奇。

最佳答案

所以我认为我有一个更干净、完全 Rx 的解决方案。这实际上是一个非常有趣的问题。如果它能满足您的需求,我认为它最终会非常优雅,尽管它花了很长时间才实现。

遗憾的是我不懂 Scala,所以您将不得不处理我的 Java8 lambda。 :D

整个实现:

public static Observable<String> getTurnstile(final Observable<String> queue, final Observable<Boolean> tokens) {
    return queue.publish(sharedQueue ->
            tokens.switchMap(token -> token ? sharedQueue.limit(1) : Observable.empty()));
}

所以,这里发生的事情是我们使用 publish 创建一个我们可以订阅多次的人员队列的共享可观察对象。在其中,我们在 token 流上使用 switchMap,这意味着每当从 switchMap 发出一个新的 Observable 时,它​​都会丢弃最后一个并订阅新的。只要标记为真,它就会对人员队列进行新订阅(并且连续多个真都可以,因为它会取消旧订阅)。当它为 false 时,它​​只是转储一个空的 Observable,以免浪费时间。

还有一些(通过的)测试用例:

@RunWith(JUnit4.class)
public class TurnstileTest {
    private final TestScheduler scheduler = new TestScheduler();
    private final TestSubscriber<String> output = new TestSubscriber<>();

    private final TestSubject<Boolean> tokens = TestSubject.create(scheduler);
    private final TestSubject<String> queue = TestSubject.create(scheduler);

    @Before
    public void setup() {
        Turnstile.getTurnstile(queue, tokens).subscribe(output);
    }

    @Test
    public void allowsOneWithTokenBefore() {
        tokens.onNext(true, 0);
        queue.onNext("Bill", 1);
        queue.onNext("Bob", 2);

        assertPassedThrough("Bill");
    }

    @Test
    public void tokenBeforeIsCancelable() {
        tokens.onNext(true, 0);
        tokens.onNext(false, 1);
        queue.onNext("Bill", 2);

        assertNonePassed();
    }

    @Test
    public void tokensBeforeAreCancelable() {
        tokens.onNext(true, 0);
        tokens.onNext(true, 1);
        tokens.onNext(true, 2);
        tokens.onNext(false, 3);
        queue.onNext("Bill", 4);

        assertNonePassed();
    }

    @Test
    public void eventualPassThroughAfterFalseTokens() {
        tokens.onNext(false, 0);
        queue.onNext("Bill", 1);
        tokens.onNext(false, 2);
        tokens.onNext(false, 3);
        queue.onNext("Jane", 4);
        queue.onNext("Bob", 5);
        tokens.onNext(true, 6);
        tokens.onNext(true, 7);
        tokens.onNext(false, 8);
        tokens.onNext(false, 9);
        queue.onNext("Phil", 10);
        tokens.onNext(false, 11);
        tokens.onNext(false, 12);
        tokens.onNext(true, 13);

        assertPassedThrough("Bill", "Jane", "Bob");
    }

    @Test
    public void allowsOneWithTokenAfter() {
        queue.onNext("Bill", 0);
        tokens.onNext(true, 1);
        queue.onNext("Bob", 2);

        assertPassedThrough("Bill");
    }

    @Test
    public void multipleTokenEntriesBeforeOnlyAllowsOneAtATime() {
        tokens.onNext(true, 0);
        tokens.onNext(true, 1);
        tokens.onNext(true, 2);
        queue.onNext("Bill", 3);
        tokens.onNext(true, 4);
        tokens.onNext(true, 5);
        queue.onNext("Jane", 6);
        queue.onNext("John", 7);

        assertPassedThrough("Bill", "Jane");
    }

    @Test
    public void noneShallPassWithoutToken() {
        queue.onNext("Jane", 0);
        queue.onNext("John", 1);

        assertNonePassed();
    }

    private void closeSubjects() {
        scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
        scheduler.triggerActions();
        tokens.onCompleted();
        queue.onCompleted();
        scheduler.triggerActions();
    }

    private void assertNonePassed() {
        closeSubjects();
        output.assertReceivedOnNext(Lists.newArrayList());
    }

    private void assertPassedThrough(final String... names) {
        closeSubjects();
        output.assertReceivedOnNext(Lists.newArrayList(names));
    }
}

如果您发现任何不适用于此的边缘情况,请告诉我,特别是如果它实时出现问题,因为测试显然是在受控环境中进行的。

关于reactive-programming - 使用 RxJava 实现类似十字转门的运算符,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27797609/

相关文章:

java - RxJava - 在 Observable<List<Service>> 中等待所有服务的 Observable<Boolean>

ios - rx.tap 订阅已完成的按钮

android - RxJava : Error occurred when trying to propagate error to Observer. onError

rx-java - 在 RxJava 中公开 "expensive"Observables 的最佳实践

generics - Rx Kotlin : trying to add custom errors catching

java - 如何用其他 Observables 过滤 Observables

java - RxJava/安卓 : Combine result of two dependent Observables

scala - Akka Streams : No-arg GraphDSL. create() 与 GraphDSL.create(sink)

reactive-programming - 订阅时评估startwith的参数

java - 嵌套 RxJava Observables 的正确方法?