java - Spark 2.0.0 : Read from Cassandra in Cluster mode

标签 java apache-spark cassandra spark-cassandra-connector

我在 Spark 2.0.0 中运行从 Cassandra 读取数据的 Spark 应用程序时遇到一些问题。

我的代码工作如下:

DataFrameReader readerCassandra = SparkContextUtil.getInstance().read() 
                    .format("org.apache.spark.sql.cassandra")
                    .option("spark.cassandra.connection.host", [DATABASE_IP])
                    .option("spark.cassandra.connection.port", [DATABASE_PORT]);

final Map<String,String> map = new HashMap<String,String>();

map.put("table", "MyTable");
map.put("keyspace", "MyKeyspace");

public final  StructType schema = DataTypes.createStructType(
        new StructField[] { DataTypes.createStructField("id", DataTypes.StringType, true),
            DataTypes.createStructField("timestamp", DataTypes.TimestampType, true),
            DataTypes.createStructField("value", DataTypes.DoubleType, true)
        });

final Dataset<Row> dataset = readerCassandra.schema(schema).options(map).load(); 
dataset.show(false);

我想在集群中运行此代码。我的集群使用spark-2.0.2-bin-hadoop2.7(http://spark.apache.org/downloads.html上没有可用的spark-2.0.0)。

首先,我使用以下脚本以客户端模式提交它:

#!/bin/bash

sparkMaster=local[*]
mainClass=package.MainClass

jar=/path/to/myJar-with-dependencies.jar

driverPort=7079
blockPort=7082

deployMode=client

$SPARK_HOME/bin/spark-submit \
  --conf "spark.driver.port=${driverPort}"\
  --conf "spark.blockManager.port=${blockPort}"\
  --class $mainClass \
  --master $sparkMaster \
  --deploy-mode $deployMode \
  --jars /path/to/jars/spark-cassandra-connector_2.11-2.0.0.jar \
  $jar

当我这样做时,一切都会顺利进行。但现在,我想在集群模式下运行我的应用程序。

因此,我修改了一些提交脚本,将 sparkMaster 设置为我的主 IP,并将 deployMode 设置为“集群”。

当我提交申请时,我的驱动程序日志中几乎立即出现以下错误:

Exception in thread "main" java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
        at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.cassandra. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects
        at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148)
        ...

Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.cassandra.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        ...

注意:

  • 我仍然遇到错误,因为与我的 Master 位于同一台机器上的只有一个 Worker 的集群。
  • 最初,我使用的是 Spark 2.3.1,在集群模式下运行代码没有遇到任何问题(在 --jars 中使用 Spark-cassandra-connector_2.11-2.3.1.jar).
  • 我在 --jars 中尝试了多个 jar,例如:spark-cassandra-connector_2.11-2.0.0.jarspark-cassandra-connector_2。 11-2.0.2.jarspark-cassandra-connector_2.11-2.3.1.jarspark-cassandra-connector-java_2.11-1.5.1。 jar,但它们都不起作用。
  • 一些其他 jar 在 --jars 参数中设置并考虑在内

最佳答案

您可能需要将路径指定为 file:///path/to/jars/spark-cassandra-connector_2.11-2.0.0.jar - 在这种情况下,它将是通过驱动程序的 HTTP 服务器分发给执行程序。否则,它期望您已将文件复制到所有计算机,以避免进程本身进行复制。请参阅Spark documentation for details ...

我宁愿建议只创建具有所有依赖项(Spark 除外)的 uberjar,然后提交它 - 这样会减轻一些痛苦。

关于java - Spark 2.0.0 : Read from Cassandra in Cluster mode,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52533181/

相关文章:

nosql - 在我的场景中提高 Cassandra 读取性能的方法

hadoop - Cassandra 聚合到 Map

java - 数组初始化错误

java - 如何让 App Engine cron 作业显示在 Web 界面的 "scheduled tasks"选项卡中?

java - MongoDB $in 与 $and 查询

java - 是否有用于 Azure 使用和计费 API 的 Java SDK?

hadoop - 在yarn上运行spark时我们应该使用哪种模式?

python - 带有字典参数的 Spark UDF 失败

scala - Spark、Scala、数据帧 : create feature vectors

安装 Apache Cassandra 后 MySQL 服务器未启动