c# - 使用 async/await、RX 和 LINQ 进行异步消息处理

标签 c# .net linq async-await system.reactive

我将 Reactive Extensions 与 async/await 结合使用来简化我的套接字协议(protocol)实现。当特定消息到达时,必须执行一些操作(例如,向每个“ping”消息发送“pong”),还有一些方法是我们必须异步等待某些特定响应。以下示例说明了这一点:

private Subject<string> MessageReceived = new Subject<string>();

//this method gets called every time a message is received from socket
internal void OnReceiveMessage(string message)
{
    MessageReceived.OnNext(message);
    ProcessMessage(message);
}

public async Task<string> TestMethod()
{
    var expectedMessage = MessageReceived.Where(x => x.EndsWith("D") && x.EndsWith("F")).FirstOrDefaultAsync();
    await SendMessage("ABC");

    //some code...

    //if response we are waiting for comes before next row, we miss it
    return await expectedMessage;
}

TestMethod() 将“ABC”发送到套接字并在收到例如“DEF”时继续(在此之前可能还有一些其他消息)。

这几乎可以工作,但存在竞争条件。这段代码似乎在 return await expectedMessage; 之前不会监听消息,这是一个问题,因为有时消息会在此之前到达。

最佳答案

FirstOrDefaultAsync 不会在这里很好地工作:它不会订阅直到 await 行,这使您处于竞争状态(如您所指出的)。替换它的方法如下:

    var expectedMessage = MessageReceived
        .Where(x => x.EndsWith("D") && x.EndsWith("F"))
        .Take(1)
        .Replay(1)
        .RefCount();

    using (var dummySubscription = expectedMessage.Subscribe(i => {}))
    {
        await SendMessage("ABC");

        //Some code... goes here.

        return await expectedMessage;
    }

.Replay(1) 确保新订阅获得最新条目(假设存在)。它只有在有订阅者收听时才有效,因此 dummySubscription

关于c# - 使用 async/await、RX 和 LINQ 进行异步消息处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43076510/

相关文章:

c# - 如何将 quickbook 桌面应用程序与网站集成

.net - 装配中每个 NetModule 的 PE header

.net - RSACryptoServiceProvider.ExportCspBlob 数据格式

c# - 如何将数据库表转换为 "pivot"DataTable?

c# - 检查 C# 代码混淆的效率

c# - 获取 "cannot insert NULL into ("架构"."table"."column") 当列不为空时

c# - 如何查看 xamarin 形式的两个引脚之间的距离并将距离作为字符串获取?

c# - 在 MySQL 中使用子类型时遇到问题

c# - 现有的类似于Parallel.For的LINQ扩展方法?

c# - LINQ - 删除 List<T> 中以任何顺序或位置包含数组中的一项的项目