c# - 接收并发异步请求并一次处理一个请求

标签 c# .net multithreading asynchronous concurrency

背景

我们有一个服务操作,可以接收并发异步请求,并且必须一次处理一个请求。

在以下示例中,UploadAndImport(...) 方法在多个线程上接收并发请求,但其对 ImportFile(...) 方法的调用必须一次发生一个。

外行描述

想象一个有许多 worker (多线程)的仓库。人们(客户)可以同时(并发)向仓库发送许多包裹(请求)。当包裹进来时, worker 从头到尾负责,而投递包裹的人可以离开(即发即忘)。 worker 的工作是将每个包裹放入一个小滑槽中,一次只能有一个 worker 将一个包裹放入一个滑槽中,否则会发生困惑。如果投递包裹的人稍后 checkin (轮询端点),仓库应该能够报告包裹是否落入斜槽。

问题

接下来的问题是如何编写一个服务操作......

  1. 可以接收并发客户端请求,
  2. 在多个线程上接收并处理这些请求,
  3. 在接收请求的同一线程上处理请求,
  4. 一次处理一个请求,
  5. 是一种单向即发即忘操作,并且
  6. 有一个单独的轮询端点,用于报告请求完成情况。

我们尝试了以下方法,但想知道两件事:

  1. 是否有任何我们没有考虑到的竞争条件?
  2. 是否有更规范的方法可以使用面向服务的架构(我们碰巧使用的是 WCF)在 C#.NET 中对此场景进行编码?

示例:我们尝试过什么?

这是我们尝试过的服务代码。虽然感觉有点像黑客或拼凑,但它确实有效。

static ImportFileInfo _inProgressRequest = null;

static readonly ConcurrentDictionary<Guid, ImportFileInfo> WaitingRequests = 
    new ConcurrentDictionary<Guid, ImportFileInfo>();

public void UploadAndImport(ImportFileInfo request)
{
    // Receive the incoming request
    WaitingRequests.TryAdd(request.OperationId, request);

    while (null != Interlocked.CompareExchange(ref _inProgressRequest, request, null))
    {
        // Wait for any previous processing to complete
        Thread.Sleep(500);
    }

    // Process the incoming request
    ImportFile(request);

    Interlocked.Exchange(ref _inProgressRequest, null);
    WaitingRequests.TryRemove(request.OperationId, out _);
}

public bool UploadAndImportIsComplete(Guid operationId) => 
    !WaitingRequests.ContainsKey(operationId);

这是示例客户端代码。

private static async Task UploadFile(FileInfo fileInfo, ImportFileInfo importFileInfo)
{
    using (var proxy = new Proxy())
    using (var stream = new FileStream(fileInfo.FullName, FileMode.Open, FileAccess.Read))
    {
        importFileInfo.FileByteStream = stream;
        proxy.UploadAndImport(importFileInfo);
    }

    await Task.Run(() => Poller.Poll(timeoutSeconds: 90, intervalSeconds: 1, func: () =>
    {
        using (var proxy = new Proxy())
        {
            return proxy.UploadAndImportIsComplete(importFileInfo.OperationId);
        }
    }));
}

很难在 Fiddle 中编写一个最小可行的示例,但是 here is a start给人一种感觉并且可以编译。

和以前一样,上面的内容看起来像是一个 hack/kludge,我们正在询问其方法中的潜在陷阱以及更合适/规范的替代模式。

最佳答案

使用生产者-消费者模式在线程计数限制的情况下通过管道传输请求的简单解决方案。

您仍然需要实现一个简单的进度报告器或事件。我建议用 Microsoft SignalR 提供的异步通信来代替昂贵的轮询方法。图书馆。它使用 WebSocket 来启用异步行为。客户端和服务器可以在集线器上注册它们的回调。使用 RPC,客户端现在可以调用服务器端方法,反之亦然。您可以使用中心(客户端)将进度发布给客户端。根据我的经验,SignalR 使用起来非常简单,并且有很好的文档记录。它有一个适用于所有著名服务器端语言(例如 Java)的库。

