以下代码按预期工作(参见输出):
import { Observable, Subject } from "rxjs";
import { scan } from "rxjs/operators";
export type ColumnFilter = { field: string, value: string };
let sub$: Subject<ColumnFilter> = new Subject<ColumnFilter>();
let obs$: Observable<ColumnFilter[]> = new Observable<ColumnFilter[]>();
obs$ = sub$.pipe(
scan((acc: ColumnFilter[], curr) => {
const index = acc.findIndex(f => f.field === curr.field);
if (index === -1) {
acc.push({ field: curr.field, value: curr.value });
return acc;
}
curr.value === '' ?
acc.splice(index, 1) :
acc[index] = { field: curr.field, value: curr.value };
return acc;
}, [])
);
obs$.subscribe(
value => console.log(value)
);
sub$.next({ field: 'size', value: 'xl' })
sub$.next({ field: 'color', value: 'black' })
sub$.next({ field: 'color', value: '' })
// output:
// [ { field: 'size', value: 'xl' } ]
// [ { field: 'size', value: 'xl' }, { field: 'color', value: 'black' } ]
// [ { field: 'size', value: 'xl' } ]
现在,如果我向 obs$
可观察对象添加第二个订阅,输出会发生变化:(检查流的最后一个值。它现在包含 { field: 'color', value : '' }
)
import { Observable, Subject } from "rxjs";
import { scan } from "rxjs/operators";
export type ColumnFilter = { field: string, value: string };
let sub$: Subject<ColumnFilter> = new Subject<ColumnFilter>();
let obs$: Observable<ColumnFilter[]> = new Observable<ColumnFilter[]>();
obs$ = sub$.pipe(
scan((acc: ColumnFilter[], curr) => {
const index = acc.findIndex(f => f.field === curr.field);
if (index === -1) {
acc.push({ field: curr.field, value: curr.value });
return acc;
}
curr.value === '' ?
acc.splice(index, 1) :
acc[index] = { field: curr.field, value: curr.value };
return acc;
}, [])
);
// vvv this was added
obs$.subscribe()
// ^^^ this was added
obs$.subscribe(
value => console.log(value)
);
sub$.next({ field: 'size', value: 'xl' })
sub$.next({ field: 'color', value: 'black' })
sub$.next({ field: 'color', value: '' })
// output:
// [ { field: 'size', value: 'xl' } ]
// [ { field: 'size', value: 'xl' }, { field: 'color', value: 'black' } ]
// [ { field: 'size', value: 'xl' }, { field: 'color', value: '' } ]
我不明白为什么第二个订阅会改变流的结果。我只能猜测这与改变累加器有关?因为如果我使用 acc.filter
而不是 acc.splice
,它会按预期工作。
编辑
添加了堆栈 Blitz :https://stackblitz.com/edit/rxjs-6-opeartors-uzojgw?file=index.ts
最佳答案
以下是第二个示例代码中发生的情况:当 sub$.next()
触发事件时,管道会针对每个订阅的可观察量执行一次。由于 scan
运算符的 acc: ColumnFilter[]
在这些执行之间共享,因此会发生所描述的副作用。
我的建议是使用 share
或 shareReplay
运算符,以便每个事件仅执行一次管道,并且为订阅者多播管道发出的值(= 所有订阅者都获得相同的对象引用)。
obs$ = sub$.pipe(
scan((acc: ColumnFilter[], curr) => {
const index = acc.findIndex((f) => f.field === curr.field);
if (index === -1) {
acc.push({ field: curr.field, value: curr.value });
return acc;
}
curr.value === ''
? acc.splice(index, 1)
: (acc[index] = { field: curr.field, value: curr.value });
return acc;
}, []),
share() // you might use shareReplay(1) instead, e.g. if you have late subscribers
);
关于angular - 为什么第二次订阅会改变我的直播结果?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73754343/