clojure - 为什么 xml/emit stream 会立即发出,而直接写入流却不会

标签 clojure

我正在尝试以小块的形式将文本写入流,当我将 XML 流编写器指向输出流(它立即开始发送数据)时它可以正常工作,然后我尝试写入一些文本然后刷新它在流关闭之前不发送任何内容。

(defn data
  "Download a 5MB file and parse it"
  []
  (-> "http://www.cs.washington.edu/research/xmldatasets/data/tpc-h/orders.xml"
      URL.
      .openStream
      xml/parse))

(defn send-stuff [request]
  (condp = (:uri request)
    "/text" (response/response
             (ring-io/piped-input-stream
              #(let [w (io/make-writer % {:encoding "UTF-8"})]
                 (.write w "start\n")
                 (.flush w)
                 (Thread/sleep 1000)
                 (.write w "done\n")
                 (.flush w))))
    "/xml"  (response/response
             (ring-io/piped-input-stream
              #(->> (io/make-writer % {:encoding "UTF-8"})
                    (xml/emit (data))
                    .flush)))))

(comment
  (def server (jetty/run-jetty #'send-stuff {:port 8888 :join? false}))
  (.stop server))

像这样用 curl 测试它:

curl localhost:8888/text

静静地坐在那里整整一秒钟,然后返回

start
done

我希望看到“开始”然后一秒后“完成”,而不是延迟一秒,然后两者都出现。

并使用

curl localhost:8888/xml

立即开始流式传输挖眼的 XML(抱歉其中隐藏了个人偏见;-)

-- 编辑 我已经确认问题出在码头输出缓冲区,因为如果我将该缓冲区设置得非常小,它就会消失:

(def server (jetty/run-jetty #'send-stuff {:output-buffer-size 1 :port 8888 :join? false}))

当然,在许多情况下,将输出缓冲区设置为 1 是一个坏主意

最佳答案

.flush您正在调用的不是用于 HTTP 响应的流,而是一个 output stream of the piped streams pair .

当您查看 source code of PipedOutputStream.flush() 您会注意到它只通知所有等待从连接的 PipedInputStream 读取的线程,并不意味着刷新到底层 HTTP 响应流。

行为上的差异是由响应数据大小引起的。如果您将示例更改为使用小型 XML 数据,则行为将相同:

(defn data
  []
  (-> "<?xml version=\"1.0\" encoding=\"UTF-8\"?><a>1</a>"
      (.getBytes)
      (ByteArrayInputStream.)
      (xml/parse)))


(defn send-stuff [request]
  (condp = (:uri request)
    "/text" (response/response
              (ring-io/piped-input-stream
                #(let [w (io/make-writer % {:encoding "UTF-8"})]
                   (.write w "start\n")
                   (.flush w)
                   (Thread/sleep 1000)
                   (.write w "done\n")
                   (.flush w))))
    "/xml"  (response/response
              (ring-io/piped-input-stream
                #(let [w (io/make-writer % {:encoding "UTF-8"})]
                   (xml/emit (data) w)
                   (.flush w)
                   (Thread/sleep 1000)
                   (xml/emit (data) w)
                   (.flush w))))))

调用 curl localhost:8888/xml一秒钟后将仅显示整个响应:

<?xml version="1.0" encoding="UTF-8"?><a>1</a><?xml version="1.0" encoding="UTF-8"?><a>1</a>

您可以使用不同的流机制,您可以控制刷新 HTTP 响应流,例如使用阻塞队列:

(ns so43769408
  (:require [ring.adapter.jetty :as jetty]
            [clojure.java.io :as io]
            [ring.util.response :as response]
            [ring.core.protocols :as protocols])
  (:import (java.io OutputStream)
           (java.util.concurrent LinkedBlockingQueue)))

(extend-protocol protocols/StreamableResponseBody
  LinkedBlockingQueue
  (write-body-to-stream [output-queue _ ^OutputStream output-stream]
    (with-open [out (io/writer output-stream)]
      (loop [chunk (.take output-queue)]
        (when-not (= chunk ::EOF)
          (.write out (str chunk))
          (.flush out)
          (recur (.take output-queue)))))))


(defn send-stuff [request]
  (response/response
    (let [output-queue (LinkedBlockingQueue.)]
      (future
        (.put output-queue "start\n")
        (Thread/sleep 1000)
        (.put output-queue "end\n")
        (.put output-queue ::EOF))
      output-queue)))

(comment
  (def server (jetty/run-jetty #'send-stuff {:port 8888 :join? false}))
  (.stop server))

关于clojure - 为什么 xml/emit stream 会立即发出,而直接写入流却不会,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43769408/

相关文章:

emacs - 将 emacs 与 paredit 一起使用时,如何将结束括号添加到 clojure 表单?

Clojure:难以成功地将协议(protocol)应用于 2 种类型

clojure - 从 REPL 中调用黑色页面

clojure - Clojure 中的关联

clojure - 一种语言是 LISP 的方言是什么意思?

clojure - 如何在大型 Clojure 程序中实际使用 core.logic?

Clojure 的映射 : are keys and vals in same order?

javascript - 当用户在输入字段中输入 "ENTER"时停止页面重新加载 - ClojureScript

clojure - 计算集合中元素的连续出现次数

clojure - 客观思考与功能思考