c# - 超时缓冲任务数据

标签 c# .net multithreading rabbitmq task-parallel-library

我有一个相当棘手的问题要解决。我有多个(多达一百个或更多)任务,每个任务都会产生一条数据,比如字符串。这些任务每时每刻都可能产生,并且一次可能有大量任务,而另一次则没有。每个任务都必须接收 bool,指示是否正确完成(这很重要)。

我想实现某种缓冲区,以聚合来自任务的数据并将其刷新到外部服务,返回操作状态(正常或失败)。另外,我的缓冲区必须被超时刷新(以防止等待新任务生成数据的时间过长)。

到目前为止,我尝试制作一些共享的项目列表。任务可以将项目添加到列表中,还有另一个任务,检查计时器或列表中的项目计数并刷新它们。但是在这种方法中,我无法将刷新操作的状态告知任务,这对我来说非常糟糕。

如果有任何方法可以解决我的问题,我将不胜感激。

最佳答案

据我了解,您需要将每个任务的结果保存到数据库/服务,但您不想立即执行。

您的问题可能有不止一种解决方案,但很难找到最好的解决方案,所以我将描述我如何做到这一点……很快。

您需要保存/发送的数据的容器。

public class TaskResultEventArgs : EventArgs
{
    public bool Result { get; set; }
}

也为您运行任务的通知程序。我假设您可以延迟执行任务。

public class NotifyingTaskRunner
{
    public event EventHandler<TaskResultEventArgs> TaskCompleted;

    public void RunAndNotify(Task<bool> task)
    {
        task.ContinueWith(t =>
        {
            OnTaskCompleted(this, new TaskResultEventArgs { Result = t.Result });
        }, TaskContinuationOptions.OnlyOnRanToCompletion);
        task.Start();
    }

    protected virtual void OnTaskCompleted(object sender, TaskResultEventArgs e)
    {
        var h = TaskCompleted;
        if (h != null)
        {
            h.Invoke(sender, e);
        }
    }
}

可以缓冲和/或刷新结果的监听器(或者您可能希望将其委托(delegate)给另一个类)。

public class Listener
{
    private ConcurrentQueue<bool> _queue = new ConcurrentQueue<bool>();

    public Listener(NotifyingTaskRunner runner)
    {
        runner.TaskCompleted += Flush;
    }

    public async void Flush(object sender, TaskResultEventArgs e)
    {
        // Enqueue status to flush everything later (or flush it immediately)
        _queue.Enqueue(e.Result);
    }
}

这就是您可以一起使用所有内容的方式。

var runner = new NotifyingTaskRunner();
var listener = new Listener(runner);

var t1 = new Task<bool>(() => { return true; });
var t2 = new Task<bool>(() => { return false; });

runner.RunAndNotify(t1);
runner.RunAndNotify(t2);

关于c# - 超时缓冲任务数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37620534/

相关文章:

c# - 通过调度程序/数据绑定(bind)更新 UI 中缺少什么

c# - IQueryable 上的 .ToList 在后台执行时会导致 NullReferenceException

.net - 如何在 .Net Core 上使用 Apache Tika?

c# - 如何从 SOAP 响应 XML 创建 WCF 消息对象

c++ - boost::asio 中的 post 和 dispatch 有什么区别?

c# - 在 Mono/Linux 中使用 FFmpeg.Autogen 4.1.0.2 时找不到 Kernel32

c# - 从 Jquery 中的 JSON 调用返回 List<string> 的长度未定义

.net - 获取实体导航属性的子集

java - tomcat启动时如何启动一个新线程

java - Android 线程错误