angular - RxJs难题:共享流并在有新订阅时有条件地重试

标签 angular rxjs

内容:

我们的Web应用程序可以同时显示不同的帮助面板。
当面板要求给定ID(例如help-id-1)时,我们将到达一个API,并传递该ID,然后获得所需的帮助。

现在,由于某些原因,我们可能会在同一帮助面板上显示两次或更多次。但是,我们当然不希望为同一项目多次击中该API,如果没有错误或当前正在获取该API。

我们的“生产者”为我们提供了检索它的冷流:

const getHelpContentById = (id: string) => fromPromise(
  httpCallToGetHelpResultFromThirdLib(id)
).pipe(
  catchError(error => of({ status: 'ERROR', item: null, id })),
  // extracting the body of the response
  map(getHelpItemFromResponse),
  // wrapping the response into an object { status: 'SUCCES', item, id }
  map(item => setStatusOnHelpItem(item, id)),
  startWith({ status: 'LOADING', item: null, id }),
)


我们从包含状态的对象开始流,然后再接收另一个具有新状态的对象,该对象是SUCCESSERROR

预期的解决方案应:
-在首次调用给定ID时从API中提取
-如果在上一个订阅完成之前,另一个订阅发生了相同的ID(状态LOADING),则该订阅应获得与第一次调用相同的流,而无需再次获取API
-如果应用的一部分显示help-id-1失败,则不应关闭该流,而应关闭其中的next类型为{ status: 'ERROR', item: null, id }的值,这样,如果另一个组件尝试再次显示help-id-1,由于该ID的最后状态为ERROR,因此它应尝试再次访问API,并且两个订阅者都应收到实时更新{ status: 'LOADING', item: null, id },然后返回错误或成功

第一次尝试:
这是我想到的第一种非RxJs方法:
(从服务中提取的代码,这是一个类)

private helpItems: Map<
  string,
  { triggerNewFetchForItem: () => void; obs: Observable<HelpItemWithStatus> }
> = new Map();

private getFromCacheOrFetchHelpItem(id: string): Observable<HelpItemWithStatus> {
  let triggerNewFetchForItem$: BehaviorSubject<HelpItemWithStatus>;
  const idNotInCache = !this.helpItems.has(id);

  if (idNotInCache) {
    triggerNewFetchForItem$ = new BehaviorSubject<HelpItemWithStatus>(null);

    this.helpItems.set(id, {
      triggerNewFetchForItem: () => triggerNewFetchForItem$.next(null),
      obs: triggerNewFetchForItem$.pipe(
        switchMap(() => getHelpContentById(id)),
        shareReplay(1),
      ),
    });

    return this.helpItems.get(id).obs;
  } else {
    return this.helpItems.get(id).obs.pipe(
      tap(item => {
        if (item.status === ContentItemStatus.ERROR) {
          this.helpItems.get(id).triggerNewFetchForItem();
        }
      })
    );
  }
}

public getHelpItemById(id: string): Observable<HelpItemWithStatus> {
  return this.getFromCacheOrFetchHelpItem(id);
}


我同事的尝试:

private getFromCacheOrFetchHelpItem4(id: string): Observable<HelpItemWithStatus> {
  let item = this.items.get(id);

  if (item && item.status !== ContentItemStatus.ERROR) {
    return of(item);
  }

  return getNewWrappedHelpItem(this.contentfulClient, id).pipe(
    tap(item => this.items.set(id, bs), 
    shareReplay(1)),
  )
}


问题
-如果您订阅了一次,但最终报错
-您从新组件再次订阅
-按预期执行另一次获取,但更改了引用
-API调用成功,第二个组件已更新,第一个未更新

结论:
我敢肯定,有一种更好的Rx方式可以做到这一点,即使不依赖“外部缓存”(此处为Map)。最好的办法显然是为此配备一个新的运算符:)

最佳答案

我基本上在工作相同的场景。我已经考虑了一段时间,但最终决定花点时间。我的解决方案有些粗糙,但是我会在优化时尝试更新答案。我希望能有更好的方法来提供反馈。

问题/步骤

这是一个相当复杂的问题,所以我将逐步进行构建。为了简单起见,我们将从不带参数的api方法开始。

共享流,以便多个订阅者不会触发多个api请求

只需在最后添加share()运算符即可使其成为多播。

return api().pipe(share());


共享API的最新响应

只需将share()更改为shareReplay(1)。该参数指示要共享的先前响应的数量。我们只希望发出最后一个,所以我们放入1

或者,您可以使用tap运算符保留对最后一个发出的值的引用,并执行of(data)而不是如果最后一个成功,则不返回流。仅当您不再希望再次调用api时才适用(如所讨论的OP),但我将其保持通用性以使其灵活地适用于其他解决方案。

return api().pipe(shareReplay(1));


如果最后一个失败,则允许新订阅触发新的api请求

