c# - ZeroMQ PUB/SUB 模式与多线程轮询取消

标签 c# c++ multithreading zeromq netmq

我有两个应用程序,一个 C++ 服务器和一个 C# WPF UI。 C++ 代码通过 ZeroMQ 消息传递 [PUB/SUB] 服务接受请求(来自任何地方/任何人)。我使用我的 C# 代码进行回溯测试并创建“回溯测试”并执行它们。这些回溯测试可以由许多“单元测试”组成,每个测试都从 C++ 服务器发送/接收数千条消息。

目前单个回溯测试运行良好,可以发送 N 个单元测试,每个测试包含数千个请求和捕获。我的问题是建筑;当我发送另一个回测(在第一个测试之后)时,由于轮询线程没有被取消和处置,我遇到了第二次完成事件订阅的问题。这会导致错误的输出。这似乎是一个微不足道的问题(也许对你们中的一些人来说),但是在我当前的配置下取消这个轮询任务是很麻烦的。一些代码...

我的消息代理类很简单,看起来像

public class MessageBroker : IMessageBroker<Taurus.FeedMux>, IDisposable
{
    private Task pollingTask;
    private NetMQContext context;
    private PublisherSocket pubSocket;

    private CancellationTokenSource source;
    private CancellationToken token;
    private ManualResetEvent pollerCancelled;

    public MessageBroker()
    {
        this.source = new CancellationTokenSource();
        this.token = source.Token;

        StartPolling();
        context = NetMQContext.Create();
        pubSocket = context.CreatePublisherSocket();
        pubSocket.Connect(PublisherAddress);
    }

    public void Dispatch(Taurus.FeedMux message)
    {
        pubSocket.Send(message.ToByteArray<Taurus.FeedMux>());
    }

    private void StartPolling()
    {
        pollerCancelled = new ManualResetEvent(false);
        pollingTask = Task.Run(() =>
        {
            try
            {
                using (var context = NetMQContext.Create())
                using (var subSocket = context.CreateSubscriberSocket())
                {
                    byte[] buffer = null;
                    subSocket.Options.ReceiveHighWatermark = 1000;
                    subSocket.Connect(SubscriberAddress);
                    subSocket.Subscribe(String.Empty);
                    while (true)
                    {
                        buffer = subSocket.Receive();
                        MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
                        if (this.token.IsCancellationRequested)
                            this.token.ThrowIfCancellationRequested();
                    }
                }
            }
            catch (OperationCanceledException)
            {
                pollerCancelled.Set();
            }
        }, this.token);
    }

    private void CancelPolling()
    {
        source.Cancel();
        pollerCancelled.WaitOne();
        pollerCancelled.Close();
    }

    public IProgress<Taurus.FeedMux> MessageRecieved { get; set; }
    public string PublisherAddress { get { return "tcp://127.X.X.X:6500"; } }
    public string SubscriberAddress { get { return "tcp://127.X.X.X:6501"; } }

    private bool disposed = false;

    protected virtual void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                if (this.pollingTask != null)
                {
                    CancelPolling();
                    if (this.pollingTask.Status == TaskStatus.RanToCompletion ||
                         this.pollingTask.Status == TaskStatus.Faulted ||
                         this.pollingTask.Status == TaskStatus.Canceled)
                    {
                        this.pollingTask.Dispose();
                        this.pollingTask = null;
                    }
                }
                if (this.context != null)
                {
                    this.context.Dispose();
                    this.context = null;
                }
                if (this.pubSocket != null)
                {
                    this.pubSocket.Dispose();
                    this.pubSocket = null;
                }
                if (this.source != null)
                {
                  this.source.Dispose();
                  this.source = null;
                }
            }
            disposed = true;
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    ~MessageBroker()
    {
        Dispose(false);
    }
}

回测“引擎”用于执行每个回测,首先构造一个 Dictionary,其中包含每个 Test(单元测试)和要分派(dispatch)给 C++ 应用程序的消息每次测试。

DispatchTests方法,这里是

