c# - 将包含大量 "locks"的代码重构为更多无锁代码

标签 c# locking task-parallel-library lock-free low-latency

更新 感谢 Matthew Watson 注意到并注意到我计划将我的代码移植到 c++-linux,所以我更喜欢“平台无关”代码

我的交易应用程序几乎是无锁的。下面的代码是我唯一使用锁的地方。让我从代码开始,它很长,但不要担心有很多重复的部分,所以它很简单。我更喜欢添加所有“重复”部分以更好地展示我的工作原理:

Task.Factory.StartNew(() =>
{
    while (true)
    {
        Iterate();
    }
}, TaskCreationOptions.LongRunning);

private void Iterate()
{
    bool marketDataUpdated = false;

    lock (ordersToRegisterLock)
    {
        if (ordersToRegister.Count > 0)
        {
            marketDataUpdated = true;
            while (ordersToRegister.Count > 0)
            {
                Order order = ordersToRegister.Dequeue();
                // Stage1, Process
            }
        }
    }

    lock (aggrUpdatesLock)
    {
        if (aggrUpdates.Count > 0)
        {
            marketDataUpdated = true;
            while (!aggrUpdates.IsNullOrEmpty())
            {
                var entry = aggrUpdates.Dequeue();
                // Stage1, Process
            }
        }
    }

    lock (commonUpdatesLock)
    {
        if (commonUpdates.Count > 0)
        {
            marketDataUpdated = true;
            while (!commonUpdates.IsNullOrEmpty())
            {
                var entry = commonUpdates.Dequeue();
                // Stage1, Process
            }
        }
    }

    lock (infoUpdatesLock)
    {
        if (infoUpdates.Count > 0)
        {
            marketDataUpdated = true;
            while (!infoUpdates.IsNullOrEmpty())
            {
                var entry = infoUpdates.Dequeue();
                // Stage1, Process
            }
        }
    }

    lock (tradeUpdatesLock)
    {
        if (tradeUpdates.Count > 0)
        {
            marketDataUpdated = true;
            while (!tradeUpdates.IsNullOrEmpty())
            {
                var entry = tradeUpdates.Dequeue();
                // Stage1, Process
            }    

        }
    }

    if (marketDataUpdated)
    {
        // Stage2 !
        // make a lot of work. expensive operation. recalculate strategies, place orders etc.
    }
}

private readonly Queue<Order> ordersToRegister = new Queue<Order>();
private readonly object ordersToRegisterLock = new object();

private readonly Queue<AggrEntry> aggrUpdates = new Queue<AggrEntry>();
private readonly object aggrUpdatesLock = new object();

private readonly Queue<CommonEntry> commonUpdates = new Queue<CommonEntry>();
private readonly object commonUpdatesLock = new object();

private readonly Queue<InfoEntry> infoUpdates = new Queue<InfoEntry>();
private readonly object infoUpdatesLock = new object();

private readonly Queue<TradeEntry> tradeUpdates = new Queue<TradeEntry>();
private readonly object tradeUpdatesLock = new object();


    public void RegistorOrder(object sender, Gate.RegisterOrderArgs e)
    {
        lock (ordersToRegisterLock)
        {
            ordersToRegister.Enqueue(e.order);
        }
    }

    public void TradeUpdated(object sender, Gate.TradeArgs e)
    {
        lock (tradeUpdatesLock)
        {
            foreach (var entry in e.entries)
            {
                tradeUpdates.Enqueue(entry);
            }
        }
    }

    public void InfoUpdated(object sender, Gate.InfoArgs e)
    {
        lock (infoUpdatesLock)
        {
            foreach (var entry in e.entries)
            {
                infoUpdates.Enqueue(entry);
            }
        }
    }

    public void CommonUpdated(object sender, Gate.CommonArgs e)
    {
        lock (commonUpdatesLock)
        {
            foreach (var entry in e.entries)
            {
                commonUpdates.Enqueue(entry);
            }
        }
    }

    public void AggrUpdated(object sender, Gate.AggrArgs e)
    {
        lock (aggrUpdatesLock)
        {
            foreach (var entry in e.entries)
            {
                aggrUpdates.Enqueue(entry);
            }
        }
    }

在我的代码中有两个阶段。 Stage1 是更新阶段,Stage2 是工作阶段。我需要尽可能快地在这两个阶段之间切换,就像这样:

  • 有更新吗?没有
  • 有更新吗?没有
  • 有更新吗?是的,订单已更新!应用更新,执行 Stage2
  • 有更新吗?没有
  • 有更新吗?是的,订单需要注册!应用更新,执行 Stage2
  • 有更新吗?是的,交易发生,应用更新,执行Stage2

