scala - 将元素从外部推送到 fs2 中的 react 流

标签 scala akka-stream reactive-streams fs2

我有一个外部(即我无法更改)Java API,如下所示:

public interface Sender {
    void send(Event e);
}

我需要实现 Sender它接受每个事件,将其转换为 JSON 对象,将其中一些事件收集到一个包中,并通过 HTTP 发送到某个端点。这一切都应该异步完成,没有 send()阻塞调用线程,使用一些固定大小的缓冲区,如果缓冲区已满,则丢弃新事件。

使用 akka-streams 这很简单:我创建一个阶段图(使用 akka-http 发送 HTTP 请求),将其物化并使用物化的 ActorRef将新事件推送到流中:
lazy val eventPipeline = Source.actorRef[Event](Int.MaxValue, OverflowStrategy.fail)
  .via(CustomBuffer(bufferSize))  // buffer all events
  .groupedWithin(batchSize, flushDuration)  // group events into chunks
  .map(toBundle)  // convert each chunk into a JSON message
  .mapAsyncUnordered(1)(sendHttpRequest)  // send an HTTP request
  .toMat(Sink.foreach { response =>
    // print HTTP response for debugging
  })(Keep.both)

lazy val (eventsActor, completeFuture) = eventPipeline.run()

override def send(e: Event): Unit = {
  eventsActor ! e
}

这里CustomBuffer是自定义GraphStage这与库提供的 Buffer 非常相似但根据我们的特定需求量身定制;对于这个特定的问题,这可能无关紧要。

如您所见,与来自非流代码的流交互非常简单 - ! ActorRef 上的方法trait 是异步的,不需要调用任何额外的机制。然后通过整个 react 管道处理发送给参与者的每个事件。此外,由于 akka-http 的实现方式,我什至可以免费获得连接池,因此与服务器打开的连接不超过一个。

但是,我找不到正确使用 FS2 执行相同操作的方法。即使放弃缓冲的问题(我可能需要编写一个自定义的 Pipe 实现来做我们需要的其他事情)和 HTTP 连接池,我仍然坚持一个更基本的事情 - 即如何推送数据“从外部”传输到 react 流。

我能找到的所有教程和文档都假设整个程序发生在某个效果上下文中,通常是 IO .这不是我的情况 - send()方法由 Java 库在未指定的时间调用。因此,我不能将所有内容都放在一个 IO 中。操作,我必须完成 send() 中的“推送”操作方法,并将 react 流作为一个单独的实体,因为我想聚合事件并希望汇集 HTTP 连接(我相信这自然与 react 流相关联)。

我假设我需要一些额外的数据结构,比如 Queue . fs2 确实有某种 fs2.concurrent.Queue ,但同样,所有文档都显示了如何在单个 IO 中使用它上下文,所以我假设做类似的事情
val queue: Queue[IO, Event] = Queue.unbounded[IO, Event].unsafeRunSync()

然后使用 queue在流定义中,然后分别在 send() 中进一步 unsafeRun 的方法调用:
val eventPipeline = queue.dequeue
  .through(customBuffer(bufferSize))
  .groupWithin(batchSize, flushDuration)
  .map(toBundle)
  .mapAsyncUnordered(1)(sendRequest)
  .evalTap(response => ...)
  .compile
  .drain

eventPipeline.unsafeRunAsync(...)  // or something

override def send(e: Event) {
  queue.enqueue(e).unsafeRunSync()
}

不是正确的方法,很可能甚至行不通。

所以,我的问题是,如何正确使用 fs2 来解决我的问题?

最佳答案

考虑以下示例:

import cats.implicits._
import cats.effect._
import cats.effect.implicits._
import fs2._
import fs2.concurrent.Queue

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

object Answer {
  type Event = String

  trait Sender {
    def send(event: Event): Unit
  }

  def main(args: Array[String]): Unit = {
    val sender: Sender = {
      val ec = ExecutionContext.global
      implicit val cs: ContextShift[IO] = IO.contextShift(ec)
      implicit val timer: Timer[IO] = IO.timer(ec)

      fs2Sender[IO](2)
    }

    val events = List("a", "b", "c", "d")
    events.foreach { evt => new Thread(() => sender.send(evt)).start() }
    Thread sleep 3000
  }

  def fs2Sender[F[_]: Timer : ContextShift](maxBufferedSize: Int)(implicit F: ConcurrentEffect[F]): Sender = {
    // dummy impl
    // this is where the actual logic for batching
    //   and shipping over the network would live
    val consume: Pipe[F, Event, Unit] = _.evalMap { event =>
      for {
        _ <- F.delay { println(s"consuming [$event]...") }
        _ <- Timer[F].sleep(1.seconds)
        _ <- F.delay { println(s"...[$event] consumed") }
      } yield ()
    }

    val suspended = for {
      q <- Queue.bounded[F, Event](maxBufferedSize)
      _ <- q.dequeue.through(consume).compile.drain.start
      sender <- F.delay[Sender] { evt =>
        val enqueue = for {
          wasEnqueued <- q.offer1(evt)
          _ <- F.delay { println(s"[$evt] enqueued? $wasEnqueued") }
        } yield ()
        enqueue.toIO.unsafeRunAsyncAndForget()
      }
    } yield sender

    suspended.toIO.unsafeRunSync()
  }
}

主要思想是使用来自 fs2 的并发队列。请注意,上面的代码表明 Sender接口(interface)也不是main中的逻辑可以更改。仅 Sender 的实现接口(interface)可以换掉。

关于scala - 将元素从外部推送到 fs2 中的 react 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53554233/

相关文章:

scala - 在scala中使用map和reduce进行矢量积

scala - Akka HTTP 客户端 websocket 流的定义

scala - 计算Akka流中的元素数

scala - 为什么 Akka 流循环没有在此图中结束?

spring-webflux - 如何在 Flux 中迭代一个对象并对其进行操作?

java - 订阅可连接的热源时发生 CancellationException

java - 如何正确读取 Flux<DataBuffer> 并将其转换为单个 inputStream

scala - 使用 Akka Streams 2.4.2 和 Slick 3.0 从 postgres 读取

scala - play/scala ,隐式请求 => 是什么意思?

scala - 在 Scala 中使用 for comprehension