我有两个简单的观察处理程序,订阅了相同的源。然而,两种订阅都在不同的类型上运行。我希望他们保持可观察源 (Subject()) 的顺序。我尝试使用 Synchronize() 扩展,但我没有找到一种方法来按预期进行这项工作。
这是我的单元测试代码:
[Test]
public void TestObserveOn()
{
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var source = new Subject<object>();
var are = new AutoResetEvent(false);
using (source.ObserveOn(TaskPoolScheduler.Default).Synchronize(source).OfType<int>().Subscribe(
o =>
{
Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
int sleep = 3000 / o; // just to simulate longer processing
Thread.Sleep(sleep);
Console.WriteLine("Handled {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
},
() =>
{
Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
are.Set();
}))
using (source.ObserveOn(TaskPoolScheduler.Default).Synchronize(source).OfType<double>().Subscribe(
o =>
{
Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
Console.WriteLine("Handled {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
},
() =>
{
Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
}))
{
Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
source.OnNext(1);
source.OnNext(1.1);
source.OnNext(2);
source.OnNext(2.1);
source.OnNext(3);
source.OnNext(3.1);
source.OnCompleted();
Console.WriteLine("Finished on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
are.WaitOne();
}
}
测试代码的结果输出:
Starting on threadId:10
Subscribed on threadId:10
Finished on threadId:10
Received 1 on threadId:11
Handled 1 on threadId: 11
Received 1,1 on threadId:12
Handled 1,1 on threadId: 12
Received 2,1 on threadId:12
Handled 2,1 on threadId: 12
Received 3,1 on threadId:12
Handled 3,1 on threadId: 12
Received 2 on threadId:11
Handled 2 on threadId: 11
OnCompleted on threadId:12
Received 3 on threadId:11
Handled 3 on threadId: 11
OnCompleted on threadId:11
如您所见,顺序与输入不同。我想同步两个订阅,以便顺序与输入的顺序相同。
输出应该是
Starting on threadId:10
Subscribed on threadId:10
Finished on threadId:10
Received 1 on threadId:11
Handled 1 on threadId: 11
Received 1,1 on threadId:12
Handled 1,1 on threadId: 12
Received 2 on threadId:11
Handled 2 on threadId: 11
Received 2,1 on threadId:12
Handled 2,1 on threadId: 12
Received 3 on threadId:11
Handled 3 on threadId: 11
Received 3,1 on threadId:12
Handled 3,1 on threadId: 12
OnCompleted on threadId:11
OnCompleted on threadId:12
(完成顺序对我来说没那么重要)。
编辑:
我还尝试了以下方法:
[Test]
public void TestObserveOn()
{
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var source = new Subject<object>();
var taskSchedulerPair = new ConcurrentExclusiveSchedulerPair();
var exclusiveTaskFactory = new TaskFactory(taskSchedulerPair.ExclusiveScheduler);
var exclusiveScheduler = new TaskPoolScheduler(exclusiveTaskFactory);
var are = new AutoResetEvent(false);
using (source.ObserveOn(exclusiveScheduler).OfType<int>().Subscribe(
o =>
{
Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
int sleep = 3000 / o;
Thread.Sleep(sleep);
Console.WriteLine("Handled {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
},
() =>
{
Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
are.Set();
}))
using (source.ObserveOn(exclusiveScheduler).OfType<double>().Subscribe(
o =>
{
Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
Console.WriteLine("Handled {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
},
() =>
{
Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
are.Set();
}))
{
Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
source.OnNext(1);
source.OnNext(1.1);
source.OnNext(2);
source.OnNext(2.1);
source.OnNext(3);
source.OnNext(3.1);
source.OnCompleted();
Console.WriteLine("Finished on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
are.WaitOne();
are.WaitOne();
}
}
但是输出还是错误的:
Starting on threadId:10
Subscribed on threadId:10
Finished on threadId:10
Received 1 on threadId:4
Handled 1 on threadId: 4
Received 2 on threadId:4
Handled 2 on threadId: 4
Received 3 on threadId:4
Handled 3 on threadId: 4
OnCompleted on threadId:4
Received 1,1 on threadId:4
Handled 1,1 on threadId: 4
Received 2,1 on threadId:4
Handled 2,1 on threadId: 4
Received 3,1 on threadId:4
Handled 3,1 on threadId: 4
OnCompleted on threadId:4
...如您所见,它不符合 OnNext() 调用的顺序。
当使用具有类似创建含义的类型然后进行多次更新时,这一点尤其重要……如果更新发生在创建之前怎么办?如果不能保证顺序,您可能会遇到问题或需要对“ future ”事件进行排队,直到它们的前任事件与要更改的状态同步。 您需要诸如增加版本/订单号之类的东西,以将其用作订购标准并找到“漏洞”并将后继者排队,直到它们再次排成一行。
第二次编辑 ...更接近我的问题并摆脱测试用例理论:
我想要一个易于使用 RX 过滤功能的简单界面:
public interface ICommandBus // or to say Aggregator pattern
{
void Send<T>(T command) where T : ICommand; // might be something like Task<Result> Send<T>(T command) to know the system has accepted the command
IObservable<T> Stream<T>() where T : ICommand;
}
public class CommandBus : ICommandBus, IDisposable
{
private static readonly ILog Log = LogManager.GetLogger<CommandBus>();
private readonly HashSet<Type> registrations = new HashSet<Type>();
private readonly Subject<ICommand> stream = new Subject<ICommand>();
private readonly IObservable<ICommand> notifications;
private bool disposed;
public CommandBus()
{
// hmm, this is a problem!? how to sync?
this.notifications = this.stream.SubscribeOn(TaskPoolScheduler.Default);
}
public IObservable<T> Stream<T>() where T : ICommand
{
var observable = this.notifications.OfType<T>();
return new ExclusiveObservableWrapper<T>(
observable,
t => this.registrations.Add(t),
t => this.registrations.Remove(t));
}
public void Send<T>(T command) where T : ICommand
{
if (command == null)
{
throw new ArgumentNullException("command");
}
if (!this.registrations.Contains(typeof(T)))
{
throw new NoCommandHandlerSubscribedException();
}
Log.Debug(logm => logm("Sending command of type {0}.", typeof(T).Name));
this.stream.OnNext(command);
}
//public async Task SendAsync<T>(T command) where T : ICommand
//{
// if (command == null)
// {
// throw new ArgumentNullException("command");
// }
// if (!this.registrations.Contains(typeof(T)))
// {
// throw new NoCommandHandlerSubscribedException();
// }
// Log.Debug(logm => logm("Sending command of type {0}.", typeof(T)));
// this.stream.OnNext(command);
// await this.stream.Where(item => ReferenceEquals(item, command));
//}
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (!this.disposed)
{
if (disposing)
{
this.stream.Dispose();
}
}
this.disposed = true;
}
[Serializable]
public class CommandAlreadySubscribedException : Exception
{
internal CommandAlreadySubscribedException(Type type)
: base(string.Format("Tried to subscribe handler for command of type {0} but there was already a subscribtion. More than one handler at time is not allowed.", type))
{
}
protected CommandAlreadySubscribedException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
}
}
[Serializable]
public class NoCommandHandlerSubscribedException : Exception
{
public NoCommandHandlerSubscribedException()
{
}
public NoCommandHandlerSubscribedException(string message)
: base(message)
{
}
public NoCommandHandlerSubscribedException(string message, Exception innerException)
: base(message, innerException)
{
}
protected NoCommandHandlerSubscribedException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
}
}
private class ExclusiveObservableWrapper<T> : IObservable<T> where T : ICommand
{
private readonly IObservable<T> observable;
private readonly Func<Type, bool> register;
private readonly Action<Type> unregister;
internal ExclusiveObservableWrapper(IObservable<T> observable, Func<Type, bool> register, Action<Type> unregister)
{
this.observable = observable;
this.register = register;
this.unregister = unregister;
}
public IDisposable Subscribe(IObserver<T> observer)
{
var subscription = this.observable.Subscribe(observer);
var type = typeof(T);
if (!this.register(type))
{
observer.OnError(new CommandAlreadySubscribedException(type));
}
return Disposable.Create(
() =>
{
subscription.Dispose();
this.unregister(type);
});
}
}
}
如果我不能保证命令的顺序(如给定的那样),那么它们(可能)就没有意义。 (创建前更新)
ICommandBus 用于 UI/Presentation 层,它希望为命令调用相应的处理程序(无需知道处理程序)。
我只想将链卸载到一个单独的线程。
命令 -> 总线 -> 命令处理程序 -> 域模型 -> 事件 -> 事件处理程序 -> 读取模型
这需要保持命令出现的顺序。
我认为 RX 只需一些“魔法线”就可以做到这一点。但据我所知,我现在必须用自己的线程处理再做一次。 :-(
最佳答案
您似乎对 .Synchronize()
的作用有错误的理解。它的唯一目的是获取一个正在产生重叠或错位消息的可观察对象(即 OnNext
或多个 OnError
之前的 OnCompleted
)和确保他们遵循 OnNext*(OnError|OnCompleted)
行为契约。这是关于让流氓 Observable 发挥出色。
现在,既然我们可以忽略它,因为您的示例输入是一个行为良好的可观察对象,那么您可以通过调用 .ObserveOn(TaskPoolScheduler.Default)
看到您正在创建可观察的跳转线程 -这很容易导致 observables 以不同的速率被消耗 - 这就是这里发生的事情。
您已经订阅了两次源代码
,因此您无法阻止您在引入并发时所看到的行为。
鉴于您之前的问题 ( How to await finished IObserver call including observing subscriber calls? ),您似乎一心想使用 Rx 来增加并发性,但随后以某种方式强制它删除它。你真的应该以释放 Rx 的心态去做它的事情而不是高贵它。
@Beachwalker 编辑:
Enigmativity 在其对此答案的评论中 给出了我问题的正确答案。
我必须使用 EventLoopScheduler .所以我接受这个作为正确答案。
为了完整性。这是有效的代码:
[Test]
public void TestObserveOn()
{
Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
var source = new Subject<object>();
var exclusiveScheduler = new EventLoopScheduler();
var are = new AutoResetEvent(false);
using (source.ObserveOn(exclusiveScheduler).OfType<int>().Subscribe(
o =>
{
Console.WriteLine("Received {1} on threadId:{0}", Thread.CurrentThread.ManagedThreadId, o);
int sleep = 3000 / o;
Thread.Sleep(sleep);
Console.WriteLine("Handled {1} on threadId: {0}", Thread.CurrentThread.ManagedThreadId, o);
},
() =>
{
Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
are.Set();
}))
using (source.ObserveOn(exclusiveScheduler).OfType<double>().Subscribe(
o =>
{
Console.WriteLine(
"Received {1} on threadId:{0}",
Thread.CurrentThread.ManagedThreadId,
o);
Console.WriteLine(
"Handled {1} on threadId: {0}",
Thread.CurrentThread.ManagedThreadId,
o);
},
() =>
{
Console.WriteLine("OnCompleted on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
are.Set();
}))
{
Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
source.OnNext(1);
source.OnNext(1.1);
source.OnNext(2);
source.OnNext(2.1);
source.OnNext(3);
source.OnNext(3.1);
source.OnCompleted();
Console.WriteLine("Finished on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
are.WaitOne();
are.WaitOne();
}
}
关于c# - 如何同步 Observables 并卸载 UI 线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38055428/