(我之前有一个问题 here ,并假设我在访问 core.async 时不会遇到问题)。
给定如下输入数据:
(require '[clojure.core.async :as a])
(def input-data
[{:itm_na 1 :seq_no 1 :doc_img "this is a very long "}
{:itm_na 1 :seq_no 2 :doc_img "sentence from a mainframe "}
{:itm_na 1 :seq_no 3 :doc_img "system that was built before i was "}
{:itm_na 1 :seq_no 4 :doc_img "born."}
{:itm_na 2 :seq_no 1 :doc_img "this is a another very long "}
{:itm_na 2 :seq_no 2 :doc_img "sentence from the same mainframe "}
{:itm_na 3 :seq_no 1 :doc_img "Ok here we are again. "}
{:itm_na 3 :seq_no 2 :doc_img "The mainframe only had 40 char per field so"}
{:itm_na 3 :seq_no 3 :doc_img "they broke it into multiple rows "}
{:itm_na 3 :seq_no 4 :doc_img "which seems to be common"}
{:itm_na 3 :seq_no 5 :doc_img " for the time. "}
{:itm_na 3 :seq_no 6 :doc_img "thanks for your help."}])
partition-by
(如预期)将我的数据聚集到 seq 中(以便稍后折叠):
(count (partition-by :itm_na input-data ))
;;=> 3
但是,当我出于某种原因尝试使用 core.async 管道执行此操作时,它
似乎没有做同样的事情...我如何获得有状态传感器部分
partition-by
在异步管道中实际上保留状态吗?
(let
[source-chan (a/to-chan input-data)
target-chan (a/chan 100)
xf (comp (partition-by :itm_na))
]
(a/pipeline 1
target-chan
xf
source-chan)
(count (<!! (a/into [] target-chan))))
;;=>12
这应该是3?
奇怪的是,当我将 xf
绑定(bind)到如下所示的 channel 时,我得到了预期的结果。我不确定为什么 a/pipeline
的行为不同。
(let [xf (comp (partition-by :itm_na))
ch (a/chan 1 xf)]
(a/onto-chan ch input-data)
(count (<!! (a/into [] ch))))
=>3
从文档中...提到有状态位:
(clojure.repl/doc partition-by)
-------------------------
clojure.core/partition-by
([f] [f coll])
Applies f to each value in coll, splitting it each time f returns a
new value. Returns a lazy seq of partitions. Returns a stateful
transducer when no collection is provided.
最佳答案
这个特殊案例是briefly highlighted Rich Hickey 在他的演讲中说道:您不能将pipeline
与有状态转换器一起使用,这主要是因为pipeline
的并行性质。
关于clojure - core.async 与分区状态传感器不保持状态?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49146778/