c# - 将包含大量 "locks"的代码重构为更多无锁代码

标签 c# locking task-parallel-library lock-free low-latency

更新 感谢 Matthew Watson 注意到并注意到我计划将我的代码移植到 c++-linux,所以我更喜欢“平台无关”代码


Task.Factory.StartNew(() =>
    while (true)
}, TaskCreationOptions.LongRunning);

private void Iterate()
    bool marketDataUpdated = false;

    lock (ordersToRegisterLock)
        if (ordersToRegister.Count > 0)
            marketDataUpdated = true;
            while (ordersToRegister.Count > 0)
                Order order = ordersToRegister.Dequeue();
                // Stage1, Process

    lock (aggrUpdatesLock)
        if (aggrUpdates.Count > 0)
            marketDataUpdated = true;
            while (!aggrUpdates.IsNullOrEmpty())
                var entry = aggrUpdates.Dequeue();
                // Stage1, Process

    lock (commonUpdatesLock)
        if (commonUpdates.Count > 0)
            marketDataUpdated = true;
            while (!commonUpdates.IsNullOrEmpty())
                var entry = commonUpdates.Dequeue();
                // Stage1, Process

    lock (infoUpdatesLock)
        if (infoUpdates.Count > 0)
            marketDataUpdated = true;
            while (!infoUpdates.IsNullOrEmpty())
                var entry = infoUpdates.Dequeue();
                // Stage1, Process

    lock (tradeUpdatesLock)
        if (tradeUpdates.Count > 0)
            marketDataUpdated = true;
            while (!tradeUpdates.IsNullOrEmpty())
                var entry = tradeUpdates.Dequeue();
                // Stage1, Process


    if (marketDataUpdated)
        // Stage2 !
        // make a lot of work. expensive operation. recalculate strategies, place orders etc.

private readonly Queue<Order> ordersToRegister = new Queue<Order>();
private readonly object ordersToRegisterLock = new object();

private readonly Queue<AggrEntry> aggrUpdates = new Queue<AggrEntry>();
private readonly object aggrUpdatesLock = new object();

private readonly Queue<CommonEntry> commonUpdates = new Queue<CommonEntry>();
private readonly object commonUpdatesLock = new object();

private readonly Queue<InfoEntry> infoUpdates = new Queue<InfoEntry>();
private readonly object infoUpdatesLock = new object();

private readonly Queue<TradeEntry> tradeUpdates = new Queue<TradeEntry>();
private readonly object tradeUpdatesLock = new object();

    public void RegistorOrder(object sender, Gate.RegisterOrderArgs e)
        lock (ordersToRegisterLock)

    public void TradeUpdated(object sender, Gate.TradeArgs e)
        lock (tradeUpdatesLock)
            foreach (var entry in e.entries)

    public void InfoUpdated(object sender, Gate.InfoArgs e)
        lock (infoUpdatesLock)
            foreach (var entry in e.entries)

    public void CommonUpdated(object sender, Gate.CommonArgs e)
        lock (commonUpdatesLock)
            foreach (var entry in e.entries)

    public void AggrUpdated(object sender, Gate.AggrArgs e)
        lock (aggrUpdatesLock)
            foreach (var entry in e.entries)

在我的代码中有两个阶段。 Stage1 是更新阶段,Stage2 是工作阶段。我需要尽可能快地在这两个阶段之间切换,就像这样:

  • 有更新吗?没有
  • 有更新吗?没有
  • 有更新吗?是的,订单已更新!应用更新,执行 Stage2
  • 有更新吗?没有
  • 有更新吗?是的,订单需要注册!应用更新,执行 Stage2
  • 有更新吗?是的,交易发生,应用更新,执行Stage2

Stage2 中,我不应该更新,但应该继续“收集”更新,以便以后应用它们。

重要的是 - 这是对延迟非常关键的代码,所以我同意“花费”一个核心来实现最小延迟!因此,当发生任何更新时,我需要尽快处理它并执行Stage2


  • 很多锁!它可以用一些“无锁”代码代替吗?用 CAS 之类的自旋锁?
  • 占用 100% 的 CPU 核心,我可以节省一些 CPU 资源而不影响延迟吗?
  • 我可以/应该告诉 .NET 使用“专用”核心(设置任务关联性吗?)以避免额外的“切换”?
  • 我从一个线程添加到队列,并从另一个线程读取队列。这会是个问题吗?如果添加和读取队列是不稳定的?我的读取线程是否可能因为缓存更新问题而看不到来自队列的更新?


upd 部分解决了——据我所知,我最好将查询替换为无锁(可能是基于环形缓冲区?)查询。我想我稍后会使用 C++ 版本的 disruptor。我也用过这篇文章 http://www.umbraworks.net/bl0g/rebuildall/2010/03/08/Running_NET_threads_on_selected_processor_cores并将 Task 替换为运行在“固定”核心上的 Thread,但我仍在使用“busy-spin”,也许我应该使用更智能的东西?


使用下面的代码,您在“第 1 阶段”处理期间不再被锁定:

Task.Factory.StartNew(() =>
    while (true)
}, TaskCreationOptions.LongRunning);

private void Iterate()
    bool marketDataUpdated = false;

    foreach (Order order in ordersToRegister)
        marketDataUpdated = true;
        // Stage1, Process

    foreach (var entry in aggrUpdates)
        marketDataUpdated = true;
        // Stage1, Process

    foreach (var entry in commonUpdates)
        marketDataUpdated = true;
        // Stage1, Process

    foreach (var entry in infoUpdates)
        marketDataUpdated = true;
        // Stage1, Process

    foreach (var entry in tradeUpdates)
        marketDataUpdated = true;
        // Stage1, Process

    if (marketDataUpdated)
        // Stage2 !
        // make a lot of work. expensive operation. recalculate strategies, place orders etc.

private readonly ConcurrentQueue<Order> ordersToRegister = new ConcurrentQueue<Order>();

private readonly ConcurrentQueue<AggrEntry> aggrUpdates = new ConcurrentQueue<AggrEntry>();

private readonly ConcurrentQueue<CommonEntry> commonUpdates = new ConcurrentQueue<CommonEntry>();

private readonly ConcurrentQueue<InfoEntry> infoUpdates = new ConcurrentQueue<InfoEntry>();

private readonly ConcurrentQueue<TradeEntry> tradeUpdates = new ConcurrentQueue<TradeEntry>();

    public void RegistorOrder(object sender, Gate.RegisterOrderArgs e)

    public void TradeUpdated(object sender, Gate.TradeArgs e)
        foreach (var entry in e.entries)

    public void InfoUpdated(object sender, Gate.InfoArgs e)
        foreach (var entry in e.entries)

    public void CommonUpdated(object sender, Gate.CommonArgs e)
        foreach (var entry in e.entries)

    public void AggrUpdated(object sender, Gate.AggrArgs e)
        foreach (var entry in e.entries)

关于c# - 将包含大量 "locks"的代码重构为更多无锁代码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14919751/


c# - ASP.NET C# 代码中的互斥释放问题

java - 临时文件、单实例锁和进程终止 (Java)

c# - 如何在 TPL 中检查变量是否已在其他线程中更改

c# - 未显式传递时通用类型的值?

c# - Resharper 重构以删除魔术字符串

c# - 重命名类 Visual Studio 2017

perl - 如何在 Perl 中测试对资源(缓存)的并发访问?

.net - 使用 TPL 时如何管理线程本地存储 (TLS)?

c# - 使用 Tasks (TPL) 库会使应用程序多线程吗?

c# - 如何在 c# 中的 switch case 中使用枚举组合?