algorithm - 异步、提前退出、串联 Observable

标签 algorithm rx-java

假设我们有 3 个 observables,ABC。我需要同时运行所有 3 个(对于外行来说是异步的),但是:

  1. 如果我从 A 得到任何东西,发出它...不要发出任何其他东西
  2. 如果 A 完成但没有发出任何内容,则将规则 1 应用于 B
  3. 如果 B 完成但没有发出任何东西,则从 C 发出项目。
  4. 如果 C 完成但未发出任何内容,则发出默认项。

昨天我花了几个小时试图解决这个问题,但 RxJava 中似乎没有任何操作组合可以让我这样做。

您可以考虑从左到右级联的值:

A --> B --> C

而且,级联被阻塞,而每个异步运行并缓存它们的值。

A (nothing) --> B (nothing) --> C (nothing) --> default item

明确地说,A 必须在任何其他观察者发出任何内容之前完成。 B 的相同逻辑,然后是 C 的逻辑,如果 A、B、C 未能发出任何内容,则为默认值。

显然涉及缓存,我绝对不想重放可观察的。我将需要重播缓存的值。在每个大门处都举着。

该行为与 concat() 极其相似除了如果链条的下一部分之前有排放,则链条的下一部分不会释放。

最佳答案

这是我想出的:

**
 * Works like {@link rx.Observable#concat} but concatenated Observables
 * are all run immediately on their given {@link rx.Scheduler}.
 *
 * This Observable is blocking in the sense that items are emitted in order
 * like {@link rx.Observable#concat} but since each Observable is run on
 * an (possibly) asynchronous scheduler, items emitted further down the chain
 * of Observables are held until items further up the chain are (possibly) emitted.
 *
 * This Observable also short-circuits and does not emit items further down
 * the chain of Observables when an Observable higher up the chain emits items.
 *
 * For example:
 *
 * Given Observable A, B, and C
 *
 * If A emits item(s) emit them... do not emit anything else.
 * If A completes without emitting anything, apply previous rule to B.
 * If B completes without emitting anything, emit items from C (if any)
 *
 * @param <T>
 */
public class ConcatObservable<T> {
  private final List<Observable<? extends T>> observables;

  private ConcatObservable(List<Observable<? extends T>> observables) {
    this.observables = observables;
  }

  public static <T> ConcatObservable<T> from(Observable<? extends T>... observables) {
    return new ConcatObservable<T>(Arrays.asList(observables));
  }

  public Observable<T> asObservable() {
    final List<Subscription> subscriptions = new CopyOnWriteArrayList<Subscription>();

    return Observable.create(new Observable.OnSubscribe<T>() {
      @Override public void call(final Subscriber<? super T> subscriber) {
        List<Observable<? extends T>> cachedObservables = new ArrayList<Observable<? extends T>>();
        for (Observable<? extends T> observable : observables) {

          // tell it to cache values
          final ReplaySubject<T> subject = ReplaySubject.create();
          cachedObservables.add(subject);

          // run it with nobody listening
          Subscription subscription = observable.subscribe(new Observer<T>() {
            @Override public void onCompleted() {
              subject.onCompleted();
            }

            @Override public void onError(Throwable e) {
              subject.onError(e);
            }

            @Override public void onNext(T item) {
              subject.onNext(item);
            }
          });
          subscriptions.add(subscription);
        }

        final AtomicReference<Throwable> error = new AtomicReference<Throwable>();

        // for the cached ones, already running
        for (Observable<? extends T> observable : cachedObservables) {

          final AtomicBoolean shouldExit = new AtomicBoolean(false);
          final CountDownLatch latch = new CountDownLatch(1);
          Subscription subscription = observable.subscribe(new Observer<T>() {
            @Override public void onCompleted() {
              latch.countDown();
            }

            @Override public void onError(Throwable e) {
              error.set(e);
              shouldExit.set(true);
              latch.countDown();
            }

            @Override public void onNext(T item) {
              subscriber.onNext(item);
              shouldExit.set(true);
            }
          });

          // Track each subscription
          subscriptions.add(subscription);

          try {
            // Wait for this one to stop emitting, or error
            latch.await();
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while waiting for subscription to complete", e);
          }

          // This one had an item(s), so we don't bother with the rest
          if (shouldExit.get()) {
            break;
          }
        }

        // Release inner subscriptions
        for (Subscription subscription : subscriptions) {
          subscription.unsubscribe();
        }

        // Obey the Observable contract...
        Throwable throwable = error.get();
        if (throwable != null) {
          subscriber.onError(throwable);
        } else {
          subscriber.onCompleted();
        }
      }
    });
  }
}

下面是相应的测试:

public class ConcatObservableTest {

  @Test @SuppressWarnings("unchecked")
  public void it_onlyEmitsFromFirstObservable() {
    Observable<String> A = Observable.from(Arrays.asList("A", "A", "A"));
    Observable<String> B = Observable.from(Arrays.asList("B", "B", "B"));
    Observable<String> C = Observable.from(Arrays.asList("C", "C", "C"));

    Observable<String> observable = ConcatObservable.from(A, B, C).asObservable();

    TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
    observable.subscribe(testSubscriber);

    assertThat(testSubscriber.getOnNextEvents()).containsExactly("A", "A", "A");
  }

  @Test @SuppressWarnings("unchecked")
  public void it_onlyEmitsFromSecondObservable() {
    Observable<String> A = Observable.empty();
    Observable<String> B = Observable.from(Arrays.asList("B", "B", "B"));
    Observable<String> C = Observable.from(Arrays.asList("C", "C", "C"));

    Observable<String> observable = ConcatObservable.from(A, B, C).asObservable();

    TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
    observable.subscribe(testSubscriber);

    assertThat(testSubscriber.getOnNextEvents()).containsExactly("B", "B", "B");
  }

  @Test @SuppressWarnings("unchecked")
  public void it_onlyEmitsFromLastObservable() {
    Observable<String> A = Observable.empty();
    Observable<String> B = Observable.empty();
    Observable<String> C = Observable.from(Arrays.asList("C", "C", "C"));

    Observable<String> observable = ConcatObservable.from(A, B, C).asObservable();

    TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
    observable.subscribe(testSubscriber);

    assertThat(testSubscriber.getOnNextEvents()).containsExactly("C", "C", "C");
  }

  @Test @SuppressWarnings("unchecked")
  public void it_shouldStartAllObservables() {
    TestObservable<String> letters = TestObservable.createTestObservable("A", "B", "C");
    TestObservable<String> numbers = TestObservable.createDelayedTestObservable(100, "1", "2", "3");
    TestObservable<String> animals = TestObservable.createDelayedTestObservable(200, "zebra", "donkey", "unicorn");

    Observable<String> observable = ConcatObservable.from(letters, numbers, animals).asObservable();

    TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
    observable.subscribe(testSubscriber);

    assertThat(letters.isCalled()).isTrue();
    assertThat(numbers.isCalled()).isTrue();
    assertThat(animals.isCalled()).isTrue();
  }

  static class TestObservable<T> extends Observable<T> {
    private final TestOnSubscribe<T> onSubscribeFunc;

    private TestObservable(TestOnSubscribe<T> f) {
      super(f);
      onSubscribeFunc = f;
    }

    public boolean isCalled() {
      return onSubscribeFunc.isCalled();
    }

    @SuppressWarnings("unchecked")
    public static <T> TestObservable<T> createTestObservable(final T... items) {
      return createDelayedTestObservable(0, items);
    }

    @SuppressWarnings("unchecked")
    public static <T> TestObservable<T> createDelayedTestObservable(final long delay, final T... items) {
      return new TestObservable<T>(new TestOnSubscribe<T>(delay, items));
    }

    private static class TestOnSubscribe<T> implements OnSubscribe<T> {
      private final long delay;
      private final T[] items;
      private boolean isCalled;

      private TestOnSubscribe(long delay, T... items) {
        this.delay = delay;
        this.items = items;
      }

      @Override public void call(Subscriber<? super T> subscriber) {
        isCalled = true;

        for (T item : items) {
          if (delay > 0) {
            sleep(delay);
          }
          subscriber.onNext(item);
        }
        subscriber.onCompleted();
      }

      public boolean isCalled() {
        return isCalled;
      }

      private void sleep(long time) {
        try {
          Thread.sleep(time);
        } catch (InterruptedException e) { }
      }
    }
  }
}

关于algorithm - 异步、提前退出、串联 Observable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24087779/

相关文章:

arrays - 数组中的反转,我错了什么。请查看下面的数学/伪代码

android - 在 Retrofit 2.0 中使用 Mockito

java - RxJava/Rx绑定(bind) : how to handle errors on RxView

java - 根据判别索引创建多个源

algorithm - 穿越迷宫的最佳路线

Java:如何根据提供的模式从 String 中提取值并将其填充到 Map 中?

c++ - 计算排列的逆下降

Android RxJava 2 JUnit 测试 - android.os.Looper 中的 getMainLooper 未模拟 RuntimeException

java - react 流与非阻塞 I/O 有何不同?

algorithm - 按所有元素对元组数组进行排序