我有一个 Angular 服务,它异步地向多个组件共享一个数据流(http 调用)。我需要偶尔根据用户操作召回 http 服务。
我正在使用 ReplaySubject 来保存加载的值并发送给在 http 调用后订阅的订阅者。
我想知道是否有办法在进行后续 http 调用之前清除 ReplaySubject 的缓冲区? 在此期间,我怀疑我还需要取消订阅才能避免泄露?
服务:
@Injectable()
export class GreatDataService {
public data$: ReplaySubject<any>;
private subs: Subscription;
constructor(private http: Http) {
this.data$ = new ReplaySubject(1);
}
public refresh() {
if (this.subs) {
this.subs.unsubscribe();
this.subs = null;
}
this.subs = this.http.get('/api').subscribe(this.data$)
}
}
顶级部分组件:
...
constructor(private greatDataService: GreatDataService) {}
ngOnInit() {
this.greatDataService.refresh();
}
...
组件 1:
...
constructor(private greatDataService: GreatDataService) {}
ngOnInit() {
this.greatDataService.data$.subscribe(
x => console.log('subscriber 1: ' + x),
err => console.log('subscriber 1: ' + err),
() => console.log('subscriber 1: Completed')
);
...
组件 2:
...
constructor(private greatDataService: GreatDataService) {}
ngOnInit() {
this.greatDataService.data$.subscribe(
x => console.log('subscriber 2: ' + x),
err => console.log('subscriber 2: ' + err),
() => console.log('subscriber 2: Completed')
);
...
最佳答案
您可以使用 Rx.Subject 来发出新的“get-fresh-data”事件,这样您就可以在使用 .switchMap()< 调用
。请参阅此示例如何执行此操作:refresh()
时检索新数据
function getData() {
return Rx.Observable.of('retrieving new data')
.timestamp()
.delay(500);
}
// in this example i use an eventStream of clicks
// you can use Rx.Subject() and manually .next() a new value
// when somebody invokes .refresh()
const refreshDataClickStream = Rx.Observable.fromEvent(document.getElementById('refresh_stream'), 'click');
const dataStream = refreshDataClickStream
.startWith('PAGE_LOAD') /* let the stream always first time fetching data */
.switchMap(() => getData()) /* getData() is not cached so we switchMap to a new instance, abandoning the previous result*/
.publishReplay().refCount(); /* refCounter so everybody gets the same results */
dataStream.subscribe(data => console.log('sub1 data: ' + JSON.stringify(data)));
setTimeout(() => {
console.log('late arriving subscription (gets same stream)');
dataStream.subscribe(data => console.log('sub2 data: ' + JSON.stringify(data)));
}, 2000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>
<input type='button' id='refresh_stream' value="refresh_stream" />
关于angular - rxjs5/Angular - 清除 ReplaySubject 缓冲区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41859323/