这太笨了。很容易获得最后的值,甚至很容易为新订户重新运行流。但这并不能使以前的订户受益。您可能会获得成功的结果,但是不会通知任何先前的订阅者。本质上,您要的是让您的主题在有新订阅时发出新值。据我所知,这是不可能的。

我的工作是设置自己的主题,每当有人请求该信息流时,我就可以触发该主题。这不是一回事,但我认为这确实是我们想要的。我们真正想要的是某种重试方法。如果不是通过使用retryWhen运算符来实现自动化,则我们需要某种手动方式,例如加载新组件。加载新组件时,它们会请求流,因此可以正常工作。

因此,我们创建一个主题,并在超时后调用下一步。我宁愿使用ReplaySubjectBehaviorSubject来避免超时,但这样做时(ExpressionChangedAfterItHasBeenCheckedError)遇到了角度变化检测的问题。我需要更深入地研究它。

请注意,share在外部流上。我们想分享它而不是内在的。另请注意,由于我们每次都需要一个新的内部流,因此我使用的是switchMap而不是switchMapTo

const trigger = new Subject<void>();
setTimeout(() => trigger.next());
return trigger.pipe(
  switchMap(() => api()),
  shareReplay(1)
);


发生错误后让流保持活动状态

catchError运算符可让您返回可观察值。由于我们希望将其作为消息,因此我们只执行catchError(e => of(e))。问题在于这将结束流。解决方法是将捕获的内容放入switchMap的内部,以便内部流可以死掉而外部流可以继续前进。

