我是第一次尝试 RX,我有几个问题。
1)是否有更好的方法来完成我的集合的异步?
2) 我需要阻塞线程直到所有异步任务完成,我该怎么做?
class Program
{
internal class MyClass
{
private readonly List<int> _myData = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
private readonly Random random = new Random();
public int DoSomething(int j)
{
int i = random.Next(j * 1000) - (j * 200);
i = i < 0 ? 1000 : i;
Thread.Sleep(i);
Console.WriteLine(j);
return j;
}
public IObservable<int> DoSomethingAsync(int j)
{
return Observable.CreateWithDisposable<int>(
o => Observable.ToAsync<int, int>(DoSomething)(j).Subscribe(o)
);
}
public void CreateTasks()
{
_myData.ToObservable(Scheduler.NewThread).Subscribe(
onNext: (i) => DoSomethingAsync(i).Subscribe(),
onCompleted: () => Console.WriteLine("Completed")
);
}
}
static void Main(string[] args)
{
MyClass test = new MyClass();
test.CreateTasks();
Console.ReadKey();
}
}
(注意:我知道我可以将 Observable.Range 用于我的 Int 列表,但我的列表在实际程序中不是 Int 类型)。
最佳答案
我可能会尝试
public void CreateTasks()
{
_myData.ToObservable(Scheduler.NewThread)
.SelectMany(i => Observable.Start(() => DoSomething(i)))
.Subscribe(j => Console.WriteLine("j is {0}", j),
() => Console.WriteLine("Completed"));
}
所以首先我更改了 DoSomethingAsync 以便它使用 Observable.Start
。 Observable.Start 将异步运行 DoSomething
方法,并在该方法完成时通过 IObservable.OnNext
返回值。
然后 CreateTasks 方法像以前一样遍历集合中的每个项目,但将每个值提供给 SelectMany
,后者继续调用 DoSomethingAsync 方法。结果是,您将在每次完成对 DoSomethingAsync 的调用时收到一个 OnNext
并在它们全部完成时收到一个 OnComplete
。
关于c# - 如何使用 List<> 阻止 ToObservable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5058531/