scala - Spark 2.1.0 结构流与本地 CSV 文件

标签 scala csv apache-spark spark-structured-streaming

只是为了学习新的 Spark 数据流式结构,我尝试过这样的实验,但不确定我是否在流式功能上做错了。

首先,我从一些静态的东西开始,只使用 Spark 2.1.0 附带的简单文本 (csv) 文件:

val df = spark.read.format("csv").load(".../spark2/examples/src/main/resources/people.txt")
df.show()

而且我可以获得如此合理的输出(在 Zepplin 下)。

+-------+---+
|    _c0|_c1|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

按照示例,我只是修改了代码以读取相同的文件和提供的模式

val userSchema = new StructType().add("name", "string").add("age", "integer")

val csvDF = spark
  .readStream
  .schema(userSchema)      // Specify schema of the csv files
  .format("csv")
  .load(".../spark2/examples/src/main/resources/people.csv") 

而且没有报错信息,所以我想把数据写入内存,用下面的代码看结果:

val outStream = csvDF.writeStream
  .format("memory")
  .queryName("logs")
  .start()

sql("select * from logs").show(truncate = false)

但是,没有错误消息,我一直得到“空输出”

+----+---+
|name|age|
+----+---+
+----+---+

这些代码是在 Zeppelin 0.7 下测试的,我不确定我是否遗漏了什么。同时,我使用 $nc -lk 9999 尝试了来自 Apache Spark 2.1.0 官方网站的示例,它运行得非常好。

如果我做错了什么,我可以学习吗?

[修改和测试]

  1. 我尝试将同一个文件 people.txt 复制到 people1.csv peopele2.csv people3.csv 在一个 .../csv/文件夹下
  2. val csvDF = spark.readStream.schema(userSchema).csv("/somewhere/csv")
  3. csvDF.groupBy("name").count().writeStream.outputMode("complete").format("console").start().awaitTermination()

我得到了这个:

-------------------------------------------
Batch: 0
-------------------------------------------
+-------+-----+
|   name|count|
+-------+-----+
|Michael|    3|
|   Andy|    3|
| Justin|    3|
+-------+-----+

因此,我可能不认为是数据readstream()问题...

最佳答案

  1. 文件名是people.txt,不是people.csv。 Spark 将抛出一条错误消息“路径不存在”。我只是用Spark Shell验证了一下。

  2. 输入路径应该是一个目录。使用文件没有意义,因为这是一个流式查询。

关于scala - Spark 2.1.0 结构流与本地 CSV 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42615802/

相关文章:

postgresql - 如何使用maven在spark中包含jdbc jar

当静态类型为 Map 时,Scala SortedMap.map 方法返回未排序的 map

json - 如何在 JSON : SPARK Scala 中使用 read.schema 仅指定特定字段

performance - Scala快速生成上三角矩阵坐标

python - 将 float 列表写入 csv 文件

ruby - 如何将散列输出到 CSV 行

python csv标题不在第一行

apache-spark - Spark 或其他技术中的混合效应模型

scala - 使用 Oozie 将 Spark 转为 Hbase

json - 在 scala/play 中解析 "stringified"JSON