c# - 将 IObservable<T> 转换为另一个根据输入序列的元素出错或完成的序列

标签 c# observable system.reactive rx.net

我正在实现一个进程间消息传递系统,例如客户端可以向服务器请求某些数据。服务器需要能够以部分响应的形式发回其回复,并且还需要能够在发生任何异常时通知客户端。

目前,我通过 3 种消息类型执行此操作:

class PartialResponse : ResponseMessage { ... }
class ResponseError : ResponseMessage { ... }
class ResponseComplete : ResponseMessage { ... ]

例如客户端请求数据,服务器发回 0-N PartialResponse 消息,后跟 ResponseError 或 ResponseComplete。

我正在使用的库(以 NetMQ 作为其传输层的 Obvs)会将所有可能的消息流公开为一个

IObservable<ResponseMessage>

虽然这个可观察的流永远不会完成,而且我相信也不会出错(除非可能出现一些 Obvs/NetMQ 内部异常等)。

我想将其转换为 IObservable,它在原始流推送 ResponseComplete 消息时完成,并在遇到 ResponseError 消息或输入流中的实际错误时出错。例如。像这样的东西:

IObservable<PartialResponse> transform(IObservable<ResponseMessage> input)
{
  var subject = new Subject<PartialResponse>();
  input.Subscribe(
    x =>
    {
      if(x is PartialResponse r)
        subject.OnNext(r);
      else if(x is ResponseComplete)
        subject.OnCompleted();
      else if(x is ResponseError err)
        subject.OnError(new Exception(err?.ToString()));
      else
        throw new InvalidOperationException();
    },
    ex =>
    {
      subject.OnError(ex);
    }
  );

  return subject;
}

这段代码实际上应该可以工作,但可能非常糟糕 - 尤其是因为它直接订阅了输入可观察序列。

有没有更好/更干净的方法来转换可观察序列?

最佳答案

这是@Enigmativity 的充实回答:

var input = new Subject<ResponseMessage>();

var partialResponseObservable = input
    .Select(msg => 
        (msg is PartialResponse r) 
        ? Notification.CreateOnNext(r)
        : (msg is ResponseComplete) 
            ? Notification.CreateOnCompleted<PartialResponse>()
            : (msg is ResponseError err)
                ? Notification.CreateOnError<PartialResponse>(new Exception(err?.ToString()))
                : throw new InvalidOperationException()
    )
    .Dematerialize();

或类型匹配(可能读起来更好):

var partialResponseObservable = input
    .Select(msg =>
    {
        switch(msg)
        {
            case PartialResponse r:
                return Notification.CreateOnNext(r);
            case ResponseComplete rc:
                return Notification.CreateOnCompleted<PartialResponse>();
            case ResponseError err:
                return Notification.CreateOnError<PartialResponse>(new Exception(err?.ToString()));
            default:
                throw new InvalidOperationException();
        }
    })
    .Dematerialize();

关于c# - 将 IObservable<T> 转换为另一个根据输入序列的元素出错或完成的序列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48277950/

相关文章:

c# - 我可以通过匹配键将两个可观察序列配对——使用重复键吗?

c# - 如何通过对原始序列的值运行任务来创建 Rx 序列?

c# - 测试 ReactiveCommand 和 ReactiveObject ViewModel

c# - 如何判断当前应用是否为Medium Trust

c# 格式保留整数加密

javascript - RxJS Observable.fromEvent 链为每个订阅者触发

angular - 在等待 RxJS observable 时在 Angular 中显示加载指示器

java - 如何通知第二个线程变量的更改

c# - WCF SOAP 调用 HTTP header 的内容类型中缺少操作

c# - 如何访问 EPPlus 中的工作表?