javascript - RxJS:可观察对象和单个观察者的递归列表

标签 javascript reactive-programming reactive-extensions-js rxjs

我在处理可观察量的递归链时遇到了一些麻烦。

我正在使用 RxJS,它目前的版本是 1.0.10621,包含最基本的 Rx 功能,以及用于 jQuery 的 Rx。

让我为我的问题介绍一个示例场景: 我正在轮询 Twitter search API (JSON 响应)用于包含特定关键字的推文/更新。响应还包括一个“refresh_url”,应该使用它来生成后续请求。对该后续请​​求的响应将再次包含一个新的 refresh_url 等。

Rx.jQuery 允许我让 Twitter 搜索 API 调用一个可观察的事件,它产生一个 onNext 然后完成。到目前为止我尝试的是让 onNext 处理程序记住 refresh_url 并在 onCompleted 处理程序中使用它来为下一个请求生成新的可观察对象和相应的观察者。这样一来,一对 observable + observer 就会无限期地跟在另一对之后。

这种方法的问题是:

  1. 后续的 observable/observer 在它们的前辈还没有被处理掉的时候就已经存活了。

  2. 我必须做很多麻烦的簿记工作来维护对当前活着的观察者的有效引用,实际上可以有两个。 (一个在 onCompleted 中,另一个在其生命周期的其他地方)当然,需要这个引用来取消订阅/处理观察者。 簿记的替代方法是通过“仍在运行?” bool 值来实现副作用,正如我在我的示例中所做的那样。

示例代码:

            running = true;
            twitterUrl = "http://search.twitter.com/search.json";
            twitterQuery = "?rpp=10&q=" + encodeURIComponent(text);
            twitterMaxId = 0; //actually twitter ignores its since_id parameter

            newTweetObserver = function () {
                return Rx.Observer.create(
                        function (tweet) {
                            if (tweet.id > twitterMaxId) {
                                twitterMaxId = tweet.id;
                                displayTweet(tweet);
                            }
                        }
                    );
            }

            createTwitterObserver = function() {
                twitterObserver = Rx.Observer.create(
                        function (response) {
                            if (response.textStatus == "success") {
                                var data = response.data;
                                if (data.error == undefined) {
                                    twitterQuery = data.refresh_url;
                                    var tweetObservable;
                                    tweetObservable = Rx.Observable.fromArray(data.results.reverse());
                                    tweetObservable.subscribe(newTweetObserver());
                                }
                            }
                        },
                        function(error) { alert(error); },
                        function () {
                            //create and listen to new observer that includes a delay 
                            if (running) {
                                twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000);
                                twitterObservable.subscribe(createTwitterObserver());
                            }
                        } 
                    );
                return twitterObserver;
            }
            twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery);
            twitterObservable.subscribe(createTwitterObserver());

不要被从请求到推文的双层可观察对象/观察者所迷惑。 我的示例主要涉及第一层:从 Twitter 请求数据。如果在解决这个问题时,第二层(将响应转换为推文)可以与第一层合二为一,那就太棒了;但我认为这是完全不同的事情。现在。

Erik Meijer 向我指出了 Expand 运算符(参见下面的示例),并建议 Join patterns作为替代方案。

var ys = Observable.Expand
(new[]{0}.ToObservable() // initial sequence
                   , i => ( i == 10 ? Observable.Empty<int>() // terminate
         : new[]{i+1}.ToObservable() // recurse
 )
);

ys.ToArray().Select(a => string.Join(",", a)).DumpLive();

这应该可以复制粘贴到 LINQPad 中。它假定单例可观察对象并产生一个最终观察者。

所以我的问题是:如何在 RxJS 中最好地执行扩展技巧?

编辑:
扩展运算符可能可以按照 this thread 中所示的方式实现.但是需要 generators (我只有 JS < 1.6)。
遗憾RxJS 2.0.20304-beta没有实现 Extend 方法。

最佳答案

因此,我将尝试以与您略有不同的方式解决您的问题,并采取一些您可以更轻松地解决的问题。

所以我不能说的一件事是您是否正在尝试执行以下步骤

  • 获取第一个推文列表,其中包含下一个 url
  • 收到推文列表后,onNext 当前观察者并获取下一组推文
    • 无限期地这样做

或者是否有用户操作(获取更多/滚动到底部)。无论哪种方式,它确实是同一个问题。不过,我可能没有正确阅读您的问题。这是答案。

function getMyTweets(headUrl) {
    return Rx.Observable.create(function(observer) {

        innerRequest(headUrl);
        function innerRequest(url) {
            var next = '';

            // Some magic get ajax function
            Rx.get(url).subscribe(function(res) {
                observer.onNext(res);
                next = res.refresh_url;
            },
            function() {
                // Some sweet handling code
                // Perhaps get head?
            },
            function() {
                innerRequest(next);
            });
        }
    });
}

这可能不是您要的答案。如果没有,抱歉!


编辑:查看您的代码后,您似乎想要将结果作为数组并对其进行观察。

// From the results perform a select then a merge (if ordering does not matter).
getMyTweets('url')
    .selectMany(function(data) {
        return Rx.Observable.fromArray(data.results.reverse());
    });

// Ensures ordering
getMyTweets('url')
    .select(function(data) {
        return Rx.Observable.fromArray(data.results.reverse());
    })
    .concat();

关于javascript - RxJS:可观察对象和单个观察者的递归列表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9898918/

相关文章:

javascript - 第二个控制台日志如何返回null?

android - RXJava Android - 创建另一个可观察对象所需的可观察结果

reactive-programming - 捕获可观察量之间的循环依赖

javascript - RxJS 中的同步性

javascript - RxJS Interval 无延迟

javascript - 替代成百上千的 if 语句

javascript - 使用 Javascript 将行添加到 HTML 表并保存在本地驱动器上

javascript - AJAX动态页面更新

c# - C#/WinRT 中的全局状态对象

java - 在哪里使用 Mono/Flux?