scala - 状态单子(monad)在恒定空间(堆和堆栈)中的单子(monad)折叠?

标签 scala functional-programming monads scalaz scalaz7

是否可以在状态单子(monad)中以恒定的堆栈和堆空间执行折叠?还是一种不同的功能技术更适合我的问题?

接下来的部分描述了这个问题和一个激励用例。我正在使用 Scala,但也欢迎使用 Haskell 中的解决方案。

折叠 State Monad 填满堆

假设 Scalaz 7。考虑状态单子(monad)中的单子(monad)折叠。为了避免堆栈溢出,我们将蹦床折叠。

import scalaz._
import Scalaz._
import scalaz.std.iterable._
import Free.Trampoline

type TrampolinedState[S, B] = StateT[Trampoline, S, B] // monad type constructor

type S = Int  // state is an integer
type M[B] = TrampolinedState[S, B] // our trampolined state monad

type R = Int  // or some other monoid

val col: Iterable[R] = largeIterableofRs() // defined elsewhere

val (count, sum): (S, R) = col.foldLeftM[M, R](Monoid[R].zero){ 
    (acc: R, x: R) => StateT[Trampoline, S, R] {
      s: S => Trampoline.done { 
        (s + 1, Monoid[R].append(acc, x))
      }
    }
} run 0 run

// In Scalaz 7, foldLeftM is implemented in terms of foldRight, which in turn
// is a reversed.foldLeft. This pulls the whole collection into memory and kills
// the heap.  Ignore this heap overflow. We could reimplement foldLeftM to avoid
// this overflow or use a foldRightM instead.
// Our real issue is the heap used by the unexecuted State mobits.

大量收藏col ,这将填满堆。

我相信在折叠过程中,会为集合中的每个值(x: R 参数)创建一个闭包(一个 State mobit),填充堆。在 run 0 之前,这些都无法评估。被执行,提供初始状态。

可以避免这种 O(n) 堆使用吗?

更具体地说,是否可以在折叠之前提供初始状态,以便 State monad 可以在每次绑定(bind)期间执行,而不是嵌套闭包以供以后评估?

或者可以构造折叠,使其在状态单子(monad)为 run 之后延迟执行?这样,下一个x: R直到之前的闭包被评估并适合垃圾收集之后才会创建闭包。

或者这种工作有更好的功能范式吗?

示例应用程序

但也许我使用了错误的工具来完成这项工作。示例用例的演变如下。我在这里走错路了吗?

考虑reservoir sampling ,即一次通过均匀随机k集合中的项目太大而无法放入内存。在 Scala 中,这样的函数可能是
def sample[A](col: TraversableOnce[A])(k: Int): Vector[A]

如果被拉进TraversableOnce类型可以这样使用
val tenRandomInts = (Int.Min to Int.Max) sample 10
sample 所做的工作本质上是 fold :
def sample[A](col: Traversable[A])(k: Int): Vector[A] = {
    col.foldLeft(Vector()){update(k)(_: Vector[A], _: A)}
}

但是,update是有状态的;这取决于 n ,已经看到的项目数。 (它也依赖于一个 RNG,但为了简单起见,我假设它是全局的和有状态的。用于处理 n 的技术将很容易扩展。)。那么如何处理这种状态呢?

不纯的解决方案很简单,并且以恒定的堆栈和堆运行。
/* Impure version of update function */
def update[A](k: Int) = new Function2[Vector[A], A, Vector[A]] {
    var n = 0
    def apply(sample: Vector[A], x: A): Vector[A] = {
        n += 1
        algorithmR(k, n, acc, x)
    }
}

def algorithmR(k: Int, n: Int, acc: Vector[A], x: A): Vector[A] = {
    if (sample.size < k) {
        sample :+ x // must keep first k elements
    } else {
        val r = rand.nextInt(n) + 1 // for simplicity, rand is global/stateful
        if (r <= k)
            sample.updated(r - 1, x) // sample is 0-index
        else
            sample
    }
}

但是纯粹的功能解决方案呢? update必拍n作为附加参数并返回新值以及更新的样本。我们可以包括 n在隐式状态下,折叠累加器,例如,
(col.foldLeft ((0, Vector())) (update(k)(_: (Int, Vector[A]), _: A)))._2

但这掩盖了意图;我们只是真的打算累积样本向量。这个问题似乎已经为 State monad 和 monadic left fold 做好了准备。让我们再试一次。

我们将使用带有这些导入的 Scalaz 7
import scalaz._
import Scalaz._
import scalaz.std.iterable_

并通过 Iterable[A] 进行操作, 因为 Scalaz 不支持 Traversable 的单子(monad)折叠.
sample现在定义
// sample using State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {       
    type M[B] = State[Int, B]

    // foldLeftM is implemented using foldRight, which must reverse `col`, blowing
    // the heap for large `col`.  Ignore this issue for now.
    // foldLeftM could be implemented differently or we could switch to
    // foldRightM, implemented using foldLeft.
    col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0
}

