c# - GetConsumingEnumerable() 生产者-消费者的多线程 BlockingCollection 替代方案

标签 c# multithreading wait producer-consumer blockingcollection

我遇到的情况是,我有多个生产者和多个消费者。生产者将作业放入队列中。我选择了 BlockingCollection,它效果很好,因为我需要消费者等待找到工作。但是,如果我使用 GetConsumingEnumerable() 功能,集合中项目的顺序会发生变化...这不是我需要的。

它甚至在 MSDN 中说 http://msdn.microsoft.com/en-us/library/dd287186.aspx 它不保留项目的顺序。

有人知道这种情况的替代方案吗?

我看到 Take 方法可用,但它是否也为使用者线程提供“等待”条件?

上面写着http://msdn.microsoft.com/en-us/library/dd287085.aspx

“对 Take 的调用可能会阻塞,直到可以删除项目为止。”使用 TryTake 更好吗? 我确实需要线程等待并继续检查作业。

最佳答案

Take 会阻塞线程,直到有可用的东西为止。

TryTake 顾名思义尝试这样做,但如果失败或成功则返回 bool 值。 允许更多的灵活性使用它:

while(goingOn){
   if( q.TryTake(out var){
      Process(var)
   }
   else{
      DoSomething_Usefull_OrNotUseFull_OrEvenSleep();
   }
}

而不是

while(goingOn){
   if( var x = q.Take(){
      //w'll wait till this ever will happen and then we:
      Process(var)
   }
}

我的票投给了 TryTake :-)

示例:

    public class ProducerConsumer<T> {

        public struct Message {
            public T Data;
        }

        private readonly ThreadRunner _producer;
        private readonly ThreadRunner _consumer;

        public ProducerConsumer(Func<T> produce, Action<T> consume) {
            var q = new BlockingCollection<Message>();
            _producer = new Producer(produce,q);
            _consumer = new Consumer(consume,q);
        }

        public void Start() {
            _producer.Run();
            _consumer.Run();
        }

        public void Stop() {
            _producer.Stop();
            _consumer.Stop();
        }

        private class Producer : ThreadRunner {

            public Producer(Func<T> produce, BlockingCollection<Message> q) : base(q) {
                _produce = produce;
            }

            private readonly Func<T> _produce;

            public override void Worker() {
                try {
                    while (KeepRunning) {
                        var item = _produce();
                        MessageQ.TryAdd(new Message{Data = item});
                    }
                }
                catch (ThreadInterruptedException) {
                    WasInterrupted = true;
                }
            }
        }

        public abstract class ThreadRunner {

            protected readonly BlockingCollection<Message> MessageQ;

            protected ThreadRunner(BlockingCollection<Message> q) {
                MessageQ = q;
            }

            protected Thread Runner;
            protected bool KeepRunning = true;

            public bool WasInterrupted;

            public abstract void Worker();

            public void Run() {
                Runner = new Thread(Worker);
                Runner.Start();
            }

            public void Stop() {
                KeepRunning = false;
                Runner.Interrupt();
                Runner.Join();
            }

        }

        class Consumer : ThreadRunner {

            private readonly Action<T> _consume;

            public Consumer(Action<T> consume,BlockingCollection<Message> q) : base(q) {
                _consume = consume;
            }

            public override void Worker() {
                try {
                    while (KeepRunning) {
                        Message message;
                        if (MessageQ.TryTake(out message, TimeSpan.FromMilliseconds(100))) {
                            _consume(message.Data);
                        }
                        else {
                            //There's nothing in the Q so I have some spare time...
                            //Excellent moment to update my statisics or update some history to logfiles
                            //for now we sleep:
                            Thread.Sleep(TimeSpan.FromMilliseconds(100));
                        }
                    }
                }
                catch (ThreadInterruptedException) {
                    WasInterrupted = true;
                }
            }
        }
    }

}

用法:

[Fact]
public void ConsumerShouldConsume() {

    var produced = 0;
    var consumed = 0;

    Func<int> produce = () => {
        Thread.Sleep(TimeSpan.FromMilliseconds(100));
        produced++;
        return new Random(2).Next(1000);
    };

    Action<int> consume = c => { consumed++; };

    var t = new ProducerConsumer<int>(produce, consume);
    t.Start();
    Thread.Sleep(TimeSpan.FromSeconds(5));
    t.Stop();

    Assert.InRange(produced,40,60);
    Assert.InRange(consumed, 40, 60);

}

关于c# - GetConsumingEnumerable() 生产者-消费者的多线程 BlockingCollection 替代方案,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14554652/

相关文章:

Jquery 在用户定义的函数之间等待

C# 线程 : Exercises for beginners

c# - WCF RIA服务,EntitySet总是空的?

c# - 我如何查看 GenericTypeDefinition 是否实现了 IEnumerable<>

c# - 带有 ASMX Web 服务的 Web 服务器上的后台进程

python - 并发访问同一个.pyc

java - 如何获取线程的开始时间和结束时间?

powershell - 如果使用 -Wait 参数运行,如何更改 powershell 中进程的优先级?

Android - 在调用方法之前不断增加延迟,直到用户停止交互

c# - 在 InnoSetup 中访问 C# COM 对象时出现异常