scala - 有没有一种好方法可以将 Spark 中的流与更改表连接起来?

标签 scala apache-spark spark-structured-streaming

我们的 Spark 环境: DataBricks 4.2(包括 Apache Spark 2.3.1、Scala 2.11)

我们努力实现的目标: 我们希望通过一些定期更新的引用数据来丰富流数据。通过将流与引用数据连接来完成丰富。

我们实现了什么: 我们实现了两个 Spark 作业(jar): 第一个是使用

每小时更新一个 Spark 表 TEST_TABLE(我们称之为“引用数据”)
<dataset>.write.mode(SaveMode.Overwrite).saveAsTable("TEST_TABLE")

然后调用spark.catalog.refreshTable("TEST_TABLE")

第二项工作(我们称之为流数据)是使用 Spark Structured Streaming 流式读取一些数据,使用 DataFrame.transform() 将其与表 TEST_TABLE 连接起来,将其写入另一个系统。 我们在 .transform() 调用的函数中使用 spark.read.table(“TEST_TABLE”) 读取引用数据,以便获得表中的最新值。不幸的是,每次第一个应用程序更新表时,第二个应用程序都会崩溃。 Log4j 输出中显示以下消息:

18/08/23 10:34:40 WARN TaskSetManager: Lost task 0.0 in stage 547.0 (TID 5599, 10.139.64.9, executor 0): java.io.FileNotFoundException: dbfs:/user/hive/warehouse/code.db/TEST_TABLE/ part-00000-tid-5184425276562097398-25a0e542-41e4-416f-bae8-469899a72c21-36-c000.snappy.parquet

It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readFile(FileScanRDD.scala:203)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$createNextIterator(FileScanRDD.scala:377)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:295)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:291)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748

我们还尝试在读取表之前使缓存失效,但这降低了性能,并且应用程序崩溃了。 我们怀疑根本原因是对引用数据集的惰性评估(它仍然“指向”旧数据,但不再存在)。

您对我们可以采取哪些措施来防止此问题有什么建议,或者加入具有动态引用数据的流的最佳方法是什么?

最佳答案

加入引用数据;不要缓存它,这可以确保您找到源代码。查找由主键+计数器表示的最新版本数据,其中该计数器最接近或等于您在流应用程序中维护的计数器。每小时写入,再次追加所有当前的引用数据,但计数器递增;即新版本。此处使用 Parquet 。

关于scala - 有没有一种好方法可以将 Spark 中的流与更改表连接起来?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51989569/

相关文章:

scala - 在 Scala 中对未绑定(bind)的 Comparable 进行排序

scala - 嵌套惰性理解

scala - 从 scala 中的 map((tuple),(tuple)) 中读取元组的各个元素

apache-spark - Elasticsearch如何利用集群?

python - 派斯帕克 : Can saveAsNewAPIHadoopDataset() be used as bulk loading to HBase?

apache-spark - 当状态数据增长时,Spark Structured Streaming 如何处理内存中状态?

scala - 何时使用 Scala Future?

apache-spark - 发生异常 : pyspark. sql.utils.AnalysisException 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'

apache-spark - 带水印的结构化流 - 类型错误 : 'module' object is not callable

apache-spark - Spark 结构化流中的实时指标