private void DispatchTests(ConcurrentDictionary<Test, List<Taurus.FeedMux>> feedMuxCollection)
{
    broker = new MessageBroker();
    broker.MessageRecieved = new Progress<Taurus.FeedMux>(OnMessageRecieved);
    testCompleted = new ManualResetEvent(false);

    try
    {
        // Loop through the tests. 
        foreach (var kvp in feedMuxCollection)
        {
            testCompleted.Reset();
            Test t = kvp.Key;
            t.Bets = new List<Taurus.Bet>();
            foreach (Taurus.FeedMux mux in kvp.Value)
            {
                token.ThrowIfCancellationRequested();
                broker.Dispatch(mux);
            }
            broker.Dispatch(new Taurus.FeedMux()
            {
                type = Taurus.FeedMux.Type.PING,
                ping = new Taurus.Ping() { event_id = t.EventID }
            });
            testCompleted.WaitOne(); // Wait until all messages are received for this test. 
        }
        testCompleted.Close();
    }
    finally
    {
        broker.Dispose(); // Dispose the broker.
    }
}

最后的PING消息,它告诉C++我们已经完成了。然后我们强制等待,以便在从 C++ 代码接收到所有返回之前不会分派(dispatch)下一个 [unit] 测试 - 我们使用 ManualResetEvent 执行此操作。

当 C++ 收到 PING 消息时,它会直接返回消息。我们通过 OnMessageRecieved 处理收到的消息,PING 告诉我们设置 ManualResetEvent.Set() 以便我们可以继续单元测试; “请下一个”...

private async void OnMessageRecieved(Taurus.FeedMux mux)
{
    string errorMsg = String.Empty;
    if (mux.type == Taurus.FeedMux.Type.MSG)
    {
        // Do stuff.
    }
    else if (mux.type == Taurus.FeedMux.Type.PING)
    {
        // Do stuff.

        // We are finished reciving messages for this "unit test"
        testCompleted.Set(); 
    }
}

我的问题是,上面 finally 中的 broker.Dispose() 永远不会被命中。我很欣赏在后台线程上执行的 finally block 不能保证被执行

上面划掉的文字是因为我弄乱了代码;在 child 完成之前,我正在停止父线程。不过还是有问题……

现在 broker.Dispose() 被正确调用,并且 broker.Dispose() 被调用,在这个方法中我尝试取消轮询线程并处理Task 正确避免任何多次订阅。

要取消线程,我使用 CancelPolling() 方法

private void CancelPolling()
{
    source.Cancel();
    pollerCancelled.WaitOne(); <- Blocks here waiting for cancellation.
    pollerCancelled.Close();
}

但是在StartPolling()方法中

while (true)
{
    buffer = subSocket.Receive();
    MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
    if (this.token.IsCancellationRequested)
        this.token.ThrowIfCancellationRequested();
}

ThrowIfCancellationRequested() 永远不会被调用,线程永远不会被取消,因此永远不会被正确处理。轮询线程被 subSocket.Receive() 方法阻塞。

现在,我不清楚如何实现我想要的,我需要在其他线程上调用 broker.Dispose()/PollerCancel()比用于轮询消息和一些强制取消的方式。线程中止不是我想不惜一切代价进入的。

本质上,我想在执行下一个回测之前正确处理 broker,我该如何正确处理这个问题,拆分轮询并在单独的应用程序域中运行它?

我已经尝试过,在 OnMessageRecived 处理程序中进行处理,但这显然是在与轮询器相同的线程上执行的,并且不是这样做的方法,在不调用其他线程的情况下,它会阻塞。

实现我想要的最佳方式是什么以及我可以遵循这种情况的模式吗?

感谢您的宝贵时间。

最佳答案

这就是我最终解决这个问题的方法 [尽管我愿意接受更好的解决方案!]

public class FeedMuxMessageBroker : IMessageBroker<Taurus.FeedMux>, IDisposable
{
    // Vars.
    private NetMQContext context;
    private PublisherSocket pubSocket;
    private Poller poller;

    private CancellationTokenSource source;
    private CancellationToken token;
    private ManualResetEvent pollerCancelled;

