scala - Akka 流 - 将 ByteString 流拆分为多个文件

标签 scala stream akka akka-stream

我正在尝试将传入的 Akka 字节流(来自 http 请求的正文,但也可能来自文件)拆分为多个定义大小的文件。

例如,如果我上传一个 10Gb 的文件,它会创建类似于 10 个 1Gb 的文件。这些文件将具有随机生成的名称。我的问题是我真的不知道从哪里开始,因为我读过的所有响应和示例要么将整个块存储到内存中,要么使用基于字符串的分隔符。除了我不能真正拥有 1Gb 的“块”,然后将它们写入磁盘..

有没有简单的方法来执行这种操作?我唯一的想法是使用这样的东西 http://doc.akka.io/docs/akka/2.4/scala/stream/stream-cookbook.html#Chunking_up_a_stream_of_ByteStrings_into_limited_size_ByteStrings但转化为类似 FlowShape[ByteString, File] ,将自己写入文件中的块,直到达到最大文件大小,然后创建一个新文件,等等,并流回创建的文件。这看起来是一个糟糕的想法,没有正确使用 Akka ..

提前致谢

最佳答案

对于此类问题,我经常使用纯函数式、非 akka 技术,然后将这些函数“提升”为 akka 结构。我的意思是我尝试只使用 scala“东西”,然后尝试将这些东西包装在 akka 中......

文件创建

FileOutputStream 开始基于“随机生成的名称”的创建:

def randomFileNameGenerator : String = ??? //not specified in question

import java.io.FileOutputStream

val randomFileOutGenerator : () => FileOutputStream = 
  () => new FileOutputStream(randomFileNameGenerator)

国家

需要某种方式来存储当前文件的“状态”,例如已写入的字节数:
case class FileState(byteCount : Int = 0, 
                     fileOut : FileOutputStream = randomFileOutGenerator())

文件写入

首先,我们确定我们是否会使用给定的 ByteString 违反最大文件大小阈值。 :
import akka.util.ByteString

val isEndOfChunk : (FileState, ByteString, Int) => Boolean =
  (state, byteString, maxBytes) =>
    state.byteCount + byteString.length > maxBytes

然后我们必须编写创建新 FileState 的函数。如果我们已经最大化了当前的容量,或者如果它仍然低于容量,则返回当前状态:
val closeFileInState : FileState => Unit = 
  (_ : FileState).fileOut.close()

val getCurrentFileState(FileState, ByteString, Int) => FileState = 
  (state, byteString, maxBytes) =>
    if(isEndOfChunk(maxBytes, state, byteString)) {
      closeFileInState(state)
      FileState()
    }
    else
      state

唯一剩下的就是写信给 FileOutputStream :
val writeToFileAndReturn(FileState, ByteString) => FileState = 
  (fileState, byteString) => {
    fileState.fileOut write byteString.toArray
    fileState copy (byteCount = fileState.byteCount + byteString.size)
  }

//the signature ordering will become useful
def writeToChunkedFile(maxBytes : Int)(fileState : FileState, byteString : ByteString) : FileState =    
  writeToFileAndReturn(getCurrentFileState(maxBytes, fileState, byteString), byteString)    

折叠任何 GenTraversableOnce

在 Scala 中 GenTraversableOnce是具有折叠运算符的任何集合,无论是否并行。这些包括迭代器、向量、数组、序列、scala 流、......最终 writeToChunkedFile函数完美匹配GenTraversableOnce#fold的签名:
val anyIterable : Iterable = ???

val finalFileState = anyIterable.fold(FileState())(writetochunkedFile(maxBytes))

最后一个松散的结局;最后 FileOutputStream也需要关闭。由于折叠只会发出最后一个 FileState我们可以关闭那个:
closeFileInState(finalFileState)

Akka 流

Akka Flow 得到它的 fold 来自 FlowOps#fold这恰好与 GenTraversableOnce 匹配签名。因此,我们可以将常规函数“提升”为类似于我们使用 Iterable 的方式的流值。折叠:
import akka.stream.scaladsl.Flow

def chunkerFlow(maxBytes : Int) : Flow[ByteString, FileState, _] = 
  Flow[ByteString].fold(FileState())(writeToChunkedFile(maxBytes))

使用常规函数处理问题的好处在于它们可以在流之外的其他异步框架中使用,例如 future 或 Actor 。您也不需要 akka ActorSystem在单元测试中,只是常规语言数据结构。
import akka.stream.scaladsl.Sink
import scala.concurrent.Future

def byteStringSink(maxBytes : Int) : Sink[ByteString, _] = 
  chunkerFlow(maxBytes) to (Sink foreach closeFileInState)

然后你可以使用这个 Sink排水HttpEntity来自 HttpRequest .

关于scala - Akka 流 - 将 ByteString 流拆分为多个文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41402495/

相关文章:

scala - scala 中的 Apache Spark 中不支持的文字类型类

javascript - 关于 mongoose 查询的 tailable cursor 的错误

c# - 由于事先不知道长度,因此如何录制到Stream中?

networking - 如何知道何时完成接收 TCP 流?

scala - 在 OSGi 中创建 Akka 项目

scala - 通过spark从kafka到hdfs

scala - sbt 添加设置的惯用方式

java - 使用 Java 和 sbt 依赖项进行 Akka 编程

scala - 使用宏重写 val 和 var 构造函数参数

android - Akka on Android 多重引用.conf