java - 克洛贾尔/Java : Most effective method for minimizing bandwidth consumption when performing complex operations on a stream of Amazon S3 data

标签 java clojure amazon-s3 bufferedreader lazy-sequences

我正在使用 BufferedReader 执行对象的流式读取。

我需要用这个对象做两件事:

  1. 将其传递给 SuperCSV csv 阅读器
  2. 获取原始行并将它们保存在(Clojure)惰性序列中

目前,我必须使用两种不同的 BufferedReader:一种作为 SuperCSV CSV 阅读器类的参数,另一种用于初始化原始行的惰性序列。我有效地下载了两次 S3 对象,这既昂贵又缓慢。

我的一位同事指出,我正在寻找类似于 Unix“tee”命令的东西。可以以某种方式“拆分”、下载一大块数据并将副本传递给延迟序列和 csv 阅读器功能的 BufferedReader 将很有用。

我目前还在研究是否可以将惰性序列包装在 BufferedReader 中并将 that 传递给 super csv。在将非常大的惰性序列传递给多个消费者时,我遇到了一些 Java 堆空间问题,所以我有点担心采用这个解决方案。

另一种解决方案是将文件下载到本地,然后在该文件上打开两个流。这消除了流背后的原始动机:允许在数据开始到达时立即开始处理文件。

最终的解决方案是实现我自己的 CSV 阅读器,它会返回已解析的 CSV 和原始未解析的行,这也是我仅在没有其他方法时才会考虑的解决方案。如果您使用了一个非常可靠的 CSV 阅读器,它可以返回已解析 CSV 数据的 Java 哈希和原始未解析行,请告诉我!

谢谢!

最佳答案

我倾向于从网络创建一系列行,然后将其交给需要处理该序列的许多进程;持久数据结构很酷。在需要将字符串序列转换为可以交给 SuperCSV api 的阅读器的情况下,这似乎可行:

(import '[java.io Reader StringReader])

(defn concat-reader
  "Returns a Reader that reads from a sequence of strings."
  [lines]
  (let [srs (atom (map #(StringReader. %) lines))]
    (proxy [Reader] []
      (read 
        ([] 
          (let [c (.read (first @srs))]
            (if (and (neg? c) (swap! srs next))
              (.read this)
              c)))
        ([cbuf] 
          (.read this cbuf 0 (count cbuf)))
        ([cbuf off len]
          (let [actual (.read (first @srs) cbuf off len)]
            (if (and (neg? actual) (swap! srs next))
              (.read this cbuf off len)
              actual))))
      (close [] ))))

例如

user=> (def r (concat-reader ["foo" "bar"]))
#'user/r
user=> (def cbuf (char-array 2))
#'user/cbuf
user=> (.read r cbuf)
2
user=> (seq cbuf)
(\f \o)
user=> (char (.read r))
\o
user=> (char (.read r))
\b

关于java - 克洛贾尔/Java : Most effective method for minimizing bandwidth consumption when performing complex operations on a stream of Amazon S3 data,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3580152/

相关文章:

concurrency - 通过使用无锁算法,Clojure是否无锁?

java - Gradle assembleRelease 不适用于 ProGuard

java - 如何在 char 数组中存储字符数。基本上想要在数组上使用增量运算符

java - stream.spliterator() 是否关闭流?

amazon-web-services - 有没有办法有条件地将过期策略添加到 cloudformation 中的 S3 存储桶

python - 将特定文件从 S3 子文件夹导入到 Python 中

python - 将 s3cmd 与其他类似 s3 的提供商一起使用

java - 做RCP项目的先决条件知识

string - 轻量级Clojure库用于简单的字符串模板化?

clojure - ^ :dynamic do in Clojure? 是什么意思