scala - java.lang.String 不是字符串模式的有效外部类型

标签 scala csv apache-spark row decode

我正在尝试将一些 csv 数据加载到 Spark 集群中并对其运行一些查询,但是我在加载数据时遇到了问题。

请参阅下面的代码示例 - 我已经生成了一个标题并正在尝试解析列,但是在针对(大的、列丰富的)数据集运行时该过程失败并带有混淆的错误消息:'java.lang.String is not a字符串模式的有效外部类型'

这似乎没有在互联网上的其他地方解决 - 任何人都知道问题可能是什么?

(我原本以为这可能与加载空字段或空字段有关,但一段时间后进程失败,源数据非常非常稀疏)

var headers = StructType(header_clean.split(",").map(fieldName ⇒ StructField(fieldName, StringType, true)))
var contentRdd = contentNoHeader.map(k => k.split(",")).map(
    p => Row(p.map( x => x.replace("\"", "").trim)))

contentRdd.createOrReplaceTempView("someView")

val domains = spark.sql("SELECT DISTINCT domain FROM someView")

作为引用,错误日志的底部(非常垃圾,很多列
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 87, pageUrl), StringType), true) AS pageUrl#377
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 87, pageUrl), StringType), true)    :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt    :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)    :  :  +- input[0, org.apache.spark.sql.Row, true]    : 
+- 87    :- null    +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 87, pageUrl), StringType), true)
      +- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 87, pageUrl), StringType)
         +- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 87, pageUrl)
            +- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
               +- input[0, org.apache.spark.sql.Row, true]   at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:279) at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537) at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)   at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source)   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)   at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)   at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:85)   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)  ... 3 more Caused by: java.lang.RuntimeException: [Ljava.lang.String; is not a valid external type for schema of string   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown Source)   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)   at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:276) ... 17 more

最佳答案

我通过拆分Row的元素解决了这个问题。你可以这样做:

StructType(header_clean.split(",").map(fieldName ⇒StructField(fieldName, StringType, true)))
var contentRdd = contentNoHeader.map(k => k.split(",")).map(
p => {
  val ppp = p.map( x => x.replace("\"", "").trim)
  Row(ppp(0),ppp(1),ppp(2))
})

关于scala - java.lang.String 不是字符串模式的有效外部类型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40164087/

相关文章:

java - 响应式(Reactive)编程背压与传统分页有何不同?

mysql - 通过 CSV 将 Excel 数据导入 MySQL 时出现问题

scala - 为什么 Scala 函数限制为 22 个参数?

scala - 有条件地重写 Kiama/Scala 中的术语

scala - 我如何将类型扩展到 scala 中的特征

java - 如何在 Apache Spark 中为两个具有不同结构的 DataFrame 实现 NOT IN

scala - 如何在 Spark 2.3.0 UDF 中为每个工作人员构建并保留一个引用对象?

python - 小数点后第二位的停止平均数

swift - 如何将库包含到我的 Swift 项目中?

scala - 在 Spark 中,在所有工作人员上拥有静态对象的正确方法是什么?