c# - Rx groupby 直到条件改变

标签 c# system.reactive

我受困于 rx 和特定查询。 问题:

Many single update operations are produced by continuous stream. The operations can be insert or delete. I want to buffer those streams and perform few operations at the time, but it is really important to preserve the order. Additionally, operations should be buffered and done in sequences every X seconds

例子:

在:

insert-insert-insert-delete-delete-insert-delete-delete-delete-delete

输出:

insert(3)-delete(2)-insert(1)-delete(4)

我写了一个简单的应用程序来测试它,它或多或少像我想的那样工作,但它不遵守传入插入/删除的顺序

namespace RxTests
{
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;

internal class Program
{
    private static readonly Random Random = new Random();

    private static readonly CancellationTokenSource ProducerStopped = new CancellationTokenSource();

    private static readonly ISubject<UpdateOperation> operations = new Subject<UpdateOperation>();

    private static void Main(string[] args)
    {
        Console.WriteLine("Starting production");
        var producerScheduler = new EventLoopScheduler();
        var consumerScheduler = new EventLoopScheduler();
        var producer =
            Observable.Interval(TimeSpan.FromSeconds(2))
                      .SubscribeOn(producerScheduler)
                      .Subscribe(Produce, WriteProductionCompleted);
        var consumer =
            operations.ObserveOn(producerScheduler)
                      .GroupBy(operation => operation.Delete)
                      .SelectMany(observable => observable.Buffer(TimeSpan.FromSeconds(8), 50))
                      .SubscribeOn(consumerScheduler)
                      .Subscribe(WriteUpdateOperations);
        Console.WriteLine("Type any key to stop");
        Console.ReadKey();
        consumer.Dispose();
        producer.Dispose();
    }

    private static void Produce(long time)
    {
        var delete = Random.NextDouble() < 0.5;
        Console.WriteLine("Produce {0}, {1} at {2}", time + 1, delete, time);
        var idString = (time + 1).ToString(CultureInfo.InvariantCulture);
        var id = time + 1;
        operations.OnNext(
            new UpdateOperation(id, delete, idString, time.ToString(CultureInfo.InvariantCulture)));
    }

    private static void WriteProductionCompleted()
    {
        Console.WriteLine("Production completed");
        ProducerStopped.Cancel();
    }

    private static void WriteUpdateOperation(UpdateOperation updateOperation)
    {
        Console.WriteLine("Consuming {0}", updateOperation);
    }

    private static void WriteUpdateOperations(IList<UpdateOperation> updateOperation)
    {
        foreach (var operation in updateOperation)
        {
            WriteUpdateOperation(operation);
        }
    }

    private class UpdateOperation
    {
        public UpdateOperation(long id, bool delete, params string[] changes)
        {
            this.Id = id;
            this.Delete = delete;
            this.Changes = new List<string>(changes ?? Enumerable.Empty<string>());
        }

        public bool Delete { get; set; }

        public long Id { get; private set; }

        public IList<string> Changes { get; private set; }

        public override string ToString()
        {
            var stringBuilder = new StringBuilder("{UpdateOperation ");
            stringBuilder.AppendFormat("Id: {0}, Delete: {1}, Changes: [", this.Id, this.Delete);
            if (this.Changes.Count > 0)
            {
                stringBuilder.Append(this.Changes.First());
                foreach (var change in this.Changes.Skip(1))
                {
                    stringBuilder.AppendFormat(", {0}", change);
                }
            }

            stringBuilder.Append("]}");
            return stringBuilder.ToString();
        }
    }
}

任何人都可以帮助我进行正确的查询吗?

谢谢

更新 08.03.13(JerKimball 的建议)

以下几行是对 JerKimball 代码的小改动/添加,用于打印结果:

using(query.Subscribe(Print))
{
    Console.ReadLine();
    producer.Dispose();        
}

使用以下打印方法:

private static void Print(IObservable<IList<Operation>> operations)
{
    operations.Subscribe(Print);
}

private static void Print(IList<Operation> operations)
{
    var stringBuilder = new StringBuilder("[");
    if (operations.Count > 0)
    {
        stringBuilder.Append(operations.First());
        foreach (var item in operations.Skip(1))
        {
            stringBuilder.AppendFormat(", {0}", item);
        }
    }

    stringBuilder.Append("]");
    Console.WriteLine(stringBuilder);
 }

以及操作的以下字符串:

public override string ToString()
{
    return string.Format("{0}:{1}", this.Type, this.Seq);
}

顺序被保留,但是:

  • 我不确定是否在另一个订阅中订阅:它是否正确(这是我很久以前就有的一个问题,但我一直不清楚)?
  • 我总是在每个列表上有不超过两个元素(即使流产生两个以上具有相同类型的连续值)

最佳答案

我认为您可以通过混合使用 GroupByUntilDistinctUntilChangedBuffer 来获得您想要的:

这需要进行一些调整以适合您的示例代码,但查询(和概念)应该保持:

(编辑:doh - 错过了一点......)

void Main()
{
    var rnd = new Random();
    var fakeSource = new Subject<Operation>();
    var producer = Observable
        .Interval(TimeSpan.FromMilliseconds(1000))
        .Subscribe(i => 
            {
                var op = new Operation();
                op.Type = rnd.NextDouble() < 0.5 ? "insert" : "delete";
                fakeSource.OnNext(op);
            });    
    var singleSource = fakeSource.Publish().RefCount();

    var query = singleSource
        // We want to groupby until we see a change in the source
        .GroupByUntil(
               i => i.Type, 
               grp => singleSource.DistinctUntilChanged(op => op.Type))
        // then buffer up those observed events in the groupby window
        .Select(grp => grp.Buffer(TimeSpan.FromSeconds(8), 50));

    using(query.Subscribe(Console.WriteLine))
    {
        Console.ReadLine();
        producer.Dispose();        
    }
}

public class Operation { 
    private static int _cnt = 0;
    public Operation() { Seq = _cnt++; }
    public int Seq {get; set;}
    public string Type {get; set;}    
}

关于c# - Rx groupby 直到条件改变,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15269753/

相关文章:

c# - RX 中的冷可观察对象与普通可枚举对象之间的区别

c# - 监控 Windows 共享

c# - 正确实现 IDisposable 来清理 Rx 观察者和长期存在的 Observables?

java - RxJava : restart from the beginning on each subscription

c# - 如何从定时事件中恢复功能

c# - 复制 Entity Framework 对象

c# - Rx - 暂停 Observable.Interval

c# - 跟踪 Observable 中的(数量)观察者?

c# - 列表属性的初始化语法

c# - 收藏观察员