在我看来,轮询与“即发即弃”完全相反。你不能忘记,因为你必须根据时间间隔检查某些内容。基于事件的通信,例如 SignalR,是一劳永逸的,因为您触发并会收到提醒(因为您忘记了)。 “事件方”将调用您的回调,而不是您等待自己执行!

要求 5 被忽略,因为我没有得到任何理由。等待线程完成将消除火灾和忘记字符。

private BlockingCollection<ImportFileInfo> requestQueue = new BlockingCollection<ImportFileInfo>();
private bool isServiceEnabled;
private readonly int maxNumberOfThreads = 8;
private Semaphore semaphore = new Semaphore(numberOfThreads);
private readonly object syncLock = new object();

public void UploadAndImport(ImportFileInfo request) 
{            
  // Start the request handler background loop
  if (!this.isServiceEnabled)
  {
    this.requestQueue?.Dispose();
    this.requestQueue = new BlockingCollection<ImportFileInfo>();

    // Fire and forget (requirement 4)
    Task.Run(() => HandleRequests());
    this.isServiceEnabled = true;
  }

  // Cache multiple incoming client requests (requirement 1) (and enable throttling)
  this.requestQueue.Add(request);
}

private void HandleRequests()
{
  while (!this.requestQueue.IsCompleted)
  {
    // Wait while thread limit is exceeded (some throttling)
    this.semaphore.WaitOne();

    // Process the incoming requests in a dedicated thread (requirement 2) until the BlockingCollection is marked completed.
    Task.Run(() => ProcessRequest());
  }

  // Reset the request handler after BlockingCollection was marked completed
  this.isServiceEnabled = false;
  this.requestQueue.Dispose();
}

private void ProcessRequest()
{
  ImportFileInfo request = this.requestQueue.Take();
  UploadFile(request);

  // You updated your question saying the method "ImportFile()" requires synchronization.
  // This a bottleneck and will significantly drop performance, when this method is long running. 
  lock (this.syncLock)
  {
    ImportFile(request);
   }

  this.semaphore.Release();
}

备注:

  • BlockingCollection是一个 IDisposable
  • TODO:您必须通过将 BlockingCollection 标记为已完成来“关闭”它: “BlockingCollection.CompleteAdding()”否则它将不确定地循环等待进一步的请求。也许您会为客户端引入额外的请求方法来取消和/或更新流程并将添加到 BlockingCollection 的操作标记为已完成。或者是一个在将其标记为已完成之前等待空闲时间的计时器。或者让您的请求处理程序线程阻塞或旋转。
  • 如果您需要取消支持,请将 Take() 和 Add(...) 替换为 TryTake(...) 和 TryAdd(...)
  • 代码未经测试
  • 您的“ImportFile()”方法是多线程环境中的瓶颈。我建议使其线程安全。如果 I/O 需要同步,我会将数据缓存在 BlockingCollection 中,然后将它们一一写入 I/O。

关于c# - 接收并发异步请求并一次处理一个请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50184497/

相关文章:

java - ExoPlayer 测试 - 无法在未调用 Looper.prepare() 的线程内创建处理程序

c# - TcpClient ConnectAsync 获取状态

c# - 使用 iText 7 将文本环绕在单元格中的图像周围

c# - 在 dotnet highchart 中设置图表大小?

c# - 表格中的光标点

multithreading - 使用OpenMP使用已知种子生成随机数的安全方法是什么?

c# - EF 4 : The specified type member '*' is not supported in LINQ to Entities

c# - MySql.Data.MySqlClient.MySqlException :“Access denied for user ' hobo' @'49.70.207.34'(使用密码 : YES)”

c# - 如何将 <select multiple> 发布到 List<T> ASP.NET Core

c# - 如何在 web api get 中返回 IEnumerable DbSet?