scala 从 Stream[String] => 超出 GC 开销限制分割字符串

标签 scala garbage-collection heap-memory

我不明白为什么分割Stream[String]会产生超出GC开销限制,具体取决于中的str Stream[String].flatMap{string => str.split("")} 是不变的或随机发出的。

str不变时,不会发生开销,而是在随机情况下会发生。

我没有引用循环 block 中的对象。

我使用def来声明Stream,以生成非累积的Stream

感谢您的见解。

这是我的代码:

import scala.util.Random

object DataOps{
  val randomGen:Random = new Random()
  def randomText:String = (0 to 300).map(x => randomGen.nextString(10)).mkString(" ")
  val text:String = Array.fill(300)(randomGen.nextString(10)).mkString(" ")

  //return a stream of strind using the same 'txt:String'
  def infiniteInvariantDataStream(cnt:Int): Stream[String] = { 
    if (cnt>0) text#::infiniteInvariantDataStream(cnt-1)
    else Stream[String]()
  }

  //return a Stream of random string
  def infiniteDataStream(cnt:Int):Stream[String] = { 
    if (cnt>0) randomText#::infiniteDataStream(cnt-1)
    else Stream[String]()
  }
}

object BasicOps{
  def dummyStringStreamSplit(datastream: Stream[String]) = { 
      datastream
        .flatMap(txt => txt.split(" ")) 
        .foreach(word => word)
  }
}

object scalaOverflow extends App{

  val n_lines:Int = 1000000

  println("splitting looping over invariant text")
  def datastream1:Stream[String] = DataOps.infiniteInvariantDataStream(n_lines)
  BasicOps.dummyStringStreamSplit(datastream1)
  println("INVARIANT LINE SPLIT OK: no heap overflow")

  println("splitting looping over random text")
  def datastream3:Stream[String] = DataOps.infiniteDataStream(n_lines)
  BasicOps.dummyStringStreamSplit(datastream3)
  println("RANDOM LINE SPLIT OK: no heap overflow")

}

这是错误:

splitting looping over invariant text
INVARIANT LINE SPLIT OK: no heap overflow
splitting looping over random text
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.lang.String.valueOf(String.java:2840)
        at java.lang.Character.toString(Character.java:2136)
        at java.lang.String.valueOf(String.java:2826)
        at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:198)
        at scala.collection.TraversableOnce$$anonfun$addString$1.apply(TraversableOnce.scala:350)
        at scala.collection.immutable.List.foreach(List.scala:383)
        at scala.collection.TraversableOnce$class.addString(TraversableOnce.scala:343)
        at scala.collection.AbstractTraversable.addString(Traversable.scala:104)
        at scala.collection.TraversableOnce$class.mkString(TraversableOnce.scala:309)
        at scala.collection.AbstractTraversable.mkString(Traversable.scala:104)
        at scala.collection.TraversableOnce$class.mkString(TraversableOnce.scala:311)
        at scala.collection.AbstractTraversable.mkString(Traversable.scala:104)
        at scala.collection.TraversableOnce$class.mkString(TraversableOnce.scala:313)
        at scala.collection.AbstractTraversable.mkString(Traversable.scala:104)
        at scala.util.Random.nextString(Random.scala:89)
        at DataOps$$anonfun$randomText$1.apply(scalaOverflow.scala:5)
        at DataOps$$anonfun$randomText$1.apply(scalaOverflow.scala:5)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
        at scala.collection.immutable.Range.foreach(Range.scala:160)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at DataOps$.randomText(scalaOverflow.scala:5)
        at DataOps$.infiniteDataStream(scalaOverflow.scala:16)
        at DataOps$$anonfun$infiniteDataStream$1.apply(scalaOverflow.scala:16)
        at DataOps$$anonfun$infiniteDataStream$1.apply(scalaOverflow.scala:16)
        at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1117)
        at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1107)
        at scala.collection.immutable.Stream$$anonfun$flatMap$1.apply(Stream.scala:458)
        at scala.collection.immutable.Stream$$anonfun$flatMap$1.apply(Stream.scala:458)
        at scala.collection.immutable.Stream.append(Stream.scala:241)
        at scala.collection.immutable.Stream$$anonfun$append$1.apply(Stream.scala:241)

更新

其实,这个推流的原因在于下面的方法。重点是将 java while 循环转换为功能友好的 Stream:

import java.sql.{Connection, ResultSet, Statement, DriverManager}
def sqlStream(psqlResult: ResultSet, colname:String): Stream[(Int,String)] = {
    val state:Boolean = psqlResult.next()
    if (state && psqlResult.getString(colname) != null)
        (psqlResult.getRow(), psqlResult.getString(colname))#::sqlStream(psqlResult, colname)
    else if (state)
        sqlStream(psqlResult, colname)
    else
        Stream[(Int,String)]()
}

我应该考虑更好的选择吗?

谢谢。

最佳答案

dummyStringStreamSplit 中的参数 datastream 实际上充当 val 并维护对传入流开头的引用。这就是导致无界内存使用和最终 GC 开销限制超出错误的原因。

确实没有办法让一个接受 Stream 并根据每个元素进行计算(而不是仅仅返回一个新的 Stream)的方法是安全的。至少,无法保证客户端代码不会向您传递保存在某处变量中的 Stream

如果您定义 dummyStringStreamSplit 如下:

def dummyStringStreamSplit(datastream: Stream[String]) =
  datastream.flatMap(txt => txt.split(" "))

然后你可以这样做:

println("splitting looping over random text")
def datastream3:Stream[String] = DataOps.infiniteDataStream(n_lines)
def datastream3Split = BasicOps.dummyStringStreamSplit(datastream3)
datastream3Split.foreach(word => word)
println("RANDOM LINE SPLIT OK: no heap overflow")

并且您不会收到 GC 开销限制超出错误。

关于scala 从 Stream[String] => 超出 GC 开销限制分割字符串,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24282556/

相关文章:

android - 大内存块没有被垃圾收集

.net - 有关如何避免客户端应用程序中 GC 延迟的建议

C++ 删除对象作为引用

C++ 堆内存性能改进

scala - 获取包对象上字段的 FieldMirror

scala - 元组在 map 内解包

c++ - 垃圾收集 C 会比 C++ 更快吗?

heap-memory - 使用 Jena/ARQ 在 sparql 查询到 D2R 之间等待 - 堆空间

xml - 如何使用 Scala 将 xml 文档解析为流?

scala - 如何发布带有校验和(MD5、SHA1)的工件?