scala - 从 Cassandra 读取数据在 Flink 中处理

标签 scala cassandra apache-flink

我必须使用 Flink 作为流引擎来处理来自 Kafka 的数据流。为了对数据进行分析,我需要在 Cassandra 中查询一些表。做这个的最好方式是什么?我一直在 Scala 中寻找此类案例的示例。但是我找不到任何。如何使用Scala作为编程语言在Flink中读取来自Cassandra的数据?
Read & write data into cassandra using apache flink Java API在同一行上有另一个问题。它在答案中提到了多种方法。我想知道在我的情况下最好的方法是什么。此外,大多数可用的示例都是用 Java 编写的。我正在寻找 Scala 示例。

最佳答案

我目前在 flink 1.3 中使用 asyncIO 从 cassandra 读取。这是关于它的文档:

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html (如果有 DatabaseClient,您将使用 com.datastax.drive.core.Cluster 代替)

让我知道您是否需要一个更深入的示例来使用它专门从 cassandra 读取,但不幸的是,我只能在 java 中提供一个示例。

编辑 1

这是我使用 flink 的异步 I/O 从 Cassandra 读取的代码示例。我仍在努力识别和解决一个问题,由于某种原因(没有深入研究)单个查询返回的大量数据,异步数据流的超时被触发,即使它看起来被 Cassandra 很好地返回并且在超时时间之前。但是假设这只是我正在做的其他事情的一个错误而不是因为这段代码,这对你来说应该很好用(并且对我来说也很好用了几个月):

public class GenericCassandraReader extends RichAsyncFunction<CustomInputObject, ResultSet> {

    private final Properties props;
    private Session client;

    public GenericCassandraReader(Properties props) {
        super();
        this.props = props;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        client = Cluster.builder()
                .addContactPoint(props.cassandraUrl)
                .withPort(props.cassandraPort)
                .build()
                .connect(props.cassandraKeyspace);
    }

    @Override
    public void close() throws Exception {
        client.close();
    }

    @Override
    public void asyncInvoke(final CustomInputObject customInputObject, final AsyncCollector<ResultSet> asyncCollector) throws Exception {

        String queryString = "select * from table where fieldToFilterBy='" + customInputObject.id() + "';";

        ListenableFuture<ResultSet> resultSetFuture = client.executeAsync(queryString);

        Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {

            public void onSuccess(ResultSet resultSet) {
                asyncCollector.collect(Collections.singleton(resultSet));
            }

            public void onFailure(Throwable t) {
                asyncCollector.collect(t);
            }
        });
    }
}

再次抱歉耽搁了。我希望能解决这个错误,所以我可以确定,但此时我认为只有一些引用比没有好。

编辑 2

所以我们最终确定问题不在于代码,而在于网络吞吐量。大量字节试图通过一个不足以处理它的管道,东西开始备份,一些开始涓涓细流,但是(感谢 datastax cassandra 驱动程序的 QueryLogger 我们可以看到这一点)接收结果所花费的时间每个查询开始攀升到 4 秒,然后是 6 秒,然后是 8 秒,依此类推。

TL;DR,代码没问题,请注意,如果您遇到 Flink 的 asyncWaitOperator 的 timeoutExceptions,则可能是网络问题。

编辑 2.5

还意识到提及由于网络延迟问题可能是有益的,我们最终转向使用 RichMapFunction 来保存我们从 cassandra 中读取的状态数据。因此,该作业只跟踪通过它的所有记录,而不必在每次有新记录通过时从表中读取以获取其中的所有记录。

关于scala - 从 Cassandra 读取数据在 Flink 中处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43067681/

相关文章:

java - 使用 Java 中的 TableSource、Table 或 DataSet 对象将 String 转换为 Double

eclipse - Scala Eclipse 文件>new 有<无适用项目>

java - 在运行时远程探测 Scala/Java 应用程序

java - 为 Java 代码定义测试数据 - 简单 - 也许使用 Scala?

用于 Windows 客户端的 Cassandra GUI

hadoop - Cassandra pig 插入异常

maven - Flink 错误 - org.apache.hadoop.ipc.RemoteException : Server IPC version 9 cannot communicate with client version 4

scala - 在Scala中,如何折叠列表并返回中间结果?

cassandra - 查询以使用cassandra使用max选择列的最大值

java - Flink DataStream API 中 TimeWindow.getStart() 的问题