scala - 使用 Play2/Scala 通过 Iteratee 将文件上传流转发到 S3

标签 scala amazon-s3 playframework-2.0 iterate

我已经阅读了一些关于通过 Iteratee 将文件发送到 S3 的可能性的内容,这似乎允许在我们收到文件时发送 S3 文件 block ,并避免大文件的 OutOfMemory 例如。

我发现这个 SO 帖子可能几乎是我需要做的:
Play 2.x : Reactive file upload with Iteratees
我真的不明白该怎么做,或者它是否真的在 Play 2.0.2 中可用(因为 Sadek Brodi 说 foldM 仅在 Play 2.1 中可用)

对于已经阅读过一些关于 Iteratees 的博客但还不是 Scala/Play2 专家的人,有人可以用简单的方式解释一下吗?

我什至不知道我是否应该使用多部分正文解析器或类似的东西,但我知道的一件事是我不明白这段代码在做什么:

val consumeAMB = 
  Traversable.takeUpTo[Array[Byte]](1028*1028) &>> Iteratee.consume()

val rechunkAdapter:Enumeratee[Array[Byte],Array[Byte]] =
  Enumeratee.grouped(consumeAMB)

val writeToStore: Iteratee[Array[Byte],_] =
  Iteratee.foldM[Array[Byte],_](connectionHandle){ (c,bytes) => 
    // write bytes and return next handle, probable in a Future
  }

BodyParser( rh => (rechunkAdapter &>> writeToStore).map(Right(_)))

顺便说一下,与使用经典的 Java InputStream/OutputStream 相比,内存消耗会有什么不同。
实际上,我能够以非阻塞方式将 500mb 文件转发到 S3,内存消耗非常低,不使用 Iteratees,使用 Java + AsyncHttpClient + Grizzly(但我想它也适用于 Netty)。

那么使用Iteratee有什么好处呢?

我可以看到的一个区别是,我获得并转发到 S3 的 InputStream 在我的情况下由一个临时文件支持(这是一种 CXF 行为),因此它可能不像 Play Iteratee 那样具有反应性

但是对于 Iteratee,如果 Enumerator 产生连接接收到的字节并通过 Iteratee 将它们转发到 S3,那么如果与 S3 的连接不好并且字节不能非常快速地转发,那么“待处理”字节存储在哪里?

最佳答案

简单的解释? 我会尽力。 :)

您正在用组件构建管道。一旦你构建了管道,它就可以被发送数据。它是一个 Iteratee,所以它知道如何迭代数据。

您要上传的文件包含在请求正文中,而 BodyParser 是在 Play 中处理请求正文的。因此,您将迭代管道放入 BodyParser。当发出请求时,您的管道将被发送数据(它将对其进行迭代)。

您的管道 (rechunkAdapter &>> writeToStore) 将数据分成 1MB 位,然后将它们发送到 S3。

管道的第一部分 (rechunkAdapter) 进行分 block 。它实际上有自己的迷你管道来进行分 block ( consumeAMB )。一旦迷你管道接收到足够的数据来组成一个 block ,它就会将它发送到主管道。

管道的第二部分 (writeToStore) 就像一个循环,在每个 block 上被调用,让您有机会将每个 block 发送到 S3。

迭代器的优点?

一旦您知道发生了什么,您就可以通过将组件连接在一起来构建迭代管道。并且类型检查器通常会在您错误地将某些东西连接在一起时告诉您。

例如,我们可以修改上面的管道来修复它很慢的事实。它可能很慢,因为只要一个 block 准备好上传到 S3,请求上传就会暂停。放慢请求上传速度很重要,这样我们就不会耗尽内存,但我们可以通过添加一个固定大小的缓冲区来更宽容一点。所以只需添加 Concurrent.buffer(2)进入管道的中间以缓冲最多 2 个 block 。

Iteratees 为流提供了一种功能方法。这是优点还是缺点,取决于您对函数式编程的感受。 :) 与惰性流(另一种功能方法)相比,迭代提供了对资源使用的精确控制。

最后,迭代器允许我们相对简单地(!)进行非常复杂的异步流编程。我们可以在不持有线程的情况下处理 IO,这是可扩展性的巨大胜利。经典的 Java InputStream/OutputStream 示例需要 2 个线程。

关于scala - 使用 Play2/Scala 通过 Iteratee 将文件上传流转发到 S3,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12609451/

相关文章:

amazon-s3 - 使用自定义域在 S3 网站上设置索引文档

amazon-web-services - 不是由该工厂创建的AWS Socket

scala - Slick 中的动态查询参数(排序)

multithreading - Play Framework : Async vs Sync performance

scala - 如何检查Map中是否存在键值对组合

scala - inferSchema=true 不适用于读取 csv 文件 n Spark Structured Streaming

scala - 为什么这个谓词中的参数可以省略呢?

scala - 是否可以引用计数调用位置?

java - AWS Java SDK 2.0 为 S3 对象创建预签名 URL

Scala,Play2.0 - 预期为 ';' 但找到了 ')'