以下 Spark 代码:
val model = ALS.trainImplicit(ratings = ratingsRDD,
rank = rank,
iterations = numIterations,
lambda = lambda,
alpha = alpha)
model.productFeatures.cache()
val modelSubsetRDD = new MatrixFactorizationModel(
rank = rank,
userFeatures = model.productFeatures,
productFeatures = model.productFeatures)
引发以下异常:
Cannot change storage level of an RDD after it was already assigned a level
StorageLevel.MEMORY_ONLY
也会引发相同的异常。
另一方面,以下代码可以正常工作:
val model = ALS.trainImplicit(ratings = ratingsRDD,
rank = rank,
iterations = numIterations,
lambda = lambda,
alpha = alpha)
val modelSubsetRDD = new MatrixFactorizationModel(
rank = rank,
userFeatures = model.userFeatures,
productFeatures = model.productFeatures)
model.userFeatures.persist(StorageLevel.MEMORY_ONLY)
model.productFeatures.persist(StorageLevel.MEMORY_ONLY)
注意到,这次 userFeatures
和 productFeatures
设置为模型的两个不同成员。但是,我不确定为什么会这样。
最佳答案
您可能会从代码中的其他地方获得一些持久性?不确定 ALS.trainImplicit
在返回模型之前正在做什么。
调用cache()
会将RDD存储在MEMORY_ONLY中,而调用persist
允许您更改缓存类型。所以我猜测这个 RDD 已经被持久化在其他地方,并且您正在尝试使用 cache()
重新持久化它,这就是问题所在。但是,使用 persist
更改持久类型是完全可以接受的。
编辑:
尝试以下代码:
val model = ALS.trainImplicit(ratings = ratingsRDD,
rank = rank,
iterations = numIterations,
lambda = lambda,
alpha = alpha)
if(model.productFeatures.getStorageLevel() == StorageLevel.NONE)
model.productFeatures.cache()
val modelSubsetRDD = new MatrixFactorizationModel(
rank = rank,
userFeatures = model.productFeatures,
productFeatures = model.productFeatures)
这应该避免您尝试缓存已经缓存的内容(无论是在内存还是磁盘中)。
关于scala - 无法更改 RDD 的存储级别,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37750545/