reactive-programming - Rx swift 在使用 flatMap 后保持原始流的原始顺序

标签 reactive-programming rx-swift

在中间应用 flatMap 后,我试图保持流的原始顺序。

下面是详细说明我的意思的图表:

----2-4-1------------------(原始流)

-----------1--2---------4--(网络事件 - 由带有延迟的 flatMap 表示)

---------------2---------4-1(想要的结果)

下面是详细说明情况的代码:

    persistMessageEventBus.flatMap({ num -> Observable<Int> in

        print("aaab Doing \(num)")

        let t2g = Observable.just(num).delay(Double(num), scheduler: MainScheduler.instance).do(onNext:{ num in print("aaab Done async \(num)")})

        return t2g

    }).concatMap({ num -> Observable<Int> in

        print("aaab Done map \(num)")

        return Observable.just(num)

    }).subscribe(onNext: { num in

        print("aaab done \(num)")

    }).addDisposableTo(disposeBag)

    persistMessageEventBus.onNext(2)
    persistMessageEventBus.onNext(4)
    persistMessageEventBus.onNext(1)

输出为:

aaab Doing 2
aaab Doing 4
aaab Doing 1
aaab Done async 1
aaab Done map 1
aaab done 1
aaab Done async 2
aaab Done map 2
aaab done 2
aaab Done async 4
aaab Done map 4
aaab done 4

想要的输出是:

aaab Doing 2
aaab Doing 4
aaab Doing 1
aaab Done async 1
aaab Done async 2
aaab Done map 2
aaab done 2
aaab Done async 4
aaab Done map 4
aaab done 4
aaab Done map 1
aaab done 1

RxSwift 中有类似的东西吗?

最佳答案

使用 .concatMap() 代替,它保证原始顺序。

更新#1

那么显然它需要索引和一些缓冲。

       typealias Indexed = (num: Int, index: Int)

        class Buffer {
            let ordered = PublishSubject<Int>()
            private var current = 0
            private var buffer: [Int: Int] = [:]
            func onNext(_ indexed: Indexed) {
                self.buffer[indexed.index] = indexed.num
                for index in self.buffer.keys.sorted() {
                    if index == current {
                        ordered.onNext(self.buffer[index]!)
                        self.buffer.remove(at: self.buffer.index(forKey: index)!)
                        current += 1
                    }
                }
            }
        }

        let buffer = Buffer()

        buffer
            .ordered
            .subscribe(onNext: { num in

                print("aaab done \(num)")

            })
            .disposed(by: disposeBag)

        persistMessageEventBus
            .mapWithIndex { (num, index) -> Indexed in
                return (num: num, index: index)
            }
            .flatMap({ indexed -> Observable<Indexed> in

                print("aaab Doing \(indexed.num)")

                let t2g = Observable.just(indexed).delay(Double(indexed.num), scheduler: MainScheduler.instance).do(onNext: { indexed in print("aaab Done async \(indexed.num)") })

                return t2g

            })
            .subscribe(onNext: { indexed in
                buffer.onNext(indexed)
            })
            .disposed(by: disposeBag)

        persistMessageEventBus.onNext(2)
        persistMessageEventBus.onNext(4)
        persistMessageEventBus.onNext(1)
aaab Done async 1
aaab done 2
aaab Done async 2
aaab done 4
aaab Done async 4
aaab done 1

关于reactive-programming - Rx swift 在使用 flatMap 后保持原始流的原始顺序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46775518/

相关文章:

android - RxJava 过滤器 'else'

ios - 如何只更新 RxDataSources 中已更改的单元格而不是全部单元格?

c# - 编写我自己的 IScheduler 来管理线程,最好的方法是什么?

java - 在回调 rxjava 中返回 Observable

ios - 如何获取 TableView 是否被选为 Observable<Bool>?

ios - 无法在 RxSwift 中调用非函数类型错误的值?

ios - RxSwift flatMapLatest 无需处理以前的可观察值

ios - RxSwift - Playground 执行失败 : Couldn't lookup symbols

java - Single.retry(3) 仅调用一次

rxjs - 使用flatMap将数组转换为项目序列