scala - 无法更改 RDD 的存储级别

标签 scala apache-spark machine-learning recommendation-engine

以下 Spark 代码:

val model = ALS.trainImplicit(ratings = ratingsRDD,
                              rank = rank,
                              iterations = numIterations,
                              lambda = lambda,
                              alpha = alpha)  

model.productFeatures.cache()

val modelSubsetRDD = new MatrixFactorizationModel(
  rank = rank,
  userFeatures = model.productFeatures,
  productFeatures = model.productFeatures)

引发以下异常:

Cannot change storage level of an RDD after it was already assigned a level

StorageLevel.MEMORY_ONLY 也会引发相同的异常。

另一方面,以下代码可以正常工作:

    val model = ALS.trainImplicit(ratings = ratingsRDD,
                              rank = rank,
                              iterations = numIterations,
                              lambda = lambda,
                              alpha = alpha)  
    val modelSubsetRDD = new MatrixFactorizationModel(
      rank = rank,
      userFeatures = model.userFeatures,
      productFeatures = model.productFeatures)

    model.userFeatures.persist(StorageLevel.MEMORY_ONLY)
    model.productFeatures.persist(StorageLevel.MEMORY_ONLY)

注意到,这次 userFeaturesproductFeatures 设置为模型的两个不同成员。但是,我不确定为什么会这样。

最佳答案

您可能会从代码中的其他地方获得一些持久性?不确定 ALS.trainImplicit 在返回模型之前正在做什么。

调用cache()会将RDD存储在MEMORY_ONLY中,而调用persist允许您更改缓存类型。所以我猜测这个 RDD 已经被持久化在其他地方,并且您正在尝试使用 cache() 重新持久化它,这就是问题所在。但是,使用 persist 更改持久类型是完全可以接受的。

编辑:

尝试以下代码:

val model = ALS.trainImplicit(ratings = ratingsRDD,
                              rank = rank,
                              iterations = numIterations,
                              lambda = lambda,
                              alpha = alpha)  
if(model.productFeatures.getStorageLevel() == StorageLevel.NONE)
    model.productFeatures.cache()

val modelSubsetRDD = new MatrixFactorizationModel(
  rank = rank,
  userFeatures = model.productFeatures,
  productFeatures = model.productFeatures)

这应该避免您尝试缓存已经缓存的内容(无论是在内存还是磁盘中)。

关于scala - 无法更改 RDD 的存储级别,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37750545/

相关文章:

Python - 数据编码向量到Word

artificial-intelligence - 人工智能学习无效数据中的模式?

java - Spark : how does partitionBy (DataFrameWriter) actually work?

scala - 隐式 def 函数的含义是什么

scala - 收到无法解析符号 scalafx 的错误

python - 将gzip文件保存在应用于rdd的函数中

python - 舍入 double 值并转换为整数

apache-spark - 在实践中,迷你批处理与实时流之间有什么区别(不是理论上的区别)?

python - 如何在CNN中添加Dropout

scala - scala 2.11 中弃用的 `scala.collection.script` 的替代方案?