我正在尝试编写一个方法,该方法接受 IConnectableObservable,对其进行一些处理并返回一个新的 IConnectableObservable,该新的 IConnectableObservable 流处理后的数据和一些附加项目。流式传输的序列是有限的,但它有副作用,因此只需运行一次。但是,我试图用它做两件事:
- 使用 Select 查询转换流中的每个元素。
- 将流中的每个元素收集到一个数组中,并对数组进行一些处理并传输结果。
下面是我对此的最佳尝试,但我觉得可能有一种我还没有想到的更好的方法。
protected override IConnectableObservable<ITestResult<ITestOutput<double, double>, ITestLimit<double>>> ProcessOutput(
IConnectableObservable<ITestOutput<double, double>> output, InputVerificationTestCase testCase)
{
var obsResults = output.Select(o =>
{
var limits = GenerateDcAccuracyLimits.CalculateAbsoluteLimits(o.Input, testCase.FullScaleRange, testCase.Limits);
return new TestResult<ITestOutput<double, double>, ITestLimit<double>>
{
Component = "Variable Gain Module Input",
Description = "Measurement Accuracy",
Limits = limits,
Output = o,
Passed = _validationService.Validate(o.Result, limits)
};
});
var observable = Observable.Create<ITestResult<ITestOutput<double, double>, ITestLimit<double>>>(obs =>
{
var resultTask = obsResults.ForEachAsync(obs.OnNext);
var fitTask = output.ToArray().ForEachAsync(arr =>
{
resultTask.Wait();
var fit = ComputeErrorFit(arr, testCase);
obs.OnNext(GetGainErrorResult(fit.Item2, testCase));
});
output.Connect();
Task.WaitAll(resultTask, fitTask);
obs.OnCompleted();
return Disposable.Empty;
});
return observable.Publish();
}
编辑于 2015 年 10 月 7 日:
这是其余的代码:
private ITestResult<ITestOutput<double, double>, ITestLimit<double>> GetGainErrorResult(double gainError, InputVerificationTestCase testCase)
{
var gainErrorLimit = GenerateDcAccuracyLimits.CalculateGainErrorLimits(testCase.FullScaleRange, testCase.Limits);
return new TestResult<ITestOutput<double, double>, ITestLimit<double>>
{
Component = "Variable Gain Module Input",
Description = "Gain Error",
Passed = _validationService.Validate(gainError, gainErrorLimit),
Output = new TestOutput<double, double> { Input = 0, Result = gainError },
Limits = gainErrorLimit
};
}
private Tuple<double, double> ComputeErrorFit(ITestOutput<double, double>[] outputs, InputChannelTestCase testCase)
{
var input = outputs.Select(o => o.Input);
var vErr = outputs.Select(o => o.Result - o.Input);
return Fit.Line(input.ToArray(), vErr.ToArray());
}
还在抽象基类中,我有以下内容:
public IConnectableObservable<TOutput> RunSingleChannel(TCase testCase)
{
dut.Acquisition.SampleRate.Value = SampleRate;
dut.AnalogChannels[testCase.Channel].InputRange.Value = testCase.FullScaleRange;
var testOutput = CreateTestProcedure(testCase.Channel).RunAsync(testCase.InputVoltages);
return ProcessOutput(testOutput.Replay(), testCase);
}
protected abstract IConnectableObservable<TOutput> ProcessOutput(IConnectableObservable<ITestOutput<double, TAcq>> output, TCase testCase);
最佳答案
看来您要使用 Rx 来以困难的方式做事。你确实需要避免将任务与可观察对象混合在一起。它们使您的代码难以推理,并且经常导致死锁和其他并发问题。
你应该尝试这样的事情:
protected override IConnectableObservable<ITestResult<ITestOutput<double, double>, ITestLimit<double>>> ProcessOutput(
IConnectableObservable<ITestOutput<double, double>> output, InputVerificationTestCase testCase)
{
var source = output.RefCount();
return
source
.Select(o =>
{
var limits = GenerateDcAccuracyLimits.CalculateAbsoluteLimits(o.Input, testCase.FullScaleRange, testCase.Limits);
return new TestResult<ITestOutput<double, double>, ITestLimit<double>>
{
Component = "Variable Gain Module Input",
Description = "Measurement Accuracy",
Limits = limits,
Output = o,
Passed = _validationService.Validate(o.Result, limits)
};
})
.Merge(
source
.ToArray()
.Select(arr => GetGainErrorResult(ComputeErrorFit(arr, testCase).Item2, testCase)))
.Publish();
}
您使用可连接的可观察量有点奇怪,但上面的内容应该大致满足您的需要。
我已经使用此示例测试了代码:
public IConnectableObservable<int> ProcessOutput(IConnectableObservable<int> output)
{
var source = output.RefCount();
return
source
.Merge(source.ToArray().Select(arr => arr.Sum()))
.Publish();
}
void Main()
{
var output = Observable.Range(1, 10).Publish();
var processed = ProcessOutput(output);
processed.Subscribe(x => Console.WriteLine(x));
processed.Connect();
}
哪些输出:
1
2
3
4
5
6
7
8
9
10
55
我还检查了原始可观察值仅生成一次。
关于c# - 处理后流式传输 IConnectableObservable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32983954/