    /// <summary>
    /// Default ctor.
    /// </summary>
    public FeedMuxMessageBroker()
    {
        context = NetMQContext.Create();

        pubSocket = context.CreatePublisherSocket();
        pubSocket.Connect(PublisherAddress);

        pollerCancelled = new ManualResetEvent(false);
        source = new CancellationTokenSource();
        token = source.Token;
        StartPolling();
    }

    #region Methods.
    /// <summary>
    /// Send the mux message to listners.
    /// </summary>
    /// <param name="message">The message to dispatch.</param>
    public void Dispatch(Taurus.FeedMux message)
    {
        pubSocket.Send(message.ToByteArray<Taurus.FeedMux>());
    }

    /// <summary>
    /// Start polling for messages.
    /// </summary>
    private void StartPolling()
    {
        Task.Run(() =>
            {
                using (var subSocket = context.CreateSubscriberSocket())
                {
                    byte[] buffer = null;
                    subSocket.Options.ReceiveHighWatermark = 1000;
                    subSocket.Connect(SubscriberAddress);
                    subSocket.Subscribe(String.Empty);
                    subSocket.ReceiveReady += (s, a) =>
                    {
                        buffer = subSocket.Receive();
                        if (MessageRecieved != null)
                            MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
                    };

                    // Poll.
                    poller = new Poller();
                    poller.AddSocket(subSocket);
                    poller.PollTillCancelled();
                    token.ThrowIfCancellationRequested();
                }
            }, token).ContinueWith(ant => 
                {
                    pollerCancelled.Set();
                }, TaskContinuationOptions.OnlyOnCanceled);
    }

    /// <summary>
    /// Cancel polling to allow the broker to be disposed.
    /// </summary>
    private void CancelPolling()
    {
        source.Cancel();
        poller.Cancel();

        pollerCancelled.WaitOne();
        pollerCancelled.Close();
    }
    #endregion // Methods.

    #region Properties.
    /// <summary>
    /// Event that is raised when a message is recived. 
    /// </summary>
    public IProgress<Taurus.FeedMux> MessageRecieved { get; set; }

    /// <summary>
    /// The address to use for the publisher socket.
    /// </summary>
    public string PublisherAddress { get { return "tcp://127.0.0.1:6500"; } }

    /// <summary>
    /// The address to use for the subscriber socket.
    /// </summary>
    public string SubscriberAddress { get { return "tcp://127.0.0.1:6501"; } }
    #endregion // Properties.

    #region IDisposable Members.
    private bool disposed = false;

    /// <summary>
    /// Dispose managed resources.
    /// </summary>
    /// <param name="disposing">Is desposing.</param>
    protected virtual void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                CancelPolling();
                if (pubSocket != null)
                {
                    pubSocket.Disconnect(PublisherAddress);
                    pubSocket.Dispose();
                    pubSocket = null;
                }
                if (poller != null)
                {
                    poller.Dispose();
                    poller = null;
                }
                if (context != null)
                {
                    context.Terminate();
                    context.Dispose();
                    context = null;
                }
                if (source != null)
                {
                    source.Dispose();
                    source = null;
                }
            }

            // Shared cleanup logic.
            disposed = true;
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    /// <summary>
    /// Finalizer.
    /// </summary>
    ~FeedMuxMessageBroker()
    {
        Dispose(false);
    }
    #endregion // IDisposable Members.
}

所以我们以相同的方式轮询,但使用 NetMQ 中的 Poller 类。在我们设置的任务延续中,我们确定 PollerTask 都被取消了。然后我们就可以安全地处置了……

关于c# - ZeroMQ PUB/SUB 模式与多线程轮询取消,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29977305/

相关文章:

c++ - 如果在纯 c++ 中是静态的?

c++ - 调整数组大小函数不会改变内存位置

python - (Python) 使用原始输入停止线程?

C++ 将 int 添加到 int 数组

c# - 在 Entity Framework 中排除更新属性

c# - CommandParameter 始终为空 wpf MVVM

c# - 无法从 Func<T> 分配给 Func<Interface>

python - 间歇性 Python 线程错误, "main thread is not in main loop"

c - 为什么C程序中调用线程数比执行线程数多?

c# - 在文本框中插入制表符