c# - 非阻塞(无锁)一次性初始化

标签 c# multithreading asynchronous lock-free

原始问题:

我只需要在多线程应用程序中初始化一次(当第一个线程进入 block 时)。后续线程应跳过初始化而不等待它完成。

我找到了这个博客条目 Lock-free Thread Safe Initialisation in C#但它并不完全符合我的要求,因为它让其他线程等待初始化完成(如果我理解正确的话)。

这是一个展示问题的示例,尽管由于缺乏同步而无法正常工作:

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace LockFreeInitialization
{
    public class Program
    {
        private readonly ConcurrentQueue<int> _jobsQueue = new ConcurrentQueue<int>();
        private volatile bool _initialized;

        private async Task EnqueueAndProcessJobsAsync(int taskId, int jobId)
        {
            Enqueue(taskId, jobId);

            /* "Critical section"? Only the first thread to arrive should
             * execute OneTimeInitAsync. Subsequent threads should always
             * skip this part. This is where things go wrong as all the
             * tasks execute this section due to lack of synchronization. */
            if (!_initialized)
            {
                await OneTimeInitAsync(taskId);
            }

            /* Before and during initialization, all threads should skip
             * the ProcessQueueAsync. After initialization is completed,
             * it does not matter which thread will execute it (since the
             * _jobsQueue is thread-safe). */
            if (_initialized)
            {
                await ProcessQueueAsync(taskId);
            }
            Console.WriteLine($"Task {taskId} completed.");
        }

        private void Enqueue(int taskId, int jobId)
        {
            Console.WriteLine($"Task {taskId} enqueues job {jobId}.");
            _jobsQueue.Enqueue(jobId);
        }

        private async Task OneTimeInitAsync(int taskId)
        {
            Console.WriteLine($"Task {taskId} is performing initialization");

            /* Do some lengthy initialization */
            await Task.Delay(TimeSpan.FromSeconds(3));
            _initialized = true;

            Console.WriteLine($"Task {taskId} completed initialization");
        }

        private async Task ProcessQueueAsync(int taskId)
        {
            while (_jobsQueue.TryDequeue(out int jobId))
            {
                /* Do something lengthy with the jobId */
                await Task.Delay(TimeSpan.FromSeconds(1));

                Console.WriteLine($"Task {taskId} completed job {jobId}.");
            }
        }

        private static void Main(string[] args)
        {
            var p = new Program();
            var rand = new Random();

            /* Start 4 tasks in parallel */
            for (var threadId = 1; threadId < 5; threadId++)
            {
                p.EnqueueAndProcessJobsAsync(threadId, rand.Next(10));
            }

            /* Give tasks chance to finish */
            Console.ReadLine();
        }
    }
}

OneTimeInitAsyncProcessQueueAsync 都是冗长的操作,在现实生活中会与某些远程服务通信。使用 lock 会阻塞其他线程,而我希望他们只是将他们的工作堆积到 _jobsQueue 上并继续他们的工作。我尝试使用 ManualResetEvent 无济于事。

有谁知道我将如何完成这项工作?提前致谢。


更新(解决方案)

根据下面的讨论,我了解到所提供的场景不足以描述我的问题。但是,由于这些答案和评论,我考虑稍微重新设计一下解决方案,以便按我希望的方式工作。

想象一下客户端的两个远程服务 ServiceA(job processor)和 ServiceB(job repository)应用程序必须与之通信。我们需要建立与 ServiceA 的连接,同时我们从 ServiceB 获取多个作业的数据。当作业数据可用时,我们使用 ServiceA 处理作业(批量)(现实生活中的示例涉及到 ServiceA 的 Signal-R 连接以及需要发送到 ServiceA 的来自 ServiceB 的一些作业 ID)。这是代码示例:

public class StackOverflowSolution
{
    private readonly ConcurrentQueue<int> _jobsQueue = new ConcurrentQueue<int>();

    /* Just to randomize waiting times */
    private readonly Random _random = new Random();

    /* Instance-scoped one-time initialization of a remote ServiceA connection */
    private async Task<string> InitializeConnectionAsync()
    {
        Console.WriteLine($"{nameof(InitializeConnectionAsync)} started");

        await Task.Delay(TimeSpan.FromSeconds(_random.Next(5) + 1));

        Console.WriteLine($"{nameof(InitializeConnectionAsync)} completed");

        return "Connection";
    }

