scala - 具有 StateT[IO, _, _] 的 FS2 Stream,定期转储状态

标签 scala monad-transformers scala-cats fs2 cats-effect

我有一个消耗无限数据流的程序。在此过程中,我想记录一些指标,这些指标形成一个幺半群,因为它们只是简单的求和和平均值。我想定期在某处写下这些指标,清除它们,然后返回累积它们。我基本上有:

object Foo {
  type MetricsIO[A] = StateT[IO, MetricData, A]

  def recordMetric(m: MetricData): MetricsIO[Unit] = {
    StateT.modify(_.combine(m))
  }

  def sendMetrics: MetricsIO[Unit] = {
    StateT.modifyF { s =>
      val write: IO[Unit] = writeMetrics(s)
      write.attempt.map {
        case Left(_) => s
        case Right(_) => Monoid[MetricData].empty
      }
    }
  }
}

因此,大部分执行直接使用 IO 并使用 StateT.liftF 提升。在某些情况下,我会添加一些对 recordMetric 的调用。最后我得到了一个流:

val mainStream: Stream[MetricsIO, Bar] = ...

我想定期(比如每分钟左右)转储指标,所以我尝试了:

val scheduler: Scheduler = ...
val sendStream =
  scheduler
    .awakeEvery[MetricsIO](FiniteDuration(1, TimeUnit.Minutes))
    .evalMap(_ => Foo.sendMetrics)

val result = mainStream.concurrently(sendStream).compile.drain

然后我执行通常的顶级程序操作,即使用启动状态调用 run,然后调用 unsafeRunSync

问题是,我只看到空的指标!我怀疑这与我的幺半群隐式向 sendStream 提供空指标有关,但我不太明白为什么会这样或如何解决它。也许有一种方法可以将这些 sendMetrics 调用“交错”到主流中?

编辑:这是一个最小的完整可运行示例:

import fs2._
import cats.implicits._
import cats.data._
import cats.effect._
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

val sec = Executors.newScheduledThreadPool(4)
implicit val ec = ExecutionContext.fromExecutorService(sec)

type F[A] = StateT[IO, List[String], A]

val slowInts = Stream.unfoldEval[F, Int, Int](1) { n =>
  StateT(state => IO {
    Thread.sleep(500)
    val message = s"hello $n"
    val newState = message :: state
    val result = Some((n, n + 1))
    (newState, result)
  })
}

val ticks = Scheduler.fromScheduledExecutorService(sec).fixedDelay[F](FiniteDuration(1, SECONDS))

val slowIntsPeriodicallyClearedState = slowInts.either(ticks).evalMap[Int] {
  case Left(n) => StateT.liftF(IO(n))
  case Right(_) => StateT(state => IO {
    println(state)
    (List.empty, -1)
  })
}

现在如果我这样做:

slowInts.take(10).compile.drain.run(List.empty).unsafeRunSync

然后我得到了预期的结果 - 状态正确地累积到输出中。但如果我这样做:

slowIntsPeriodicallyClearedState.take(10).compile.drain.run(List.empty).unsafeRunSync

然后我看到始终打印出一个空列表。我希望打印出部分列表(大约 2 个元素)。

最佳答案

StateT 与效果类型一起使用并不安全,因为它在面对并发访问时并不安全。相反,请考虑使用 Ref(来自 fs2 或 cats-effect,具体取决于版本)。

类似这样的事情:

def slowInts(ref: Ref[IO, Int]) = Stream.unfoldEval[F, Int, Int](1) { n =>
  val message = s"hello $n"
  ref.modify(message :: _) *> IO {
    Thread.sleep(500)
    val result = Some((n, n + 1))
    result
  }
}

val ticks = Scheduler.fromScheduledExecutorService(sec).fixedDelay[IO](FiniteDuration(1, SECONDS))

def slowIntsPeriodicallyClearedState(ref: Ref[IO, Int] = 
  slowInts.either(ticks).evalMap[Int] {
    case Left(n) => IO.pure(n)
    case Right(_) =>
      ref.modify(_ => Nil).flatMap { case Change(previous, now) => 
        IO(println(now)).as(-1)
      }
  }

关于scala - 具有 StateT[IO, _, _] 的 FS2 Stream,定期转储状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51624763/

相关文章:

scala - 如何为 cat-effect 的资源添加正确的错误处理

scala - 有没有更聪明的方法来对猫做到这一点?

scala - 如何在 Scala 中组合 ADT?

scala - 奇怪的错误消息 : bad symbolic reference. package.class 中的签名指的是包 org 中的术语 apache,该术语不可用

Scala代码揭开神秘面纱

haskell - 将 failWith 与 Servant 和自定义 monad 堆栈一起使用

haskell - 学习 Happstack 和 Monad 变形金刚

scala - 了解 UID 在 Spark MLLib Transformer 中的作用

scala - 如何使用没有类型别名的 Scala 猫对 Either 进行排序(请参阅 Herding cats)

haskell - 在 monad 转换器中,为什么已知的 monad 是内部的?