c# - 使用 Reactive Extensions 在 TextChanged 上搜索

标签 c# .net system.reactive reactive-programming

我试图在包含 10000 多条记录的数据库表上实现即时搜索。

搜索在搜索文本框内的文本发生变化时开始,当搜索框变空时我想调用一个不同的方法来加载所有数据。

此外,如果用户在加载另一个搜索结果时更改搜索字符串,则应停止加载这些结果以支持新搜索。

我像下面的代码一样实现了它,但我想知道是否有更好或更干净的方法来使用 Rx( react 性扩展)运算符,我觉得在第一个可观察对象的订阅方法中创建第二个可观察对象是命令式多于声明式,if 语句也是如此。

var searchStream = Observable.FromEventPattern(s => txtSearch.TextChanged += s, s => txtSearch.TextChanged -= s)
    .Throttle(TimeSpan.FromMilliseconds(300))
    .Select(evt =>
        {
            var txtbox = evt.Sender as TextBox;
            return txtbox.Text;
        }
    );

searchStream
    .DistinctUntilChanged()
    .ObserveOn(SynchronizationContext.Current)
    .Subscribe(searchTerm =>
        {
            this.parties.Clear();
            this.partyBindingSource.ResetBindings(false);
            long partyCount;
            var foundParties = string.IsNullOrEmpty(searchTerm) ? partyRepository.GetAll(out partyCount) : partyRepository.SearchByNameAndNotes(searchTerm);

            foundParties
                .ToObservable(Scheduler.Default)
                .TakeUntil(searchStream)
                .Buffer(500)
                .ObserveOn(SynchronizationContext.Current)
                .Subscribe(searchResults =>
                    {
                        this.parties.AddRange(searchResults);
                        this.partyBindingSource.ResetBindings(false);
                    }
                    , innerEx =>
                    {

                    }
                    , () => { }
                );
        }
        , ex =>
        {
        }
        , () =>
        {

        }
    );

SearchByNameAndNotes方法只返回 IEnumerable<Party>通过从数据读取器读取数据来使用 SQLite。

最佳答案

我想你想要这样的东西。编辑:从您的评论中,我看到您有一个同步存储库 API - 我将保留异步版本,然后添加一个同步版本。注释内嵌:

异步版本库

异步存储库接口(interface)可能是这样的:

public interface IPartyRepository
{
    Task<IEnumerable<Party>> GetAllAsync(out long partyCount);
    Task<IEnumerable<Party>> SearchByNameAndNotesAsync(string searchTerm);
}

然后我将查询重构为:

var searchStream = Observable.FromEventPattern(
    s => txtSearch.TextChanged += s,
    s => txtSearch.TextChanged -= s)
    .Select(evt => txtSearch.Text) // better to select on the UI thread
    .Throttle(TimeSpan.FromMilliseconds(300))
    .DistinctUntilChanged()
    // placement of this is important to avoid races updating the UI
    .ObserveOn(SynchronizationContext.Current)
    .Do(_ =>
    {
        // I like to use Do to make in-stream side-effects explicit
        this.parties.Clear();
        this.partyBindingSource.ResetBindings(false);
    })
    // This is "the money" part of the answer:
    // Don't subscribe, just project the search term
    // into the query...
    .Select(searchTerm =>
    {
        long partyCount;
        var foundParties = string.IsNullOrEmpty(searchTerm)
            ? partyRepository.GetAllAsync(out partyCount)
            : partyRepository.SearchByNameAndNotesAsync(searchTerm);

        // I assume the intention of the Buffer was to load
        // the data into the UI in batches. If so, you can use Buffer from nuget
        // package Ix-Main like this to get IEnumerable<T> batched up
        // without splitting it up into unit sized pieces first
        return foundParties
            // this ToObs gets us into the monad
            // and returns IObservable<IEnumerable<Party>>
            .ToObservable()
            // the ToObs here gets us into the monad from
            // the IEnum<IList<Party>> returned by Buffer
            // and the SelectMany flattens so the output
            // is IObservable<IList<Party>>
            .SelectMany(x => x.Buffer(500).ToObservable())
            // placement of this is again important to avoid races updating the UI
            // erroneously putting it after the Switch is a very common bug
            .ObserveOn(SynchronizationContext.Current); 
    })
    // At this point we have IObservable<IObservable<IList<Party>>
    // Switch flattens and returns the most recent inner IObservable,
    // cancelling any previous pending set of batched results
    // superceded due to a textbox change
    // i.e. the previous inner IObservable<...> if it was incomplete
    // - it's the equivalent of your TakeUntil, but a bit neater
    .Switch() 
    .Subscribe(searchResults =>
    {
        this.parties.AddRange(searchResults);
        this.partyBindingSource.ResetBindings(false);
    },
    ex => { },
    () => { });