    /* Preparation of a job (assume it requires communication with remote ServiceB) */
    private async Task InitializeJobAsync(int id)
    {
        Console.WriteLine($"{nameof(InitializeJobAsync)}({id}) started");

        await Task.Delay(TimeSpan.FromSeconds(_random.Next(10) + 1));
        _jobsQueue.Enqueue(id);

        Console.WriteLine($"{nameof(InitializeJobAsync)}({id}) completed");
    }

    /* Does something to the ready jobs in the _jobsQueue using connection to
     * ServiceA */
    private async Task ProcessQueueAsync(string connection)
    {
        var sb = new StringBuilder("Processed ");
        bool any = false;
        while (_jobsQueue.TryDequeue(out int idResult))
        {
            any = true;
            sb.Append($"{idResult}, ");
        }
        if (any)
        {
            await Task.Delay(TimeSpan.FromMilliseconds(_random.Next(500)));
            Console.WriteLine(sb.ToString());
        }
    }

    /* Orchestrates the processing */
    public async Task RunAsync()
    {
        /* Start initializing the conection to ServiceA */
        Task<string> connectionTask = InitializeConnectionAsync();
        /* Start initializing jobs */
        var jobTasks = new List<Task>();
        foreach (int id in new[] {1, 2, 3, 4})
        {
            jobTasks.Add(InitializeJobAsync(id));
        }
        /* Wait for initialization to complete */
        string connection = await connectionTask;

        /* Trigger processing of jobs as they become ready */
        var queueProcessingTasks = new List<Task>();
        while (jobTasks.Any())
        {
            jobTasks.Remove(await Task.WhenAny(jobTasks));
            queueProcessingTasks.Add(ProcessQueueAsync(connection));
        }

        await Task.WhenAll(queueProcessingTasks);
    }

    public static void Main()
    {
        new StackOverflowSolution().RunAsync().Wait();
    }
}

输出示例:

InitializeConnectionAsync started
InitializeJobAsync(1) started
InitializeJobAsync(2) started
InitializeJobAsync(3) started
InitializeJobAsync(4) started
InitializeJobAsync(5) started
InitializeJobAsync(3) completed
InitializeJobAsync(2) completed
InitializeConnectionAsync completed
Processed 3, 2,
InitializeJobAsync(1) completed
Processed 1,
InitializeJobAsync(5) completed
Processed 5,
InitializeJobAsync(4) completed
Processed 4,

感谢所有反馈!

最佳答案

老实说 EnqueueAndProcessJobsAsync 的语义因为你的代码根本不是一个好主意,因为你描述了你实际在做什么和你实际需要什么。

当前 TaskEnqueueAndProcessJobsAsync 返回等待初始化如果初始化不是由其他人启动的,那么只要队列为空,或者只要这个逻辑调用上下文碰巧处理了一个出错的项目,它就会完成。那...只是没有意义。

您显然想要的是 Task每当作业完成时完成(当然需要初始化才能完成),或者如果该作业出错则出错,并且不受任何其他作业错误的影响。幸运的是,除了更有用之外,它也更容易做到。

就实际初始化而言,您可以只使用 Lazy<Task>确保异步初始化的正确同步,并公开 Task任何 future 的调用都可以在初始化完成时告诉他们。

public class MyAsyncQueueRequireingInitialization
{
    private readonly Lazy<Task> whenInitialized;
    public MyAsyncQueueRequireingInitialization()
    {
        whenInitialized = new Lazy<Task>(OneTimeInitAsync);
    }
    //as noted in comments, the taskID isn't actually needed for initialization
    private async Task OneTimeInitAsync() 
    {
        Console.WriteLine($"Performing initialization");

        /* Do some lengthy initialization */
        await Task.Delay(TimeSpan.FromSeconds(3));

        Console.WriteLine($"Completed initialization");
    }

    public async Task ProcessJobAsync(int taskID, int jobId)
    {
        await whenInitialized.Value;

        /* Do something lengthy with the jobId */
        await Task.Delay(TimeSpan.FromSeconds(1));

        Console.WriteLine($"Completed job {jobId}.");
    }
}

关于c# - 非阻塞(无锁)一次性初始化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45439928/

相关文章:

C# 任务调度器查询

c# - .NET 1.0 线程池问题

Java 的多个同步块(synchronized block)

java - 从 BlockingQueue 获取时缺少项目

c++ - Boost:两个工作线程,让主线程休眠直到它们都完成

javascript - 在 Node.JS/NPM 的 MySQL 库中使用带有查询的 Promise

node.js - 在函数nodejs中从mariadb返回值

c# - 如何验证 WPF 客户端对 ASP .NET WebAPI 2 的请求

c# - db4o,如果添加了字段,如何更新对象?

c# - 为什么使用 SqlCommand 插入语句很慢?