c# - 在 Reactive Extensions 中按预定义的顺序对 Observable 进行排序

标签 c# sorting system.reactive reactive-programming

假设我有一个类型 T :

class T {
    public int identifier; //Arbitrary but unique for each character (Guids in real-life)
    public char character; //In real life not a char, but I chose char here for easy demo purposes
}

我有一个预定义的有序标识符序列:

int[] identifierSequence = new int[]{
    9, 3, 4, 4, 7
};

我现在需要订购一个 IObservable<T>它产生以下对象序列:

{identifier: 3, character 'e'},
{identifier: 9, character 'h'},
{identifier: 4, character 'l'},
{identifier: 4, character 'l'},
{identifier: 7, character 'o'}

因此生成的 IObservable 会产生 hello . 我不想使用 ToArray,因为我想在对象到达时立即接收对象,而不是等到观察到所有内容。 更具体地说,我希望这样接收它们:

 Input: e  h  l  l  o
Output:    he l  l  o

执行此操作的正确 react 方式是什么? 我能想到的最好的是:

Dictionary<int, T> buffer = new Dictionary<int, T>();
int curIndex = 0;

inputObserable.SelectMany(item =>
{
    buffer[item.identifier] = item;

    IEnumerable<ReportTemplate> GetReadyElements()
    {
        while (true)
        {
            int nextItemIdentifier = identifierSequence[curIndex];
            T nextItem;
            if (buffer.TryGetValue(nextItemIdentifier, out nextItem))
            {
                buffer.Remove(nextItem.identifier);
                curIndex++;
                yield return nextItem;
            }
            else
            {
                break;
            }
        }
    }

    return GetReadyElements();
});

编辑:

Schlomo 对我的代码提出了一些非常有效的问题,这就是为什么我将他的回答标记为正确的原因。我对他的代码进行了一些修改以使其可用:

  • 通用标识符和对象类型
  • 迭代而不是递归,以防止在非常大的可观察量上发生潜在的计算溢出
  • 将匿名类型转换为真实类以提高可读性
  • 尽可能只在字典中查找一个值一次并将其存储为变量,而不是多次查找
  • 固定型

这给了我以下代码:

public static IObservable<T> OrderByIdentifierSequence<T, TId>(this IObservable<T> source, IList<TId> identifierSequence, Func<T, TId> identifierFunc)
    {
        var initialState = new OrderByIdentifierSequenceState<T, TId>(0, ImmutableDictionary<TId, ImmutableList<T>>.Empty, Enumerable.Empty<T>());
        return source.Scan(initialState, (oldState, item) =>
            {
                //Function to be called upon receiving new item
                //If we can pattern match the first item, then it is moved into Output, and concatted continuously with the next possible item
                //Otherwise, if nothing is available yet, just return the input state
                OrderByIdentifierSequenceState<T, TId> GetOutput(OrderByIdentifierSequenceState<T, TId> state)
                {
                    int index = state.Index;
                    ImmutableDictionary<TId, ImmutableList<T>> buffer = state.Buffer;
                    IList<T> output = new List<T>();

                    while (index < identifierSequence.Count)
                    {
                        TId key = identifierSequence[index];
                        ImmutableList<T> nextValues;
                        if (!buffer.TryGetValue(key, out nextValues) || nextValues.IsEmpty)
                        {
                            //No values available yet
                            break;
                        }

                        T toOutput = nextValues[nextValues.Count - 1];
                        output.Add(toOutput);

                        buffer = buffer.SetItem(key, nextValues.RemoveAt(nextValues.Count - 1));
                        index++;
                    }

                    return new OrderByIdentifierSequenceState<T, TId>(index, buffer, output);
                }

                //Before calling the recursive function, add the new item to the buffer
                TId itemIdentifier = identifierFunc(item);

                ImmutableList<T> valuesList;
                if (!oldState.Buffer.TryGetValue(itemIdentifier, out valuesList))
                {
                    valuesList = ImmutableList<T>.Empty;
                }
                var remodifiedBuffer = oldState.Buffer.SetItem(itemIdentifier, valuesList.Add(item));

                return GetOutput(new OrderByIdentifierSequenceState<T, TId>(oldState.Index, remodifiedBuffer, Enumerable.Empty<T>()));
            })
            // Use Dematerialize/Notifications to detect and emit end of stream.
            .SelectMany(output =>
            {
                var notifications = output.Output
                    .Select(item => Notification.CreateOnNext(item))
                    .ToList();

                if (output.Index == identifierSequence.Count)
                {
                    notifications.Add(Notification.CreateOnCompleted<T>());
                }

                return notifications;
            })
            .Dematerialize();
    }

    class OrderByIdentifierSequenceState<T, TId>
    {
        //Index shows what T we're waiting on
        public int Index { get; }
        //Buffer holds T that have arrived that we aren't ready yet for
        public ImmutableDictionary<TId, ImmutableList<T>> Buffer { get; }
        //Output holds T that can be safely emitted.
        public IEnumerable<T> Output { get; }

        public OrderByIdentifierSequenceState(int index, ImmutableDictionary<TId, ImmutableList<T>> buffer, IEnumerable<T> output)
        {
            this.Index = index;
            this.Buffer = buffer;
            this.Output = output;
        }
    }

