rx-java - RxJava:取消订阅可从另一个异步生产中观察到的异步

标签 rx-java kotlin

取消订阅另一个异步Observable的最佳(干净)方法是什么?
说,我有一个演示文稿,显示了用户的聊天列表。
每个“聊天”项目应显示其自身的最新消息。
这是原始代码说明:

fun AsyncInsideAsync() {
    /**
     * A chat that 'transmits' latest message
     */
    class Chat(val name: Int) {
        val lastMessage: PublishSubject<String> = PublishSubject.create()

        fun postMessage(message: Int) {
            lastMessage.onNext("Chat: $name, Message: $message")
        }
    }

    // List of chats for user.
    val chats: PublishSubject<Chat> = PublishSubject.create()

    ///////////////////////////////////
    //        ORIGINAL SEQUENCE      //
    ///////////////////////////////////
    val sequence = chats.flatMap { it.lastMessage }

    val subscriber = TestSubscriber<String>()
    sequence.subscribe(subscriber)

    // User has single chat in a chat-list
    val chat1 = Chat(1)
    chats.onNext(chat1)

    // Someone posts a message in Chat 1
    chat1.postMessage(1)
    subscriber.assertValues(
            "Chat: 1, Message: 1"
    )

    // Someone posts another message
    chat1.postMessage(2)
    subscriber.assertValues(
            "Chat: 1, Message: 1",
            "Chat: 1, Message: 2"
    )

    // Chat 1 disappears FROM USER CHAT LIST, Chat 2 is created
    val chat2 = Chat(2)
    chats.onNext(chat2)

    // Someone posts a message to Chat 2
    chat2.postMessage(1)
    subscriber.assertValues(
            "Chat: 1, Message: 1",
            "Chat: 1, Message: 2",
            "Chat: 2, Message: 1"
    )

    // Someone out there posts a message to Chat 1 that is not visible to user anymore
    chat1.postMessage(3)

    // The answer looks like this 
    //      "Chat: 1, Message: 1",
    //      "Chat: 1, Message: 2",
    //      "Chat: 2, Message: 1",
    //      "Chat: 1, Message: 3"
    // Chat 1 is still subscribed and test fails
    subscriber.assertValues(
            "Chat: 1, Message: 1",
            "Chat: 1, Message: 2",
            "Chat: 2, Message: 1",
    )
}

我想到的是使用一个主题(或可观察的共享对象)来制动一系列内部订阅。但是看起来很奇怪:
    ///////////////////////////////////
    //        MODIFIED SEQUENCE      //
    ///////////////////////////////////
    val unsubscribe: PublishSubject<Boolean> = PublishSubject.create()
    val sequence = chats
            .doOnNext({ unsubscribe.onNext(true) })
            .doAfterTerminate({ unsubscribe.onNext(true) })
            .flatMap {
                it.lastMessage.takeUntil(unsubscribe)
            }

这种方法有效,但看起来令人恐惧。
非常感谢!

最佳答案

假设评论

// Chat 1 disappears FROM USER CHAT LIST, Chat 2 is created

表示您希望当前的Chat(在这种情况下为chat1)在Chat上发送新的chats时停止发射,您可以使用switchMap运算符完成此操作。
val sequence = chats.switchMap { it.lastMessage }

从ReactiveX文档中:

RxJava also implements the switchMap operator. It behaves much like flatMap, except that whenever a new item is emitted by the source Observable, it will unsubscribe to and stop mirroring the Observable that was generated from the previously-emitted item, and begin only mirroring the current one.

关于rx-java - RxJava:取消订阅可从另一个异步生产中观察到的异步,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36179494/

相关文章:

rx-java - RxJava可观察到的: Invoke onError and then retry

kotlin - 是否可以在 Kotlin 的列表中传播列表?

android - 仅在 recyclerview 新项目出现时启动声音

java - 如何分组并删除重复的对象

java - RxJava 2 运算符组合,仅提供一组值中的第一个值

java - StringObservable.from(InputStream).share() 立即导致 MissingBackPressure

android - 将图像加载异步任务转换为 rxjava,为什么它滞后于 ui?

android - 在 Kotlin 协程中使用流替换 RxJava 的点击监听器

android - 在对话框 fragment 上显示 ProgressBar

java - 如何将 SSL 证书添加到 okHttp 双向 TLS 连接?