我有两个应用程序,一个 C++ 服务器和一个 C# WPF UI。 C++ 代码通过 ZeroMQ 消息传递 [PUB/SUB] 服务接受请求(来自任何地方/任何人)。我使用我的 C# 代码进行回溯测试并创建“回溯测试”并执行它们。这些回溯测试可以由许多“单元测试”组成,每个测试都从 C++ 服务器发送/接收数千条消息。
目前单个回溯测试运行良好,可以发送 N 个单元测试,每个测试包含数千个请求和捕获。我的问题是建筑;当我发送另一个回测(在第一个测试之后)时,由于轮询线程没有被取消和处置,我遇到了第二次完成事件订阅的问题。这会导致错误的输出。这似乎是一个微不足道的问题(也许对你们中的一些人来说),但是在我当前的配置下取消这个轮询任务是很麻烦的。一些代码...
我的消息代理类很简单,看起来像
public class MessageBroker : IMessageBroker<Taurus.FeedMux>, IDisposable
{
private Task pollingTask;
private NetMQContext context;
private PublisherSocket pubSocket;
private CancellationTokenSource source;
private CancellationToken token;
private ManualResetEvent pollerCancelled;
public MessageBroker()
{
this.source = new CancellationTokenSource();
this.token = source.Token;
StartPolling();
context = NetMQContext.Create();
pubSocket = context.CreatePublisherSocket();
pubSocket.Connect(PublisherAddress);
}
public void Dispatch(Taurus.FeedMux message)
{
pubSocket.Send(message.ToByteArray<Taurus.FeedMux>());
}
private void StartPolling()
{
pollerCancelled = new ManualResetEvent(false);
pollingTask = Task.Run(() =>
{
try
{
using (var context = NetMQContext.Create())
using (var subSocket = context.CreateSubscriberSocket())
{
byte[] buffer = null;
subSocket.Options.ReceiveHighWatermark = 1000;
subSocket.Connect(SubscriberAddress);
subSocket.Subscribe(String.Empty);
while (true)
{
buffer = subSocket.Receive();
MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
if (this.token.IsCancellationRequested)
this.token.ThrowIfCancellationRequested();
}
}
}
catch (OperationCanceledException)
{
pollerCancelled.Set();
}
}, this.token);
}
private void CancelPolling()
{
source.Cancel();
pollerCancelled.WaitOne();
pollerCancelled.Close();
}
public IProgress<Taurus.FeedMux> MessageRecieved { get; set; }
public string PublisherAddress { get { return "tcp://127.X.X.X:6500"; } }
public string SubscriberAddress { get { return "tcp://127.X.X.X:6501"; } }
private bool disposed = false;
protected virtual void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
if (this.pollingTask != null)
{
CancelPolling();
if (this.pollingTask.Status == TaskStatus.RanToCompletion ||
this.pollingTask.Status == TaskStatus.Faulted ||
this.pollingTask.Status == TaskStatus.Canceled)
{
this.pollingTask.Dispose();
this.pollingTask = null;
}
}
if (this.context != null)
{
this.context.Dispose();
this.context = null;
}
if (this.pubSocket != null)
{
this.pubSocket.Dispose();
this.pubSocket = null;
}
if (this.source != null)
{
this.source.Dispose();
this.source = null;
}
}
disposed = true;
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
~MessageBroker()
{
Dispose(false);
}
}
回测“引擎”用于执行每个回测,首先构造一个 Dictionary
,其中包含每个 Test
(单元测试)和要分派(dispatch)给 C++ 应用程序的消息每次测试。
DispatchTests
方法,这里是
private void DispatchTests(ConcurrentDictionary<Test, List<Taurus.FeedMux>> feedMuxCollection)
{
broker = new MessageBroker();
broker.MessageRecieved = new Progress<Taurus.FeedMux>(OnMessageRecieved);
testCompleted = new ManualResetEvent(false);
try
{
// Loop through the tests.
foreach (var kvp in feedMuxCollection)
{
testCompleted.Reset();
Test t = kvp.Key;
t.Bets = new List<Taurus.Bet>();
foreach (Taurus.FeedMux mux in kvp.Value)
{
token.ThrowIfCancellationRequested();
broker.Dispatch(mux);
}
broker.Dispatch(new Taurus.FeedMux()
{
type = Taurus.FeedMux.Type.PING,
ping = new Taurus.Ping() { event_id = t.EventID }
});
testCompleted.WaitOne(); // Wait until all messages are received for this test.
}
testCompleted.Close();
}
finally
{
broker.Dispose(); // Dispose the broker.
}
}
最后的PING
消息,它告诉C++我们已经完成了。然后我们强制等待,以便在从 C++ 代码接收到所有返回之前不会分派(dispatch)下一个 [unit] 测试 - 我们使用 ManualResetEvent
执行此操作。
当 C++ 收到 PING 消息时,它会直接返回消息。我们通过 OnMessageRecieved
处理收到的消息,PING 告诉我们设置 ManualResetEvent.Set()
以便我们可以继续单元测试; “请下一个”...
private async void OnMessageRecieved(Taurus.FeedMux mux)
{
string errorMsg = String.Empty;
if (mux.type == Taurus.FeedMux.Type.MSG)
{
// Do stuff.
}
else if (mux.type == Taurus.FeedMux.Type.PING)
{
// Do stuff.
// We are finished reciving messages for this "unit test"
testCompleted.Set();
}
}
我的问题是,上面 finally 中的 。broker.Dispose()
永远不会被命中。我很欣赏在后台线程上执行的 finally block 不能保证被执行
上面划掉的文字是因为我弄乱了代码;在 child 完成之前,我正在停止父线程。不过还是有问题……
现在 broker.Dispose()
被正确调用,并且 broker.Dispose()
被调用,在这个方法中我尝试取消轮询线程并处理Task
正确避免任何多次订阅。
要取消线程,我使用 CancelPolling()
方法
private void CancelPolling()
{
source.Cancel();
pollerCancelled.WaitOne(); <- Blocks here waiting for cancellation.
pollerCancelled.Close();
}
但是在StartPolling()
方法中
while (true)
{
buffer = subSocket.Receive();
MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
if (this.token.IsCancellationRequested)
this.token.ThrowIfCancellationRequested();
}
ThrowIfCancellationRequested()
永远不会被调用,线程永远不会被取消,因此永远不会被正确处理。轮询线程被 subSocket.Receive()
方法阻塞。
现在,我不清楚如何实现我想要的,我需要在其他线程上调用 broker.Dispose()
/PollerCancel()
比用于轮询消息和一些强制取消的方式。线程中止不是我想不惜一切代价进入的。
本质上,我想在执行下一个回测之前正确处理 broker
,我该如何正确处理这个问题,拆分轮询并在单独的应用程序域中运行它?
我已经尝试过,在 OnMessageRecived
处理程序中进行处理,但这显然是在与轮询器相同的线程上执行的,并且不是这样做的方法,在不调用其他线程的情况下,它会阻塞。
实现我想要的最佳方式是什么以及我可以遵循这种情况的模式吗?
感谢您的宝贵时间。
最佳答案
这就是我最终解决这个问题的方法 [尽管我愿意接受更好的解决方案!]
public class FeedMuxMessageBroker : IMessageBroker<Taurus.FeedMux>, IDisposable
{
// Vars.
private NetMQContext context;
private PublisherSocket pubSocket;
private Poller poller;
private CancellationTokenSource source;
private CancellationToken token;
private ManualResetEvent pollerCancelled;
/// <summary>
/// Default ctor.
/// </summary>
public FeedMuxMessageBroker()
{
context = NetMQContext.Create();
pubSocket = context.CreatePublisherSocket();
pubSocket.Connect(PublisherAddress);
pollerCancelled = new ManualResetEvent(false);
source = new CancellationTokenSource();
token = source.Token;
StartPolling();
}
#region Methods.
/// <summary>
/// Send the mux message to listners.
/// </summary>
/// <param name="message">The message to dispatch.</param>
public void Dispatch(Taurus.FeedMux message)
{
pubSocket.Send(message.ToByteArray<Taurus.FeedMux>());
}
/// <summary>
/// Start polling for messages.
/// </summary>
private void StartPolling()
{
Task.Run(() =>
{
using (var subSocket = context.CreateSubscriberSocket())
{
byte[] buffer = null;
subSocket.Options.ReceiveHighWatermark = 1000;
subSocket.Connect(SubscriberAddress);
subSocket.Subscribe(String.Empty);
subSocket.ReceiveReady += (s, a) =>
{
buffer = subSocket.Receive();
if (MessageRecieved != null)
MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
};
// Poll.
poller = new Poller();
poller.AddSocket(subSocket);
poller.PollTillCancelled();
token.ThrowIfCancellationRequested();
}
}, token).ContinueWith(ant =>
{
pollerCancelled.Set();
}, TaskContinuationOptions.OnlyOnCanceled);
}
/// <summary>
/// Cancel polling to allow the broker to be disposed.
/// </summary>
private void CancelPolling()
{
source.Cancel();
poller.Cancel();
pollerCancelled.WaitOne();
pollerCancelled.Close();
}
#endregion // Methods.
#region Properties.
/// <summary>
/// Event that is raised when a message is recived.
/// </summary>
public IProgress<Taurus.FeedMux> MessageRecieved { get; set; }
/// <summary>
/// The address to use for the publisher socket.
/// </summary>
public string PublisherAddress { get { return "tcp://127.0.0.1:6500"; } }
/// <summary>
/// The address to use for the subscriber socket.
/// </summary>
public string SubscriberAddress { get { return "tcp://127.0.0.1:6501"; } }
#endregion // Properties.
#region IDisposable Members.
private bool disposed = false;
/// <summary>
/// Dispose managed resources.
/// </summary>
/// <param name="disposing">Is desposing.</param>
protected virtual void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
CancelPolling();
if (pubSocket != null)
{
pubSocket.Disconnect(PublisherAddress);
pubSocket.Dispose();
pubSocket = null;
}
if (poller != null)
{
poller.Dispose();
poller = null;
}
if (context != null)
{
context.Terminate();
context.Dispose();
context = null;
}
if (source != null)
{
source.Dispose();
source = null;
}
}
// Shared cleanup logic.
disposed = true;
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Finalizer.
/// </summary>
~FeedMuxMessageBroker()
{
Dispose(false);
}
#endregion // IDisposable Members.
}
所以我们以相同的方式轮询,但使用 NetMQ 中的 Poller
类。在我们设置的任务延续中,我们确定 Poller
和 Task
都被取消了。然后我们就可以安全地处置了……
关于c# - ZeroMQ PUB/SUB 模式与多线程轮询取消,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29977305/