c# - 如何同步 Observables 并卸载 UI 线程

标签 c# multithreading system.reactive

我有两个简单的观察处理程序,订阅了相同的源。然而,两种订阅都在不同的类型上运行。我希望他们保持可观察源 (Subject()) 的顺序。我尝试使用 Synchronize() 扩展,但我没有找到一种方法来按预期进行这项工作。

这是我的单元测试代码:

[Test]
public void TestObserveOn()
{
    Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
    var source = new Subject<object>();
    var are = new AutoResetEvent(false);

    using (source.ObserveOn(TaskPoolScheduler.Default).Synchronize(source).OfType<int>().Subscribe(
        o =>
            {
                Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
                int sleep = 3000 / o; // just to simulate longer processing
                Thread.Sleep(sleep);
                Console.WriteLine("Handled  {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
            },
        () =>
            {
                Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
                are.Set();
            }))
    using (source.ObserveOn(TaskPoolScheduler.Default).Synchronize(source).OfType<double>().Subscribe(
                    o =>
                    {
                        Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
                        Console.WriteLine("Handled  {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
                    },
                    () =>
                    {
                        Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
                    }))
    {
        Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);

        source.OnNext(1);
        source.OnNext(1.1);
        source.OnNext(2);
        source.OnNext(2.1);
        source.OnNext(3);
        source.OnNext(3.1);
        source.OnCompleted();

        Console.WriteLine("Finished on threadId:{0}", Thread.CurrentThread.ManagedThreadId);

        are.WaitOne();
    }
}

测试代码的结果输出:

Starting on threadId:10
Subscribed on threadId:10
Finished on threadId:10
Received 1 on threadId:11
Handled  1 on threadId: 11
Received 1,1 on threadId:12
Handled  1,1 on threadId: 12
Received 2,1 on threadId:12
Handled  2,1 on threadId: 12
Received 3,1 on threadId:12
Handled  3,1 on threadId: 12
Received 2 on threadId:11
Handled  2 on threadId: 11
OnCompleted on threadId:12
Received 3 on threadId:11
Handled  3 on threadId: 11
OnCompleted on threadId:11

如您所见,顺序与输入不同。我想同步两个订阅,以便顺序与输入的顺序相同。

输出应该是

Starting on threadId:10
Subscribed on threadId:10
Finished on threadId:10
Received 1 on threadId:11
Handled  1 on threadId: 11
Received 1,1 on threadId:12
Handled  1,1 on threadId: 12
Received 2 on threadId:11
Handled  2 on threadId: 11
Received 2,1 on threadId:12
Handled  2,1 on threadId: 12
Received 3 on threadId:11
Handled  3 on threadId: 11
Received 3,1 on threadId:12
Handled  3,1 on threadId: 12
OnCompleted on threadId:11
OnCompleted on threadId:12

(完成顺序对我来说没那么重要)。

编辑:

我还尝试了以下方法:

[Test]
public void TestObserveOn()
{
    Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
    var source = new Subject<object>();
    var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();
    var exclusiveTaskFactory = new TaskFactory(taskSchedulerPair.ExclusiveScheduler);
    var exclusiveScheduler = new TaskPoolScheduler(exclusiveTaskFactory);
    var are = new AutoResetEvent(false);

    using (source.ObserveOn(exclusiveScheduler).OfType<int>().Subscribe(
        o =>
            {
                Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
                int sleep = 3000 / o;
                Thread.Sleep(sleep);
                Console.WriteLine("Handled  {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
            },
        () =>
            {
                Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
                are.Set();
            }))
    using (source.ObserveOn(exclusiveScheduler).OfType<double>().Subscribe(
                    o =>
                    {
                        Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
                        Console.WriteLine("Handled  {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
                    },
                    () =>
                    {
                        Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
                        are.Set();
                    }))
    {
        Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);

        source.OnNext(1);
        source.OnNext(1.1);
        source.OnNext(2);
        source.OnNext(2.1);
        source.OnNext(3);
        source.OnNext(3.1);
        source.OnCompleted();

        Console.WriteLine("Finished on threadId:{0}", Thread.CurrentThread.ManagedThreadId);

        are.WaitOne();
        are.WaitOne();
    }
}

但是输出还是错误的:

Starting on threadId:10
Subscribed on threadId:10
Finished on threadId:10
Received 1 on threadId:4
Handled  1 on threadId: 4
Received 2 on threadId:4
Handled  2 on threadId: 4
Received 3 on threadId:4
Handled  3 on threadId: 4
OnCompleted on threadId:4
Received 1,1 on threadId:4
Handled  1,1 on threadId: 4
Received 2,1 on threadId:4
Handled  2,1 on threadId: 4
Received 3,1 on threadId:4
Handled  3,1 on threadId: 4
OnCompleted on threadId:4

...如您所见,它不符合 OnNext() 调用的顺序。

当使用具有类似创建含义的类型然后进行多次更新时,这一点尤其重要……如果更新发生在创建之前怎么办?如果不能保证顺序,您可能会遇到问题或需要对“ future ”事件进行排队,直到它们的前任事件与要更改的状态同步。 您需要诸如增加版本/订单号之类的东西,以将其用作订购标准并找到“漏洞”并将后继者排队,直到它们再次排成一行。

第二次编辑 ...更接近我的问题并摆脱测试用例理论:

我想要一个易于使用 RX 过滤功能的简单界面:

public interface ICommandBus // or to say Aggregator pattern
{
    void Send<T>(T command) where T : ICommand; // might be something like Task<Result> Send<T>(T command) to know the system has accepted the command

    IObservable<T> Stream<T>() where T : ICommand;
}

public class CommandBus : ICommandBus, IDisposable
{
    private static readonly ILog Log = LogManager.GetLogger<CommandBus>();

    private readonly HashSet<Type> registrations = new HashSet<Type>();

    private readonly Subject<ICommand> stream = new Subject<ICommand>();

    private readonly IObservable<ICommand> notifications;

    private bool disposed;

    public CommandBus()
    {
        // hmm, this is a problem!? how to sync?
        this.notifications = this.stream.SubscribeOn(TaskPoolScheduler.Default);

    }

    public IObservable<T> Stream<T>() where T : ICommand
    {
        var observable = this.notifications.OfType<T>();
        return new ExclusiveObservableWrapper<T>(
            observable,
            t => this.registrations.Add(t),
            t => this.registrations.Remove(t));
    }

    public void Send<T>(T command) where T : ICommand
    {
        if (command == null)
        {
            throw new ArgumentNullException("command");
        }

        if (!this.registrations.Contains(typeof(T)))
        {
            throw new NoCommandHandlerSubscribedException();
        }

        Log.Debug(logm => logm("Sending command of type {0}.", typeof(T).Name));

        this.stream.OnNext(command);
    }

    //public async Task SendAsync<T>(T command) where T : ICommand
    //{
    //    if (command == null)
    //    {
    //        throw new ArgumentNullException("command");
    //    }

    //    if (!this.registrations.Contains(typeof(T)))
    //    {
    //        throw new NoCommandHandlerSubscribedException();
    //    }

    //    Log.Debug(logm => logm("Sending command of type {0}.", typeof(T)));

    //    this.stream.OnNext(command);

    //    await this.stream.Where(item => ReferenceEquals(item, command));
    //}

    public void Dispose()
    {
        this.Dispose(true);
        GC.SuppressFinalize(this);
    }

    protected virtual void Dispose(bool disposing)
    {
        if (!this.disposed)
        {
            if (disposing)
            {
                this.stream.Dispose();
            }
        }

        this.disposed = true;
    }

    [Serializable]
    public class CommandAlreadySubscribedException : Exception
    {
        internal CommandAlreadySubscribedException(Type type)
            : base(string.Format("Tried to subscribe handler for command of type {0} but there was already a subscribtion. More than one handler at time is not allowed.", type))
        {
        }

        protected CommandAlreadySubscribedException(SerializationInfo info, StreamingContext context)
            : base(info, context)
        {
        }
    }

    [Serializable]
    public class NoCommandHandlerSubscribedException : Exception
    {
        public NoCommandHandlerSubscribedException()
        {
        }

        public NoCommandHandlerSubscribedException(string message)
            : base(message)
        {
        }

        public NoCommandHandlerSubscribedException(string message, Exception innerException)
            : base(message, innerException)
        {
        }

        protected NoCommandHandlerSubscribedException(SerializationInfo info, StreamingContext context)
            : base(info, context)
        {
        }
    }

    private class ExclusiveObservableWrapper<T> : IObservable<T> where T : ICommand
    {
        private readonly IObservable<T> observable;

        private readonly Func<Type, bool> register;

        private readonly Action<Type> unregister;

        internal ExclusiveObservableWrapper(IObservable<T> observable, Func<Type, bool> register, Action<Type> unregister)
        {
            this.observable = observable;
            this.register = register;
            this.unregister = unregister;
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            var subscription = this.observable.Subscribe(observer);
            var type = typeof(T);

            if (!this.register(type))
            {
                observer.OnError(new CommandAlreadySubscribedException(type));
            }

            return Disposable.Create(
                () =>
                {
                    subscription.Dispose();
                    this.unregister(type);
                });
        }
    }
}

如果我不能保证命令的顺序(如给定的那样),那么它们(可能)就没有意义。 (创建前更新)

ICommandBus 用于 UI/Presentation 层,它希望为命令调用相应的处理程序(无需知道处理程序)。

我只想将链卸载到一个单独的线程。

命令 -> 总线 -> 命令处理程序 -> 域模型 -> 事件 -> 事件处理程序 -> 读取模型

这需要保持命令出现的顺序。

我认为 RX 只需一些“魔法线”就可以做到这一点。但据我所知,我现在必须用自己的线程处理再做一次。 :-(

最佳答案

您似乎对 .Synchronize() 的作用有错误的理解。它的唯一目的是获取一个正在产生重叠或错位消息的可观察对象(即 OnNext 或多个 OnError 之前的 OnCompleted )和确保他们遵循 OnNext*(OnError|OnCompleted) 行为契约。这是关于让流氓 Observable 发挥出色。

现在,既然我们可以忽略它,因为您的示例输入是一个行为良好的可观察对象,那么您可以通过调用 .ObserveOn(TaskPoolScheduler.Default) 看到您正在创建可观察的跳转线程 -这很容易导致 observables 以不同的速率被消耗 - 这就是这里发生的事情。

您已经订阅了两次源代码,因此您无法阻止您在引入并发时所看到的行为。

鉴于您之前的问题 ( How to await finished IObserver call including observing subscriber calls? ),您似乎一心想使用 Rx 来增加并发性,但随后以某种方式强制它删除它。你真的应该以释放 Rx 的心态去做它的事情而不是高贵它。

@Beachwalker 编辑:

Enigmativity 在其对此答案的评论中 给出了我问题的正确答案。

我必须使用 EventLoopScheduler .所以我接受这个作为正确答案。

为了完整性。这是有效的代码:

[Test]
public void TestObserveOn()
{
    Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
    var source = new Subject<object>();
    var exclusiveScheduler = new EventLoopScheduler();
    var are = new AutoResetEvent(false);

    using (source.ObserveOn(exclusiveScheduler).OfType<int>().Subscribe(
        o =>
            {
                Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
                int sleep = 3000 / o;
                Thread.Sleep(sleep);
                Console.WriteLine("Handled  {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
            },
        () =>
            {
                Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
                are.Set();
            }))
    using (source.ObserveOn(exclusiveScheduler).OfType<double>().Subscribe(
                    o =>
                        {
                            Console.WriteLine(
                                "Received {1} on threadId:{0}",
                                Thread.CurrentThread.ManagedThreadId,
                                o);
                            Console.WriteLine(
                                "Handled  {1} on threadId: {0}",
                                Thread.CurrentThread.ManagedThreadId,
                                o);
                        },
                    () =>
                    {
                        Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
                        are.Set();
                    }))
    {
        Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);

        source.OnNext(1);
        source.OnNext(1.1);
        source.OnNext(2);
        source.OnNext(2.1);
        source.OnNext(3);
        source.OnNext(3.1);
        source.OnCompleted();

        Console.WriteLine("Finished on threadId:{0}", Thread.CurrentThread.ManagedThreadId);

        are.WaitOne();
        are.WaitOne();
    }
}

关于c# - 如何同步 Observables 并卸载 UI 线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38055428/

相关文章:

c# - 如何监控注册表访问? [C#]

c# - 通过 observable 限制重播缓冲区

unit-testing - 测试响应式(Reactive)扩展 - 如何将测试调度程序与 ToTask() 一起使用?

c# - 等价于 += 作为前缀

c# - ICommand 的 CanExecuteChanged 事件

multithreading - "nonblocking"数据结构如何可能?

windows - lua lane 线程间通信

c++ - <线程> : no match for operator <

swift - Observable 更改时不返回值

c# - 如何使表达式将值类型视为引用类型?