我一直在尝试将 RDD 转换为数据帧。为此,需要定义类型而不是 Any。我正在使用 Spark MLLib PrefixSpan,这就是 freqSequence.sequence 的来源。我从一个数据框开始,其中包含 Session_ID、 View 和购买作为字符串数组:
viewsPurchasesGrouped: org.apache.spark.sql.DataFrame =
[session_id: decimal(29,0), view_product_ids: array[string], purchase_product_ids: array[string]]
然后,我计算频繁模式并需要将它们放在数据框中,以便我可以将它们写入 Hive 表。
val viewsPurchasesRddString = viewsPurchasesGrouped.map( row => Array(Array(row(1)), Array(row(2)) ))
val prefixSpan = new PrefixSpan()
.setMinSupport(0.001)
.setMaxPatternLength(2)
val model = prefixSpan.run(viewsPurchasesRddString)
val freqSequencesRdd = sc.parallelize(model.freqSequences.collect())
case class FreqSequences(views: Array[String], purchases: Array[String], support: Long)
val viewsPurchasesDf = freqSequencesRdd.map( fs =>
{
val views = fs.sequence(0)(0)
val purchases = fs.sequence(1)(0)
val freq = fs.freq
FreqSequences(views, purchases, freq)
}
)
viewsPurchasesDf.toDF() // optional
当我尝试运行此程序时,观看次数和购买次数为“任意”,而不是“数组[字符串]”。我拼命地尝试将它们转换,但我得到的最好的是数组[Any]。我想我需要将内容映射到字符串,我已经尝试过,例如这个:How to get an element in WrappedArray: result of Dataset.select("x").collect()?这是:How to cast a WrappedArray[WrappedArray[Float]] to Array[Array[Float]] in spark (scala)以及数千个其他 Stackoverflow 问题...
我真的不知道如何解决这个问题。我想我已经将初始数据帧/RDD 转换为很多,但无法理解在哪里。
最佳答案
我认为问题是你有一个DataFrame
,它不保留静态类型信息。当您从 Row
中取出一个项目时,您必须明确告诉它您希望获得哪种类型。
未经测试,但根据您提供的信息推断:
import scala.collection.mutable.WrappedArray
val viewsPurchasesRddString = viewsPurchasesGrouped.map( row =>
Array(
Array(row.getAs[WrappedArray[String]](1).toArray),
Array(row.getAs[WrappedArray[String]](2).toArray)
)
)
关于arrays - Scala 将 WrappedArray 或 Array[Any] 转换为 Array[String],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48339097/