我有一项服务以我控制的速率从队列中消耗消息。我做了一些处理,然后尝试通过 Datastax Java 客户端写入 Cassandra 集群。我已经使用 maxRequestsPerConnection
和 maxConnectionsPerHost
设置了我的 Cassandra 集群。但是,在测试中我发现,当我达到 maxConnectionsPerHost
和 maxRequestsPerConnection
时,对 session.executeAsync
的调用不会阻塞。
我现在正在做的是使用 new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection)
并在每个异步请求之前递增它,并在 executeAsync
返回的 future 完成时递减它.这工作得很好,但它似乎是多余的,因为驱动程序已经在内部跟踪请求和连接。
有没有人想出更好的解决方案来解决这个问题?
一个警告:我希望一个请求在完成之前被视为未完成。这包括重试!我从集群中获得可重试失败的情况(例如等待一致性超时)是我想要背压并停止使用队列消息的主要情况。
问题:
// the rate at which I consume messages depends on how fast this method returns
processMessage(message) {
// this appears to return immediately even if I have exhausted connections/requests
session.executeAsync(preparedStatement.bind(...));
}
当前解决方案:
constructor() {
this.concurrentRequestsSemaphore = new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection);
}
processMessage(message) {
ResultSetFuture resultSetFuture = session.executeAsync(preparedStatement.bind(...));
CompletableFuture<ResultSet> future = completableFromListenable(resultSetFuture);
concurrentRequestsSemaphore.acquireUninterruptibly();
future.whenComplete((result, exception) -> concurrentRequests.release());
}
另外,有人能看出这个解决方案有什么明显的问题吗?
最佳答案
一个不杀死集群的可能想法是“限制”你对 executeAsync
的调用,例如在一批 100 个(或任何最适合您的集群和工作负载的数字)之后,您将在客户端代码中 hibernate 并对所有 100 个 future 进行阻塞调用(或使用 Guava 库转换 future 列表进入列表的 future )
这样,在发出 100 个异步查询后,您将强制客户端应用程序等待所有异步查询都成功,然后再继续。如果您在调用 future.get()
时捕获到任何异常,您可以安排重试。通常,Java 驱动程序的默认 RetryStrategy 已尝试重试。
关于来自服务器的背压信号,从CQL二进制协议(protocol)V3开始,有一个错误代码通知客户端协调器过载:https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v3.spec#L951
从客户端,您可以通过两种方式获取此重载信息:
- Java 驱动程序 3.0.0:引入了新的 OverloadedException 类:http://www.datastax.com/dev/blog/datastax-java-driver-3-0-0-released#misc
- 3.0.0 之前的 Java 驱动程序:抛出 DriverException("host overloaded")
关于java - 获得 Cassandra Writes 背压的最佳方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35323856/