我将 Reactive Extensions 与 async/await 结合使用来简化我的套接字协议(protocol)实现。当特定消息到达时,必须执行一些操作(例如,向每个“ping”消息发送“pong”),还有一些方法是我们必须异步等待某些特定响应。以下示例说明了这一点:
private Subject<string> MessageReceived = new Subject<string>();
//this method gets called every time a message is received from socket
internal void OnReceiveMessage(string message)
{
MessageReceived.OnNext(message);
ProcessMessage(message);
}
public async Task<string> TestMethod()
{
var expectedMessage = MessageReceived.Where(x => x.EndsWith("D") && x.EndsWith("F")).FirstOrDefaultAsync();
await SendMessage("ABC");
//some code...
//if response we are waiting for comes before next row, we miss it
return await expectedMessage;
}
TestMethod() 将“ABC”发送到套接字并在收到例如“DEF”时继续(在此之前可能还有一些其他消息)。
这几乎可以工作,但存在竞争条件。这段代码似乎在 return await expectedMessage;
之前不会监听消息,这是一个问题,因为有时消息会在此之前到达。
最佳答案
FirstOrDefaultAsync
不会在这里很好地工作:它不会订阅直到 await
行,这使您处于竞争状态(如您所指出的)。替换它的方法如下:
var expectedMessage = MessageReceived
.Where(x => x.EndsWith("D") && x.EndsWith("F"))
.Take(1)
.Replay(1)
.RefCount();
using (var dummySubscription = expectedMessage.Subscribe(i => {}))
{
await SendMessage("ABC");
//Some code... goes here.
return await expectedMessage;
}
.Replay(1)
确保新订阅获得最新条目(假设存在)。它只有在有订阅者收听时才有效,因此 dummySubscription
。
关于c# - 使用 async/await、RX 和 LINQ 进行异步消息处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43076510/