asynchronous - Clojurescript:使用核心/异步 channel 以 block 的形式处理请求

标签 asynchronous clojure clojurescript channel

我有以下情况:

我使用一些服务来检索一些数据并将其传递给我的输入。

有了一些输入参数,我需要对上述服务执行 N 个请求,收集输出并为每个输出执行一些 CPU 密集型任务。

我正在尝试使用核心/异步 channel 来实现此目的。

这是我的尝试(示意性地),它有点有效,但它的行为并不像我希望的那样。 对于任何有关如何改进它的提示,我们将不胜感激。

(defn produce-inputs
  [in-chan inputs]
  (let input-names-seq (map #(:name %) inputs)]
    (doseq [input-name input-names-seq]
      (async/go
        (async/>! in-chan input-name)))))

(defn consume
  [inputs]
  (let [in-chan (async/chan 1)
        out-chan (async/chan 1)]
        (do
          (produce-inputs in-chan inputs)
          (async/go-loop []
                   (let [input-name (async/<! in-chan)]
                     (do
                         (retrieve-resource-from-service input-name 
                                                         ; response handler
                                                         (fn [resp]
                                                           (async/go
                                                             (let [result (:result resp)]
                                                               (async/>! out-chan result)))))
                         (when input-name
                           (recur)))))

     ; read from out-chan and do some heavy work for each entry
     (async/go-loop []
                   (let [result (async/<! out-chan)]
                         (do-some-cpu-heavy-work result))))))

; entry point
(defn run
  [inputs]
  (consume inputs))

有没有办法更新它,以便每时每刻都有不超过五个服务请求(retrieve-resource-from-service)事件?

如果我的解释不清楚,请提问,我会更新。

最佳答案

您可以创建另一个 channel 作为 token 桶来限制请求速率。

See this link有关使用 token 桶进行每秒速率限制的示例。

要限制同时请求的数量,您可以执行以下操作:

(defn consume [inputs]
  (let [in-chan (async/chan 1)
        out-chan (async/chan 1)
        bucket (async/chan 5)]
    ;; ...
    (dotimes [_ 5] (async/put! bucket :token))
    (async/go-loop []
      (let [input-name (async/<! in-chan)
            token (async/<! bucket)]
        (retrieve-resource-from-service
          input-name 
          ; response handler
          (fn [resp]
            (async/go
              (let [result (:result resp)]
                (async/>! out-chan result)
                (async/>! bucket token)))))
        (when input-name
          (recur))))
    ;; ...
    ))

创建一个新 channel bucket,并将五个项目放入其中。在发出请求之前,我们从存储桶中取出一个 token ,并在请求完成后将其放回原处。如果bucket channel 中没有 token ,我们必须等到其中一个请求完成。

注意:这只是代码的草图,您可能需要更正它。特别是,如果您的 retrieve-resource-from-service 函数中有任何错误处理程序,您也应该在发生错误时放回 token ,以避免最终的死锁。

关于asynchronous - Clojurescript:使用核心/异步 channel 以 block 的形式处理请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50377380/

相关文章:

clojure - 在 Clojurescript 中使用 -?>?

asynchronous - 串的Rust future

javascript - 如何从javascript同步调用indexeddb方法

c# - 在循环中添加一个短暂的延迟可以防止它无限循环。为什么?

javascript - Node.js 中 sqlite 数据库的异步 HTTP 请求

clojure - 如何更新 Clojure 中的向量项?

Clojure 映射到矢量

reactjs - 渲染后的 Om 回调(在更改应用程序状态后聚焦元素)

ruby - 有没有一种好方法可以让 Ruby 通过某种类型的桥与 Clojure 对话,反之亦然?

iframe - 无法嵌入某些YouTube视频,iframe会显示 “the video contains content from WMG. it is restricted from playback on certain sites”