Cassandra 因节点增多而变慢

标签 cassandra throughput bigdata database nosql

我在 AWS 上设置了 Cassandra 集群。我想要得到的是随着更多节点的添加(如宣传的那样),I/O 吞吐量(每秒读/写次数)会增加。然而,我得到的却恰恰相反。随着新节点的添加,性能会降低。

您知道阻止其扩展的任何典型问题吗?

以下是一些详细信息:

我正在向列族添加一个文本文件 (15MB)。每一行都是一个记录。有150000条记录。当有1个节点时,写入大约需要90秒。但是当有2个节点时,需要120秒。我可以看到数据分布到 2 个节点。但是,吞吐量没有增加。

源代码如下:

public class WordGenCAS {
static final String KEYSPACE = "text_ks";
static final String COLUMN_FAMILY = "text_table";
static final String COLUMN_NAME = "text_col";

public static void main(String[] args) throws Exception {
    if (args.length < 2) {
        System.out.println("Usage: WordGenCAS <input file> <host1,host2,...>");
        System.exit(-1);
    }

    String[] contactPts = args[1].split(",");

    Cluster cluster = Cluster.builder()
            .addContactPoints(contactPts)
            .build();
    Session session = cluster.connect(KEYSPACE);

    InputStream fis = new FileInputStream(args[0]);
    InputStreamReader in = new InputStreamReader(fis, "UTF-8");
    BufferedReader br = new BufferedReader(in);

    String line;
    int lineCount = 0;
    while ( (line = br.readLine()) != null) {
        line = line.replaceAll("'", " ");
        line = line.trim();
        if (line.isEmpty())
            continue;
        System.out.println("[" + line + "]");
        String cqlStatement2 = String.format("insert into %s (id, %s) values (%d, '%s');",
                COLUMN_FAMILY,
                COLUMN_NAME,
                lineCount,
                line);
        session.execute(cqlStatement2);
        lineCount++;
    }

    System.out.println("Total lines written: " + lineCount);
}

}

数据库架构如下:

CREATE KEYSPACE text_ks WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2 };

USE text_ks;

CREATE TABLE text_table (
    id int,
    text_col text,
    primary key (id)
) WITH COMPACT STORAGE;

谢谢!

最佳答案

即使这是一篇旧帖子,我认为也值得为这些(常见)问题发布解决方案。

正如您已经发现的,使用串行过程加载数据速度很慢。您所建议的做法是正确的。

但是,在不施加某种背压的情况下发出大量查询可能会带来麻烦,并且您会由于服务器(以及在某种程度上驱动程序)过度过载而丢失数据。

此解决方案将通过异步调用加载数据,并尝试对客户端施加一些反压以避免数据丢失。

public class WordGenCAS {
    static final String KEYSPACE = "text_ks";
    static final String COLUMN_FAMILY = "text_table";
    static final String COLUMN_NAME = "text_col";

    public static void main(String[] args) throws Exception {
        if (args.length < 2) {
            System.out.println("Usage: WordGenCAS <input file> <host1,host2,...>");
            System.exit(-1);
        }

        String[] contactPts = args[1].split(",");

        Cluster cluster = Cluster.builder()
                .addContactPoints(contactPts)
                .build();
        Session session = cluster.connect(KEYSPACE);

        InputStream fis = new FileInputStream(args[0]);
        InputStreamReader in = new InputStreamReader(fis, "UTF-8");
        BufferedReader br = new BufferedReader(in);

        String line;
        int lineCount = 0;

        // This is the futures list of our queries
        List<Future<ResultSet>> futures = new ArrayList<>();

        // Loop
        while ( (line = br.readLine()) != null) {
            line = line.replaceAll("'", " ");
            line = line.trim();
            if (line.isEmpty())
                continue;
            System.out.println("[" + line + "]");
            String cqlStatement2 = String.format("insert into %s (id, %s) values (%d, '%s');",
                    COLUMN_FAMILY,
                    COLUMN_NAME,
                    lineCount,
                    line);
            lineCount++;

            // Add the "future" returned by async method the to the list
            futures.add(session.executeAsync(cqlStatement2));

            // Apply some backpressure if we issued more than X query.
            // Change X to another value suitable for your cluster
            while (futures.size() > 1000) {
                Future<ResultSet> future = futures.remove(0);
                try {
                    future.get();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        System.out.println("Total lines written: " + lineCount);
        System.out.println("Waiting for writes to complete...");

        // Wait until all writes are done.
        while (futures.size() > 0) {
            Future<ResultSet> future = futures.remove(0);
            try {
                future.get();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        System.out.println("Done!");
    }
}

关于Cassandra 因节点增多而变慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23977104/

相关文章:

Cassandra 数据完整性

java - Spark 和 Cassandra Java 应用程序 : Exception in thread "main" java. lang.NoClassDefFoundError: org/apache/spark/sql/Dataset

io - Erlang消费者队列

apache - 在哪里可以找到有关hadoop jobtracker如何停止的任何线索?

node.js - 无法使用 timeuuid 作为主键执行时间序列查询

cassandra - 如何配置cassandra进行远程连接

messaging - 使用 HornetQ Core Bridge 的吞吐量非常低

performance - 如何在多核机器上扩展 Go 例程的数量以获得最大吞吐量

hadoop - 一个集群应该有多少个节点才能分析 3 TB 的数据?应该如何设计硬件架构

java - 计算庞大数据集中的唯一 URL(超过 150 亿个)