rxjs - 如何使用 ReactiveX Observables/Subjects 处理进度更新?

标签 rxjs rx-java observable progress reactivex

我正在编写一个 Angular 应用程序,它使用 ReactiveX API 来处理异步操作。我之前在一个 Android 项目中使用过这个 API,我非常喜欢它如何简化并发任务处理。但是有一件事我不确定如何以正确的方式解决。

如何从正在进行的任务更新观察者?在这种情况下,任务需要时间来加载/创建一个复杂/大的对象,我能够返回中间进度,但不能返回对象本身。 observable 只能返回一种数据类型。因此我知道两种可能性。

  • 创建一个具有进度字段和数据字段的对象。这个对象可以简单地用 Observable.onNext(object) 返回。进度字段将在每个 onNext 更新,而数据字段在最后一个 onNext 之前为空,这会将其设置为加载的值。
  • 创建两个可观察对象,一个数据可观察对象和一个进度可观察对象。观察者必须订阅 progress observable 以进行进度更新,并订阅数据 observable 以在最终加载/创建数据时收到通知。也可以选择将这些压缩在一起以进行一次订阅。

  • 我使用了这两种技术,它们都有效,但我想知道是否有一个统一的标准,一个干净的方法,如何解决这个任务。当然,它也可以是一个全新的。我对每个解决方案持开放态度。

    最佳答案

    经过仔细考虑,我使用了一个
    解决方案类似于我的问题中的选项二。
    主要的 observable 与实际结果有关
    操作。
    本例中为 http 请求,但文件迭代示例类似。
    它由“工作”函数返回。

    可以通过函数参数添加第二个观察者/订阅者。这个订阅者只关心
    进度信息。这样所有操作都是空安全的,不需要类型检查。

    工作函数的第二个版本,没有进度观察者,
    如果不需要进度 UI 更新,则可以使用。

    export class FileUploadService {
    
     doWork(formData: FormData, url: string): Subject<Response> {
        return this.privateDoWork(formData, url, null);
     }
    
     doWorkWithProgress(formData: FormData, url: string, progressObserver: Observer<number>): Subject<Response> {
        return this.privateDoWork(formData, url, progressObserver);
     }
    
     private privateDoWork(formData: FormData, url: string, progressObserver: Observer<number> | null): Subject<Response> {
    
         return Observable.create(resultObserver => {
         let xhr: XMLHttpRequest = new XMLHttpRequest();
         xhr.open("POST", url);
    
         xhr.onload = (evt) => {
             if (progressObserver) {
                progressObserver.next(1);
                progressObserver.complete();
                }
             resultObserver.next((<any>evt.target).response);
             resultObserver.complete()
         };
         xhr.upload.onprogress = (evt) => {
             if (progressObserver) {
                progressObserver.next(evt.loaded / evt.total);
             }
    
         };
         xhr.onabort = (evt) => resultObserver.error("Upload aborted by user");
         xhr.onerror = (evt) => resultObserver.error("Error");
    
         xhr.send(formData);
         });
     }
    

    这是对包括进度订阅者在内的函数的调用。使用此解决方案,上传功能的调用者必须
    创建/处理/拆除进度订阅者。
     this.fileUploadService.doWorkWithProgress(this.chosenSerie.formData, url, new Subscriber((progress) => console.log(progress * 100)).subscribe(
        (result) => console.log(result),
        (error) => console.log(error),
        () => console.log("request Completed")
        );
    

    总的来说,我更喜欢这个解决方案,而不是具有单个订阅的“配对”对象。没有必要的空处理,并且
    我得到了一个明确的关注点。

    该示例是用 Typescript 编写的,但其他 ReactiveX 实现应该可以实现类似的解决方案。

    关于rxjs - 如何使用 ReactiveX Observables/Subjects 处理进度更新?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46151594/

    相关文章:

    json - 使用 Observable 在 Angular 2 中发出轮询请求

    android - Retrofit2.Retrofit 的 Retrofit 类文件未找到

    Java 无法将reactor.core.publisher.MonoDefer 转换为EntityClass

    java - 基于属性的可流动窗口

    javascript - RxJS Observable.fromEvent 链为每个订阅者触发

    javascript - 如何合并 mergeMap observables 调用并只为整个 observable 返回一个值

    javascript - 可从 Evenet mouseup 观察到的不准确

    angular - 处理 Resolve 服务中的错误

    javascript - Observable.of 转异步

    javascript - 使用 forkJoin 的条件可观察对象