clojure - rxjava 和 clojure 异步之谜 : futures promises and agents, 哦天哪

标签 clojure reactive-programming netflix rx-java

对于这篇便条的长度,我提前表示歉意。我花了相当多的时间来缩短它,这是我能得到的尽可能小的。

我有一个谜团,非常感谢您的帮助。这个谜团来自于我在 Clojure 中编写的 rxjava observer 的行为,它是通过从在线示例抄袭的几个简单的 observable 来实现的。

一个可观察对象同步向其观察者的 onNext 处理程序发送消息,而我所谓的有原则的观察者的行为符合预期。

另一个可观察对象通过 Clojure future 在另一个线程上异步执行相同的操作。完全相同的观察者不会捕获发布到其 onNext 的所有事件;它似乎只是在尾部丢失了随机数量的消息。

在等待 promised onCompleted 的等待到期和等待发送到 的所有事件的到期之间存在有意的竞争>代理收集器。如果 promise 获胜,我预计 onCompleted 会看到 false,并且 agent 中的队列可能很短。如果代理获胜,我希望看到onCompletedtrue以及来自代理队列的所有消息。我不期望的一个结果是 onCompletedtrue 以及来自 agent 的短队列。但是,墨菲不 sleep ,这正是我所看到的。我不知道垃圾收集是否有问题,或者 Clojure 的 STM 内部排队,或者我的愚蠢,或者完全是其他原因。

我在这里按照其独立形式的顺序呈现源代码,以便可以通过lein repl直接运行它。有三个仪式需要解决:首先,leiningen 项目文件 project.clj,它声明了对 Netflix rxjava 0.9.0 版本的依赖:

(defproject expt2 "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "Eclipse Public License"
            :url "http://www.eclipse.org/legal/epl-v10.html"}
  :dependencies [[org.clojure/clojure               "1.5.1"]
                 [com.netflix.rxjava/rxjava-clojure "0.9.0"]]
  :main expt2.core)

现在,命名空间和 Clojure 要求以及 Java 导入:

(ns expt2.core
  (:require clojure.pprint)
  (:refer-clojure :exclude [distinct])
  (:import [rx Observable subscriptions.Subscriptions]))

最后,一个用于输出到控制台的宏:

