scala - 将两个Array [string]类型的spark sql列合并到新的Array [string]列中

标签 scala apache-spark apache-spark-sql user-defined-functions

我在Spark SQL DataFrame中有两列,每一列中的每个条目都是字符串数组。

val  ngramDataFrame = Seq(
  (Seq("curious", "bought", "20"), Seq("iwa", "was", "asj"))
).toDF("filtered_words", "ngrams_array")

我想合并每一行中的数组以在新列中创建单个数组。我的代码如下:
def concat_array(firstarray: Array[String], 
                 secondarray: Array[String]) : Array[String] = 
                                     { (firstarray ++ secondarray).toArray }
val concatUDF = udf(concat_array _)
val concatFrame = ngramDataFrame.withColumn("full_array", concatUDF($"filtered_words", $"ngrams_array"))

我可以在两个数组上成功使用concat_array函数。但是,当我运行上面的代码时,出现以下异常:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 16.0 failed 1 times, most recent failure: Lost task 0.0 in stage 16.0 (TID 12, localhost): org.apache.spark.SparkException: Failed to execute user defined function(anonfun$1: (array, array) => array) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Ljava.lang.String; at $line80.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(:76) ... 13 more Driver stacktrace:

最佳答案

Arjun您创建的udf中有一个错误。当您传递数组类型的列时。数据类型不是Array [String]而是WrappedArray [String]。下面我将修改后的udf与输出粘贴。

val SparkCtxt = new SparkContext(sparkConf)

val sqlContext = new SQLContext(SparkCtxt)

import sqlContext.implicits

import org.apache.spark.sql.functions._
val temp=SparkCtxt.parallelize(Seq(Row(Array("String1","String2"),Array("String3","String4"))))
val df= sqlContext.createDataFrame(temp,
  StructType(List(
    StructField("Col1",ArrayType(StringType),true),
    StructField("Col2",ArrayType(StringType),true)
  )
  )    )

def concat_array(firstarray: mutable.WrappedArray[String],
                 secondarray: mutable.WrappedArray[String]) : mutable.WrappedArray[String] =
{
 (firstarray ++ secondarray)
}
val concatUDF = udf(concat_array _)
val df2=df.withColumn("udftest",concatUDF(df.col("Col1"), df.col("Col2")))
df2.select("udftest").foreach(each=>{println("***********")
println(each(0))})
df2.show(true)

输出:
+------------------+------------------+--------------------+
|              Col1|              Col2|             udftest|
+------------------+------------------+--------------------+
|[String1, String2]|[String3, String4]|[String1, String2...|
+------------------+------------------+--------------------+

WrappedArray(String1,String2,String3,String4)

关于scala - 将两个Array [string]类型的spark sql列合并到新的Array [string]列中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49160574/

相关文章:

r - 使用 sparklyr 将数据从数据库传输到 Spark

python - 使用不同的连接列外连接 Spark 数据框,然后合并连接列

scala - 将距离小于 x 的 List 元素分组

scala - 在 Scala 中高效检索 ArrayStack 的最后一个和倒数第二个元素?

scala - 如何隐式转换为单位?

scala - 如何将案例类放入 rdd 中并使其表现得像元组(对)?

apache-spark - Spark : How to perform prediction using trained data set (MLLIB: SVMWithSGD)

apache-spark - PySpark MongoDB 附加 DataFrame 中数组的所有元素

hive - 同一查询中的计算列 - Spark、Hive

Scala 抽象类型成员 - 继承和类型边界