java - 高效地向cassandra插入大量数据

标签 java cassandra hector

我想向 cassandra 中插入大约 5000 万行(每行约 30 列),目前只有 1 个节点。

我正在从另一个数据源查询我的数据并将其存储在一个表对象中。我逐个遍历解析每一行,然后将其添加到增变器中。目前我一次插入 100 行,100 万行需要 40 分钟!我如何加快这个过程? (我也尝试过 client.batch_mutate() 但它似乎在插入几千个 block 大小 2 后出现重置连接错误)。

通过四处搜索,我发现多线程可以提供帮助。但是我找不到任何例子,有人可以联系我吗?谢谢!!

我当前的代码:

        List<String> colNames = new ArrayList<String>();
        List<String> colValues = new ArrayList<String>();
        SomeTable result = Query(...); // this contains my result set of 1M rows initially

        for (Iterator itr = result.getRecordIterator(); itr.hasNext();) {
                String colName =.....
                String colValue = .....

            int colCount = colNames.size(); // 100 * 30

            for (int i = 0; i < colCount; i++) {
                //add row keys and columns to mutator 
                mutator.addInsertion(String.valueOf(rowCounter), "data", HFactory.createStringColumn(colNames.get(i), colValues.get(i)));
            }
            rowCounter++;

            //insert rows of block size 100
            if (rowCounter % 100==0) { 

                mutator.execute();
                //clear data
                colNames = new ArrayList<String>();
                colValues = new ArrayList<String>();
                mutator = HFactory.createMutator(keyspace, stringSerializer);
            }

        }

最佳答案

是的,多线程会有很大帮助。目前,您在 Cassandra 中使用一个连接,这意味着您在 Cassandra 中仅使用一个线程。您需要使用多个连接,这需要在您的客户端中使用多个线程。

一种方法是使用 Java ThreadPoolExecutor 并将您的 mutator.execute() 包装在可运行对象中并在线程池上执行它。注意处理异常。您还应该使用 BlockingQueue 来限制排队突变的数量,以防您读取源代码的速度快于 Cassandra 插入的速度。

有了这个,将 Hector 中的连接池大小设置为 10 左右,您的插入速度应该会明显加快。

请注意,Cassandra 不是为单节点操作而设计的,以防您不知道。我假设您打算扩展和添加复制。如果不是,那么您可能会找到一个更高效、更简单的替代解决方案来满足您的需求。在使用多个节点时,多个连接和线程变得尤为重要,这样您的插入率就可以扩展。

关于java - 高效地向cassandra插入大量数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15801102/

相关文章:

java - 当条件引用嵌入属性时无法解析属性

java - Scanner.equals() 在任何一种情况下都返回 false

python - 懒惰地从 PostgreSQL/Cassandra 创建 Dask DataFrame

mongodb - 列族存储与文档存储

cassandra - Cassandra 连接的健康检查(使用 hector)?

cassandra - Cassandra 的可用性

java - 如何添加数组中的数字以便输出高分

java - 从 ASN1 加密的 pem 证书中获取公钥和私钥

node.js - Cassandra批处理语句插入后删除不插入相同的主键

java - 如何使用 Cassandra 数据库对用户进行身份验证