Stage2 中,我不应该更新,但应该继续“收集”更新,以便以后应用它们。

重要的是 - 这是对延迟非常关键的代码,所以我同意“花费”一个核心来实现最小延迟!因此,当发生任何更新时,我需要尽快处理它并执行Stage2

所以我希望现在我需要实现的目标很清楚,而且我是如何实现的也很清楚。现在是时候讨论我的代码有多好了。我确实看到了几个潜在的问题:

  • 很多锁!它可以用一些“无锁”代码代替吗?用 CAS 之类的自旋锁?
  • 占用 100% 的 CPU 核心,我可以节省一些 CPU 资源而不影响延迟吗?
  • 我可以/应该告诉 .NET 使用“专用”核心(设置任务关联性吗?)以避免额外的“切换”?
  • 我从一个线程添加到队列,并从另一个线程读取队列。这会是个问题吗?如果添加和读取队列是不稳定的?我的读取线程是否可能因为缓存更新问题而看不到来自队列的更新?

欢迎提出任何改进我所写内容的建议,谢谢!

upd 部分解决了——据我所知,我最好将查询替换为无锁(可能是基于环形缓冲区?)查询。我想我稍后会使用 C++ 版本的 disruptor。我也用过这篇文章 http://www.umbraworks.net/bl0g/rebuildall/2010/03/08/Running_NET_threads_on_selected_processor_cores并将 Task 替换为运行在“固定”核心上的 Thread,但我仍在使用“busy-spin”,也许我应该使用更智能的东西?

最佳答案

使用下面的代码,您在“第 1 阶段”处理期间不再被锁定:

Task.Factory.StartNew(() =>
{
    while (true)
    {
        Iterate();
    }
}, TaskCreationOptions.LongRunning);


private void Iterate()
{
    bool marketDataUpdated = false;

    foreach (Order order in ordersToRegister)
    {
        marketDataUpdated = true;
        // Stage1, Process
    }

    foreach (var entry in aggrUpdates)
    {
        marketDataUpdated = true;
        // Stage1, Process
    }

    foreach (var entry in commonUpdates)
    {
        marketDataUpdated = true;
        // Stage1, Process
    }

    foreach (var entry in infoUpdates)
    {
        marketDataUpdated = true;
        // Stage1, Process
    }

    foreach (var entry in tradeUpdates)
    {
        marketDataUpdated = true;
        // Stage1, Process
    }

    if (marketDataUpdated)
    {
        // Stage2 !
        // make a lot of work. expensive operation. recalculate strategies, place orders etc.
    }
}

private readonly ConcurrentQueue<Order> ordersToRegister = new ConcurrentQueue<Order>();

private readonly ConcurrentQueue<AggrEntry> aggrUpdates = new ConcurrentQueue<AggrEntry>();

private readonly ConcurrentQueue<CommonEntry> commonUpdates = new ConcurrentQueue<CommonEntry>();

private readonly ConcurrentQueue<InfoEntry> infoUpdates = new ConcurrentQueue<InfoEntry>();

private readonly ConcurrentQueue<TradeEntry> tradeUpdates = new ConcurrentQueue<TradeEntry>();

    public void RegistorOrder(object sender, Gate.RegisterOrderArgs e)
    {
        ordersToRegister.Enqueue(e.order);
    }

    public void TradeUpdated(object sender, Gate.TradeArgs e)
    {
        foreach (var entry in e.entries)
        {
            tradeUpdates.Enqueue(entry);
        }
    }

    public void InfoUpdated(object sender, Gate.InfoArgs e)
    {
        foreach (var entry in e.entries)
        {
            infoUpdates.Enqueue(entry);
        }
    }

    public void CommonUpdated(object sender, Gate.CommonArgs e)
    {
        foreach (var entry in e.entries)
        {
            commonUpdates.Enqueue(entry);
        }
    }

    public void AggrUpdated(object sender, Gate.AggrArgs e)
    {
        foreach (var entry in e.entries)
        {
            aggrUpdates.Enqueue(entry);
        }
    }

关于c# - 将包含大量 "locks"的代码重构为更多无锁代码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14919751/

相关文章:

c# - ASP.NET C# 代码中的互斥释放问题

java - 临时文件、单实例锁和进程终止 (Java)

c# - 如何在 TPL 中检查变量是否已在其他线程中更改

c# - 未显式传递时通用类型的值?

c# - Resharper 重构以删除魔术字符串

c# - 重命名类 Visual Studio 2017

perl - 如何在 Perl 中测试对资源(缓存)的并发访问?

.net - 使用 TPL 时如何管理线程本地存储 (TLS)?

c# - 使用 Tasks (TPL) 库会使应用程序多线程吗?

c# - 如何在 c# 中的 switch case 中使用枚举组合?