java - 高效的 Spark Cassandra Java join

标签 java cassandra apache-spark

我有两张 table :

  1. my_keyspace.name 包含列:
    • 名称(字符串)- 分区键
    • 时间戳(日期)- 分区键的第二部分
    • id (int) - 分区键的第三部分
  2. my_keyspace.data 包含列:
    • 时间戳(日期)- 分区键
    • id (int) - 分区键的第二部分
    • 数据(字符串)

我正在尝试加入名称表中的时间戳和 ID。我通过获取与给定名称关联的所有时间戳和 ID 并从数据表中检索这些条目的数据来完成此操作。

在 CQL 中执行此操作非常快。我预计 Spark Cassandra 的速度同样快,但它似乎正在进行全表扫描。这可能是由于不知道哪些字段是分区/主键。虽然我似乎无法找到一种方法来告诉它映射。

我怎样才能使这个连接达到应有的效率?这是我的代码示例:

private static void notSoEfficientJoin() {
    SparkConf conf = new SparkConf().setAppName("Simple Application")
                                    .setMaster("local[*]")
                                    .set("spark.cassandra.connection.host", "localhost")
                                    .set("spark.driver.allowMultipleContexts", "true");
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaPairRDD<DataKey, NameRow> nameIndexRDD = javaFunctions(sc).cassandraTable("my_keyspace", "name", mapRowTo(NameRow.class)).where("name = 'John'")
                                                                       .keyBy(new Function<NameRow, DataKey>() {
                                                                           @Override
                                                                           public DataKey call(NameRow v1) throws Exception {
                                                                               return new DataKey(v1.timestamp, v1.id);
                                                                           }
                                                                       });

    JavaPairRDD<DataKey, DataRow> dataRDD = javaFunctions(sc).cassandraTable("my_keyspace", "data", mapRowTo(DataRow.class))
                                                          .keyBy(new Function<DataRow, DataKey>() {
                                                              @Override
                                                              public DataKey call(DataRow v1) throws Exception {
                                                                  return new DataKey(v1.timestamp, v1.id);
                                                              }
                                                          });

    JavaRDD<String> cassandraRowsRDD = nameIndexRDD.join(dataRDD)
                                                       .map(new Function<Tuple2<DataKey, Tuple2<NameRow, DataRow>>, String>() {
                                                           @Override
                                                           public String call(Tuple2<DataKey, Tuple2<NameRow, DataRow>> v1) throws Exception {
                                                               NameRow nameRow = v1._2()._1();
                                                               DataRow dataRow = v1._2()._2();
                                                               return nameRow + " " + dataRow;
                                                           }
                                                       });

    List<String> collect = cassandraRowsRDD.collect();
}

最佳答案

更有效地执行此连接的方法是实际调用 joinWithCassandraTable,这可以通过使用另一个 javaFunctions 调用包装结果来完成:

private static void moreEfficientJoin() {
    SparkConf conf = new SparkConf().setAppName("Simple Application")
                                    .setMaster("local[*]")
                                    .set("spark.cassandra.connection.host", "localhost")
                                    .set("spark.driver.allowMultipleContexts", "true");
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<DataKey> nameIndexRDD = sc.parallelize(javaFunctions(sc).cassandraTable("my_keyspace", "name", mapRowTo(DataKey.class))
                                                                    .where("name = 'John'")
                                                                    .collect());

    JavaRDD<Data> dataRDD = javaFunctions(nameIndexRDD).joinWithCassandraTable("my_keyspace", "data", allColumns, someColumns("timestamp", "id"), mapRowTo(Data.class), mapToRow(DataKey.class))
                                                       .map(new Function<Tuple2<DataKey, Data>, Data>() {
                                                           @Override
                                                           public Data call(Tuple2<DataKey, Data> v1) throws Exception {
                                                               return v1._2();
                                                           }
                                                       });
    List<Data> data = dataRDD.collect();
}

重要的是用 javaFunctions 包装 JavaRDD。因此可以不在 nameIndexRDD

上调用 collectsc.parallelize

关于java - 高效的 Spark Cassandra Java join,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31457188/

相关文章:

java - JPA 异常 : No Persistence provider for EntityManager named MyJPAApplicationPU

java - JFrame如何做到轻量级?

java - 如何在黑莓 ({m,n}) 上使用量词正则表达式模式

php - 有选择地进入 cassandra 比正常获取更快?

cassandra - 在正在运行的节点中将 cassandra 快照文件复制到 sstable 文件上是否安全?

java - Apache Spark - 数据帧的 datediff?

java - 如何在Java中定义自定义异常类,最简单的方法?

cassandra - spring-data-cassandra 如何处理连接池?

scala - 创建Kafka流的AbstractMethodError

hadoop - HDFS 中的 Avro 架构生成