c# - C#中的多个并发定期刷新操作

标签 c# multithreading concurrency task-parallel-library

需要定期执行一些操作。主要要求是:1)不要开始下一个更新周期,而以前的还没有完成2)如果先前迭代中获得的数据仍然有效,则不要开始更新,即自上次刷新以来的时间小于TTL值3)有单独的(例如> 10个)进行此类更新所需的线程。
SO上有很多类似的问题,所以我发现了here的PeriodicTaskFactory @Jim的实现。
它按预期工作,但是当同时生成多个这样的工厂时,我在刷新过程中开始经历一些开销,这些开销使整个过程变形(取消了将要发生的少量迭代)。
这是代码:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace CA_TasksTests
{
    class Program
    {
        // Result
        public class Result
        {
            public string Message { get; set; }
            public Result(int iter, string info)
            {
                Message = "#" + iter + info;
            }
            public override string ToString()
            {
                return Message;
            }
        }

        // Operation parameters
        public class Operation
        {
            public string OperationName { get; set; }
            public TimeSpan TTL { get { return TimeSpan.FromMilliseconds(Interval); } }
            public DateTime LastUpdate { get; set; }
            public Operation(int id)
            {
                OperationName = "Operation" + ((id < 10) ? "0" : "") + id;
            }

        }

        public static int Interval = 2000;
        public static int Duration = 10000;
        public static int OperationsCount = 10;
        static void Main()
        {
            // Creating 10 test operations
            var operations = Enumerable.Range(1, OperationsCount).Select(i => new Operation(i)).ToList();
            // Executing them
            var r = ExecuteActions(operations).OrderBy(i => i.Message).ToList();
            Console.WriteLine("Results (expected=" + (Duration/Interval*OperationsCount) + ") : " + r.Count);
            Console.ReadLine();

        }

        // Operation execution
        public static Result ExecuteOperation(int iter, Operation operation)
        {
            // Assiging last update timestamp
            operation.LastUpdate = DateTime.Now;
            var t = Task.Factory.StartNew(() =>
                {
                    // Some operation
                    Thread.Sleep(1000);
                    return new Result(iter, operation.OperationName);
                });
            var r = t.Result;
            return r;
        }

        public static List<Result> ExecuteActions(List<Operation> operations)
        {
            var list = new List<Result>();
            var tasks = new ConcurrentBag<Task>();
            foreach (var currentOperation in operations)
            {
                var iter = 0;
                var locker = new object();
                Operation operation = currentOperation;
                var perdiodicTask = PeriodicTaskFactory.Start(() =>
                                {
                                    // (*) Looking if we need updates semantically - 
                                    // through comparing time since last refresh with operation TTL
                                    Console.WriteLine(DateTime.Now + " : " + (DateTime.Now - operation.LastUpdate) + " ?> " + operation.TTL);
                                    // Looking if we need updates logically -
                                    // if previous operation is still running
                                    if (!Monitor.TryEnter(locker))
                                    {
                                        Console.WriteLine(">>>" + DateTime.Now + " Cancelled");
                                        return;
                                    }
                                    try
                                    {
                                        // Semantic update
                                        if (DateTime.Now - operation.LastUpdate > operation.TTL)
                                        {
                                            iter++;
                                            Console.WriteLine(DateTime.Now + " Refresh #" + iter + " " + operation.OperationName);
                                            list.Add(ExecuteOperation(iter, operation));
                                        }
                                    }
                                    finally
                                    {
                                        Monitor.Exit(locker);
                                    }
                                }, intervalInMilliseconds: (int)operation.TTL.TotalMilliseconds, duration: Duration /*maxIterations:2*/);


                var end = perdiodicTask.ContinueWith(_ =>
                    {
                        _.Dispose();
                        Console.WriteLine(">>>" + DateTime.Now + " " + operation.OperationName + " finished");
                    });
                tasks.Add(end);
            }
            Task.WaitAll(tasks.ToArray());
            return list;
        }
    }

    /// <summary>
    /// Factory class to create a periodic Task to simulate a <see cref="System.Threading.Timer"/> using <see cref="Task">Tasks.</see>
    /// </summary>
    public static class PeriodicTaskFactory
    {
        /// <summary>
        /// Starts the periodic task.
        /// </summary>
        /// <param name="action">The action.</param>
        /// <param name="intervalInMilliseconds">The interval in milliseconds.</param>
        /// <param name="delayInMilliseconds">The delay in milliseconds, i.e. how long it waits to kick off the timer.</param>
        /// <param name="duration">The duration.
        /// <example>If the duration is set to 10 seconds, the maximum time this task is allowed to run is 10 seconds.</example></param>
        /// <param name="maxIterations">The max iterations.</param>
        /// <param name="synchronous">if set to <c>true</c> executes each period in a blocking fashion and each periodic execution of the task
        /// is included in the total duration of the Task.</param>
        /// <param name="cancelToken">The cancel token.</param>
        /// <param name="periodicTaskCreationOptions"><see cref="TaskCreationOptions"/> used to create the task for executing the <see cref="Action"/>.</param>
        /// <returns>A <see cref="Task"/></returns>
        /// <remarks>
        /// Exceptions that occur in the <paramref name="action"/> need to be handled in the action itself. These exceptions will not be 
        /// bubbled up to the periodic task.
        /// </remarks>
        public static Task Start(Action action,
                                 int intervalInMilliseconds = Timeout.Infinite,
                                 int delayInMilliseconds = 0,
                                 int duration = Timeout.Infinite,
                                 int maxIterations = -1,
                                 bool synchronous = false,
                                 CancellationToken cancelToken = new CancellationToken(),
                                 TaskCreationOptions periodicTaskCreationOptions = TaskCreationOptions.None)
        {
            //Console.WriteLine(DateTime.Now + " PeriodicTaskFactory.Start");
            Stopwatch stopWatch = new Stopwatch();
            Action wrapperAction = () =>
            {
                CheckIfCancelled(cancelToken);
                action();
            };

            Action mainAction = () =>
            {
                MainPeriodicTaskAction(intervalInMilliseconds, delayInMilliseconds, duration, maxIterations, cancelToken, stopWatch, synchronous, wrapperAction, periodicTaskCreationOptions);
            };

            return Task.Factory.StartNew(mainAction, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);
        }

        /// <summary>
        /// Mains the periodic task action.
        /// </summary>
        /// <param name="intervalInMilliseconds">The interval in milliseconds.</param>
        /// <param name="delayInMilliseconds">The delay in milliseconds.</param>
        /// <param name="duration">The duration.</param>
        /// <param name="maxIterations">The max iterations.</param>
        /// <param name="cancelToken">The cancel token.</param>
        /// <param name="stopWatch">The stop watch.</param>
        /// <param name="synchronous">if set to <c>true</c> executes each period in a blocking fashion and each periodic execution of the task
        /// is included in the total duration of the Task.</param>
        /// <param name="wrapperAction">The wrapper action.</param>
        /// <param name="periodicTaskCreationOptions"><see cref="TaskCreationOptions"/> used to create a sub task for executing the <see cref="Action"/>.</param>
        private static void MainPeriodicTaskAction(int intervalInMilliseconds,
                                                   int delayInMilliseconds,
                                                   int duration,
                                                   int maxIterations,
                                                   CancellationToken cancelToken,
                                                   Stopwatch stopWatch,
                                                   bool synchronous,
                                                   Action wrapperAction,
                                                   TaskCreationOptions periodicTaskCreationOptions)
        {
            var iters = duration / intervalInMilliseconds;
            if (iters > 0)
            {
                maxIterations = iters;
            }
            TaskCreationOptions subTaskCreationOptions = TaskCreationOptions.AttachedToParent | periodicTaskCreationOptions;

            CheckIfCancelled(cancelToken);

            if (delayInMilliseconds > 0)
            {
                Thread.Sleep(delayInMilliseconds);
            }

            if (maxIterations == 0) { return; }

            int iteration = 0;

            ////////////////////////////////////////////////////////////////////////////
            // using a ManualResetEventSlim as it is more efficient in small intervals.
            // In the case where longer intervals are used, it will automatically use 
            // a standard WaitHandle....
            // see http://msdn.microsoft.com/en-us/library/vstudio/5hbefs30(v=vs.100).aspx
            using (ManualResetEventSlim periodResetEvent = new ManualResetEventSlim(false))
            {
                ////////////////////////////////////////////////////////////
                // Main periodic logic. Basically loop through this block
                // executing the action
                while (true)
                {
                    CheckIfCancelled(cancelToken);

                    Task subTask = Task.Factory.StartNew(wrapperAction, cancelToken, subTaskCreationOptions, TaskScheduler.Current);

                    if (synchronous)
                    {
                        stopWatch.Start();
                        try
                        {
                            subTask.Wait(cancelToken);
                        }
                        catch { /* do not let an errant subtask to kill the periodic task...*/ }
                        stopWatch.Stop();
                    }

                    // use the same Timeout setting as the System.Threading.Timer, infinite timeout will execute only one iteration.
                    if (intervalInMilliseconds == Timeout.Infinite) { break; }

                    iteration++;

                    if (maxIterations > 0 && iteration >= maxIterations) { break; }

                    try
                    {
                        stopWatch.Start();
                        periodResetEvent.Wait(intervalInMilliseconds, cancelToken);
                        stopWatch.Stop();
                    }
                    finally
                    {
                        periodResetEvent.Reset();
                    }

                    CheckIfCancelled(cancelToken);

                    if (duration > 0 && stopWatch.ElapsedMilliseconds >= duration) { break; }
                }
            }
        }

        /// <summary>
        /// Checks if cancelled.
        /// </summary>
        /// <param name="cancelToken">The cancel token.</param>
        private static void CheckIfCancelled(CancellationToken cancellationToken)
        {
            if (cancellationToken == null)
                throw new ArgumentNullException("cancellationToken");

            cancellationToken.ThrowIfCancellationRequested();
        }
    }
}

