c# - 处理除集合之外的绑定(bind)事件,以及在删除C#时取消绑定(bind)的事件-多线程

标签 c# multithreading system.reactive

我有一个可观察的集合,我已经将CollectionChanged处理程序绑定(bind)到了。将项目添加到集合时,我需要将事件处理程序绑定(bind)到添加的每个项目。这本身并不难,但是我必须先在单独的线程上做一些事情,然后将事件处理程序绑定(bind)到线程上获得的东西上。

要明确的是,这是这样的

protected override void OnCollectionChanged(System.Collections.Specialized.NotifyCollectionChangedEventArgs e)
{
    switch(e.Action) 
    {
        case Add:
            Background Operation - registers an event
            break;
        case Remove:
            Background Operation - unregister the event
            break;
    }
}

现在的问题是,如果对同一个对象进行添加然后立即删除,会发生什么情况。当第二个线程运行时,第一个线程可能没有运行,从而导致异常。

我的想法是,使用响应式扩展可能有一种非常好的方法来处理此问题,因此我一直在研究它,但无法找到任何特定的东西。

有任何想法吗?

最佳答案

简介与假设

使用 react 性扩展绝对可以帮助实现最大的并发性。但是,它并不是普通的Rx,因此,如果您不熟悉Rx,它将有些不容易。实现非常简洁,但是不幸的是,由于使用了很多Rx概念,因此解释冗长。

