我在一个不断更新的文件夹中有一个 csv 文件。我需要从此 csv 文件中获取输入并生成一些交易。我如何从不断更新的 csv 文件中获取数据,比如说每 5 分钟一次?
我试过以下方法:
val csvDF = spark
.readStream
.option("sep", ",")
.schema(userSchema)
.csv("file:///home/location/testFiles")
但问题是它正在监视文件夹是否已创建任何新文件...但我的问题是只有一个文件在不断更新。
最佳答案
I have 1 csv file in 1 folder location that is keep on updating everytime. i need to take inputs from this csv file and produce some transactions. how can i take data from csv file that is keep on updating , lets say every 5 minutes.
tl;dr 它不会起作用。
默认情况下,Spark Structured Streaming 会监控目录中的文件,并且每个新文件都会触发计算。一旦一个文件被处理,该文件将永远不会被再次处理。这是默认实现。
您可以编写自己的流式源来监视文件的更改,但这是自定义源开发(在大多数情况下不值得付出努力但可行)。
关于apache-spark - 如何对 CSV 文件中的更新行运行流式查询?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53833271/