c# - 如何使用 List<> 阻止 ToObservable?

标签 c# system.reactive

我是第一次尝试 RX,我有几个问题。

1)是否有更好的方法来完成我的集合的异步?

2) 我需要阻塞线程直到所有异步任务完成,我该怎么做?

class Program
{

    internal class MyClass
    {
        private readonly List<int> _myData = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };

        private readonly Random random = new Random();

        public int DoSomething(int j)
        {
            int i = random.Next(j * 1000) - (j * 200);
            i = i < 0 ? 1000 : i;
            Thread.Sleep(i);
            Console.WriteLine(j);
            return j;
        }

        public IObservable<int> DoSomethingAsync(int j)
        {
            return Observable.CreateWithDisposable<int>(
                o => Observable.ToAsync<int, int>(DoSomething)(j).Subscribe(o)
                );
        }

        public void CreateTasks()
        {
            _myData.ToObservable(Scheduler.NewThread).Subscribe(
            onNext: (i) => DoSomethingAsync(i).Subscribe(),
            onCompleted: () => Console.WriteLine("Completed")
                );
        }
    }

    static void Main(string[] args)
    {
        MyClass test = new MyClass();

        test.CreateTasks();

        Console.ReadKey(); 
    } 
}

(注意:我知道我可以将 Observable.Range 用于我的 Int 列表,但我的列表在实际程序中不是 Int 类型)。

最佳答案

我可能会尝试

public void CreateTasks()                
{                       
    _myData.ToObservable(Scheduler.NewThread)
        .SelectMany(i => Observable.Start(() => DoSomething(i)))
        .Subscribe(j => Console.WriteLine("j is {0}", j), 
                  () => Console.WriteLine("Completed"));       
} 

所以首先我更改了 DoSomethingAsync 以便它使用 Observable.Start。 Observable.Start 将异步运行 DoSomething 方法,并在该方法完成时通过 IObservable.OnNext 返回值。

然后 CreateTasks 方法像以前一样遍历集合中的每个项目,但将每个值提供给 SelectMany,后者继续调用 DoSomethingAsync 方法。结果是,您将在每次完成对 DoSomethingAsync 的调用时收到一个 OnNext 并在它们全部完成时收到一个 OnComplete

关于c# - 如何使用 List<> 阻止 ToObservable?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5058531/

相关文章:

c# - 响应式扩展即时搜索 WPF/MVVM

c# - 外键约束问题?

c# - 在 Windows 8.1 (WinRT) 上调用 SQLite3.SetDirectory 会产生 System.AccessViolationException

c# - 如何在 Rx.Net 中实现排放映射处理程序?

c# - 如何在完成之前从 ReplaySubject<T> 获取最新值

c# - 接收 : Wait for several observables to complete

c# - 转换为货币

c# - 检查字符串是否包含多个字母

c# - 如何从 C# 更改 PowerPoint 中 TextRange 的字体颜色?

system.reactive - 使用 RxJava 并行调用网络服务。这是正确的方法吗?