TTL比较检查(*)的输出显示:
9/23/2013 2:19:17 PM : 00:00:01.9910000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:01.9910000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:01.9910000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:01.9910000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:01.0020000 >? 00:00:02
9/23/2013 2:19:17 PM : 00:00:00.9940000 >? 00:00:02

因此,由于这种开销,我几乎没有取消任何更新。可能是什么原因造成的,以及如何解决?我的第一个猜测是线程切换开销,这导致在该比较中设置一些Epsilon并可以使用它。感谢帮助。

最佳答案

那是做事的相当复杂的方法。我建议使用System.Threading.Timer换一条路线。不需要锁,并且操作可以同时运行。此外,每次更新的时间可以不同。

为了防止可重入的更新(即在运行先前的FooUpdate时再次触发oj​​it_code),您创建了一个一次性计时器,每次更新后都会对其进行初始化。因此您的计时器如下所示:

System.Threading.Timer FooUpdateTimer = new System.Threading.Timer(
    FooUpdate, null, TimeSpan.FromSeconds(2), TimeSpan.Infinite);

您的FooUpdate看起来像这样:
DateTime LastFooUpdate = DateTime.MinValue;
void FooUpdate(object state)
{
    // check data freshness
    if ((DateTime.UtcNow - LastFooUpdate) > SomeMinimumTime)
    {
        // do update
        // and reset last update time
        LastFooUpdate = DateTime.UtcNow;
    }
    // then, reset the timer
    FooUpdateTimer.Change(TimeSpan.FromSeconds(2), TimeSpan.Infinite);
}

