我遇到的情况是,我有多个生产者和多个消费者。生产者将作业放入队列中。我选择了 BlockingCollection,它效果很好,因为我需要消费者等待找到工作。但是,如果我使用 GetConsumingEnumerable() 功能,集合中项目的顺序会发生变化...这不是我需要的。
它甚至在 MSDN 中说 http://msdn.microsoft.com/en-us/library/dd287186.aspx 它不保留项目的顺序。
有人知道这种情况的替代方案吗?
我看到 Take 方法可用,但它是否也为使用者线程提供“等待”条件?
上面写着http://msdn.microsoft.com/en-us/library/dd287085.aspx
“对 Take 的调用可能会阻塞,直到可以删除项目为止。”使用 TryTake 更好吗? 我确实需要线程等待并继续检查作业。
最佳答案
Take 会阻塞线程,直到有可用的东西为止。
TryTake 顾名思义尝试这样做,但如果失败或成功则返回 bool 值。 允许更多的灵活性使用它:
while(goingOn){
if( q.TryTake(out var){
Process(var)
}
else{
DoSomething_Usefull_OrNotUseFull_OrEvenSleep();
}
}
而不是
while(goingOn){
if( var x = q.Take(){
//w'll wait till this ever will happen and then we:
Process(var)
}
}
我的票投给了 TryTake :-)
示例:
public class ProducerConsumer<T> {
public struct Message {
public T Data;
}
private readonly ThreadRunner _producer;
private readonly ThreadRunner _consumer;
public ProducerConsumer(Func<T> produce, Action<T> consume) {
var q = new BlockingCollection<Message>();
_producer = new Producer(produce,q);
_consumer = new Consumer(consume,q);
}
public void Start() {
_producer.Run();
_consumer.Run();
}
public void Stop() {
_producer.Stop();
_consumer.Stop();
}
private class Producer : ThreadRunner {
public Producer(Func<T> produce, BlockingCollection<Message> q) : base(q) {
_produce = produce;
}
private readonly Func<T> _produce;
public override void Worker() {
try {
while (KeepRunning) {
var item = _produce();
MessageQ.TryAdd(new Message{Data = item});
}
}
catch (ThreadInterruptedException) {
WasInterrupted = true;
}
}
}
public abstract class ThreadRunner {
protected readonly BlockingCollection<Message> MessageQ;
protected ThreadRunner(BlockingCollection<Message> q) {
MessageQ = q;
}
protected Thread Runner;
protected bool KeepRunning = true;
public bool WasInterrupted;
public abstract void Worker();
public void Run() {
Runner = new Thread(Worker);
Runner.Start();
}
public void Stop() {
KeepRunning = false;
Runner.Interrupt();
Runner.Join();
}
}
class Consumer : ThreadRunner {
private readonly Action<T> _consume;
public Consumer(Action<T> consume,BlockingCollection<Message> q) : base(q) {
_consume = consume;
}
public override void Worker() {
try {
while (KeepRunning) {
Message message;
if (MessageQ.TryTake(out message, TimeSpan.FromMilliseconds(100))) {
_consume(message.Data);
}
else {
//There's nothing in the Q so I have some spare time...
//Excellent moment to update my statisics or update some history to logfiles
//for now we sleep:
Thread.Sleep(TimeSpan.FromMilliseconds(100));
}
}
}
catch (ThreadInterruptedException) {
WasInterrupted = true;
}
}
}
}
}
用法:
[Fact]
public void ConsumerShouldConsume() {
var produced = 0;
var consumed = 0;
Func<int> produce = () => {
Thread.Sleep(TimeSpan.FromMilliseconds(100));
produced++;
return new Random(2).Next(1000);
};
Action<int> consume = c => { consumed++; };
var t = new ProducerConsumer<int>(produce, consume);
t.Start();
Thread.Sleep(TimeSpan.FromSeconds(5));
t.Stop();
Assert.InRange(produced,40,60);
Assert.InRange(consumed, 40, 60);
}
关于c# - GetConsumingEnumerable() 生产者-消费者的多线程 BlockingCollection 替代方案,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14554652/