为了使事情尽可能的理智,我做以下假设:

  • 单个项目作为集合的成员出现的次数不得超过一次。即您不先删除一个项目就不会添加一个以上的项目
  • 添加和删除任务不能同时运行于同一项目。
  • 添加和删除任务可以针对不同的项目同时运行。此外,这是实现最佳性能所希望的。
  • 一个项目具有适当的Equals实现,以实现对添加和删除事件的正确分组
  • 如果您清除或重置集合,则必须在下面放置任务处理程序并启动一个新的

  • 处理函数签名和用法

    我创建一个处理运行添加和删除任务的功能。当您要从集合中分离时,它将返回一个IDisposable和您的Dispose。签名如下:
        private IDisposable HandleAddRemove<T>(
            ObservableCollection<T> collection,
            Func<T, Task> addActionAsync,
            Func<T, Task> removeActionAsync)
    

    它接受三个参数:
  • 集合本身作为ObservableCollection<T>
  • 接受项目并返回已启动的添加后台任务
  • 的工厂函数
  • 接受项目并返回已启动的Remove后台任务
  • 的工厂函数

    实现

    首先,我需要能够获得一个枚举,该枚举可交替返回添加和删除任务工厂。稍后我们将了解原因-在这里:
    private IEnumerable<Func<T, Task>> GetAddRemoveLoop<T>(
        Func<T, Task> addFunc,
        Func<T, Task> removeFunc)
    {
        while (true)
        {
            yield return addFunc;
            yield return removeFunc;
        }
    }
    

    现在介绍实现本身。我将一步一步地介绍它,然后在最后介绍整个过程。我们正在建立的是一条大型管道,该管道将处理每个事件并调用适当的操作。

    这是整个功能:
    private IDisposable HandleAddRemove<T>(
        ObservableCollection<T> collection,
        Func<T, Task> addActionAsync,
        Func<T, Task> removeActionAsync)
    {
        return Observable
            .FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
                h => collection.CollectionChanged += h,
                h => collection.CollectionChanged -= h)
            .Select(evt => evt.EventArgs)
            .Where(item => item.Action == NotifyCollectionChangedAction.Add ||
                            item.Action == NotifyCollectionChangedAction.Remove)
            .Select(x => x.Action == NotifyCollectionChangedAction.Add
                                ? (T) x.NewItems[0]
                                : (T) x.OldItems[0])
            .GroupBy(item => item)
            .SelectMany(item => item.Zip(
                GetAddRemoveLoop(addActionAsync, removeActionAsync),
                (i, action) => Observable.Defer(() => action(i).ToObservable())).Concat())
            .Subscribe();
    }
    

    现在让我们分解一下:

    1.连接到CollectionChanged事件
    FromEventPattern创建一个IObservable<EventPattern<XXXEventArgs>>。看起来有些奇怪-参数是委托(delegate),只要结果的可观察项被订阅或完成(正常或异常),它们就会被挂接到事件并与事件分离。
    Observable.FromEventPattern
        <NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
        h => collection.CollectionChanged += h,
        h => collection.CollectionChanged -= h)
    

    2.从EventPattern项中获取EventArgs
    EventPattern有点笨拙-我们只需要使用EventArgs属性,以便我们可以使用Select转换每个项目:
    .Select(evt => evt.EventArgs)
    

    3.过滤以获取添加和删除操作

    现在我们需要过滤掉AddRemove Action 以外的所有 Action 。我们使用Where运算符执行此操作:
    .Where(item => item.Action == NotifyCollectionChangedAction.Add ||
                   item.Action == NotifyCollectionChangedAction.Remove)
    

    4.从EventArgs中提取项目

    我们利用以下事实:对于Add操作,NewItems集合中只有一个项目,对于Remove操作,OldItems集合中恰好有一个项目。此Select根据操作类型适本地提取项目:
    .Select(x => x.Action == NotifyCollectionChangedAction.Add
                 ? (T) x.NewItems[0]
                 : (T) x.OldItems[0])
    

    5.将同一项目的出现分组在一起

    对于给定的项目,我们要确保只能执行一项添加或删除任务。为此,我们将为每个项目创建一个组。在Rx中,组运算符获取一个流并产生一个流流-为每个组创建一个新流。它还需要一个键选择器-一个用于比较项目以确定将其放入哪个组的函数。在这里,我们使用一个identity函数并假定.Equals的一个良好实现:
    .GroupBy(item => item)
    

    返回的流的类型是IObservable<IGroupedObservable<T,T>>!群流。组流中有两个类型参数。一个用于 key ,另一个用于组中的项目。在这种情况下,我们将项目本身用作键,因此类型参数都是T。添加了Key属性后,分组的可观察对象就像常规可观察对象一样。

    6.将项目传递给“添加”或“删除”任务

    这是棘手的部分!此时,每个组将包含重复发送的同一项目-每次在集合上调用Add(item)Remove(item)一次。第一个用于Add,第二个用于Remove,依此类推。

    暂时忽略SelectMany-让我们先看一下它的内部:

    我们使用Zip运算符,该运算符会将组流中的每个项目与前面所述的辅助函数产生的可枚举返回的添加/删除工厂函数配对。
    Zip的第一个参数是可枚举的-对我们的辅助函数的调用。

    第二个是“选择器功能”。这接受项目和工厂功能的压缩对,并将其组合以获得结果。因此,Zip如下所示:
    item.Zip(GetAddRemoveLoop(addActionAsync, removeActionAsync),
            (i, action) => /* get result of combining item and action */)
    

    输出将是IObservable<TResult>,其中TResult是选择器函数返回的类型。

    现在,我们要做的就是将添加/删除功能(返回一个已启动的Task,在添加/删除操作完成后完成)的调用转换为IObservable流。有一个方便的扩展方法,可以将Task转换为称为IObservableToObservable。对于简单的Task,它使用特殊的类型Unit作为返回类型。 Unit是一种常见的功能类型,当您需要知道已完成某件事但并不关心它是什么时,将使用它。因此,ToObservable将给我们一个表示我们的异步任务的IObservable<Unit>

    一个幼稚的方法是这样做:
    (i, action) => action(i).ToObservable();
    

    问题在于,lambda的评估时间过早。我们只想在订阅IObservable<Unit>时调用添加/删除任务。 Observable.Defer将为我们做到这一点,仅调用您在订阅时通过它的操作。因此,除了上面的我们,我们做的是:
    (i, action) => Observable.Defer(() => action(i).ToObservable())
    

    因此,我们在每个组上调用了Zip函数,并生成了一个IObservable<IObservable<Unit>>流,该流代表表示添加/删除的备用调用。

    7.确保依次调用添加/删除

    现在,我们需要确保组中的每个添加/删除流都一个接一个地订阅。为此,我们称为Concat。仅当先前的子流已经结束时,这使订阅每个子流的流的流合并。因此,它将IObservable<IObservable<Unit>>转换为平面IObservable<Unit>

    8.扁平化组

    每个组仍然是一个流。现在我们可以跳回到SelectMany。我们在组上使用它来获取所有单个组流并将它们展平为单个流。我们正在转换IObservable<IGroupedObservable<T,T>>-> IObservable<Unit>:
    .SelectMany(item => // the Zip and Concat result
                        // that produced the `IObservable<Unit>` of add/remove tasks
    

    9.现在运行它!

    最后,我们就此调用Subscribe。我们不在乎结果-流本身正在调用我们的添加/删除任务。由于组本身是不终止的,因此除非我们处理返回的IDisposable,否则该流不会结束:
    .Subscribe();
    

    这将在其所连接的运算符上调用“订阅”,每个运算符都将对其上方的运算符进行订阅,依此类推直至初始FromEventArgs

    如果需要清除集合和/或取消订阅IDisposable事件,请记住处置返回的CollectionChanged

    完整的解决方案

    再次是整个功能:
    private IDisposable HandleAddRemove<T>(
        ObservableCollection<T> collection,
        Func<T, Task> addActionAsync,
        Func<T, Task> removeActionAsync)
    {
        return Observable
            .FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
                h => collection.CollectionChanged += h,
                h => collection.CollectionChanged -= h)
            .Select(evt => evt.EventArgs)
            .Where(item => item.Action == NotifyCollectionChangedAction.Add ||
                            item.Action == NotifyCollectionChangedAction.Remove)
            .Select(x => x.Action == NotifyCollectionChangedAction.Add
                                ? (T) x.NewItems[0]
                                : (T) x.OldItems[0])
            .GroupBy(item => item)
            .SelectMany(item => item.Zip(
                GetAddRemoveLoop(addActionAsync, removeActionAsync),
                (i, action) => Observable.Defer(() => action(i).ToObservable())).Concat())
            .Subscribe();
    }
    

    sample 用量

    在以下用法中,我们提供了长时间运行的添加/删除任务以清楚地显示效果:
    var collection = new ObservableCollection<string>();
    
    Func<string, Task> addAction = x =>
        {
            Console.WriteLine("Begin add task for " + x);
            return Task.Delay(2000)
                    .ContinueWith(t => Console.WriteLine("End add task for " + x));
        };
    
    Func<string, Task> removeAction = x =>
    {
        Console.WriteLine("Begin remove task for " + x);
        return Task.Delay(3000)
                .ContinueWith(t => Console.WriteLine("End remove task for " + x));
    };
    
    var sub = HandleAddRemove(
        collection,
        addAction,
        removeAction);
    
    collection.Add("item1");
    Thread.Sleep(1000);
    collection.Remove("item1");
    Thread.Sleep(1000);
    collection.Add("item2");
    collection.Add("item3");
    Thread.Sleep(5000);
    collection.Remove("item3");
    Thread.Sleep(1000);
    collection.Remove("item2");
    
    Thread.Sleep(30000);
    
    Console.WriteLine("Done");
    
    sub.Dispose();
    

    结果如下:
    Begin add task for item1
    Begin add task for item2
    Begin add task for item3
    End add task for item1
    Begin remove task for item1
    End add task for item3
    End add task for item2
    End remove task for item1
    Begin remove task for item3
    Begin remove task for item2
    End remove task for item3
    End remove task for item2
    Done
    

    希望这不会给您带来太多麻烦。我知道我的假设可能相差太远,如果可以的话,我希望这在某种程度上是有用的或鼓舞人心的!

    关于c# - 处理除集合之外的绑定(bind)事件,以及在删除C#时取消绑定(bind)的事件-多线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20113507/

    相关文章:

    c++ - 多线程时一直调用对象析构函数,但对象没有超出范围

    c# - 如何从 IObservable<T> 选择递增的值子序列

    C# 响应式(Reactive)扩展 - 内存管理和 Distinct 运算符

    c# - 包括不使用 Entity Framework 查询

    c# - C# 中的变量作用域

    java - 在另一个线程读取 Java 列表引用时更改它

    使用集合时线程安全的 C# 最佳实践(还不是并发的)

    system.reactive - 根据 CPU 使用率限制 RX 任务

    c# - 错误::类、结构或接口(interface)成员声明中的无效标记 '='

    c# - 保存/序列化 LINQ 查询