c# - Microsoft TPL 数据流 - 同步处理关联请求

标签 c# .net task-parallel-library tpl-dataflow

我提前为标题道歉,但这是我能想到的最好的描述 Action 的方法。

需求是处理消息总线的请求。 传入的请求可能与关联或分组这些请求的 id 有关。 我想要的行为是请求流同步处理相关的 id。 但是可以异步处理不同的 id。

我正在使用并发字典来跟踪正在处理的请求和链接中的谓词。

这是假设提供相关请求的同步处理。

然而,我得到的行为是第一个请求得到处理,第二个请求被丢弃。

我已附上来自控制台应用程序的示例代码来模拟该问题。

我们将不胜感激任何指导或反馈。

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace ConsoleApplication2
{
    class Program
    {
        static void Main(string[] args)
        {
            var requestTracker = new ConcurrentDictionary<string, string>();

            var bufferBlock = new BufferBlock<Request>();

            var actionBlock = new ActionBlock<Request>(x => 
            {
                Console.WriteLine("processing item {0}",x.Name);
                Thread.Sleep(5000);
                string itemOut = null;
                requestTracker.TryRemove(x.Id, out itemOut);
            });

            bufferBlock.LinkTo(actionBlock, x => requestTracker.TryAdd(x.Id,x.Name));


            var publisher = Task.Run(() =>
            {
                var request = new Request("item_1", "first item");
                bufferBlock.SendAsync(request);

                var request_1 = new Request("item_1", "second item");
                bufferBlock.SendAsync(request_1);

            });

            publisher.Wait();
            Console.ReadLine();
        }
    }

    public class Request
    {
        public Request(string id, string name)
        {
            this.Id = id;
            this.Name = name;
        }
        public string Id { get; set; }
        public string Name { get; set; }
    }
}

最佳答案

  1. 你说你想并行处理一些请求(至少我假设这就是你所说的“异步”的意思),但是 ActionBlock 默认情况下不是并行的。要更改它,请设置 MaxDegreeOfParallelism .

  2. 您正在尝试使用 TryAdd() 作为过滤器,但由于以下两个原因,它不起作用:

    1. 过滤器只被调用一次,它不会自动重试或类似的事情。这意味着如果一个项目没有通过,它就永远不会通过,即使在阻止它的项目完成之后也是如此。
    2. 如果一个项目卡在 block 的输出队列中,则没有其他项目会离开该 block 。这可能会显着降低并行度,即使您以某种方式解决了上一个问题。
  3. 我认为这里最简单的解决方案是为每个组设置一个 block ,这样,每个组中的项目将按顺序处理,但不同组中的项目将并行处理。在代码中,它可能类似于:

    var processingBlocks = new Dictionary<string, ActionBlock<Request>>();
    
    var splitterBlock = new ActionBlock<Request>(request =>
    {
        ActionBlock<Request> processingBlock;
    
        if (!processingBlocks.TryGetValue(request.Id, out processingBlock))
        {
            processingBlock = processingBlocks[request.Id] =
                new ActionBlock<Request>(r => /* process the request here */);
        }
    
        processingBlock.Post(request);
    });
    

    这种方法的问题是组的处理 block 永远不会消失。如果您负担不起(这是内存泄漏),因为您将拥有大量组,那么 hashing approach suggested by I3arnon是要走的路。

关于c# - Microsoft TPL 数据流 - 同步处理关联请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24618348/

相关文章:

php - 只有 1 个用户访问本地 Intranet 页面被拒绝

c# - 在 Web 服务中使用 Task.WhenAny()

c# - 优先级最低的线程被调用更多次

c# - Singleton Unity 中出现实例引用错误,无法访问成员 '<method>'

c# - NHibernate 4 升级的重大变化

c# - OpenCV Marker Z 轴变换不正确

c# - XInclude 的替代品

c# - Task.Factory.StartNew() 生成对象未初始化错误

c# - 如何更改整个 Canvas 的鼠标光标形状?

c# - Linq like for Java