scala infinite rx observable creation - 如何正确地做到这一点?

标签 scala reactive-programming rx-java

我最近开始玩 rxjava-scala ,我想创建一个(可能)无限流可观察。查看代码和 open issue在 github 上,我发现一个“开箱即用”的解决方案还没有实现(问题中的 usecase06 说它甚至没有为 java 实现)

所以,我试着想出我自己的实现。考虑以下几点:

def getIterator: Iterator[String] = {
  def fib(a: BigInt, b: BigInt): Stream[BigInt] = a #:: fib(b, a + b)
  fib(1, 1).iterator.map{bi =>
    Thread.sleep(100)
    s"next fibonacci: ${bi}"
  }
}

和辅助方法:

def startOnThread(body: => Unit): Thread = {
  val t = new Thread {
    override def run = body
  }
  t.start
  t
}

和示例核心:

val observable: Observable[String] = Observable(
  observer => {
    var cancelled = false
    val fs = getIterator
    val t = startOnThread{
      while (!cancelled) {observer.onNext(fs.next)}
      observer.onCompleted()
    }
    Subscription(new rx.Subscription {
      override def unsubscribe() = {
        cancelled = true
        t.join
      }
    })
  }
)

val observer = Observer(new rx.Observer[String]{
  def onNext(args: String) = println(args)
  def onError(e: Throwable) = logger.error(e.getMessage)
  def onCompleted() = println("DONE!")
})

val subscription = observable.subscribe(observer)
Thread.sleep(5000)
subscription.unsubscribe()

这似乎工作正常,但我对此并不满意。首先,我正在创建一个新的 Thread,这可能很糟糕。但即使我使用某种线程池,它仍然会感觉不对。所以我想我应该使用调度程序,这听起来像是一个合适的解决方案,只是我不知道如何在这种情况下使用它。我尝试在 observeOn 方法中提供 rx.lang.scala.concurrency.Schedulers.threadPoolForIO,但似乎我做错了。 observable 的代码不会用它编译。任何帮助将不胜感激。谢谢!

最佳答案

首先,已经有将 Iterable 转换为 Observable: "from"函数的适配器。

其次,迭代器不会返回控制权,因此您的 Sleep 和 unsubscribe 不会被调用。您需要在专用线程“subscribeOn(NewThreadScheduler())”中执行订阅操作

def getIterator: Iterator[String] = {
  def fib(a: BigInt, b: BigInt): Stream[BigInt] = a #:: fib(b, a + b)
  fib(1, 1).iterator.map{bi =>
    Thread.sleep(1000)
    s"next fibonacci: ${bi}"
  }
}

val sub = Observable.from(getIterator.toIterable)
  .subscribeOn(NewThreadScheduler())
  .subscribe(println(_))
readLine()
sub.unsubscribe()
println("fib complete")
readLine()

关于scala infinite rx observable creation - 如何正确地做到这一点?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20781731/

相关文章:

scala - 引用透明度

java - 不鼓励使用 Mono<Optional<T>> 吗?

java - Rx-Java:创建可配置的 Observable

rx-java - 超时后,ReactiveX 发出空值或哨兵值

linux - 对于本地运行的 scala spray 服务器,curl localhost 有效,但 curl <local ip> 获取 "Connection refused"

xml - 从 XML 中删除节点

scala - 自动映射和 "implicit conversion must be more specific then Any Ref"错误

javascript - 如何从 rxjs Observable 获取刻度数

JavaFX 和 RxJava-TableView 无限调用 setCellValueFactory()

java - 最后 onNext() 执行完成的 RxJava 回调