scala - FS2 的对象池模式

标签 scala fs2

我正在尝试了解实现 Object Pool pattern 的最佳方式是什么在 FS2 .

假设我们有以下 MyPrinter 定义:

class MyPrinter {
  import scala.util.Random.nextInt
  Thread.sleep(5000 + nextInt(1000))
  def doStuff(s: String): Unit = {
    println(s)
    Thread.sleep(1000 + nextInt(1000))
  }
  def releaseResources(): Unit =
    println("Releasing resources")
}

制作由 n 打印机池支持的 Stream[Task, MyPrinter] 的最佳方法是什么?当流结束时,应通过调用 releaseResources 正确释放所有底层资源。

额外的问题:如果打印机由于某种原因终止,是否可以在池中创建一个新的打印机?

最佳答案

不确定我是否回答了这个问题,但是这个怎么样

implicit val S = Strategy.fromFixedDaemonPool(10, "pooling")

val queue = new LinkedBlockingDeque[MyPrinter]()
queue.add(new MyPrinter)
queue.add(new MyPrinter)

Stream.repeatEval(Task.delay(queue.take()))
  .map(p => try p.doStuff("test") finally {
    p.releaseResources()
    queue.put(p)
  })
  .take(10)
  .runLog
  .unsafeRun()

队列可以用https://commons.apache.org/proper/commons-pool/代替

更新:

如果你想同时处理每个“资源”:

concurrent.join(10)(
  Stream
    .repeatEval(Task.delay(queue.take()))
    .map(p => Stream.eval(Task.delay(p.doStuff("test"))
    .map(_ => p /* done with this resource */)))
).map(p => { p.releaseResources(); queue.put(p) /* release resource */})
 .take(10).runLog.unsafeRun()

关于scala - FS2 的对象池模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38335931/

相关文章:

scala - 将元素从外部推送到 fs2 中的 react 流

scala - fs2.Stream[IO, Something] 不返回 take(1)

Scala import 语句错误地导入隐式值

java - Maven:如何让 jar-with-dependencies 排除 "provided"依赖项?

scala 文本中单词之间的绝对最小距离

java - 在 Akka 中从主管更新 SuperVisor 策略是否安全/不好?

scala - 提高涉及文件转换的 fs2 流的性能

java - 解决 java/Scala play 框架应用程序中的性能问题

scala - fs2 流不会在延迟时中断

scala - 是否可以在 http4s 服务器上的多个 http 请求之间共享纯 FP 状态?