c# - BlockingCollection 与 Subject 用作​​消费者

标签 c# task-parallel-library system.reactive blockingcollection subject

我正在尝试用 C# 实现一个消费者。有许多可以同时执行的发布者。我创建了三个示例,一个使用 Rx 和主题,一个使用 BlockingCollection,第三个使用 BlockingCollection 中的 ToObservable。他们在这个简单的例子中都做同样的事情,我希望他们与多个制作人合作。

每种方法的不同之处是什么?

我已经在使用 Rx,所以我更喜欢这种方法。但我担心 OnNext 没有线程安全保证,而且我不知道 Subject 和默认调度程序的排队语义是什么。

是否有线程安全主题?

是否要处理所有消息?

还有其他情况下这不起作用吗?是并发处理吗?

void SubjectOnDefaultScheduler()
{
    var observable = new Subject<long>();
    observable.
        ObserveOn(Scheduler.Default).
        Subscribe(i => { DoWork(i); });
    observable.OnNext(1);
    observable.OnNext(2);
    observable.OnNext(3);
}

不是 Rx,但很容易适应使用/订阅它。它需要一个项目然后处理它。这应该连续发生。

void BlockingCollectionAndConsumingTask()
{
    var blockingCollection = new BlockingCollection<long>();
    var taskFactory = new TaskFactory();
    taskFactory.StartNew(() =>
    {
        foreach (var i in blockingCollection.GetConsumingEnumerable())
        {
            DoWork(i);
        }
    });
    blockingCollection.Add(1);
    blockingCollection.Add(2);
    blockingCollection.Add(3);
}

使用有点像主题的阻塞集合似乎是一个很好的折衷方案。我猜想隐含地安排到任务上,这样我就可以使用异步/等待,对吗?

void BlockingCollectionToObservable()
{
    var blockingCollection = new BlockingCollection<long>();
    blockingCollection.
        GetConsumingEnumerable().
        ToObservable(Scheduler.Default).
        Subscribe(i => { DoWork(i); });
    blockingCollection.Add(1);
    blockingCollection.Add(2);
    blockingCollection.Add(3);
}

最佳答案

主题不是线程安全的。并发发出的OnNext会直接并发调用一个Observer。考虑到 Rx 的其他领域强制执行正确语义的程度,我个人觉得这非常令人惊讶。我只能假设这样做是出于性能考虑。

不过,Subject 是一种折衷方案,因为它确实会使用 OnError 或 OnComplete 强制终止 - 在引发其中任何一个之后,OnNext 是一个 NOP。而且这种行为线程安全的。

但是在 Subject 上使用 Observable.Synchronize() ,它会强制传出调用遵守正确的 Rx 语义。特别是,如果同时进行,OnNext 调用将被阻塞。

底层机制是标准的 .NET 锁。当锁被多个线程争用时,它们会以先到先得的方式授予锁,大部分时间。在某些情况下会违反公平性。但是,您肯定会获得您正在寻找的序列化访问权限。

ObserveOn 具有特定于平台的行为 - 如果可用,您可以提供 SynchronizationContext并将 OnNext 调用发布到它。使用调度程序,它最终会调用 ConcurrentQueue<T>。并通过调度程序串行分派(dispatch)它们 - 因此执行线程将取决于调度程序。无论哪种方式,排队行为也将强制执行正确的语义。

在这两种情况下(Synchronize 和 ObserveOn),您肯定不会丢失消息。使用 ObserveOn,您可以通过选择调度程序/上下文隐式选择您将在其上处理消息的线程,使用 Synchronize,您将在调用线程上处理消息。哪个更好取决于您的场景。

还有更多需要考虑的因素 - 例如,如果生产者的速度超过了消费者,您想要做什么。

您可能还想看看 Rxx Consume:http://rxx.codeplex.com/SourceControl/changeset/view/63470#1100703

显示同步行为的示例代码(Nuget Rx-Testing、Nunit)- Thread.Sleep 代码有点矫揉造作,但它很糟糕而且我很懒::

public class SubjectTests
{
    [Test]
    public void SubjectDoesNotRespectGrammar()
    {
        var subject = new Subject<int>();
        var spy = new ObserverSpy(Scheduler.Default);
        var sut = subject.Subscribe(spy);
        // Swap the following with the preceding to make this test pass
        //var sut = subject.Synchronize().Subscribe(spy);

        Task.Factory.StartNew(() => subject.OnNext(1));
        Task.Factory.StartNew(() => subject.OnNext(2));

        Thread.Sleep(2000);

        Assert.IsFalse(spy.ConcurrencyViolation);
    }

    private class ObserverSpy : IObserver<int>
    {
        private int _inOnNext;

        public ObserverSpy(IScheduler scheduler)
        {
            _scheduler = scheduler;
        }

        public bool ConcurrencyViolation = false;
        private readonly IScheduler _scheduler;

        public void OnNext(int value)
        {
            var isInOnNext = Interlocked.CompareExchange(ref _inOnNext, 1, 0);

            if (isInOnNext == 1)
            {
                ConcurrencyViolation = true;
                return;
            }

            var wait = new ManualResetEvent(false);

            _scheduler.Schedule(TimeSpan.FromSeconds(1), () => wait.Set());
            wait.WaitOne();

            _inOnNext = 0;
        }

        public void OnError(Exception error)
        {

        }

        public void OnCompleted()
        {

        }
    }
}

关于c# - BlockingCollection 与 Subject 用作​​消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16501679/

相关文章:

c# - 使用 C# 学习多线程的资源

c# - 响应式扩展从 IObservable 转换为 IEnumerable

system.reactive - 当方法返回有延迟的值时如何重复该方法

c# - 如何将两个任务结果合并为第三个任务?

c# - ASP.NET 异步任务执行顺序

c# - 使用Reactive Extensions模拟UIElement的点击

c# - ASP.NET AJAX ColorPickerExtender 客户端工作正常,代码隐藏颜色为空

c# - 覆盖锚定容器中的锚定样式

c# - 当接口(interface)和实现都存在时,Var 关键字类型推断歧义

c# - 异步/等待异常处理