java - Spark 多次对数据库运行查询

标签 java apache-spark batch-processing

我正在尝试使用以下代码加载数据集以进行 Spark:

Dataset<Row> dataset = spark.read().jdbc(RPP_CONNECTION_URL, creditoDia3, rppDBProperties));
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, creditoDia2, rppDBProperties)));
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, creditoDia, rppDBProperties)));
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, debitoDia3, rppDBProperties)));
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, debitoDia2, rppDBProperties)));
dataset = dataset.union(spark.read().jdbc(RPP_CONNECTION_URL, debitoDia,rppDBProperties)));
dataset = dataset.cache();
Long numberOfRowsProcessed = dataset.count();

因此,在这 6 个 session 访问我的数据库并提取数据集并计算行数之后,我不再需要访问数据库。但运行以下代码后:

dataset.createOrReplaceTempView("temp");
Dataset<Row> base =  spark.sql(new StringBuilder()
.append("select ")
.append("TRANSACTION ")
.append("from temp ")
.append("where PAYMENT_METHOD in (1,2,3,4) ")
.append("and   TRANSACTION_STATUS in ('A','B') ")
.toString()
);
base.createOrReplaceTempView("base");

但是,我实际上看到的是 Spark 再次运行查询,但这一次,附加了我在定义 Dataset<Row> base 时传递的过滤器。 。正如你所看到的,我已经缓存了数据,但是没有任何效果。

问题:是否可以在 Spark 中加载内存中的所有内容并使用缓存的数据,查询 Spark 而不再查询数据库?

从关系数据库中获取数据的成本很高,而且需要一段时间。

更新

我注意到 Spark 在尝试执行时正在向数据库发送新查询

from base a 
left join base b on on a.IDT_TRANSACTION = b.IDT_TRANSACTION and a.DATE = b.DATE 

这是 Spark 附加到查询的字符串(从数据库捕获):

WHERE ("IDT_TRANSACTION_STATUS" IS NOT NULL) AND ("NUM_BIN_CARD" IS NOT NULL)

日志中出现:

18/01/16 14:22:20 INFO DAGScheduler: ShuffleMapStage 12 (show at RelatorioBinTransacao.java:496) finished in 13,046 s 18/01/16 14:22:20 INFO DAGScheduler: looking for newly runnable stages 18/01/16 14:22:20 INFO DAGScheduler: running: Set(ShuffleMapStage 9) 18/01/16 14:22:20 INFO DAGScheduler: waiting: Set(ShuffleMapStage 13, ShuffleMapStage 10, ResultStage 14, ShuffleMapStage 11) 18/01/16 14:22:20 INFO DAGScheduler: failed: Set()

我不确定我是否明白他想说什么,但我认为内存中缺少了一些东西。

如果我只是像这样在左连接上添加注释:

from base a 
//left join base b on on a.IDT_TRANSACTION = b.IDT_TRANSACTION and a.DATE = b.DATE 

它工作得很好,并且不再进入数据库。

最佳答案

这听起来好像您可能没有足够的内存来在集群上存储联合结果。在 Long numberOfRowsProcessed = dataset.count(); 之后,请查看 Spark UI 的“存储”选项卡,看看整个数据集是否已完全缓存。如果不是,那么您需要更多内存(和/或磁盘空间)。

如果您确认数据集确实已缓存,请发布查询计划(例如 base.explain())。

关于java - Spark 多次对数据库运行查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48268969/

相关文章:

java - Spring 批处理 : How to catch exception message with skip method in spring batch?

java - Thread.currentThread().getContextClassLoader.getResource() 中的文件长度不同

Java——解决简单数组地震程序

memory - 如何设置 Apache Spark Executor 内存

amazon-web-services - com.amazonaws.services.gluejobexecutor.model.VersionMismatchException

python - pyspark 和 HDFS 命令

java - 在 Netbeans 中使用 SOAP Web 服务客户端获取自定义 HTTP header (Cookie)

java Servlet Filter 基于位置的设置

python - 并行抓取数据+批处理

workflow-foundation-4 - 一次为大量记录创建工作流服务实例