java - 为什么我的 Cassandra 准备语句摄取数据如此慢?

标签 java cassandra datastax-enterprise

我有一个包含 100,000 个名称的 Java 列表,我希望将其提取到运行 Datastax Enterprise 5.1 和 Cassandra 3.10.0 的 3 节点 Cassandra 集群中

我的代码可以摄取,但需要很长时间。我对集群进行了压力测试,每秒能够执行超过 25,000 次写入。通过我的摄取代码,我得到了大约 200/秒的糟糕性能。

我的 Java 列表中有 100,000 个名称,名为 myList。我使用以下准备好的语句和 session 执行来摄取数据。

PreparedStatement prepared = session.prepare("insert into names (id, name) values (?, ?)");

         int id = 0;

         for(int i = 0; i < myList.size(); i++) {
             id += 1;
             session.execute(prepared.bind(id, myList.get(i)));
        }

我在代码中添加了一个集群监视器来查看发生了什么。这是我的监控代码。

    /// Monitoring Status of Cluster
    final LoadBalancingPolicy loadBalancingPolicy =
    cluster.getConfiguration().getPolicies().getLoadBalancingPolicy();
    ScheduledExecutorService scheduled =
    Executors.newScheduledThreadPool(1);
        scheduled.scheduleAtFixedRate(() -> {
            Session.State state = session.getState();
            state.getConnectedHosts().forEach((host) -> {
                HostDistance distance = loadBalancingPolicy.distance(host);
                int connections = state.getOpenConnections(host);
                int inFlightQueries = state.getInFlightQueries(host);
                System.out.printf("%s connections=%d, current load=%d, maxload=%d%n",
                        host, connections, inFlightQueries,
                        connections *
                                poolingOptions.getMaxRequestsPerConnection(distance));
            });
    }, 5, 5, TimeUnit.SECONDS); 

监控 5 秒输出显示 3 次迭代的以下内容:

/192.168.20.25:9042 connections=1, current load=1, maxload=32768
/192.168.20.26:9042 connections=1, current load=0, maxload=32768
/192.168.20.34:9042 connections=1, current load=0, maxload=32768
/192.168.20.25:9042 connections=1, current load=1, maxload=32768
/192.168.20.26:9042 connections=1, current load=0, maxload=32768
/192.168.20.34:9042 connections=1, current load=0, maxload=32768
/192.168.20.25:9042 connections=1, current load=0, maxload=32768
/192.168.20.26:9042 connections=1, current load=1, maxload=32768
/192.168.20.34:9042 connections=1, current load=0, maxload=32768

我似乎没有非常有效地利用我的集群。我不确定我做错了什么,非常感谢任何提示。

谢谢!

最佳答案

使用executeAsync。

Executes the provided query asynchronously. This method does not block. It returns as soon as the query has been passed to the underlying network stack. In particular, returning from this method does not guarantee that the query is valid or has even been submitted to a live node. Any exception pertaining to the failure of the query will be thrown when accessing the ResultSetFuture.

您正在插入大量数据。如果您使用executeAsync并且您的集群无法处理如此多的数据,它可能会抛出异常。您可以使用信号量来限制executeAsync。

示例:

PreparedStatement prepared = session.prepare("insert into names (id, name) values (?, ?)");

int numberOfConcurrentQueries = 100;
final Semaphore semaphore = new Semaphore(numberOfConcurrentQueries);

int id = 0;    

for(int i = 0; i < myList.size(); i++) {
    try {
        id += 1;
        semaphore.acquire();
        ResultSetFuture future = session.executeAsync(prepared.bind(id, myList.get(i)));
        Futures.addCallback(future, new FutureCallback<ResultSet>() {
            @Override
            public void onSuccess(ResultSet result) {
                semaphore.release();
            }

            @Override
            public void onFailure(Throwable t) {
                semaphore.release();
            }
        });
    } catch (Exception e) {
        semaphore.release();
        e.printStackTrace();
    }
}

来源:
https://stackoverflow.com/a/30526719/2320144 https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/Session.html#executeAsync-com.datastax.driver.core.Statement-

关于java - 为什么我的 Cassandra 准备语句摄取数据如此慢?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43559589/

相关文章:

java - Gwt - 当用户触摸多个小部件或链接时显示弹出面板

java - 合并两个排序数组时,使用数组还是队列更好?

cassandra - DSE/Cassandra CQL now() 不适用于时间戳类型

cassandra - Cassandra错误-无法限制“聚类”列(之前的列受非EQ关系限制)

cassandra - 运行 DataStax Enterprise 4.7 后缺少 JAR list

java - 从字符串中获取最后几个字符

java - 如何在新的 Java 类中使用 servlet 中声明的变量值?

java - Cassandra:如何设置客户端到节点的加密?

csv - 如何将 .tsv 文件加载到 cassandra 中

node.js - cassandra 在执行时继续运行