apache-spark - 使用 apache flink Java API 将数据读写到 cassandra 中

标签 apache-spark cassandra apache-flink cassandra-2.1

我打算使用 apache flink 使用 flink 将数据读/写到 cassandra 中。我希望使用 flink-connector-cassandra ,我没有找到连接器的良好文档/示例。

您能否指出使用 Apache Flink 从 cassandra 读取和写入数据的正确方法。我只看到纯粹用于写入的接收器示例? apache flink 是否也用于从类似于 apache spark 的 cassandra 读取数据?

最佳答案

我有同样的问题,这就是我正在寻找的。我不知道它是否针对您的需要过于简化,但我认为我仍然应该展示它。

ClusterBuilder cb = new ClusterBuilder() {
        @Override
        public Cluster buildCluster(Cluster.Builder builder) {
            return builder.addContactPoint("urlToUse.com").withPort(9042).build();
        }
    };

    CassandraInputFormat<Tuple2<String, String>> cassandraInputFormat = new CassandraInputFormat<>("SELECT * FROM example.cassandraconnectorexample", cb);

    cassandraInputFormat.configure(null);
    cassandraInputFormat.open(null);

    Tuple2<String, String> testOutputTuple = new Tuple2<>();
    cassandraInputFormat.nextRecord(testOutputTuple);

    System.out.println("column1: " + testOutputTuple.f0);
    System.out.println("column2: " + testOutputTuple.f1);

我发现这一点的方法是找到“CassandraInputFormat”类的代码并查看它是如何工作的( http://www.javatips.net/api/flink-master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java )。老实说,我希望它只是一种格式,而不是基于名称的 Cassandra 的完整阅读类(class),我觉得其他人可能会这么想。

关于apache-spark - 使用 apache flink Java API 将数据读写到 cassandra 中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42617575/

相关文章:

java - 如何修复格式错误的 POM : Unrecognised tag: 'groupId' ?

sql - Apache Calcite 是否提供添加自定义子句或语句的方法?

scala - AvroTypeException : Not an enum: MOBILE on DataFileWriter

apache-spark - Spark SQL 和使用现有的配置单元 udfs

numpy - 用于触发数据框的大 numpy 数组

scala - Scala Spark 中笛卡尔变换的显式排序

resources - 学习 cassandra 的资源列表

Cassandra修复导致节点超时

json - 如何在 JSON : SPARK Scala 中使用 read.schema 仅指定特定字段

apache-flink - flink-zeppelin-没有响应