android - 如何暂停/恢复 Observable?

标签 android rx-java

我正在尝试通过改造实现分页,但我正在努力寻找如何暂停可观察对象,以便它不会继续请求不需要的页面。

基本问题是:我可以告诉可观察源“暂停”和“恢复”吗?我不是在谈论缓冲或跳过,而是我希望 source observable 完全停止,即:不要发出任何网络请求等。

下面是我正在使用的一些模拟代码。 rangeObservable 是模拟的网络服务器“寻呼机”,而 timerObservable 就像接收滚动事件一样。

package example.wanna.be.pausable;

import java.io.IOException;
import java.lang.Throwable;
import java.util.concurrent.TimeUnit;

import rx.Observable;
import rx.observables.ConnectableObservable;
import rx.Subscription;
import rx.Subscriber;

public class Main {

  private static ConnectableObservable rangeObservable;

  private static void setPaused(boolean paused) {
    // How do I pause/resume rangeObservable?
  }

  public static void main(String[] args) {

    rangeObservable = Observable.range(0, Integer.MAX_VALUE).publish();
    Observable timerObservable = Observable.timer(2, 2, TimeUnit.SECONDS);

    rangeObservable.subscribe(new Subscriber<Integer>() {

      private int count = 0;

      public void onStart() {
        System.out.println("Range started");
      }

      public void onNext(Integer i) {
        System.out.println("Range: " + i);

        if (++count % 20 == 0) {
          System.out.println("Pausing");
          setPaused(true);
        }
      }

      public void onError(Throwable e) {
        e.printStackTrace();
      }

      public void onCompleted() {
        System.out.println("Range done");
      }

    });

    timerObservable.subscribe(new Subscriber<Long>() {

      public void onStart() {
        System.out.println("Time started");

        // I dont know where to put this
        // rangeObservable.connect();
      }

      public void onNext(Long i) {
        System.out.println("Timer: " + i);
        setPaused(false);
      }

      public void onError(Throwable e) {
        e.printStackTrace();
      }

      public void onCompleted() {
        System.out.println("Timer done");
      }

    });

    // for some reason I have to do this or it just exits immediately
    try {
      System.in.read();
    } catch(IOException e) {
      e.printStackTrace();
    }

  }

}

感谢任何指导!

最佳答案

您需要存储您的订阅并对其调用取消订阅/订阅(尚未对此进行全面测试,但我认为它应该可以工作,我在 setPaused 中所做的大部分更改可能需要修复代码重复):

package example.wanna.be.pausable;

import java.io.IOException;
import java.lang.Throwable;
import java.util.concurrent.TimeUnit;

import rx.Observable;
import rx.observables.ConnectableObservable;
import rx.Subscription;
import rx.Subscriber;

public class Main {

  private static ConnectableObservable rangeObservable;
  Subscription mSubscription;

  private static void setPaused(boolean pause) {
    if (pause) {
        mSubscription.unsubscribe()
    } else {
        mSubscription.subscribe(new Subscriber<Integer>() {

      private int count = 0;

      public void onStart() {
        System.out.println("Range started");
      }

      public void onNext(Integer i) {
        System.out.println("Range: " + i);

        if (++count % 20 == 0) {
          System.out.println("Pausing");
          setPaused(true);
        }
      }

      public void onError(Throwable e) {
        e.printStackTrace();
      }

      public void onCompleted() {
        System.out.println("Range done");
      }

    });
  }

  public static void main(String[] args) {

    rangeObservable = Observable.range(0, Integer.MAX_VALUE).publish();
    Observable timerObservable = Observable.timer(2, 2, TimeUnit.SECONDS);

    mSubscription = rangeObservable.subscribe(new Subscriber<Integer>() {

      private int count = 0;

      public void onStart() {
        System.out.println("Range started");
      }

      public void onNext(Integer i) {
        System.out.println("Range: " + i);

        if (++count % 20 == 0) {
          System.out.println("Pausing");
          setPaused(true);
        }
      }

      public void onError(Throwable e) {
        e.printStackTrace();
      }

      public void onCompleted() {
        System.out.println("Range done");
      }

    });

    timerObservable.subscribe(new Subscriber<Long>() {

      public void onStart() {
        System.out.println("Time started");

        // I dont know where to put this
        // rangeObservable.connect();
      }

      public void onNext(Long i) {
        System.out.println("Timer: " + i);
        setPaused(false);
      }

      public void onError(Throwable e) {
        e.printStackTrace();
      }

      public void onCompleted() {
        System.out.println("Timer done");
      }

    });

    // for some reason I have to do this or it just exits immediately
    try {
      System.in.read();
    } catch(IOException e) {
      e.printStackTrace();
    }

  }

}

最后还有:

// for some reason I have to do this or it just exits immediately
        try {
          System.in.read();
        } catch(IOException e) {
          e.printStackTrace();
        }

这样做的原因是在 main() 函数中,订阅是异步启动的,因此您的程序到达 main 的末尾然后退出,因为没有更多代码要运行(因为您的可观察对象在不同的​​线程上运行).

关于android - 如何暂停/恢复 Observable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27210579/

相关文章:

java - 使用 HttpUrlConnection 未获得正确的 header 响应代码

android - RecyclerView 可扩展的 cardView

android - 如何在android模拟器中写入 "@"?

android - 将 webView.AddJavascriptInterface 与 MonoDroid 一起使用

android - GCM(Google Cloud Messaging)突然返回401

java - Android-RxJava : Update UI from background thread using observable

java - 在 RxJava 中订阅 Observable 时出现问题

Java RX,为什么这一行被调用两次而这两行从未被调用

java - 在可观察对象发出一定数量的项目后触发一个 Action

java - 为事件循环创建一个可观察对象