(defmacro pdump [x]
  `(let [x# ~x]
     (do (println "----------------")
         (clojure.pprint/pprint '~x)
         (println "~~>")
         (clojure.pprint/pprint x#)
         (println "----------------")
         x#)))

最后,我的观察者。我使用代理来收集任何可观察对象的onNext发送的消息。我使用 atom 来收集潜在的 onError。我为 onCompleted 使用了 promise,以便观察者外部的消费者可以等待它。

(defn- subscribe-collectors [obl]
  (let [;; Keep a sequence of all values sent:
        onNextCollector      (agent [])
        ;; Only need one value if the observable errors out:
        onErrorCollector     (atom nil)
        ;; Use a promise for 'completed' so we can wait for it on
        ;; another thread:
        onCompletedCollector (promise)]
    (letfn [;; When observable sends a value, relay it to our agent"
            (collect-next      [item] (send onNextCollector (fn [state] (conj state item))))
            ;; If observable errors out, just set our exception;
            (collect-error     [excp] (reset!  onErrorCollector     excp))
            ;; When observable completes, deliver on the promise:
            (collect-completed [    ] (deliver onCompletedCollector true))
            ;; In all cases, report out the back end with this:
            (report-collectors [    ]
              (pdump
               ;; Wait for everything that has been sent to the agent
               ;; to drain (presumably internal message queues):
               {:onNext      (do (await-for 1000 onNextCollector)
                                 ;; Then produce the results:
                                 @onNextCollector)
                ;; If we ever saw an error, here it is:
                :onError     @onErrorCollector
                ;; Wait at most 1 second for the promise to complete;
                ;; if it does not complete, then produce 'false'.
                ;; I expect if this times out before the agent
                ;; times out to see an 'onCompleted' of 'false'.
                :onCompleted (deref onCompletedCollector 1000 false)
                }))]
      ;; Recognize that the observable 'obl' may run on another thread:
      (-> obl
          (.subscribe collect-next collect-error collect-completed))
      ;; Therefore, produce results that wait, with timeouts, on both
      ;; the completion event and on the draining of the (presumed)
      ;; message queue to the agent.
      (report-collectors))))

现在,这是一个同步可观察对象。它将 25 条消息注入(inject)其观察者的 onNext 喉咙,然后调用他们的 onCompleted 。

(defn- customObservableBlocking []
  (Observable/create
    (fn [observer]                       ; This is the 'subscribe' method.
      ;; Send 25 strings to the observer's onNext:
      (doseq [x (range 25)]
        (-> observer (.onNext (str "SynchedValue_" x))))
      ; After sending all values, complete the sequence:
      (-> observer .onCompleted)
      ; return a NoOpSubsription since this blocks and thus
      ; can't be unsubscribed (disposed):
      (Subscriptions/empty))))

我们将观察者订阅到这个可观察的:

;;; The value of the following is the list of all 25 events:
(-> (customObservableBlocking)
    (subscribe-collectors))

它按预期工作,我们在控制台上看到以下结果

{:onNext (do (await-for 1000 onNextCollector) @onNextCollector),
 :onError @onErrorCollector,
 :onCompleted (deref onCompletedCollector 1000 false)}
~~>
{:onNext
 ["SynchedValue_0"
  "SynchedValue_1"
  "SynchedValue_2"
  "SynchedValue_3"
  "SynchedValue_4"
  "SynchedValue_5"
  "SynchedValue_6"
  "SynchedValue_7"
  "SynchedValue_8"
  "SynchedValue_9"
  "SynchedValue_10"
  "SynchedValue_11"
  "SynchedValue_12"
  "SynchedValue_13"
  "SynchedValue_14"
  "SynchedValue_15"
  "SynchedValue_16"
  "SynchedValue_17"
  "SynchedValue_18"
  "SynchedValue_19"
  "SynchedValue_20"
  "SynchedValue_21"
  "SynchedValue_22"
  "SynchedValue_23"
  "SynchedValue_24"],
 :onError nil,
 :onCompleted true}
----------------

这是一个异步可观察对象,它仅在 future 的线程上执行完全相同的操作:

(defn- customObservableNonBlocking []
  (Observable/create
    (fn [observer]                       ; This is the 'subscribe' method
      (let [f (future
                ;; On another thread, send 25 strings:
                (doseq [x (range 25)]
                  (-> observer (.onNext (str "AsynchValue_" x))))
                ; After sending all values, complete the sequence:
                (-> observer .onCompleted))]
        ; Return a disposable (unsubscribe) that cancels the future:
        (Subscriptions/create #(future-cancel f))))))

;;; For unknown reasons, the following does not produce all 25 events:
(-> (customObservableNonBlocking)
    (subscribe-collectors))

但是,令人惊讶的是,我们在控制台上看到的是:true 代表 onCompleted,这意味着 promise 没有超时;但只有一些异步消息。我们看到的实际消息数量因运行而异,这意味着存在一些并发现象。感谢线索。

----------------
{:onNext (do (await-for 1000 onNextCollector) @onNextCollector),
 :onError @onErrorCollector,
 :onCompleted (deref onCompletedCollector 1000 false)}
~~>
{:onNext
 ["AsynchValue_0"
  "AsynchValue_1"
  "AsynchValue_2"
  "AsynchValue_3"
  "AsynchValue_4"
  "AsynchValue_5"
  "AsynchValue_6"],
 :onError nil,
 :onCompleted true}
----------------

最佳答案

代理上的 await-for 意味着阻塞当前线程,直到所有操作因此分派(dispatch) 远(从这个线程或代理)到代理已经发生,这意味着可能会发生在您的等待结束后,仍然有其他一些线程可以向代理发送消息,即你的情况发生了什么。在代理上的等待结束并且您在映射中的 :onNext 键中取消引用其值后,您将等待完成的 promise ,该 promise 在等待后结果为 true,但在平均值中一些其他消息被发送到代理以收集到向量中的时间。

您可以通过将 :onCompleted 键作为映射中的第一个键来解决此问题,这基本上意味着等待完成,然后等待代理,因为到那时就不再有 send 对代理的调用可以在 as 已经收到 onCompleted 之后发生。

{:onCompleted (deref onCompletedCollector 1000 false)
 :onNext      (do (await-for 0 onNextCollector)
                                 @onNextCollector)
 :onError     @onErrorCollector
 }

关于clojure - rxjava 和 clojure 异步之谜 : futures promises and agents, 哦天哪,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16868460/

相关文章:

javascript - 延迟除特定项目外的所有项目

android - 如何在 Android 和 iOS(例如 Netflix 和 Iflix)中缓冲和播放视频

Spring cloud with Eureka - Eureka Web UI URL

c++ - 如何在 rxcpp 中处理请求/响应流

jboss - Gradle - 如何从神奇的缓存中提取依赖项?

macos - Mac OS X 上的 Clojure 编辑器/IDE 建议

clojure - 将 clojurescript 的 re-graph graphql 客户端连接到 lacinia-pedestal graphql 服务器

deployment - Clojure/环 : How can I integrate my clojure app with a java build process that is out of my control?

servlets - Compojure:从缺少 Content-Type header 的 POST 请求中获取正文

haskell - `valueB` 是如何工作的?它总是返回相同的值?