c# - 异步生产者/消费者

标签 c# multithreading concurrency parallel-processing producer-consumer

我有一个从多个线程访问的类的实例。此类接受此调用并将元组添加到数据库中。我需要以串行方式完成此操作,因为由于某些数据库约束,并行线程可能会导致数据库不一致。

由于我不熟悉 C# 中的并行性和并发性,所以我这样做了:

private BlockingCollection<Task> _tasks = new BlockingCollection<Task>();

public void AddDData(string info)
{
    Task t = new Task(() => { InsertDataIntoBase(info); });
    _tasks.Add(t);
}

private void InsertWorker()
{
    Task.Factory.StartNew(() =>
    {
        while (!_tasks.IsCompleted)
        {
            Task t;
            if (_tasks.TryTake(out t))
            {
                t.Start();
                t.Wait();
            }
        }
    });
}

AddDData 是被多线程调用的,InsertDataIntoBase 是一个非常简单的插入,只需几毫秒。

问题是,出于某种原因,我缺乏知识,无法弄清楚,有时一个任务会被调用两次!它总是这样:

T1 T2 T3 T1 <- PK 错误。 T4 ...

我对 .Take() 的理解是否完全错误,是我遗漏了什么,还是我的生产者/消费者实现真的很糟糕?

最好的问候, 拉斐尔

更新:

按照建议,我使用此架构进行了快速沙盒测试实现,正如我所怀疑的那样,它不能保证在前一个任务完成之前不会触发任务。

enter image description here

所以问题仍然存在:如何正确地排队任务并按顺序触发它们?

更新 2:

我简化了代码:

private BlockingCollection<Data> _tasks = new BlockingCollection<Data>();

public void AddDData(Data info)
{
    _tasks.Add(info);
}

private void InsertWorker()
{
    Task.Factory.StartNew(() =>
    {
        while (!_tasks.IsCompleted)
        {
            Data info;
            if (_tasks.TryTake(out info))
            {
                InsertIntoDB(info);
            }
        }
    });
}

请注意,我摆脱了 Tasks,因为我依赖于同步的 InsertIntoDB 调用(因为它在一个循环中),但仍然没有运气......这一代很好,我绝对确定只有独特的实例是去排队。但无论我怎么尝试,有时同一个对象会被使用两次。

最佳答案

我认为这应该可行:

    private static BlockingCollection<string> _itemsToProcess = new BlockingCollection<string>();

    static void Main(string[] args)
    {
        InsertWorker();
        GenerateItems(10, 1000);
        _itemsToProcess.CompleteAdding();
    }

    private static void InsertWorker()
    {
        Task.Factory.StartNew(() =>
        {
            while (!_itemsToProcess.IsCompleted)
            {
                string t;
                if (_itemsToProcess.TryTake(out t))
                {
                    // Do whatever needs doing here
                    // Order should be guaranteed since BlockingCollection 
                    // uses a ConcurrentQueue as a backing store by default.
                    // http://msdn.microsoft.com/en-us/library/dd287184.aspx#remarksToggle
                    Console.WriteLine(t);
                }
            }
        });
    }

    private static void GenerateItems(int count, int maxDelayInMs)
    {
        Random r = new Random();
        string[] items = new string[count];

        for (int i = 0; i < count; i++)
        {
            items[i] = i.ToString();
        }

        // Simulate many threads adding items to the collection
        items
            .AsParallel()
            .WithDegreeOfParallelism(4)
            .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
            .Select((x) =>
            {
                Thread.Sleep(r.Next(maxDelayInMs));
                _itemsToProcess.Add(x);
                return x;
            }).ToList();
    }

这确实意味着消费者是单线程的,但允许多个生产者线程。

关于c# - 异步生产者/消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18749185/

相关文章:

java - 没有 volatile 关键字的原子语句被忽略

multithreading - 纯函数式编程中的竞争条件

c# - 跨语言共享环境变量

C#读取XFA表单数据

multithreading - 有什么方法可以在生成的线程内调用闭包吗?

multithreading - 多线程设计模式

java - BlockingQueue - 仅获取特定对象。

c# - 具有公共(public)字段的不同类之间的 IEnumerable.Except()

c# - Nopcommerce Plugin Creation中Sequence contains more than one matching elements如何解决?

c - 当客户端套接字在C中终止时如何防止服务器套接字关闭?