java - 获得 Cassandra Writes 背压的最佳方法是什么?

标签 java cassandra datastax-java-driver backpressure

我有一项服务以我控制的速率从队列中消耗消息。我做了一些处理,然后尝试通过 Datastax Java 客户端写入 Cassandra 集群。我已经使用 maxRequestsPerConnectionmaxConnectionsPerHost 设置了我的 Cassandra 集群。但是,在测试中我发现,当我达到 maxConnectionsPerHostmaxRequestsPerConnection 时,对 session.executeAsync 的调用不会阻塞。

我现在正在做的是使用 new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection) 并在每个异步请求之前递增它,并在 executeAsync 返回的 future 完成时递减它.这工作得很好,但它似乎是多余的,因为驱动程序已经在内部跟踪请求和连接。

有没有人想出更好的解决方案来解决这个问题?

一个警告:我希望一个请求在完成之前被视为未完成。这包括重试!我从集群中获得可重试失败的情况(例如等待一致性超时)是我想要背压并停止使用队列消息的主要情况。

问题:

// the rate at which I consume messages depends on how fast this method returns
processMessage(message) {
    // this appears to return immediately even if I have exhausted connections/requests
    session.executeAsync(preparedStatement.bind(...));
}

当前解决方案:

constructor() {
    this.concurrentRequestsSemaphore = new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection);
}

processMessage(message) {
    ResultSetFuture resultSetFuture = session.executeAsync(preparedStatement.bind(...));
    CompletableFuture<ResultSet> future = completableFromListenable(resultSetFuture);
    concurrentRequestsSemaphore.acquireUninterruptibly();
    future.whenComplete((result, exception) -> concurrentRequests.release());
}

另外,有人能看出这个解决方案有什么明显的问题吗?

最佳答案

一个不杀死集群的可能想法是“限制”你对 executeAsync 的调用,例如在一批 100 个(或任何最适合您的集群和工作负载的数字)之后,您将在客户端代码中 hibernate 并对所有 100 个 future 进行阻塞调用(或使用 Guava 库转换 future 列表进入列表的 future )

这样,在发出 100 个异步查询后,您将强制客户端应用程序等待所有异步查询都成功,然后再继续。如果您在调用 future.get() 时捕获到任何异常,您可以安排重试。通常,Java 驱动程序的默认 RetryStrategy 已尝试重试。

关于来自服务器的背压信号,从CQL二进制协议(protocol)V3开始,有一个错误代码通知客户端协调器过载:https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v3.spec#L951

从客户端,您可以通过两种方式获取此重载信息:

关于java - 获得 Cassandra Writes 背压的最佳方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35323856/

相关文章:

Java 使用 hibernate

java - 如何从 BufferedImage 列表创建视频?

java - 使用 CQL 3.0 在集合上创建自定义索引

java - 未找到请求的操作的编解码器 : [timestamp <-> java. util.UUID]

java - Spark 使用编码器创建数据集,其中 row 是数组类型

java - 如何在 Web 应用程序(Spring/Vaadin)中为每个用户创建和维护一个 jdbc 连接?

cassandra - 无法从我的图形实例中看到我的顶点和边,但能够从其他图形实例中看到

cassandra - Cassandra 服务器上出现错误 : Unable to gossip with any seeds

node.js - 如何在所有路由文件中使用app.js变量?

java - Cassandra:使用 DataStax Java 驱动程序选择一系列 TimeUUID