是否可以在状态单子(monad)中以恒定的堆栈和堆空间执行折叠?还是一种不同的功能技术更适合我的问题?
接下来的部分描述了这个问题和一个激励用例。我正在使用 Scala,但也欢迎使用 Haskell 中的解决方案。
折叠 State
Monad 填满堆
假设 Scalaz 7。考虑状态单子(monad)中的单子(monad)折叠。为了避免堆栈溢出,我们将蹦床折叠。
import scalaz._
import Scalaz._
import scalaz.std.iterable._
import Free.Trampoline
type TrampolinedState[S, B] = StateT[Trampoline, S, B] // monad type constructor
type S = Int // state is an integer
type M[B] = TrampolinedState[S, B] // our trampolined state monad
type R = Int // or some other monoid
val col: Iterable[R] = largeIterableofRs() // defined elsewhere
val (count, sum): (S, R) = col.foldLeftM[M, R](Monoid[R].zero){
(acc: R, x: R) => StateT[Trampoline, S, R] {
s: S => Trampoline.done {
(s + 1, Monoid[R].append(acc, x))
}
}
} run 0 run
// In Scalaz 7, foldLeftM is implemented in terms of foldRight, which in turn
// is a reversed.foldLeft. This pulls the whole collection into memory and kills
// the heap. Ignore this heap overflow. We could reimplement foldLeftM to avoid
// this overflow or use a foldRightM instead.
// Our real issue is the heap used by the unexecuted State mobits.
大量收藏
col
,这将填满堆。我相信在折叠过程中,会为集合中的每个值(
x: R
参数)创建一个闭包(一个 State mobit),填充堆。在 run 0
之前,这些都无法评估。被执行,提供初始状态。可以避免这种 O(n) 堆使用吗?
更具体地说,是否可以在折叠之前提供初始状态,以便 State monad 可以在每次绑定(bind)期间执行,而不是嵌套闭包以供以后评估?
或者可以构造折叠,使其在状态单子(monad)为
run
之后延迟执行?这样,下一个x: R
直到之前的闭包被评估并适合垃圾收集之后才会创建闭包。或者这种工作有更好的功能范式吗?
示例应用程序
但也许我使用了错误的工具来完成这项工作。示例用例的演变如下。我在这里走错路了吗?
考虑reservoir sampling ,即一次通过均匀随机
k
集合中的项目太大而无法放入内存。在 Scala 中,这样的函数可能是def sample[A](col: TraversableOnce[A])(k: Int): Vector[A]
如果被拉进
TraversableOnce
类型可以这样使用val tenRandomInts = (Int.Min to Int.Max) sample 10
sample
所做的工作本质上是 fold
:def sample[A](col: Traversable[A])(k: Int): Vector[A] = {
col.foldLeft(Vector()){update(k)(_: Vector[A], _: A)}
}
但是,
update
是有状态的;这取决于 n
,已经看到的项目数。 (它也依赖于一个 RNG,但为了简单起见,我假设它是全局的和有状态的。用于处理 n
的技术将很容易扩展。)。那么如何处理这种状态呢?不纯的解决方案很简单,并且以恒定的堆栈和堆运行。
/* Impure version of update function */
def update[A](k: Int) = new Function2[Vector[A], A, Vector[A]] {
var n = 0
def apply(sample: Vector[A], x: A): Vector[A] = {
n += 1
algorithmR(k, n, acc, x)
}
}
def algorithmR(k: Int, n: Int, acc: Vector[A], x: A): Vector[A] = {
if (sample.size < k) {
sample :+ x // must keep first k elements
} else {
val r = rand.nextInt(n) + 1 // for simplicity, rand is global/stateful
if (r <= k)
sample.updated(r - 1, x) // sample is 0-index
else
sample
}
}
但是纯粹的功能解决方案呢?
update
必拍n
作为附加参数并返回新值以及更新的样本。我们可以包括 n
在隐式状态下,折叠累加器,例如,(col.foldLeft ((0, Vector())) (update(k)(_: (Int, Vector[A]), _: A)))._2
但这掩盖了意图;我们只是真的打算累积样本向量。这个问题似乎已经为 State monad 和 monadic left fold 做好了准备。让我们再试一次。
我们将使用带有这些导入的 Scalaz 7
import scalaz._
import Scalaz._
import scalaz.std.iterable_
并通过
Iterable[A]
进行操作, 因为 Scalaz 不支持 Traversable
的单子(monad)折叠.sample
现在定义// sample using State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {
type M[B] = State[Int, B]
// foldLeftM is implemented using foldRight, which must reverse `col`, blowing
// the heap for large `col`. Ignore this issue for now.
// foldLeftM could be implemented differently or we could switch to
// foldRightM, implemented using foldLeft.
col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0
}
更新在哪里
// update using State monad
def update(k: Int) = {
(acc: Vector[A], x: A) => State[Int, Vector[A]] {
n => (n + 1, algorithmR(k, n + 1, acc, x)) // algR same as impure solution
}
}
不幸的是,这会破坏大型集合的堆栈。
所以让我们蹦床吧。
sample
就是现在// sample using trampolined State monad
def sample[A](col: Iterable[A])(k: Int): Vector[A] = {
import Free.Trampoline
type TrampolinedState[S, B] = StateT[Trampoline, S, B]
type M[B] = TrampolinedState[Int, B]
// Same caveat about foldLeftM using foldRight and blowing the heap
// applies here. Ignore for now. This solution blows the heap anyway;
// let's fix that issue first.
col.foldLeftM[M, Vector[A]](Vector())(update(k)(_: Vector[A], _: A)) eval 0 run
}
更新在哪里
// update using trampolined State monad
def update(k: Int) = {
(acc: Vector[A], x: A) => StateT[Trampoline, Int, Vector[A]] {
n => Trampoline.done { (n + 1, algorithmR(k, n + 1, acc, x) }
}
}
这修复了堆栈溢出,但仍然会破坏非常大的集合(或非常小的堆)的堆。每个匿名函数
集合中的值是在折叠期间创建的(我相信关闭每个
x: A
参数),甚至在蹦床运行之前消耗堆。 (FWIW,State 版本也有这个问题;堆栈溢出首先出现在较小的集合中。)
最佳答案
Our real issue is the heap used by the unexecuted State mobits.
不它不是。真正的问题是该集合不适合内存,而
foldLeftM
和 foldRightM
强制整个集合。不纯解决方案的一个副作用是您正在释放内存。在“纯功能”解决方案中,您不会在任何地方这样做。您对
Iterable
的使用忽略了一个关键细节:什么样的收藏 col
实际上是,它的元素是如何创建的,以及它们是如何被丢弃的。因此,foldLeftM
必然如此。在 Iterable
.它可能过于严格,并且您将整个集合强制放入内存。例如,如果它是 Stream
,那么只要你坚持col
到目前为止强制执行的所有元素都将在内存中。如果是其他类型的懒惰Iterable
没有记住它的元素,那么折叠仍然太严格。我用
EphemeralStream
尝试了您的第一个示例没有看到任何显着的堆压力,即使它显然具有相同的“未执行状态 mobits”。不同之处在于 EphemeralStream
的元素被弱引用,其foldRight
不会强制整个流。我怀疑如果你使用
Foldable.foldr
,那么您将看不到有问题的行为,因为它与第二个参数中惰性的函数折叠在一起。当你调用 fold 时,你希望它立即返回一个看起来像这样的暂停:Suspend(() => head |+| tail.foldRightM(...))
当蹦床恢复第一次暂停并运行到下一次暂停时,暂停之间的所有分配都将可供垃圾收集器释放。
尝试以下操作:
def foldM[M[_]:Monad,A,B](a: A, bs: Iterable[B])(f: (A, B) => M[A]): M[A] =
if (bs.isEmpty) Monad[M].point(a)
else Monad[M].bind(f(a, bs.head))(fax => foldM(fax, bs.tail)(f))
val MS = StateT.stateTMonadState[Int, Trampoline]
import MS._
foldM[M,R,Int](Monoid[R].zero, col) {
(x, r) => modify(_ + 1) map (_ => Monoid[R].append(x, r))
} run 0 run
对于蹦床单子(monad)
M
,这将在恒定堆中运行,但会溢出堆栈以获取非蹦床单子(monad)。但是真正的问题是
Iterable
对于太大而无法放入内存的数据,这不是一个好的抽象。 当然,你可以编写一个命令式的副作用程序,在每次迭代后显式丢弃元素或使用惰性右折叠。在您想将该程序与另一个程序组合之前,这很有效。而且我假设您在 State
中调查这样做的全部原因monad 一开始是为了获得组合性。所以,你可以做什么?以下是一些选项:
Reducer
, Monoid
, 及其组合,然后在命令式显式释放循环(或蹦床惰性右折叠)中运行作为最后一步,之后组合是不可能的或预期的。 Iteratee
组合和单子(monad)Enumerator
s 喂它们。 这些选项中的最后一个是我在一般情况下会使用和推荐的选项。
关于scala - 状态单子(monad)在恒定空间(堆和堆栈)中的单子(monad)折叠?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20756436/