return trigger.pipe(
  switchMap(() => api().pipe(
    catchError(err => of(err))
  ),
  shareReplay(1)
);


知道api的状态

为此,我们将创建一个具有type属性的通用响应包装。可能的值为“ FETCHING”,“ SUCCESS”和“ FAILURE”。 api调用开始后,我们将使用startWith运算符发送获取通知(因此为什么会结束)。

return trigger.pipe(
  switchMap(() => api().pipe(
    map((data) => ({ state: 'SUCCESS', data })),
    catchError(err => of({ state: 'FAILURE', err })),
    startWith({ state: 'FETCHING' })
  ),
  shareReplay(1)
);


一次仅允许一个机上请求

基本上,如果请求正在进行中,我们不希望不调用触发器。我们可以使用标志来执行此操作,也可以将distinct运算符与触发器配合使用以在api调用解析后将其重置。第二种方法很棘手,因为在构造流时需要引用该流。因此,我们将只使用一个变量,并且可以将trigger.next()包装在if中,也可以在流上放置一个过滤器。我将做一个过滤器。

private state: string;
...
return trigger.pipe(
  filter(() => this.state !== 'FETCHING'),
  switchMap(() => api().pipe(
    map((data) => ({ state: 'SUCCESS', data })),
    catchError(err => of({ state: 'FAILURE', err })),
    startWith({ state: 'FETCHING' }),
    tap(x => { this.state = x.state; })
  ),
  shareReplay(1)
);


仅重试错误,不成功

为此,您要做的就是不调用触发器。因此,只需将触发器上的条件更改为未初始化状态或“失败”即可。

...
filter(() => this.state == null || this.state === 'FAILURE'),
...


共享相同参数的流

基本上,您只需要散列参数并将其用作映射的键​​即可。请参阅下面的完整示例。



这就是全部。我创建了一个辅助函数,该函数将生成一个api方法。开发人员必须为参数提供api方法和哈希方法,因为推断起来会过于复杂。

import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { tap, switchMap, map, startWith, catchError, shareReplay, filter } from 'rxjs/operators';
import { of } from 'rxjs/observable/of';

// posible states of the api request
export enum ApiStateType {
  Fetching,
  Success,
  Failure
}

// wrapper for the api status messages
export interface ApiStatus<T> {
  state: ApiStateType;
  params: any[],
  data: T
}

// information related to a stream for a unique set of parameters
interface StreamConfig<T> {
  state: ApiStateType;
  trigger: Subject<void>;
  stream: Observable<ApiStatus<T>>;
}

export function generateCachedApi<T>(
  api: (...params) => Observable<T>,
  generateKey: (...params) => string
): (...params) => Observable<ApiStatus<T>> {
  const cache = new Map<string, StreamConfig<T>>();

  return (...params): Observable<ApiStatus<T>> => {
    const key = generateKey(...params);
    let config = cache.get(key);

    if (!config) {
      console.log(`created new stream (${key})`);
      config = <StreamConfig<T>> { trigger: new Subject<void>() };
      config.stream = config.trigger.pipe(
        filter(() => config.state == null || config.state === ApiStateType.Failure),
        switchMap(() => {
          return api(...params).pipe(
            map((data) => (<ApiStatus<T>>{ state: ApiStateType.Success, params, data })),
            catchError((data, source) => of(<ApiStatus<T>>{ state: ApiStateType.Failure, params, data })),
            startWith(<ApiStatus<T>>{ state: ApiStateType.Fetching, params }),
            tap(x => { config.state = x.state; })
          );
        }),
        tap(x => { console.log('PUBLISH', x)}),
        shareReplay(1),
      );
      cache.set(key, config);
    } else {
      console.log(`returned existing stream (${key})`);
    }
    setTimeout(() => { config.trigger.next() });
    return config.stream;
  }
}


这是我一起破解的一个运行示例:https://stackblitz.com/edit/api-cache


我敢肯定,有一种更好的Rx方式可以做到这一点,甚至无需依靠“外部缓存”(此处的Map)。最好的办法显然是为此配备一个新的运算符:)


我创建了一个cacheMap运算符来尝试做到这一点。我有一个发出api参数的源,cacheMap运算符将为唯一的一组参数查找或创建流,并返回其mergeMap样式。问题在于,现在每个订阅者都将订阅该内部可观察对象。因此,您必须添加一个过滤器(请参见下面的替代解决方案)。

替代解决方案

这是我想到的另一种解决方案。代替维护多个流,您可以拥有一个主要流,并使用过滤器将其提供给订阅者。

单个流的问题是重播将应用于所有参数。因此,您要么必须使用没有缓冲区的重放,要么自己管理重放。

如果使用不带缓冲区的重播,则它将重播FETCHINGSUCCESS的所有内容,这可能会导致额外的处理,尽管用户可能不会注意到。理想情况下,我们将有一个replayByKey运算符,但我还没有写出来。因此,现在我仅使用地图。使用地图的问题在于,我们仍然向已经收到地图的订户发送相同的值。因此,我们将distinctUntilChanged运算符添加到实例流。或者,您可以创建实例流,然后在其上放一个takeUntil,触发器是为成功进行过滤的实例流,并放一个delay(0)以允许最后一个值在关闭之前通过管道。这样就可以完成数据流,因为一旦成功就永远不会获得新的值,就可以了。我采用了与众不同的方法,因为如果您想更改其要求,它可以使您获得新的价值。

我们使用mergeMap而不是switchMap,因为我们可以同时进行针对不同参数的运行中请求,并且我们不想取消针对不同参数的请求。

解决方法如下:

import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import { tap, mergeMap, map, startWith, catchError, share, filter, distinctUntilChanged } from 'rxjs/operators';
import { of } from 'rxjs/observable/of';

// posible states of the api request
export enum ApiStateType {
  Fetching,
  Success,
  Failure
}

// wrapper for the api status messages
export interface ApiStatus<T> {
  state: ApiStateType;
  key: string;
  params: any[];
  data: T;
}

export function generateCachedApi<T>(
  api: (...params) => Observable<T>,
  generateKey: (...params) => string
): (...params) => Observable<ApiStatus<T>> {
  const trigger = new Subject<any[]>();
  const stateCache = new Map<string, ApiStatus<T>>();
  const stream = trigger.pipe(
    map<any[], [any[], string]>((params) => [ params, generateKey(...params) ]),
    tap(([_, key]) => {
      if (!stateCache.has(key)) {
        stateCache.set(key, <ApiStatus<T>> {})
      }
    }),
    mergeMap(([params, key]) => {
      const apiStatus = stateCache.get(key);
      if (apiStatus.state === ApiStateType.Fetching || apiStatus.state === ApiStateType.Success) {
        return of(apiStatus);
      }
      return api(...params).pipe(
        map((data) => (<ApiStatus<T>>{ state: ApiStateType.Success, key, params, data })),
        catchError((data, source) => of(<ApiStatus<T>>{ state: ApiStateType.Failure, key, params, data })),
        startWith(<ApiStatus<T>>{ state: ApiStateType.Fetching, key, params }),
        tap(state => { stateCache.set(key, state); })
      )
    }),
    tap(x => { console.log('PUBLISH', x)}),
    share()
  );

  return (...params): Observable<ApiStatus<T>> => {
    const key = generateKey(...params);
    const instanceStream = stream.pipe(
      filter((response) => response.key === key),
      distinctUntilChanged()
    );
    setTimeout(() => { trigger.next(params) });
    return instanceStream;
  }
}

关于angular - RxJs难题:共享流并在有新订阅时有条件地重试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49069193/

相关文章:

Angular 变化检测 ExpressionChangedAfterItHasBeenCheckedError

javascript - Rxjs 并行订阅者

angular - 完成所有多个服务调用后执行代码

javascript - 组件之间的数据传输

angular - 如何将可观察值传递给 Angular Bootstrap 组件?

javascript - 使用 ng-bootstrap 模板解析错误

angular - 如何过滤垫树组件Angular Material 6.0.1

javascript - 如何使用客户端作为放置请求的一部分发送的 express 访问服务器端的数据

javascript - 展平嵌套的 Observable

javascript - Angular 2 : import service file path not found