apache-spark - Spark Cassandra Java 集成问题

标签 apache-spark cassandra cassandra-3.0 datastax-java-driver spark-cassandra-connector

我对 Spark 和 Cassandra 都是新手。

我正在尝试在 Cassandra Data 上使用 Spark+java 实现聚合功能。

我无法在代码中获取 Cassandra 数据。我阅读了多个讨论,发现 Spark 和 Spark-Cassandra 连接器存在一些兼容性问题。我尝试了很多方法来解决我的问题,但未能解决它。
找到下面的 pom.xml (也请不要介意额外的依赖项。我需要确定哪个库导致了问题)-

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>IBeatCassPOC</groupId>
<artifactId>ibeatCassPOC</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>

    <!--CASSANDRA START-->
    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>3.0.0</version>
    </dependency>

    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-mapping</artifactId>
        <version>3.0.0</version>
    </dependency>

    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-extras</artifactId>
        <version>3.0.0</version>
    </dependency>
    <dependency>
        <groupId>com.sparkjava</groupId>
        <artifactId>spark-core</artifactId>
        <version>2.5.4</version>
    </dependency>

    <!--https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_2.10-->
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>2.0.0-M3</version>
    </dependency>
    <!--CASSANDRA END-->
    <!-- Kafka -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.8.2.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.8.2.1</version>
    </dependency>

    <dependency>
        <groupId>commons-codec</groupId>
        <artifactId>commons-codec</artifactId>
        <version>1.2</version>
    </dependency>

    <!-- Spark -->

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.4.0</version>
    </dependency>

    <!-- Logging -->
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>2.1.0</version>
</dependency>

    <!-- Spark-Kafka -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.4.0</version>
    </dependency>


    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.4.0</version>
    </dependency>
    <!-- Jackson -->
    <dependency>
        <groupId>org.codehaus.jackson</groupId>
        <artifactId>jackson-mapper-asl</artifactId>
        <version>1.9.13</version>
    </dependency>

    <!-- Google Collection Library -->
    <dependency>
        <groupId>com.google.collections</groupId>
        <artifactId>google-collections</artifactId>
        <version>1.0-rc2</version>
    </dependency>

    <!--UA Detector dependency for AgentType in PageTrendLog-->
    <dependency>
        <groupId>net.sf.uadetector</groupId>
        <artifactId>uadetector-core</artifactId>
        <version>0.9.12</version>
    </dependency>
    <dependency>
        <groupId>net.sf.uadetector</groupId>
        <artifactId>uadetector-resources</artifactId>
        <version>2013.12</version>
    </dependency>

    <dependency>
        <groupId>com.esotericsoftware</groupId>
        <artifactId>kryo</artifactId>
        <version>3.0.3</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>1.3.0</version>
    </dependency>

    <dependency>
        <groupId>org.twitter4j</groupId>
        <artifactId>twitter4j-stream</artifactId>
        <version>4.0.4</version>
    </dependency>

    <!-- MongoDb Java Connector -->
    <!-- <dependency> <groupId>org.mongodb</groupId> <artifactId>mongo-java-driver</artifactId>
        <version>2.13.0</version> </dependency> -->

</dependencies>

用于获取数据的 Java 源代码 -

    import com.datastax.spark.connector.japi.CassandraJavaUtil;
    import com.datastax.spark.connector.japi.CassandraRow;
    import com.datastax.spark.connector.japi.rdd.CassandraJavaRDD;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;                
    import java.util.ArrayList;

    public class ReadCassData {
        public static void main(String[] args) {
            
            SparkConf sparkConf = new SparkConf();
            sparkConf.setAppName("Spark-Cassandra Integration");
            sparkConf.setMaster("local[4]");
            sparkConf.set("spark.cassandra.connection.host", "stagingServer22");
            sparkConf.set("spark.cassandra.connection.port", "9042");

            sparkConf.set("spark.cassandra.connection.timeout_ms", "5000");
            sparkConf.set("spark.cassandra.read.timeout_ms", "200000");
    
    
            JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
            String keySpaceName = "testKeyspace";
            String tableName = "testTable";
            
            CassandraJavaRDD<CassandraRow> cassandraRDD = CassandraJavaUtil.javaFunctions(javaSparkContext).cassandraTable(keySpaceName, tableName);
            System.out.println("Cassandra Count" + cassandraRDD.cassandraCount());
            final ArrayList<CassandraRow> data = new ArrayList<CassandraRow>();
            
            cassandraRDD.reduce(new Function2<CassandraRow, CassandraRow, CassandraRow>() {
                public CassandraRow call(CassandraRow v1, CassandraRow v2) throws Exception {
                    System.out.println("hello");
                    System.out.println(v1 + " ____ " + v2);
                    data.add(v1);
                    data.add(v2);
                    return null;
                }
            });
            System.out.println( "data Size -" + data.size());
            
        }
    }

遇到的异常是 -

    Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 0.0 failed 1 times, most recent failure: Lost task 2.0 in stage 0.0 (TID 2, localhost): java.lang.NoSuchMethodError: org.apache.spark.TaskContext.getMetricsSources(Ljava/lang/String;)Lscala/collection/Seq;
        at org.apache.spark.metrics.MetricsUpdater$.getSource(MetricsUpdater.scala:20)
        at org.apache.spark.metrics.InputMetricsUpdater$.apply(InputMetricsUpdater.scala:56)
        at com.datastax.spark.connector.rdd.CassandraTableScanRDD.compute(CassandraTableScanRDD.scala:329)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
            
    Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

我在远程位置部署了 Cassandra 集群,并且使用的 Cassandra 版本是 3.9。

请指导一下兼容的依赖项是什么。我无法更改我的 Cassandra 版本(当前为 3.9)。请建议使用什么版本的 Spark/spark-cassandra-connector 才能在数据库上成功执行 Map-Reduce 作业。

最佳答案

我尝试过与 Spark 连接,并在 scala 中使用了 Spark cassandra 连接器。

val Spark =“com.datastax.spark”%%“spark-cassandra-connector”%“1.6.0”

val SparkCore = "org.apache.spark"%% "spark-sql"% "1.6.1"

下面是我的工作代码 -

import com.datastax.driver.dse.graph.GraphResultSet
import com.spok.util.LoggerUtil
import com.datastax.spark.connector._
import org.apache.spark._

object DseSparkGraphFactory extends App {

  val dseConn = {         

     LoggerUtil.info("Connecting with DSE Spark Cluster....")
        val conf = new SparkConf(true)
          .setMaster("local[*]")
          .setAppName("test")
          .set("spark.cassandra.connection.host", "Ip-Address")
        val sc = new SparkContext(conf)
        val rdd = sc.cassandraTable("spokg_test", "Url_p")
        rdd.collect().map(println)

  }

关于apache-spark - Spark Cassandra Java 集成问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41953420/

相关文章:

java - Cassandra 击落主机重试 (Hector)

json - 将 JSON 数据集导入 Cassandra

node.js - Cassandra批处理语句插入后删除不插入相同的主键

java - Cassandra 抛出 OutOfMemory ;怎么调?

apache-spark - 在哪里可以找到 Spark SQL 语法引用?

java - 无法将 Spring Boot Java 应用程序提交到 Spark 集群

apache-spark - Apache Spark 与 Spring Cloud 数据流

java - 是否有一个 RDD 变换函数可以查看相邻元素?

cassandra - 不太清楚 Cassandra 的反模式

datastax - 如何使用用户定义的函数获取 cassandra 的 blob 列的大小?