scala - Apache Spark : Driver (instead of just the Executors) tries to connect to Cassandra

标签 scala apache-spark cassandra

我想我还没有完全理解 Spark 的工作原理。

这是我的设置:

我在独立模式下运行 Spark 集群。我为此使用了 4 台机器:一台是 Master,另外三台是 Workers。

我编写了一个从 Cassandra 集群读取数据的应用程序(参见 https://github.com/journeymonitor/analyze/blob/master/spark/src/main/scala/SparkApp.scala#L118)。

3 节点 Cassandra 集群在同样托管 Spark Worker 节点的机器上运行。 Spark Master 节点不运行 Cassandra 节点:

Machine 1      Machine 2        Machine 3        Machine 4
Spark Master   Spark Worker     Spark Worker     Spark Worker
               Cassandra node   Cassandra node   Cassandra node

这背后的原因是我想优化数据局部性 - 在集群上运行我的 Spark 应用程序时,每个 Worker 只需要与其本地 Cassandra 节点对话。

现在,当通过运行 spark-submit --deploy-mode client --master spark://machine-1 将我的 Spark 应用程序提交到集群时从 Machine 1(Spark Master),我期望以下内容:
  • 在 Spark Master 上启动了一个 Driver 实例
  • Driver 在每个 Spark Worker 上启动一个 Executor
  • Driver 将我的应用程序分发给每个 Executor
  • 我的应用程序在每个 Executor 上运行,然后通过 127.0.0.1:9042 与 Cassandra 对话。

  • 然而,情况似乎并非如此。相反,Spark Master 尝试与 Cassandra 对话(但失败了,因为 Machine 1 主机上没有 Cassandra 节点)。

    我误解了什么?它的工作方式不同吗? Driver 是否真的从 Cassandra 读取数据,并将数据分发给 Executor?但后来我永远无法读取大于 memory of Machine 1 的数据。 ,即使我的集群的总内存是足够的。

    或者,Driver 是否与 Cassandra 对话不是为了读取数据,而是为了找出如何对数据进行分区,并指示 Executor 读取“他们的”部分数据?

    如果有人能启发我,那将不胜感激。

    最佳答案

    Driver 程序负责在 worker 节点上创建 SparkContext、SQLContext 和调度任务。它包括创建逻辑和物理计划以及应用优化。为了能够做到这一点,它必须能够访问数据源模式和可能的其他信息,如模式或不同的统计信息。实现细节因源而异,但一般来说,这意味着数据应该可以在所有节点上访问,包括应用程序主节点。

    在一天结束时,您的期望几乎是正确的。无需通过驱动程序即可在每个工作程序上单独获取数据块,但驱动程序必须能够连接到 Cassandra 以获取所需的元数据。

    关于scala - Apache Spark : Driver (instead of just the Executors) tries to connect to Cassandra,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33897586/

    相关文章:

    cassandra - 增加复制因子对cassandra性能的影响

    scala - 尝试编译播放模板项目时 Unresolved 依赖项 : sbt-plugin;2. 7.0

    java - 如何在用Java编写的多模块 Play 项目中运行单元测试

    scala - 如何在 Spark/Scala 中使用窗口函数使用 countDistinct?

    apache-spark - 斯卡拉 Spark : Calculate grouped-by AUC

    php - 处理消息队列中的重复

    cassandra - Cassandra 提交日志如何被损坏?

    oop - Scala扩展参数化抽象类

    java - 将 DefaultMutableTreeNode 值设置为默认值,用于 Spark mapToPair 时

    Scala/Spark Apriori 实现速度极慢