如果您希望每10秒运行一次FooUpdate,则可以使用10秒的更新时间来复制上面的代码。那是:
System.Threading.Timer BarUpdateTimer = new System.Threading.Timer(
    BarUpdate, null, TimeSpan.FromSeconds(10), TimeSpan.Infinite);

DateTime LastBarUpdate = DateTime.MinValue;
void BarUpdate(object state)
{
    ...
}

如果您只有一个或两个,那很好。如果希望有一堆它们,那么可以将该功能包装到一个类中。让我们来看看 。 。 。
class PeriodicUpdater
{
    private System.Threading.Timer _timer;
    private TimeSpan _interval;
    private DateTime _lastUpdateTime = DateTime.MinValue;
    private Action _updateAction;
    private TimeSpan _freshness;

    public PeriodicUpdater(Action updateAction, TimeSpan interval, TimeSpan freshness)
    {
        _interval = interval;
        _updateAction = updateAction;
        _freshness = freshness;
        _timer = new Timer(TimerTick, null, _interval, TimeSpan.Infinite);
    }

    private void TimerTick(object state)
    {
        if ((DateTime.UtcNow - LastUpdateTime) >= _freshness)
        {
            _updateAction();
            _lastUpdateTime = DateTime.UtcNow;
        }
        _timer.Change(_interval, TimeSpan.Infinite);
    }
}

并创建一个:
var FooUpdater = new PeriodicUpdater(
    FooUpdateAction, 
    TimeSpan.FromSeconds(2.0),
    TimeSpan.FromSeconds(8.0));

var BarUpdater = new PeriodicUpdater(
    BarUpdateAction,
    TimeSpan.FromSeconds(10.0),
    TimeSpan.FromSeconds(15.5));

private void FooUpdateAction()
{
    // do the Foo update
}

private void BarUpdateAction()
{
    // do the Bar update
}

那应该给你基本的想法。

更新:取消

如果要添加对取消的支持,可以将BarUpdate传递给构造函数,并注册一个回调。因此,构造函数变为:
    public PeriodicUpdater(Action updateAction, TimeSpan interval, 
        TimeSpan freshness, CancellationToken ct)
    {
        _interval = interval;
        _updateAction = updateAction;
        _freshness = freshness;
        _timer = new Timer(TimerTick, null, _interval, TimeSpan.Infinite);
        ct.Register(Cancel);
    }

然后添加CancellationToken方法:
    private void Cancel()
    {
        _timer.Change(0, 0);
        _timer.Dispose();
    }

关于c# - C#中的多个并发定期刷新操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18958155/

相关文章:

c# - 从非托管 C++ 调用 C# 传递或返回 "Complex"类型

java - 为什么会死锁

java - 再次设置 AtomicBoolean

java - 如何同步对不同类成员的访问?

c# - 如何在工作线程中从 UI 线程获取变量?

scala - 当使用 Futures 执行上下文的其他实现时,程序永远不会结束

c# - 有人可以向我展示一个代码示例,展示如何将对象序列化为 JSON 吗?

c# - 通过加载项在 Visual Studio 中添加 ToolBoxTab 和 ToolBoxItems

c# - 如何在 FirstChanceException 的日志记录中包含实际的表单类名?

Python tkinter 线程和窗口刷新