scala - 如何运行 Monix 的 parSequenceUnordered 并处理每个任务的结果?

标签 scala monix sttp

我目前正在致力于实现对 API 的客户端 http 请求,并决定探索 sttp 和 monix 来完成此任务。由于我是 Monix 的新手,我仍然不确定如何运行任务并检索其结果。我的目标是获得一系列 http 请求结果,我可以并行调用 -> 解析 -> 加载。

下面是我迄今为止尝试过的片段:

import sttp.client._
import sttp.client.asynchttpclient.monix._
import monix.eval.Task

object SO extends App {

  val postTask = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    val r1 = basicRequest.get(uri"https://hello.world.io/v1/bla")
      .header("accept", "application/json")
      .response(asString)
      .body()
      .send()

    val tasks = Seq(r1).map(i => Task(i))
    Task.parSequenceUnordered(tasks).guarantee(backend.close())
  }
  
  import monix.execution.Scheduler.Implicits.global

  postTask.runToFuture.foreach(println) // prints: List(Task.FlatMap$2052527361)
}

我的困惑相当简单(我猜)。如何运行我创建的 Task.parSequenceUnordered 并处理(解析 http 结果)序列中的任务?

很高兴有:出于好奇,在处理请求的任务序列时是否可以天真地引入速率限制/节流?我并不是真的在寻找构建复杂的东西。它可以像间隔批处理请求一样简单。想知道 Monix 是否已经有一个助手可以做到这一点。

最佳答案

感谢Oleg Pyzhcovmonix gitter community帮助我解决这个问题。

此处引用奥列格:

Since you're using backend with monix support already, the type of r1 is Task[Response[Either[String,String]]]. So when you're doing Seq(r1).map(i => Task(i)), you make it a sequence of tasks that don't do anything except give you other tasks that give you result (the type would be Seq[Task[Task[Response[...]]]]). Your code then parallelizes the outer layer, tasks-that-give-tasks, and you get the tasks that you started with as the result. You only need to process a Seq(r1) for it to run requests in parallel.

If you're using Intellij, you can press Alt + = to see the type of selection - it helps if you can't tell the type from the code alone (but it gets better with experience).

As for rate-limiting, we have parSequenceN that lets you set a limit to parallelism. Note that unordered only means that you get slight performance advantage at the cost of results being in random order in the output, they are executed non-deterministically anyway.

我最终得到了一个(简化的)实现,如下所示:

import sttp.client._
import sttp.client.asynchttpclient.monix._
import monix.eval.Task

object SO extends App {

  val postTask = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    val r1 = basicRequest.get(uri"https://hello.world.io/v1/bla")
      .header("accept", "application/json")
      .response(asString)
      .body()
      .send()

    val items = Seq(r1.map(x => x.body))
    Task.parSequenceN(1)(items).guarantee(backend.close())
  }
  
  import monix.execution.Scheduler.Implicits.global

   postTask.runToFuture.foreach(println)
}

关于scala - 如何运行 Monix 的 parSequenceUnordered 并处理每个任务的结果?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63263276/

相关文章:

scala - 如何使用 Circe 的手动解码器反序列化非固定的 json 数组?

java - Spark : How to map Python with Scala or Java User Defined Functions?

java - Scala - 在特定场景下无法通过终端执行进程

scala - 如何让 Scala 命名参数和默认参数与宏一起使用

scala - 将字符串转换为列表的列表

scala - Monix Task.sleep 和单线程执行

scala - 如何处理 monix onErrorHandle 中抛出的未处理异常