我是 rx 的新手,一直在使用 dot net 中的响应式(Reactive)扩展编写一些网络代码。我的问题是,当我通过我提供的 token 触发取消时,我使用异步函数创建的 tcpClients 的可观察对象没有像我预期的那样完成。这是我遇到问题的代码的简化版本:
public static class ListenerExtensions
{
public static IObservable<TcpClient> ToListenerObservable(
this IPEndPoint endpoint,
int backlog)
{
return new TcpListener(endpoint).ToListenerObservable(backlog);
}
public static IObservable<TcpClient> ToListenerObservable(
this TcpListener listener,
int backlog)
{
return Observable.Create<TcpClient>(async (observer, token) =>
{
listener.Start(backlog);
try
{
while (!token.IsCancellationRequested)
observer.OnNext(await Task.Run(() => listener.AcceptTcpClientAsync(), token));
//This never prints and onCompleted is never called.
Console.WriteLine("Completing..");
observer.OnCompleted();
}
catch (System.Exception error)
{
observer.OnError(error);
}
finally
{
//This is never executed and my progam exits without closing the listener.
Console.WriteLine("Stopping listener...");
listener.Stop();
}
});
}
}
class Program
{
static void Main(string[] args)
{
var home = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 2323);
var cancellation = new CancellationTokenSource();
home.ToListenerObservable(10)
.Subscribe(
onNext: c => Console.WriteLine($"{c.Client.RemoteEndPoint} connected"),
onError: e => Console.WriteLine($"Error: {e.Message}"),
onCompleted: () => Console.WriteLine("Complete"), // Never happens
token: cancellation.Token);
Console.WriteLine("Any key to cancel");
Console.ReadKey();
cancellation.Cancel();
Thread.Sleep(1000);
}
}
如果我运行它并连接到 localhost:2323,我可以看到我得到了一系列连接的 tcpClient。但是,如果我触发了 cancellationtoken 的取消,程序将退出而不关闭监听器并像我期望的那样发出 onCompleted 事件。我做错了什么?
最佳答案
尽量避免编写过多的 try
/catch
代码并避免使用取消标记总是好的。这是一种在不脱离标准 Rx 运算符的情况下做你正在做的事情的方法。请注意,我无法完全测试此代码,因此可能仍需要进行一些调整。
试试这个:
var query = Observable.Create<TcpClient>(o =>
{
var home = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 2323);
var listener = new TcpListener(home);
listener.Start();
return
Observable
.Defer(() => Observable.FromAsync(() => listener.AcceptTcpClientAsync()))
.Repeat()
.Subscribe(o);
});
var completer = new Subject<Unit>();
var subscription =
query
.TakeUntil(completer)
.Subscribe(
onNext: c => Console.WriteLine($"{c.Client.RemoteEndPoint} connected"),
onError: e => Console.WriteLine($"Error: {e.Message}"),
onCompleted: () => Console.WriteLine("Complete"));
Console.WriteLine("Enter to cancel");
Console.ReadLine();
completer.OnNext(Unit.Default);
Thread.Sleep(1000);
这里的关键是 completer
,它像取消 token 一样询问。它自然地完成订阅,这与 subscription.Dispose()
不同,后者无需 OnCompleted
调用即可完成。
关于c# - 取消使用异步函数创建的可观察对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44615388/