我正在尝试以小块的形式将文本写入流,当我将 XML 流编写器指向输出流(它立即开始发送数据)时它可以正常工作,然后我尝试写入一些文本然后刷新它在流关闭之前不发送任何内容。
(defn data
"Download a 5MB file and parse it"
[]
(-> "http://www.cs.washington.edu/research/xmldatasets/data/tpc-h/orders.xml"
URL.
.openStream
xml/parse))
(defn send-stuff [request]
(condp = (:uri request)
"/text" (response/response
(ring-io/piped-input-stream
#(let [w (io/make-writer % {:encoding "UTF-8"})]
(.write w "start\n")
(.flush w)
(Thread/sleep 1000)
(.write w "done\n")
(.flush w))))
"/xml" (response/response
(ring-io/piped-input-stream
#(->> (io/make-writer % {:encoding "UTF-8"})
(xml/emit (data))
.flush)))))
(comment
(def server (jetty/run-jetty #'send-stuff {:port 8888 :join? false}))
(.stop server))
像这样用 curl 测试它:
curl localhost:8888/text
静静地坐在那里整整一秒钟,然后返回
start
done
我希望看到“开始”然后一秒后“完成”,而不是延迟一秒,然后两者都出现。
并使用
curl localhost:8888/xml
立即开始流式传输挖眼的 XML(抱歉其中隐藏了个人偏见;-)
-- 编辑 我已经确认问题出在码头输出缓冲区,因为如果我将该缓冲区设置得非常小,它就会消失:
(def server (jetty/run-jetty #'send-stuff {:output-buffer-size 1 :port 8888 :join? false}))
当然,在许多情况下,将输出缓冲区设置为 1 是一个坏主意。
最佳答案
.flush
您正在调用的不是用于 HTTP 响应的流,而是一个 output stream of the piped streams pair .
当您查看 source code of PipedOutputStream.flush()
您会注意到它只通知所有等待从连接的 PipedInputStream 读取的线程,并不意味着刷新到底层 HTTP 响应流。
行为上的差异是由响应数据大小引起的。如果您将示例更改为使用小型 XML 数据,则行为将相同:
(defn data
[]
(-> "<?xml version=\"1.0\" encoding=\"UTF-8\"?><a>1</a>"
(.getBytes)
(ByteArrayInputStream.)
(xml/parse)))
(defn send-stuff [request]
(condp = (:uri request)
"/text" (response/response
(ring-io/piped-input-stream
#(let [w (io/make-writer % {:encoding "UTF-8"})]
(.write w "start\n")
(.flush w)
(Thread/sleep 1000)
(.write w "done\n")
(.flush w))))
"/xml" (response/response
(ring-io/piped-input-stream
#(let [w (io/make-writer % {:encoding "UTF-8"})]
(xml/emit (data) w)
(.flush w)
(Thread/sleep 1000)
(xml/emit (data) w)
(.flush w))))))
调用 curl localhost:8888/xml
一秒钟后将仅显示整个响应:
<?xml version="1.0" encoding="UTF-8"?><a>1</a><?xml version="1.0" encoding="UTF-8"?><a>1</a>
您可以使用不同的流机制,您可以控制刷新 HTTP 响应流,例如使用阻塞队列:
(ns so43769408
(:require [ring.adapter.jetty :as jetty]
[clojure.java.io :as io]
[ring.util.response :as response]
[ring.core.protocols :as protocols])
(:import (java.io OutputStream)
(java.util.concurrent LinkedBlockingQueue)))
(extend-protocol protocols/StreamableResponseBody
LinkedBlockingQueue
(write-body-to-stream [output-queue _ ^OutputStream output-stream]
(with-open [out (io/writer output-stream)]
(loop [chunk (.take output-queue)]
(when-not (= chunk ::EOF)
(.write out (str chunk))
(.flush out)
(recur (.take output-queue)))))))
(defn send-stuff [request]
(response/response
(let [output-queue (LinkedBlockingQueue.)]
(future
(.put output-queue "start\n")
(Thread/sleep 1000)
(.put output-queue "end\n")
(.put output-queue ::EOF))
output-queue)))
(comment
(def server (jetty/run-jetty #'send-stuff {:port 8888 :join? false}))
(.stop server))
关于clojure - 为什么 xml/emit stream 会立即发出,而直接写入流却不会,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43769408/