我有ObserverBase类。在Start方法中,我订阅了一个可观察对象,应用了转换和过滤器。
我不明白为什么未编译SubscribeOn作为扩展方法的用法。
谁能解释这种行为?
public class ObserverBase<T1, T2> : IPeer<T1, T2>
{
private readonly ISubject<T2> subject;
private readonly IMapper<T1, T2> messageMapper;
protected ObserverBase(ISubject<T2> subject, IMapper<T1, T2> messageMapper)
{
this.subject = subject;
this.messageMapper = messageMapper;
}
public IObservable<T2> Start(IObservable<T1> observable, Func<T2, bool> predicate)
{
//works
Synchronization.ObserveOn(Synchronization.SubscribeOn(observable, TaskPoolScheduler.Default),
Scheduler.Immediate)
.Select(message => this.messageMapper.Map(message))
.Where(predicate)
.Subscribe(observation => this.subject.OnNext(observation));
// compile error on SubscribeOn: The type arguments cannot be inferred from the usage. Try specifying the type argument explicitly.
observable.SubscribeOn(TaskPoolScheduler.Default).ObserveOn(Scheduler.Immediate)
.Select(message => this.messageMapper.Map(message))
.Where(predicate)
.Subscribe(observation => this.subject.OnNext(observation));
// compile error on TaskPoolScheduler.Default: Cannot resolve method SubscribeOn(System.Reactive.Concurrency.TaskPoolScheduler)
observable.SubscribeOn<T1>(TaskPoolScheduler.Default).ObserveOn(Scheduler.Immediate)
.Select(message => this.messageMapper.Map(message))
.Where(predicate)
.Subscribe(observation => this.subject.OnNext(observation));
return this.subject;
}
}
最佳答案
这对我有用-您是否为using
命名空间(定义了System.Reactive.Linq
)包括了SubscribeOn
语句? Synchronization.ObserveOn
在另一个 namespace (System.Reactive.Concurrency
)中-我怀疑您已经有了using语句。
关于c# - 为什么SubscribeOn在这里不能作为扩展方法而是直接调用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25781457/