angular - rxjs5/Angular - 清除 ReplaySubject 缓冲区

标签 angular angular2-services rxjs5

我有一个 Angular 服务,它异步地向多个组件共享一个数据流(http 调用)。我需要偶尔根据用户操作召回 http 服务。

我正在使用 ReplaySubject 来保存加载的值并发送给在 http 调用后订阅的订阅者。

我想知道是否有办法在进行后续 http 调用之前清除 ReplaySubject 的缓冲区? 在此期间,我怀疑我还需要取消订阅才能避免泄露?

服务:

@Injectable()
export class GreatDataService {

    public data$: ReplaySubject<any>;
    private subs: Subscription;

    constructor(private http: Http) {
        this.data$ = new ReplaySubject(1);
    }

    public refresh() {
        if (this.subs) {
            this.subs.unsubscribe();
            this.subs = null;
        }
        this.subs = this.http.get('/api').subscribe(this.data$)
    }
}

顶级部分组件:

...
    constructor(private greatDataService: GreatDataService) {}
    ngOnInit() {
         this.greatDataService.refresh();
    }
...

组件 1:

...
    constructor(private greatDataService: GreatDataService) {}
    ngOnInit() {
    this.greatDataService.data$.subscribe(
        x => console.log('subscriber 1: ' + x),
        err => console.log('subscriber 1: ' + err),
        () => console.log('subscriber 1: Completed')
    );
...

组件 2:

...
    constructor(private greatDataService: GreatDataService) {}
    ngOnInit() {
    this.greatDataService.data$.subscribe(
        x => console.log('subscriber 2: ' + x),
        err => console.log('subscriber 2: ' + err),
        () => console.log('subscriber 2: Completed')
    );
...

最佳答案

您可以使用 Rx.Subject 来发出新的“get-fresh-data”事件,这样您就可以在使用 .switchMap()< 调用 refresh() 时检索新数据。请参阅此示例如何执行此操作:

function getData() {
  return Rx.Observable.of('retrieving new data')
    .timestamp()
    .delay(500);
}

// in this example i use an eventStream of clicks
// you can use Rx.Subject() and manually .next() a new value
// when somebody invokes .refresh()
const refreshDataClickStream = Rx.Observable.fromEvent(document.getElementById('refresh_stream'), 'click');

const dataStream = refreshDataClickStream
  .startWith('PAGE_LOAD') /* let the stream always first time fetching data */
  .switchMap(() => getData()) /* getData() is not cached so we switchMap to a new instance, abandoning the previous result*/
  .publishReplay().refCount(); /* refCounter so everybody gets the same results */

dataStream.subscribe(data => console.log('sub1 data: ' + JSON.stringify(data)));

setTimeout(() => {
  console.log('late arriving subscription (gets same stream)');
  dataStream.subscribe(data => console.log('sub2 data: ' + JSON.stringify(data)));
}, 2000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>
<input type='button' id='refresh_stream' value="refresh_stream" />

关于angular - rxjs5/Angular - 清除 ReplaySubject 缓冲区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41859323/

相关文章:

http - 如何在 Angular 2 中获取自定义响应 header ?

javascript - 在 RxJS 中是否有与 `race` 运算符相反的运算符?

angular - AWS 无服务器 Lambda + Angular - TypeError : express is not a function

laravel - Angular 2 - 请求的资源上不存在 'Access-Control-Allow-Origin' header

Angular 2/4 : Alerts and notifications from the server

javascript - angular2 http post 请求获取 res.json()

typescript - Angular 2 : how to redirect if API returns an error?

RxJS (5.0rc4) : Pause and resume an interval timer

recursion - 避免使用 RxJS5 observables 进行递归

css - 伪元素之前的动态样式 - Angular 5