java - 将 API 回调绑定(bind)到 RxJava Observable

标签 java system.reactive rx-java observable

我正在尝试创建一个响应式(Reactive)应用程序,该应用程序在单独的线程上监听网络套接字以获取价格,但对如何准确构造 Observable 感到有点困惑。我拥有的许多接口(interface)都受到我正在使用的 API 的限制,因此无法更改。我总结了我想要做的测试,但我看不到如何填写 getPriceReactive() 方法的主体,以便订阅者将价格打印在控制台上(参见代码中的注释)。

public class PriceObservableTest {

   // This interface is defined externally and used by the API
   private interface ITickHandler {
       void priceReceived(double price);
   }

   // Stores the price (currently just one double for illustration)
   private class Tick {
       double price = Double.NaN;
   }

   // Implementation of handler called by API when it receives a price
   private class TickHandler implements ITickHandler {
       private final Tick tick;

       TickHandler() { this.tick = new Tick(); }

       @Override public void priceReceived(double x) { tick.price = x; }
   }

   // This class emulates the API delivering prices from the socket
   private class PriceSource {
      private final Thread thread;

      PriceSource(final ITickHandler handler) {
          thread = new Thread(new Runnable() {
              final Random r = new Random();
              @Override public void run() {
                  while (!Thread.currentThread().isInterrupted()) {
                      try {
                          Thread.sleep(100);
                          handler.priceReceived(r.nextDouble() * 100);
                      } catch (InterruptedException e) {
                          break;
                      }
                  }
                  System.out.println("Price thread closed");
              }
         });
      }

      void subscribe() { thread.start(); }

      void unsubscribe() { thread.interrupt(); }
  }

  @Test
  public void simpleTest() throws Exception {

      final ITickHandler handler = new TickHandler();

      // Simulate some prices received periodically from a socket
      PriceSource prices = new PriceSource(handler);

      Observable<Tick> reactive = getPriceReactive(handler);

      reactive.subscribe(new Subscriber<Tick>() {
          @Override public void onCompleted() { }
          @Override public void onError(Throwable e) { }
          @Override public void onNext(Tick tick) {
              System.out.println("Received price: " + tick.price);
          }});

      // Observe prices for 1 second. The subscriber should print them to console
      prices.subscribe();
      Thread.sleep(1000); 
      prices.unsubscribe();
   }

   // Returns an observable that reacts to price changes
   private Observable<Tick> getPriceReactive(ITickHandler handler) {
       return Observable.create(new Observable.OnSubscribe<Tick>() {
           @Override public void call(Subscriber<? super Tick> subscriber) {

              // How to call subscriber.onNext() whenever
              // priceReceived() is called with a new price?

           }
       });
   }
}

不知何故,只要 API 调用 priceReceived() 就需要调用 subscriber.onNext(),但我不太明白如何实现这一点。当然,我可以在 TickHandler 中存储对订阅者的引用,但这违背了 Observable 的目的,不是吗?

最佳答案

ITickHandler实现中转换为Observable。您控制的不是订阅者,而是发布者

private class TickHandler implements ITickHandler {
   private final Tick tick;
   private final PublishSubject<Tick> priceSubject;

  TickHandler() { 
       this.tick = new Tick(); 
       this.priceSubject = PublishSubject.create();
   }

   @Override public void priceReceived(double x)
   { 
        tick.price = x; 
        priceSubject.onNext(tick);
   }

   public Observable<Tick> priceReceivedObservable()
   {
       return priceSubject.asObservable();   
   }
}

您可以在测试中使用它,例如:

final ITickHandler handler = new TickHandler();
PriceSource prices = new PriceSource(handler);

handler.priceReceivedObservable()
       .subscribe(new Subscriber<Tick>() {
          @Override public void onCompleted() { }
          @Override public void onError(Throwable e) { }
          @Override public void onNext(Tick tick) {
              System.out.println("Received price: " + tick.price);
          }});

我警告你,它没有经过测试,因为我没有做很多 Java :)

关于java - 将 API 回调绑定(bind)到 RxJava Observable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36316253/

相关文章:

c# - 并发流图

spring-mvc - Spring Webflux 和 Observable 响应不起作用

java - 无法在 mac os high sierra 中启动 uiautomatorviewer

system.reactive - 验证用户是否从具有响应式扩展的响应式列表中键入了单词

java - 如何在 Hibernate 中限制 Java String 的创建?

system.reactive - 在模拟中同步 rx 中的事件

android - RxJava2 : . andThen() 被调用,即使第一个 Completable 发出错误

java - 如何在 RxJava 中不重复相同的操作?

java - 在页面对象模式中返回新的页面对象有什么好处?

java - Lotus Notes Java 代理的 GSON 库错误 - java.lang.NoClassDefFoundError : com. google.gson.JsonObject