但是,这段代码仍然存在一些问题:

  • 不断复制状态(主要是 ImmutableDictionary ),这可能非常昂贵。可能的解决方案:为每个观察者维护一个单独的状态,而不是为每个收到的项目维护一个单独的状态。
  • identifierSequence 中的一个或多个元素时源中不存在可观察到的问题出现。这当前阻塞了有序的可观察对象,并且它永远不会完成。可能的解决方案:超时,当源可观察对象完成时抛出异常,当源可观察对象完成时返回所有可用项目,...
  • 当源 observable 包含的元素多于 identifierSequence 时,我们得到内存泄漏。源可观察但不在 identifierSequence 中的项目当前被添加到字典中,但在源可观察对象完成之前不会被删除。这是潜在的内存泄漏。可能的解决方案:检查该项目是否在identifierSequence在将其添加到字典之前,绕过代码并立即输出该项目,...

我的解决方案:

    /// <summary>
    /// Takes the items from the source observable, and returns them in the order specified in identifierSequence.
    /// If an item is missing from the source observable, the returned obserable returns items up until the missing item and then blocks until the source observable is completed.
    /// All available items are then returned in order. Note that this means that while a correct order is guaranteed, there might be missing items in the result observable.
    /// If there are items in the source observable that are not in identifierSequence, these items will be ignored.
    /// </summary>
    /// <typeparam name="T">The type that is produced by the source observable</typeparam>
    /// <typeparam name="TId">The type of the identifiers used to uniquely identify a T</typeparam>
    /// <param name="source">The source observable</param>
    /// <param name="identifierSequence">A list of identifiers that defines the sequence in which the source observable is to be ordered</param>
    /// <param name="identifierFunc">A function that takes a T and outputs its unique identifier</param>
    /// <returns>An observable with the same elements as the source, but ordered by the sequence of items in identifierSequence</returns>
    public static IObservable<T> OrderByIdentifierSequence<T, TId>(this IObservable<T> source, IList<TId> identifierSequence, Func<T, TId> identifierFunc)
    {
        if (source == null)
        {
            throw new ArgumentNullException(nameof(source));
        }
        if (identifierSequence == null)
        {
            throw new ArgumentNullException(nameof(identifierSequence));
        }
        if (identifierFunc == null)
        {
            throw new ArgumentNullException(nameof(identifierFunc));
        }

        if (identifierSequence.Count == 0)
        {
            return Observable.Empty<T>();
        }

        HashSet<TId> identifiersInSequence = new HashSet<TId>(identifierSequence);

        return Observable.Create<T>(observer =>
        {
            //current index of pending item in identifierSequence
            int index = 0;
            //buffer of items we have received but are not ready for yet
            Dictionary<TId, List<T>> buffer = new Dictionary<TId, List<T>>();

            return source.Select(
                    item =>
                    {
                        //Function to be called upon receiving new item
                        //We search for the current pending item in the buffer. If it is available, we yield return it and repeat.
                        //If it is not available yet, stop.
                        IEnumerable<T> GetAvailableOutput()
                        {
                            while (index < identifierSequence.Count)
                            {
                                TId key = identifierSequence[index];
                                List<T> nextValues;
                                if (!buffer.TryGetValue(key, out nextValues) || nextValues.Count == 0)
                                {
                                    //No values available yet
                                    break;
                                }

                                yield return nextValues[nextValues.Count - 1];

                                nextValues.RemoveAt(nextValues.Count - 1);
                                index++;
                            }
                        }

                        //Get the identifier for this item
                        TId itemIdentifier = identifierFunc(item);

                        //If this item is not in identifiersInSequence, we ignore it.
                        if (!identifiersInSequence.Contains(itemIdentifier))
                        {
                            return Enumerable.Empty<T>();
                        }

                        //Add the new item to the buffer
                        List<T> valuesList;
                        if (!buffer.TryGetValue(itemIdentifier, out valuesList))
                        {
                            valuesList = new List<T>();
                            buffer[itemIdentifier] = valuesList;
                        }
                        valuesList.Add(item);

                        //Return all available items
                        return GetAvailableOutput();
                    })
                .Subscribe(output =>
                {
                    foreach (T cur in output)
                    {
                        observer.OnNext(cur);
                    }

                    if (index == identifierSequence.Count)
                    {
                        observer.OnCompleted();
                    }
                },(ex) =>
                {
                    observer.OnError(ex);
                }, () =>
                {
                    //When source observable is completed, return the remaining available items
                    while (index < identifierSequence.Count)
                    {
                        TId key = identifierSequence[index];
                        List<T> nextValues;
                        if (!buffer.TryGetValue(key, out nextValues) || nextValues.Count == 0)
                        {
                            //No values available
                            index++;
                            continue;
                        }

                        observer.OnNext(nextValues[nextValues.Count - 1]);

                        nextValues.RemoveAt(nextValues.Count - 1);
                        index++;
                    }

                    //Mark observable as completed
                    observer.OnCompleted();
                });
        });
    }