更新在哪里
// update using State monad
def update(k: Int) = {
    (acc: Vector[A], x: A) => State[Int, Vector[A]] {
        n => (n + 1, algorithmR(k, n + 1, acc, x)) // algR same as impure solution
    }
}

不幸的是,这会破坏大型集合的堆栈。

所以让我们蹦床吧。 sample就是现在
// sample using trampolined State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {
    import Free.Trampoline

    type TrampolinedState[S, B] = StateT[Trampoline, S, B]
    type M[B] = TrampolinedState[Int, B]

    // Same caveat about foldLeftM using foldRight and blowing the heap
    // applies here.  Ignore for now. This solution blows the heap anyway;
    // let's fix that issue first.
    col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0 run
}

更新在哪里
// update using trampolined State monad
def update(k: Int) = {
    (acc: Vector[A], x: A) => StateT[Trampoline, Int, Vector[A]] {
        n => Trampoline.done { (n + 1, algorithmR(k, n + 1, acc, x) }
    }
}

这修复了堆栈溢出,但仍然会破坏非常大的集合(或非常小的堆)的堆。每个匿名函数
集合中的值是在折叠期间创建的(我相信关闭每个 x: A 参数),甚至在蹦床运行之前消耗堆。 (FWIW,State 版本也有这个问题;堆栈溢出首先出现在较小的集合中。)

最佳答案

Our real issue is the heap used by the unexecuted State mobits.



不它不是。真正的问题是该集合不适合内存,而 foldLeftMfoldRightM强制整个集合。不纯解决方案的一个副作用是您正在释放内存。在“纯功能”解决方案中,您不会在任何地方这样做。

您对 Iterable 的使用忽略了一个关键细节:什么样的收藏 col实际上是,它的元素是如何创建的,以及它们是如何被丢弃的。因此,foldLeftM 必然如此。在 Iterable .它可能过于严格,并且您将整个集合强制放入内存。例如,如果它是 Stream ,那么只要你坚持col到目前为止强制执行的所有元素都将在内存中。如果是其他类型的懒惰Iterable没有记住它的元素,那么折叠仍然太严格。

我用 EphemeralStream 尝试了您的第一个示例没有看到任何显着的堆压力,即使它显然具有相同的“未执行状态 mobits”。不同之处在于 EphemeralStream的元素被弱引用,其foldRight不会强制整个流。

我怀疑如果你使用 Foldable.foldr ,那么您将看不到有问题的行为,因为它与第二个参数中惰性的函数折叠在一起。当你调用 fold 时,你希望它立即返回一个看起来像这样的暂停:
Suspend(() => head |+| tail.foldRightM(...))

当蹦床恢复第一次暂停并运行到下一次暂停时,暂停之间的所有分配都将可供垃圾收集器释放。

尝试以下操作:
def foldM[M[_]:Monad,A,B](a: A, bs: Iterable[B])(f: (A, B) => M[A]): M[A] =
  if (bs.isEmpty) Monad[M].point(a)
  else Monad[M].bind(f(a, bs.head))(fax => foldM(fax, bs.tail)(f))

val MS = StateT.stateTMonadState[Int, Trampoline]
import MS._

foldM[M,R,Int](Monoid[R].zero, col) {
  (x, r) => modify(_ + 1) map (_ => Monoid[R].append(x, r))
} run 0 run

对于蹦床单子(monad) M,这将在恒定堆中运行,但会溢出堆栈以获取非蹦床单子(monad)。

但是真正的问题是 Iterable对于太大而无法放入内存的数据,这不是一个好的抽象。 当然,你可以编写一个命令式的副作用程序,在每次迭代后显式丢弃元素或使用惰性右折叠。在您想将该程序与另一个程序组合之前,这很有效。而且我假设您在 State 中调查这样做的全部原因monad 一开始是为了获得组合性。

所以,你可以做什么?以下是一些选项:
  • 使用Reducer , Monoid , 及其组合,然后在命令式显式释放循环(或蹦床惰性右折叠)中运行作为最后一步,之后组合是不可能的或预期的。
  • 使用Iteratee组合和单子(monad)Enumerator s 喂它们。
  • 使用 Scalaz-Stream 编写组合流转换器.

  • 这些选项中的最后一个是我在一般情况下会使用和推荐的选项。

    关于scala - 状态单子(monad)在恒定空间(堆和堆栈)中的单子(monad)折叠?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20756436/

    相关文章:

    java - 在 Play 框架中将 Scala session 对象转换为 Java session 对象

    javascript - 使用函数式 Javascript 与 "procedural"的性能影响

    c# - Bind 和 Map 之外的 Option<T> monad 的标准操作

    Scala - 错误 : recursive variable name needs type

    algorithm - 在 Scala 中解压增量压缩的数字序列

    scala - 为复数定义一个类是否有意义,其中实部/虚部使用 Numeric[T] 而不是具体类型?

    javascript - 对返回数组感到困惑#map javascript

    Haskell - 我将如何运行状态单子(monad)列表?

    scala - Scala是否真的是Monad

    haskell - 在交互式命令行提示期间维护状态