c# - 带有 UDPClient.EndReceive 和 ref 远程端点参数的 Observable.FromAsyncPattern

标签 c# asynchronous system.reactive

我正在学习 Reactive 扩展并尝试重构我的一些代码。

UDPClient.EndReceive 接受一个 ref IPEndPoint 参数,所以我目前有这个工作:

UdpClient receiverUDP = new UdpClient();
receiverUDP.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
receiverUDP.EnableBroadcast = true;
receiverUDP.Client.ExclusiveAddressUse = false;
receiverUDP.Client.Bind(new IPEndPoint(IPAddress.Any, 1234));

IPEndPoint ep = null;
var async = Observable.FromAsyncPattern<byte[]>(receiverUDP.BeginReceive, (i) => receiverUDP.EndReceive(i, ref ep));
var subscr = async().Subscribe(x => Console.WriteLine(ASCIIEncoding.ASCII.GetString(x)));

如果我的订阅者需要访问远程 IPEndPoint 怎么办?在我当前的版本中,我正在使用事件,并传回一个包含 byte[]IPEndPoint 的自定义类。我这辈子都想不出如何用 Rx 做到这一点。

最佳答案

如果您已经为 byte[]IPEndPoint 创建了包装类,为什么不使用 Select 将其作为序列返回:

private IObservable<RemoteData> GetRemoteDataAsync()
{
    return Observable.Defer(() => 
    {
        UdpClient receiverUDP = new UdpClient();
        receiverUDP.Client.SetSocketOption(SocketOptionLevel.Socket, 
            SocketOptionName.ReuseAddress, true);
        receiverUDP.EnableBroadcast = true;
        receiverUDP.Client.ExclusiveAddressUse = false;
        receiverUDP.Client.Bind(new IPEndPoint(IPAddress.Any, 1234));

        IPEndPoint ep = null;
        return Observable.FromAsyncPattern<byte[]>(
                   receiverUDP.BeginReceive, 
                   (i) => receiverUDP.EndReceive(i, ref ep)
               )()
               .Select(bytes => new RemoteData(bytes, ep));
    });
}

关于c# - 带有 UDPClient.EndReceive 和 ref 远程端点参数的 Observable.FromAsyncPattern,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4433287/

相关文章:

c# - RxUI ObservableAsPropertyHelper 不适用于 XAML 绑定(bind)

c# - 所有验证器控件的回发事件处理程序和 Validate 方法的顺序

C# 堆栈溢出

c# - 运行异步 lambda 时,使用 Task.Run(func) 还是 new Func<Task>(func)()?

c# - 如何动态更新/添加项目到 IObservable<int> 中?

c# - ASP.NET Core - 从类库查看

ios - 在 Firebase 中,我可以从 .observeSingleEventOfType() 获取已完成执行其代码块的信号吗?

Node.js 异步并行 - 后果是什么?

system.reactive - 处理事件日志中的事件并对特定模式使用react(Rx?)

c# - 使用IObservable实现异步方法