最佳答案

请注意,您的实现存在一些问题:

  1. 如果两个 'l' 来得早,一个就会被吞下,然后阻止整个序列。您的字典应该映射到一个集合,而不是单个项目。
  2. 没有 OnCompleted 消息。
  3. 多个订阅者可能会搞砸状态。试试这个(GetPatternMatchOriginal 是您的代码):

-

var stateMachine = src.GetPatternMatchOriginal(new int[] { 9, 3, 4, 4, 7 });

stateMachine.Take(3).Dump(); //Linqpad
stateMachine.Take(3).Dump(); //Linqpad

第一个输出是 h e l,第二个输出是 l o。它们都应该输出 h e l

此实现解决了这些问题,并且使用不可变数据结构也没有副作用:

public static class X
{
    public static IObservable<T> GetStateMachine(this IObservable<T> source, string identifierSequence)
    {
        //State is held in an anonymous type: 
        //  Index shows what character we're waiting on, 
        //  Buffer holds characters that have arrived that we aren't ready yet for
        //  Output holds characters that can be safely emitted.
        return source
            .Scan(new { Index = 0, Buffer = ImmutableDictionary<int, ImmutableList<T>>.Empty, Output = Enumerable.Empty<T>() },
            (state, item) =>
            {
                //Function to be called recursively upon receiving new item
                //If we can pattern match the first item, then it is moved into Output, and concatted recursively with the next possible item
                //Otherwise just return the inputs
                (int Index, ImmutableDictionary<int, ImmutableList<T>> Buffer, IEnumerable<T> Output) GetOutput(int index, ImmutableDictionary<int, ImmutableList<T>> buffer, IEnumerable<T> results)
                {
                    if (index == identifierSequence.Length)
                        return (index, buffer, results);

                    var key = identifierSequence[index];
                    if (buffer.ContainsKey(key) && buffer[key].Any())
                    {
                        var toOuptut = buffer[key][buffer[key].Count - 1];
                        return GetOutput(index + 1, buffer.SetItem(key, buffer[key].RemoveAt(buffer[key].Count - 1)), results.Concat(new[] { toOuptut }));
                    }
                    else
                        return (index, buffer, results);
                }

                //Before calling the recursive function, add the new item to the buffer
                var modifiedBuffer = state.Buffer.ContainsKey(item.Identifier)
                   ? state.Buffer
                   : state.Buffer.Add(item.Identifier, ImmutableList<T>.Empty);

                var remodifiedBuffer = modifiedBuffer.SetItem(item.Identifier, modifiedBuffer[item.Identifier].Add(item));

                var output = GetOutput(state.Index, remodifiedBuffer, Enumerable.Empty<T>());
                return new { Index = output.Index, Buffer = output.Buffer, Output = output.Output };
            })
            // Use Dematerialize/Notifications to detect and emit end of stream.
            .SelectMany(output =>
            {
                var notifications = output.Output
                    .Select(item => Notification.CreateOnNext(item))
                    .ToList();
                if (output.Index == identifierSequence.Length)
                    notifications.Add(Notification.CreateOnCompleted<T>());
                return notifications;
            })
            .Dematerialize();
    }
}

那么你可以这样调用它:

var stateMachine = src.GetStateMachine(new int[] { 9, 3, 4, 4, 7 });
stateMachine.Dump(); //LinqPad

src.OnNext(new T { Identifier = 4, Character = 'l' });
src.OnNext(new T { Identifier = 4, Character = 'l' });
src.OnNext(new T { Identifier = 7, Character = 'o' });
src.OnNext(new T { Identifier = 3, Character = 'e' });
src.OnNext(new T { Identifier = 9, Character = 'h' });

关于c# - 在 Reactive Extensions 中按预定义的顺序对 Observable 进行排序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45465193/

相关文章:

c# - 单元测试 ASP.NET MVC5 应用程序

c# - Linq to Sql - 日期时间格式 - YYYY-MMM(2009 年 3 月)

c# - SQL插入-删除表

c# - 使用 Rx( react 性扩展)观察 ObservableCollection 中的特定项目

.net - 如何使用 Socket 和 Reactive 扩展(Rx)从连接的客户端套接字获取接收到的消息缓冲区

system.reactive - 如果其他 Observable 发出映射函数,则转换 Observable

c# - 在 C# 中反序列化 JSON 而不创建类

iphone - 使用 iPhone 的排序描述符对核心数据位置变化进行排序

c# - 编辑合并排序以按 C# 降序排序

java - 无法将 int 添加到 ArrayList 的 ArrayList 中