java - 无法从连接到 EC2 上 Cassandra 的 EMR 运行 Spark 作业

标签 java amazon-ec2 apache-spark cassandra emr

我正在从 EMR 集群运行 Spark 作业,该集群连接到 EC2 上的 Cassandra

以下是我在项目中使用的依赖项。

<dependency>
    <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.0</version>
</dependency>
    <dependency>
      <groupId>com.datastax.spark</groupId>
      <artifactId>spark-cassandra-connector_2.10</artifactId>
      <version>1.5.0-M1</version>
    </dependency>

<dependency>
    <groupId>com.datastax.cassandra</groupId>
     <artifactId>cassandra-driver-core</artifactId>
     <version>2.1.6</version>
</dependency>

 <dependency>
     <groupId>com.datastax.spark</groupId>
     <artifactId>spark-cassandra-connector-java_2.10</artifactId>
     <version>1.5.0-M3</version>
 </dependency>

我在这里面临的问题是,如果我使用 cassandra-driver-core 3.0.0 ,我会收到以下错误

java.lang.ExceptionInInitializerError
at mobi.vserv.SparkAutomation.DriverTester.doTest(DriverTester.java:28)
at mobi.vserv.SparkAutomation.DriverTester.main(DriverTester.java:16)
Caused by: java.lang.IllegalStateException: Detected Guava issue #1635 which indicates that a version of Guava less than 16.01 is in use.  This introduces codec resolution issues and potentially other incompatibility issues in the driver.  Please upgrade to Guava 16.01 or later.
at com.datastax.driver.core.SanityChecks.checkGuava(SanityChecks.java:62)
at com.datastax.driver.core.SanityChecks.check(SanityChecks.java:36)
at com.datastax.driver.core.Cluster.<clinit>(Cluster.java:67)
... 2 more

我也尝试过包含 guaua 版本 19.0.0,但仍然无法运行该作业

当我降级 cassandra-driver-core 2.1.6 时,出现以下错误。

com.datastax.driver.core.exceptions.NoHostAvailableException: All    host(s) tried for query failed (tried: /EMR PUBLIC IP:9042    (com.datastax.driver.core.TransportException: [/EMR PUBLIC IP:9042] Cannot       connect))
 at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:223)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:78)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1272)
at com.datastax.driver.core.Cluster.init(Cluster.java:158)
at com.datastax.driver.core.Cluster.connect(Cluster.java:248)

请注意,我已经在本地测试了我的代码,它运行得非常好,并且我遵循了此处提到的不同的依赖项组合 https://github.com/datastax/spark-cassandra-connector

代码:

 public class App1 {

 private static Logger logger = LoggerFactory.getLogger(App1.class);

static SparkConf conf = new SparkConf().setAppName("SparkAutomation").setMaster("yarn-cluster");


static JavaSparkContext sc = null;
static
   {

    sc = new JavaSparkContext(conf);
   }


public static void main(String[] args) throws Exception {

    JavaRDD<String> Data = sc.textFile("S3 PATH TO GZ FILE/*.gz");

    JavaRDD<UserSetGet> usgRDD1=Data.map(new ConverLineToUSerProfile());

     List<UserSetGet> t3 = usgRDD1.collect(); 

     for(int i =0 ; i <=t3.size();i++){
         try{
         phpcallone php = new phpcallone();
         php.sendRequest(t3.get(i));
         }
         catch(Exception e){
             logger.error("This Has reached ====> " + e);
         }

     }

  } 
}




public class phpcallone{

private static Logger logger = LoggerFactory.getLogger(phpcallone.class);
static String pid;

public void sendRequest(UserSetGet usg) throws JSONException, IOException, InterruptedException {


     UpdateCassandra uc= new UpdateCassandra(); 
     try { 
         uc.UpdateCsrd(); 
         }
     catch (ClassNotFoundException e) {
         e.printStackTrace(); }
     }

}
   }

public class UpdateCassandra{
public void UpdateCsrd() throws ClassNotFoundException {

     Cluster.Builder clusterBuilder = Cluster.builder()
                .addContactPoint("PUBLIC IP ").withPort(9042)
                .withCredentials("username", "password");
     clusterBuilder.getConfiguration().getSocketOptions().setConnectTimeoutMillis(10000);

    try  {
        Session session = clusterBuilder.build().connect("dmp");
        session.execute("USE dmp");
        System.out.println("Connection established");

    } catch (Exception e) {
        e.printStackTrace();
    }
  }

 }

最佳答案

假设您使用的是 EMR 4.1+,您可以将 guava jar 传递到 Spark 提交的 --jars 选项中。然后向 EMR 提供配置文件以首先使用用户类路径。

例如,在文件 setup.json 中

[
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.driver.userClassPathFirst": "true",
      "spark.executor.userClassPathFirst": "true"
    }
  }
]

您可以在 create-cluster aws cli 命令中提供 --configurations file://setup.json 选项。

关于java - 无法从连接到 EC2 上 Cassandra 的 EMR 运行 Spark 作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35548027/

相关文章:

java - 在 try-with-resources block 中调用对象的方法

java - 虽然我实现的快速排序方法适用于整数和字符串数组,但此方法不适用于汽车数组

java - Spark Sql 查询失败

java - 如果我们缓存一个DataSet,然后将同一个DataSet缓存为表,Spark会缓存数据两次吗

java - Scala SBT运行时的第二个-Xmx参数

java - 调用 menuitems 来触发单独的代码块

python - 如何在Boto3中获取AWS中未标记的实例?

azure - 云中自定义客户端服务器应用程序的负载平衡

amazon-s3 - 将 S3 存储桶挂载为 EC2 实例上的 NFS 共享

apache-spark - 如何在不重新分区的情况下并行执行 Spark UDF