c# - 处理后流式传输 IConnectableObservable

标签 c# c#-4.0 system.reactive reactive-programming

我正在尝试编写一个方法,该方法接受 IConnectableObservable,对其进行一些处理并返回一个新的 IConnectableObservable,该新的 IConnectableObservable 流处理后的数据和一些附加项目。流式传输的序列是有限的,但它有副作用,因此只需运行一次。但是,我试图用它做两件事:

  1. 使用 Select 查询转换流中的每个元素。
  2. 将流中的每个元素收集到一个数组中,并对数组进行一些处理并传输结果。

下面是我对此的最佳尝试,但我觉得可能有一种我还没有想到的更好的方法。

protected override IConnectableObservable<ITestResult<ITestOutput<double, double>, ITestLimit<double>>> ProcessOutput(
    IConnectableObservable<ITestOutput<double, double>> output, InputVerificationTestCase testCase)
{
    var obsResults = output.Select(o =>
    {
        var limits = GenerateDcAccuracyLimits.CalculateAbsoluteLimits(o.Input, testCase.FullScaleRange, testCase.Limits);
        return new TestResult<ITestOutput<double, double>, ITestLimit<double>>
        {
            Component = "Variable Gain Module Input",
            Description = "Measurement Accuracy",
            Limits = limits,
            Output = o,
            Passed = _validationService.Validate(o.Result, limits)
        };
    });

    var observable = Observable.Create<ITestResult<ITestOutput<double, double>, ITestLimit<double>>>(obs =>
    {
        var resultTask = obsResults.ForEachAsync(obs.OnNext);
        var fitTask = output.ToArray().ForEachAsync(arr =>
        {
            resultTask.Wait();
            var fit = ComputeErrorFit(arr, testCase);
            obs.OnNext(GetGainErrorResult(fit.Item2, testCase));
        });
        output.Connect();
        Task.WaitAll(resultTask, fitTask);
        obs.OnCompleted();
        return Disposable.Empty;
    });

    return observable.Publish();
}

编辑于 2015 年 10 月 7 日:

这是其余的代码:

private ITestResult<ITestOutput<double, double>, ITestLimit<double>> GetGainErrorResult(double gainError, InputVerificationTestCase testCase)
{
    var gainErrorLimit = GenerateDcAccuracyLimits.CalculateGainErrorLimits(testCase.FullScaleRange, testCase.Limits);
    return new TestResult<ITestOutput<double, double>, ITestLimit<double>>
    {
        Component = "Variable Gain Module Input",
        Description = "Gain Error",
        Passed = _validationService.Validate(gainError, gainErrorLimit),
        Output = new TestOutput<double, double> { Input = 0, Result = gainError },
        Limits = gainErrorLimit
    };
}

private Tuple<double, double> ComputeErrorFit(ITestOutput<double, double>[] outputs, InputChannelTestCase testCase)
{
    var input = outputs.Select(o => o.Input);
    var vErr = outputs.Select(o => o.Result - o.Input);
    return Fit.Line(input.ToArray(), vErr.ToArray());
}

还在抽象基类中,我有以下内容:

public IConnectableObservable<TOutput> RunSingleChannel(TCase testCase)
{
    dut.Acquisition.SampleRate.Value = SampleRate;
    dut.AnalogChannels[testCase.Channel].InputRange.Value = testCase.FullScaleRange;
    var testOutput = CreateTestProcedure(testCase.Channel).RunAsync(testCase.InputVoltages);
    return ProcessOutput(testOutput.Replay(), testCase);
}

protected abstract IConnectableObservable<TOutput> ProcessOutput(IConnectableObservable<ITestOutput<double, TAcq>> output, TCase testCase);

最佳答案

看来您要使用 Rx 来以困难的方式做事。你确实需要避免将任务与可观察对象混合在一起。它们使您的代码难以推理,并且经常导致死锁和其他并发问题。

你应该尝试这样的事情:

protected override IConnectableObservable<ITestResult<ITestOutput<double, double>, ITestLimit<double>>> ProcessOutput(
    IConnectableObservable<ITestOutput<double, double>> output, InputVerificationTestCase testCase)
{
    var source = output.RefCount();
    return
        source
            .Select(o =>
            {
                var limits = GenerateDcAccuracyLimits.CalculateAbsoluteLimits(o.Input, testCase.FullScaleRange, testCase.Limits);
                return new TestResult<ITestOutput<double, double>, ITestLimit<double>>
                {
                    Component = "Variable Gain Module Input",
                    Description = "Measurement Accuracy",
                    Limits = limits,
                    Output = o,
                    Passed = _validationService.Validate(o.Result, limits)
                };
            })
            .Merge(
                source
                    .ToArray()
                    .Select(arr => GetGainErrorResult(ComputeErrorFit(arr, testCase).Item2, testCase)))
            .Publish();
}

您使用可连接的可观察量有点奇怪,但上面的内容应该大致满足您的需要。

我已经使用此示例测试了代码:

public IConnectableObservable<int> ProcessOutput(IConnectableObservable<int> output)
{
    var source = output.RefCount();
    return
        source
            .Merge(source.ToArray().Select(arr => arr.Sum()))
            .Publish();
}

void Main()
{
    var output = Observable.Range(1, 10).Publish();

    var processed = ProcessOutput(output);

    processed.Subscribe(x => Console.WriteLine(x));

    processed.Connect();
}

哪些输出:

1
2
3
4
5
6
7
8
9
10
55

我还检查了原始可观察值仅生成一次。

关于c# - 处理后流式传输 IConnectableObservable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32983954/

相关文章:

c# - ScrollTo 无法在 Xamarin.Forms 中使用分组的 ListView

c# - .Net Core 中每个启动设置的预处理器指令

c# - SQLCLR 数据库名称每次更改项目时都会添加 _1

c# - DateTime.ToBinary() 和 DateTime.ToFileTime() 有何不同?

.net - 使用 Reactive Extensions (Rx) 创建 REST 客户端 API

c# - 保护匿名 Web API 请求的方法

c#-4.0 - 如何扩展 MEF 以根据作为属性提供的工厂类型创建对象?

c# - 在字典内部的字典中添加键值对

c# - 以最大并行度将 IEnumerable<T> 转换为 IObservable<T>

system.reactive - 为什么 RX 中的主题称为 "Subject"?