同步版本库

同步存储库接口(interface)可能是这样的:

public interface IPartyRepository
{
    IEnumerable<Party> GetAll(out long partyCount);
    IEnumerable<Party> SearchByNameAndNotes(string searchTerm);
}

就个人而言,我不建议像这样同步的存储库接口(interface)。为什么?它通常会执行 IO,因此您会浪费地阻塞线程。

您可能会说客户端可以从后台线程调用,或者您可以将他们的调用包装在任务中 - 但我认为这不是正确的方法。

  • 客户“不知道”你要屏蔽;契约(Contract)中没有说明
  • 它应该是处理实现的异步方面的存储库 - 毕竟,如何最好地实现这一点只有存储库实现者最清楚。

不管怎样,接受上面的,一种实现方式是这样的(当然和async版本大体相似所以我只标注了不同点):

var searchStream = Observable.FromEventPattern(
    s => txtSearch.TextChanged += s,
    s => txtSearch.TextChanged -= s)
    .Select(evt => txtSearch.Text)
    .Throttle(TimeSpan.FromMilliseconds(300))
    .DistinctUntilChanged()
    .ObserveOn(SynchronizationContext.Current)
    .Do(_ =>
    {
        this.parties.Clear();
        this.partyBindingSource.ResetBindings(false);
    })       
    .Select(searchTerm =>
        // Here we wrap the synchronous repository into an
        // async call. Note it's simply not enough to call
        // ToObservable(Scheduler.Default) on the enumerable
        // because this can actually still block up to the point that the
        // first result is yielded. Doing as we have here,
        // we guarantee the UI stays responsive
        Observable.Start(() =>
        {
            long partyCount;
            var foundParties = string.IsNullOrEmpty(searchTerm)
                ? partyRepository.GetAll(out partyCount)
                : partyRepository.SearchByNameAndNotes(searchTerm);

            return foundParties;
        }) // Note you can supply a scheduler, default is Scheduler.Default
        .SelectMany(x => x.Buffer(500).ToObservable())
        .ObserveOn(SynchronizationContext.Current))
    .Switch()
    .Subscribe(searchResults =>
    {
        this.parties.AddRange(searchResults);
        this.partyBindingSource.ResetBindings(false);
    },
    ex => { },
    () => { });  

关于c# - 使用 Reactive Extensions 在 TextChanged 上搜索,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22873541/

相关文章:

c# - "partial"在此上下文中使用 SonarQube 5.2 是无偿的

c# - 更新 .NET Web 服务以使用 TLS 1.2

c# - 如何在数据表中添加图像?

wpf - Rx 响应式(Reactive)扩展 Observeondispatcher 单元测试错误 : The current thread has no Dispatcher associated with it

c# - SynchronizationContext 和 TaskScheduler 之间的概念区别是什么

c# - 无论 .NET 版本如何,使用相同种子第 n 次调用 Random.Next() 是否总是产生相同的数字?

c# - 同步框架 - 跨域文件同步

c# - 如何将计算机音频传递给程序以进行可视化?

c# - 我可以对 IQueryable 和 IEnumerable 执行 Contract.Ensures 吗?

javascript - 如何创建一个可以在单项和批处理模式之间切换的 Rx (RxJS) 流?