我是 Rx 的新手。我想知道是否可以向不同的订阅者发送一条消息,以便它们在不同的线程上运行? IObserable 如何控制它?普通的 Subject 实现,据我所知,它在单个线程上一个接一个地调用订阅者。
public class Subsciber : IObserver<int>
{
public void OnNext(int a)
{
// Do something
}
public void OnError(Exception e)
{
// Do something
}
public void OnCompeleted()
{
}
}
public static class Program
{
public void static Main()
{
var observable = new <....SomeClass....>();
var sub1 = new Subscriber();
var sub2 = new Subscriber();
observable.Subscribe(sub1);
observable.Subscribe(sub2);
// some waiting function
}
}
如果我将 Subject 用作“SomeClass”,则在 sub1 的 OnNext() 完成之前不会调用 sub2 的 OnNext()。如果 sub1 需要很多时间,我不希望它延迟 sub2 的接收。谁能告诉我 Rx 如何允许 SomeClass 的这种实现。
最佳答案
您编写的代码几乎可以并行运行可观察对象。如果你这样写你的观察者:
public class Subscriber : IObserver<int>
{
public void OnNext(int a)
{
Console.WriteLine("{0} on {1} at {2}",
a,
Thread.CurrentThread.ManagedThreadId,
DateTime.Now.ToString());
}
public void OnError(Exception e)
{ }
public void OnCompleted()
{ }
}
然后运行这段代码:
var observable =
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Select(x => (int)x)
.Take(5)
.ObserveOn(Scheduler.ThreadPool);
var sub1 = new Subscriber();
var sub2 = new Subscriber();
observable.Subscribe(sub1);
observable.Subscribe(sub2);
Thread.Sleep(10000);
将产生以下内容:
0 on 28 at 2011/10/20 00:13:49
0 on 16 at 2011/10/20 00:13:49
1 on 29 at 2011/10/20 00:13:50
1 on 22 at 2011/10/20 00:13:50
2 on 27 at 2011/10/20 00:13:51
2 on 29 at 2011/10/20 00:13:51
3 on 27 at 2011/10/20 00:13:52
3 on 19 at 2011/10/20 00:13:52
4 on 27 at 2011/10/20 00:13:53
4 on 27 at 2011/10/20 00:13:53
它已经在不同线程上并行运行订阅。
我使用的重要的东西是 .ObserveOn
扩展方法 - 这就是使这项工作成功的原因。
您应该记住,观察者通常不会共享同一个可观察对象实例。订阅一个可观察对象有效地连接了一个独特的可观察操作符“链”,从可观察对象的来源到观察者。这与调用 GetEnumerator
非常相似两次枚举,你不会共享同一个枚举器实例,你会得到两个唯一的实例。
现在,我想描述一下链的含义。我将提供从 Observable.Generate
中提取的 Reflector.NET 代码& Observable.Where
来说明这一点。
以这段代码为例:
var xs = Observable.Generate(0, x => x < 10, x => x + 1, x => x);
var ys = xs.Where(x => x % 2 == 0);
ys.Subscribe(y => { /* produces 0, 2, 4, 6, 8 */ });
引擎盖下 Generate
& Where
每个创建内部 Rx 类的新实例 AnonymousObservable<T>
. AnonymousObservable<T>
的构造函数需要 Func<IObserver<T>, IDisposable>
每当它收到对 Subscribe
的调用时使用的委托(delegate).
Observable.Generate<T>(...)
的稍微清理过的代码来自 Reflector.NET 的是:
public static IObservable<TResult> Generate<TState, TResult>(
TState initialState,
Func<TState, bool> condition,
Func<TState, TState> iterate,
Func<TState, TResult> resultSelector,
IScheduler scheduler)
{
return new AnonymousObservable<TResult>((IObserver<TResult> observer) =>
{
TState state = initialState;
bool first = true;
return scheduler.Schedule((Action self) =>
{
bool flag = false;
TResult local = default(TResult);
try
{
if (first)
{
first = false;
}
else
{
state = iterate(state);
}
flag = condition(state);
if (flag)
{
local = resultSelector(state);
}
}
catch (Exception exception)
{
observer.OnError(exception);
return;
}
if (flag)
{
observer.OnNext(local);
self();
}
else
{
observer.OnCompleted();
}
});
});
}
Action self
参数是迭代输出值的递归调用。您会注意到此代码中没有任何地方执行 observer
。被存储或值被粘贴到多个观察者。此代码为每个新观察者运行一次。
Observable.Where<T>(...)
的稍微清理过的代码来自 Reflector.NET 的是:
public static IObservable<TSource> Where<TSource>(
this IObservable<TSource> source,
Func<TSource, bool> predicate)
{
return new AnonymousObservable<TSource>(observer =>
source.Subscribe(x =>
{
bool flag;
try
{
flag = predicate(x);
}
catch (Exception exception)
{
observer.OnError(exception);
return;
}
if (flag)
{
observer.OnNext(x);
}
}, ex => observer.OnError(ex), () => observer.OnCompleted));
}
同样,此代码不会跟踪多个观察者。它调用Subscribe
有效地将自己的代码作为观察者传递给底层 source
可观察。
您应该看到,在我上面的示例代码中,订阅了 Where
创建对 Generate
的订阅因此这是一个可观察链。事实上,它在一系列 AnonymousObservable
上链接订阅调用。对象。
如果您有两个订阅,则您有两个链。如果您有 1,000 个订阅,则您有 1,000 个链。
现在,作为旁注 - 即使有 IObservable<T>
和 IObserver<T>
接口(interface)——你应该非常非常少地在你自己的类中实际实现这些。内置类和运算符处理 99.99% 的所有情况。有点像IEnumerable<T>
- 您需要多久自己实现一次此接口(interface)?
让我知道这是否有帮助,如果您需要任何进一步的解释。
关于c# - 是否可以在 Rx 的不同线程上调用订阅者的 OnNext?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7821404/