clojure - core.async 与分区状态传感器不保持状态?

标签 clojure core.async

(我之前有一个问题 here ,并假设我在访问 core.async 时不会遇到问题)

给定如下输入数据:

(require '[clojure.core.async :as a])

(def input-data
  [{:itm_na 1 :seq_no 1  :doc_img "this is a very long "}
   {:itm_na 1 :seq_no 2  :doc_img "sentence from a mainframe "}
   {:itm_na 1 :seq_no 3  :doc_img "system that was built before i was "}
   {:itm_na 1 :seq_no 4  :doc_img "born."}
   {:itm_na 2 :seq_no 1  :doc_img "this is a another very long "}
   {:itm_na 2 :seq_no 2  :doc_img "sentence from the same mainframe "}
   {:itm_na 3 :seq_no 1  :doc_img "Ok here we are again. "}
   {:itm_na 3 :seq_no 2  :doc_img "The mainframe only had 40 char per field so"}
   {:itm_na 3 :seq_no 3  :doc_img "they broke it into multiple rows "}
   {:itm_na 3 :seq_no 4  :doc_img "which seems to be common"}
   {:itm_na 3 :seq_no 5  :doc_img " for the time. "}
   {:itm_na 3 :seq_no 6  :doc_img "thanks for your help."}])

partition-by (如预期)将我的数据聚集到 seq 中(以便稍后折叠):

(count (partition-by :itm_na input-data ))
;;=> 3

但是,当我出于某种原因尝试使用 core.async 管道执行此操作时,它 似乎没有做同样的事情...我如何获得有状态传感器部分 partition-by 在异步管道中实际上保留状态吗?

(let
    [source-chan (a/to-chan input-data)
     target-chan (a/chan 100)
     xf (comp (partition-by :itm_na))
     ]
  (a/pipeline 1
              target-chan
              xf
              source-chan)
  (count (<!! (a/into [] target-chan))))

;;=>12

这应该是3?

奇怪的是,当我将 xf 绑定(bind)到如下所示的 channel 时,我得到了预期的结果。我不确定为什么 a/pipeline 的行为不同。

(let [xf (comp (partition-by :itm_na))
      ch (a/chan 1 xf)]
  (a/onto-chan ch input-data)
  (count (<!! (a/into [] ch))))
=>3

从文档中...提到有状态位:

(clojure.repl/doc partition-by) 
-------------------------
clojure.core/partition-by
([f] [f coll])
  Applies f to each value in coll, splitting it each time f returns a
   new value.  Returns a lazy seq of partitions.  Returns a stateful
   transducer when no collection is provided.

最佳答案

这个特殊案例是briefly highlighted Rich Hickey 在他的演讲中说道:您不能将pipeline 与有状态转换器一起使用,这主要是因为pipeline 的并行性质。

关于clojure - core.async 与分区状态传感器不保持状态?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49146778/

相关文章:

clojure - 使用 clojure 作为脚本语言时如何添加 Java 实例作为上下文?

clojure - Core.async:从 promise-chans 集合中获取所有值

clojure - 使用 Clojure 的 core.async 使用文件内容

clojure - 如何在 Clojure 中将序列拆分为每 3 个相邻元素?

testing - 从用于生产的 boot-clj 项目中排除测试源

macros - 旁边不需要空格的 Clojure 宏

clojure - 在 Clojure 中将数字从 10 进制转换为另一个进制

clojure - 使用 core.async pub/sub 时如何避免丢失项目?

clojurescript - 不关闭浏览器 channel 的后果?