scala - 如何在 Akka HTTP 中将数字流完成为 CSV 值?

标签 scala akka akka-stream akka-http

我有一个 Akka 源作为数字流,实现如下:

Source(Stream(1, 2, 3, 4, 5))

我正在尝试利用 Akka HTTP 中的 Akka 流支持将流响应作为逗号分隔值返回。

我已关注akka doc在简单的 csv 源流上进行以下实现:

implicit val csvFormat = Marshaller.strict[Int, ByteString] { res =>
    Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`, () => {
    ByteString(List(res).mkString(","))
    })
}

implicit val streamingSupport: CsvEntityStreamingSupport = EntityStreamingSupport.csv()

complete(Source(Stream(1, 2, 3, 4, 5)))

但显然这不是 CSV 实体流支持的正确用例。这会导致每个数字都在新行中传输。

但这不是我想要的。我想以逗号分隔的列表形式回复,例如 1,2,3,4,5

如何使用 Akka HTTP 中的流支持来实现这一目标?

最佳答案

该换行符是由 EntityStreamingSupport.csv() 中定义的流渲染器添加的。

我们需要定义自己的自定义EntityStreamingSupport才能使其正常工作。

val route =
  path("test") {
    val responseSource: Source[Int, NotUsed] =
      Source.fromIterator(() => Stream(1, 2, 3, 4, 5).iterator)

    val byteStringSource: Source[ByteString, NotUsed] =
      responseSource.map(i => ByteString(i.toString))

    val streamingSource =
      byteStringSource.map(bs => HttpEntity(ContentTypes.`text/plain(UTF-8)`, bs))

    implicit val streamingSupport =
      EntityStreamingSupport.csv(maxLineLength = 16 * 1024)
        .withSupported(ContentTypeRange(ContentTypes.`text/plain(UTF-8)`))
        .withContentType(ContentTypes.`text/plain(UTF-8)`)
        .withFramingRenderer(Flow[ByteString].map(bs => bs ++ ByteString(",")))

    complete((streamingSource))
  }
curl localhost:8080/test -v 
*   Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /test HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: */*
> 
< HTTP/1.1 200 OK
< Server: akka-http/10.2.4
< Date: Tue, 20 Jul 2021 07:50:46 GMT
< Transfer-Encoding: chunked
< Content-Type: text/plain; charset=UTF-8
< 
* Connection #0 to host localhost left intact
1,2,3,4,5,* Closing connection 0

编辑:为了消除逗号,我们可以使用窗口黑客。

val route =
  path("test") {
    val responseSource: Source[Int, NotUsed] =
      Source.fromIterator(() => Stream(1, 2, 3, 4, 5).iterator)

    val startByteString = ByteString("$start$")

    val byteStringSource: Source[ByteString, NotUsed] =
        responseSource.map(i => ByteString(i.toString)).prepend(Source.single(startByteString))

    val streamingSource =
      byteStringSource.map(bs => HttpEntity(ContentTypes.`text/plain(UTF-8)`, bs))

    implicit val streamingSupport =
      EntityStreamingSupport.csv(maxLineLength = 16 * 1024)
        .withSupported(ContentTypeRange(ContentTypes.`text/plain(UTF-8)`))
        .withContentType(ContentTypes.`text/plain(UTF-8)`)
        .withFramingRenderer(
          Flow[ByteString].sliding(2, 1)
            .map { bsSeq =>
              if (startByteString.equals(bsSeq(0))) {
                // first int; no need for comma
                bsSeq(1)
              } else {
                // not first int; add comma
                ByteString(",") ++ bsSeq(1)
              }

            }
        )

    complete((streamingSource))
  }
curl localhost:8080/test -v 
*   Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /test HTTP/1.1
> Host: localhost:8080
> User-Agent: curl/7.64.1
> Accept: */*
> 
< HTTP/1.1 200 OK
< Server: akka-http/10.2.4
< Date: Tue, 20 Jul 2021 08:28:05 GMT
< Transfer-Encoding: chunked
< Content-Type: text/plain; charset=UTF-8
< 
* Connection #0 to host localhost left intact
1,2,3,4,5* Closing connection 0

关于scala - 如何在 Akka HTTP 中将数字流完成为 CSV 值?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68450421/

相关文章:

scala - 在 Scala 中拦截套接字关闭上的 Akka HTTP WebSocket 事件

scala - 如何使用 Play 2.5 在分块响应中将 Anorm 大型查询结果流式传输到客户端

scala - 如何突然停止 akka 流 Runnable Graph?

scala - 如何真实测试akka流?

Scala 集合 : Using a value of Set find the key from a Map object.

scala - 让测试仅在 SBT 控制台外工作

scala - 在 Akka 指令中提取路径头

scala - Akka、Scalatra 和 Web 状态问题

Scala 中的 Java 泛型问号

scala - 如何正确地通过包装类来线程化类型?