java - 使用 Spark Streaming 连接到 Cassandra 时出错

标签 java apache-spark cassandra apache-kafka spark-streaming

我正在尝试使用 cassandra 创建键空间和表,但出现错误。事实上,我正在尝试连接 Spark 和 cassandra。

我有以下代码:

public static void main(String[] args){

    SparkConf sparkConf = new SparkConf().setAppName("org.kakfa.spark.ConsumerData").setMaster("local[4]");
    // Substitute 127.0.0.1 with the actual address of your Spark Master (or use "local" to run in local mode
    sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");
    // Create the context with 2 seconds batch size
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

    Map<String, Integer> topicMap = new HashMap<>();
    String[] topics = KafkaProperties.TOPIC.split(",");
    for (String topic: topics) {
        topicMap.put(topic, KafkaProperties.NUM_THREADS);
    }
    /* connection to cassandra */
    CassandraConnector connector = CassandraConnector.apply(sparkConf);
    System.out.println("+++++++++++ cassandra connector created ++++++++++++++++++++++++++++");

    /* Receive kafka inputs */
    JavaPairReceiverInputDStream<String, String> messages =
            KafkaUtils.createStream(jssc, KafkaProperties.ZOOKEEPER, KafkaProperties.GROUP_CONSUMER, topicMap);
    System.out.println("+++++++++++++ streaming-kafka connection done +++++++++++++++++++++++++++");

    System.out.println(" -----  trying to create tables ------ ");

    try (Session session = connector.openSession()) {
        session.execute("DROP KEYSPACE IF EXISTS test");
        session.execute("CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
        session.execute("CREATE TABLE test.users (id TEXT PRIMARY KEY, name TEXT)");
    }

    System.out.println("---- tables created ----");

但我收到以下错误:

Exception in thread "main" com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.ConnectionException: [/127.0.0.1:9042] Unexpected error during transport initialization (com.datastax.driver.core.TransportException: [/127.0.0.1:9042] Connection has been closed)))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:196)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:80)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1145)
at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:313)
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:182)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:161)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:161)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:70)
at org.kakfa.spark.ConsumerData.main(ConsumerData.java:73)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

起初我以为是主机,但后来我将连接主机更改为“本地”,出现下一个错误,我不知道应该在此处设置什么才能避免出现此错误:

Exception in thread "main" java.lang.IllegalArgumentException: Cannot build a cluster without contact points
at com.datastax.driver.core.Cluster.checkNotEmpty(Cluster.java:108)
at com.datastax.driver.core.Cluster.<init>(Cluster.java:100)
at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:169)
at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:1031)
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:179)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:161)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:161)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:70)
at org.kakfa.spark.ConsumerData.main(ConsumerData.java:73)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

在我通过从命令行调用“bin/cassandra -f”启动 cassandra 之前添加这一点。

谢谢!

最佳答案

问题是我有错误的依赖项。

如果您想了解更多信息,可以查看solution here .

关于java - 使用 Spark Streaming 连接到 Cassandra 时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37161286/

相关文章:

java - 找到Hashmap中的几个最大值

java - java进程中有很多线程

java - 打印 Java 数组的最简单方法是什么?

java - 捕获 header 参数信息 selenium java

hadoop - 如何用Spark尾部HDFS文件?

scala - 如何将任何新库(如 spark-sftp)添加到我的 Pyspark 代码中?

java - 如何使用 CQL 更新 Cassandra 中的列值?

sqlite - 用于存储基于大型集合/数组的数据的高写入并发后端?

python - PySpark - 比较数据帧

Cassandra 用于时间序列数据 : how to size the partition?