我正在尝试用 C# 实现一个消费者。有许多可以同时执行的发布者。我创建了三个示例,一个使用 Rx 和主题,一个使用 BlockingCollection,第三个使用 BlockingCollection 中的 ToObservable。他们在这个简单的例子中都做同样的事情,我希望他们与多个制作人合作。
每种方法的不同之处是什么?
我已经在使用 Rx,所以我更喜欢这种方法。但我担心 OnNext 没有线程安全保证,而且我不知道 Subject 和默认调度程序的排队语义是什么。
是否有线程安全主题?
是否要处理所有消息?
还有其他情况下这不起作用吗?是并发处理吗?
void SubjectOnDefaultScheduler()
{
var observable = new Subject<long>();
observable.
ObserveOn(Scheduler.Default).
Subscribe(i => { DoWork(i); });
observable.OnNext(1);
observable.OnNext(2);
observable.OnNext(3);
}
不是 Rx,但很容易适应使用/订阅它。它需要一个项目然后处理它。这应该连续发生。
void BlockingCollectionAndConsumingTask()
{
var blockingCollection = new BlockingCollection<long>();
var taskFactory = new TaskFactory();
taskFactory.StartNew(() =>
{
foreach (var i in blockingCollection.GetConsumingEnumerable())
{
DoWork(i);
}
});
blockingCollection.Add(1);
blockingCollection.Add(2);
blockingCollection.Add(3);
}
使用有点像主题的阻塞集合似乎是一个很好的折衷方案。我猜想隐含地安排到任务上,这样我就可以使用异步/等待,对吗?
void BlockingCollectionToObservable()
{
var blockingCollection = new BlockingCollection<long>();
blockingCollection.
GetConsumingEnumerable().
ToObservable(Scheduler.Default).
Subscribe(i => { DoWork(i); });
blockingCollection.Add(1);
blockingCollection.Add(2);
blockingCollection.Add(3);
}
最佳答案
主题不是线程安全的。并发发出的OnNext会直接并发调用一个Observer。考虑到 Rx 的其他领域强制执行正确语义的程度,我个人觉得这非常令人惊讶。我只能假设这样做是出于性能考虑。
不过,Subject 是一种折衷方案,因为它确实会使用 OnError 或 OnComplete 强制终止 - 在引发其中任何一个之后,OnNext 是一个 NOP。而且这种行为是线程安全的。
但是在 Subject 上使用 Observable.Synchronize() ,它会强制传出调用遵守正确的 Rx 语义。特别是,如果同时进行,OnNext 调用将被阻塞。
底层机制是标准的 .NET 锁。当锁被多个线程争用时,它们会以先到先得的方式授予锁,大部分时间。在某些情况下会违反公平性。但是,您肯定会获得您正在寻找的序列化访问权限。
ObserveOn 具有特定于平台的行为 - 如果可用,您可以提供 SynchronizationContext
并将 OnNext 调用发布到它。使用调度程序,它最终会调用 ConcurrentQueue<T>
。并通过调度程序串行分派(dispatch)它们 - 因此执行线程将取决于调度程序。无论哪种方式,排队行为也将强制执行正确的语义。
在这两种情况下(Synchronize 和 ObserveOn),您肯定不会丢失消息。使用 ObserveOn,您可以通过选择调度程序/上下文隐式选择您将在其上处理消息的线程,使用 Synchronize,您将在调用线程上处理消息。哪个更好取决于您的场景。
还有更多需要考虑的因素 - 例如,如果生产者的速度超过了消费者,您想要做什么。
您可能还想看看 Rxx Consume:http://rxx.codeplex.com/SourceControl/changeset/view/63470#1100703
显示同步行为的示例代码(Nuget Rx-Testing、Nunit)- Thread.Sleep 代码有点矫揉造作,但它很糟糕而且我很懒::
public class SubjectTests
{
[Test]
public void SubjectDoesNotRespectGrammar()
{
var subject = new Subject<int>();
var spy = new ObserverSpy(Scheduler.Default);
var sut = subject.Subscribe(spy);
// Swap the following with the preceding to make this test pass
//var sut = subject.Synchronize().Subscribe(spy);
Task.Factory.StartNew(() => subject.OnNext(1));
Task.Factory.StartNew(() => subject.OnNext(2));
Thread.Sleep(2000);
Assert.IsFalse(spy.ConcurrencyViolation);
}
private class ObserverSpy : IObserver<int>
{
private int _inOnNext;
public ObserverSpy(IScheduler scheduler)
{
_scheduler = scheduler;
}
public bool ConcurrencyViolation = false;
private readonly IScheduler _scheduler;
public void OnNext(int value)
{
var isInOnNext = Interlocked.CompareExchange(ref _inOnNext, 1, 0);
if (isInOnNext == 1)
{
ConcurrencyViolation = true;
return;
}
var wait = new ManualResetEvent(false);
_scheduler.Schedule(TimeSpan.FromSeconds(1), () => wait.Set());
wait.WaitOne();
_inOnNext = 0;
}
public void OnError(Exception error)
{
}
public void OnCompleted()
{
}
}
}
关于c# - BlockingCollection 与 Subject 用作消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16501679/