stream - RxJava - 将 Observable 转换为迭代器、流或序列

标签 stream sequence rx-java kotlin rx-kotlin

我知道这违反了很多 Rx 规则,但我真的很喜欢 RxJava-JDBC我的队友也是。关系数据库是我们工作的核心,Rx 也是如此。

然而,在某些情况下,我们不想作为 Observable<ResultSet> 发出。但宁愿只拥有一个基于拉取的 Java 8 Stream<ResultSet>或 Kotlin Sequence<ResultSet> .但是我们非常习惯 RxJava-JDBC 库,它只返回一个 Observable<ResultSet> .

因此,我想知道是否有办法可以转Observable<ResultSet>Sequence<ResultSet>使用扩展功能,不做任何中介收集或toBlocking()调用。以下是我目前所拥有的一切,但我的头现在正在旋转,试图连接基于推和拉的系统,而且我无法作为 ResultSet 进行缓冲。每个 onNext() 都是有状态的称呼。这是不可能完成的任务吗?

import rx.Observable
import rx.Subscriber
import java.sql.ResultSet

fun Observable<ResultSet>.asSequence() = object: Iterator<ResultSet>, Subscriber<ResultSet>() {

    private var isComplete = false

    override fun onCompleted() {
        isComplete = true
    }

    override fun onError(e: Throwable?) {
        throw UnsupportedOperationException()
    }

    override fun onNext(rs: ResultSet?) {
        throw UnsupportedOperationException()
    }


    override fun hasNext(): Boolean {
        throw UnsupportedOperationException()
    }

    override fun next(): ResultSet {
        throw UnsupportedOperationException()
    }

}.asSequence()

最佳答案

我不确定这是实现您想要的最简单方法,但您可以尝试此代码。它转换 ObservableIterator通过创建阻塞队列并发布来自 Observable 的所有事件到这个队列。 Iterable从队列中提取事件,如果没有则阻塞。然后它根据接收到的当前事件修改自己的状态。

class ObservableIterator<T>(
    observable: Observable<T>,
    scheduler: Scheduler
) : Iterator<T>, Closeable {

  private val queue = LinkedBlockingQueue<Notification<T>>()
  private var cached: Notification<T>? = null
  private var completed: Boolean = false

  private val subscription =
      observable
          .materialize()
          .subscribeOn(scheduler)
          .subscribe({ queue.put(it) })

  override fun hasNext(): Boolean {
    cacheNext()
    return !completed
  }

  override fun next(): T {
    cacheNext()
    val notification = cached ?: throw NoSuchElementException()
    check(notification.isOnNext)
    cached = null
    return notification.value
  }

  private fun cacheNext() {
    if (completed) {
      return
    }

    if (cached == null) {
      queue.take().let { notification ->
        if (notification.isOnError) {
          completed = true
          throw RuntimeException(notification.throwable)
        } else if (notification.isOnCompleted) {
          completed = true
        } else {
          cached = notification
        }
      }
    }
  }

  override fun close() {
    subscription.unsubscribe()
    completed = true
    cached = null
  }
}

关于stream - RxJava - 将 Observable 转换为迭代器、流或序列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37421742/

相关文章:

python - 如何将我列出的号码加在一起

带有 IntentService 的 Android RxJava 间隔

android - 如何使用 MVP 和 rxjava 构建我的应用程序并进行改造以从 Observables 中获取数据?

java - 流中的数据不完整

scala - 选项列表 : equivalent of sequence in Scala?

java - 关闭 I/O 流

python - 根据 pandas 中连续行的值替换列值

java-8 - 仅返回从 Observable 发出的项目

c# - iTextSharp - 如何将文档转换为字节 []

java - Android (Java) 中 MediaRecorder 的 LocalSocket(Unix 域)客户端-服务器数据流问题