java - Datastax DSE Cassandra、Spark、Shark、独立程序

标签 java scala cassandra apache-spark shark-sql

我使用 Datastax Enterprise 4.5。我希望我的配置是正确的,我按照 datastax 网站上的说明进行了操作。我可以使用 Windows 服务写入 Cassandra DB,这可以工作,但我无法使用 where 函数通过 Spark 进行查询。

我使用“./dse cassandra -k -t”(在/bin 文件夹中)启动 Cassandra 节点(只有一个用于测试目的),因此 hadoop 和 Spark 都在运行。我可以毫无问题地写入 Cassandra。

因此,当“where”不是 RowKey 时,您不能在 Cassandra 查询中使用“where”子句。所以我需要使用 Spark/Shark。我可以通过 shark (./dse shark) 启动和使用我需要的所有查询,但我需要用 Scala 或 Java 编写一个独立程序。

所以我尝试了这个链接:https://github.com/datastax/spark-cassandra-connector

我可以查询一个简单的语句,例如:

val conf = new SparkConf(true)
  .set("spark.cassandra.connection.host", "MY_IP")
  .setMaster("spark://MY_IP:7077")
  .setAppName("SparkTest")

// Connect to the Spark cluster:
lazy val sc = new SparkContext(conf)

val rdd = sc.cassandraTable("keyspace", "tablename")
println(rdd.first)

这很有效,但如果我要求更多行或计数:

println(rdd.count)
rdd.toArray.foreach(println)

然后我得到这个异常:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up.
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

当我在 Java 中尝试这个时,我遇到了同样的问题。有谁知道这个问题吗?我不知道数据库配置是否正确或者 scala/Java 程序是否正常工作。也许某些端口被阻止,但 7077 和 4040 是开放的。

旁注:如果我在 Cassandra DB 上启动 Spark,我可以执行如下查询:

sc.cassandraTable("test","words").select("word").toArray.foreach(println) 

但是如果我使用“where”子句,例如:

sc.cassandraTable("test","words").select("word").where("word = ?","foo").toArray.foreach(println)

我得到这个异常:

java.io.IOException: Exception during query execution: SELECT "word" FROM "test"."words" WHERE token("word") > 0 AND word = ? ALLOW FILTERING

你知道为什么吗?我想我可以在 Spark 中使用 where 子句?

谢谢!

最佳答案

All masters are unresponsive!

表示您尝试连接的 IP 实际上并未被 Spark 绑定(bind)。所以这基本上是一个网络配置错误。扫描以查看哪些接口(interface)正在监听 7077,并确保您连接到正确的接口(interface)。

对于第二个问题,where运算符意味着您要对该子句进行谓词下推。目前您无法使用主键执行此操作。如果你想where在单个主键上,您可以执行 filter来实现这一点,但您不会看到出色的性能,因为这将执行整个表扫描。

关于java - Datastax DSE Cassandra、Spark、Shark、独立程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25609300/

相关文章:

cassandra - 使用 AvroConverter 启动时出现 Kafka Connect 错误

java - 无法转换 10 月 7 日星期二 :00:00 EEST 2014 to Date object

java - Stack 不将项目插入集合中

scala - 在 Scala 中使用反射访问外部类

scala - 更正 List[List[X]] 的 "empty"返回值

java - 在 Java 中使用 apache Spark 查询 cassandra 时速度很慢。

java - ServiceTracker 找到哪些服务

java - Java 中的 "How to format LocalDate with Custom Pattern"

string - Scala 运行时字符串插值/格式化

cassandra - 所有尝试查询的主机均失败