java - 与 MongoDB 相比,使用 Java 驱动程序的 Cassandra Bulk-Write 性能非常糟糕

标签 java cassandra datastax-java-driver

我为 MongoDB 和 Cassandra 构建了一个导入器。基本上导入器的所有操作都是相同的,除了最后一部分形成数据以匹配所需的 cassandra 表模式和想要的 mongodb 文档结构。与 MongoDB 相比,Cassandra 的写入性能非常差,我认为我做错了什么。

基本上,我的抽象导入器类加载数据,读出所有数据并将其传递给扩展的 MongoDBImporter 或 CassandraImporter 类以将数据发送到数据库。一次以一个数据库为目标——不能同时对 C* 和 MongoDB 进行“双重”插入。导入器在同一台机器上针对相同数量的节点 (6) 运行。

问题:

MongoDB 导入在 57 分钟后完成。我摄取了 10.000.000 个文档,我预计 Cassandra 的行数大致相同。我的 Cassandra 导入器现在已经运行了 2.5 小时,并且只插入了 5.000.000 行。我将等待进口商完成并在此处编辑实际完成时间。

我如何使用 Cassandra 导入:

我在摄取数据之前准备了两个语句。这两个语句都是 UPDATE 查询,因为有时我必须将数据附加到现有列表中。在开始导入之前,我的表已完全清除。准备好的语句被一遍又一遍地使用。

PreparedStatement statementA = session.prepare(queryA);
PreparedStatement statementB = session.prepare(queryB);

对于 行,我创建一个 BoundStatement 并将该语句传递给我的“自定义”批处理方法:
    BoundStatement bs = new BoundStatement(preparedStatement); //either statementA or B
    bs = bs.bind();

    //add data... with several bs.setXXX(..) calls

    cassandraConnection.executeBatch(bs);

使用 MongoDB,我可以一次插入 1000 个文档(这是最大值)而不会出现问题。对于 Cassandra,导入器崩溃并显示 com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large在某个时候,我的陈述中只有 10 条。我正在使用此代码来构建批处理。顺便说一句,我之前从 1000、500、300、200、100、50、20 批量开始,但显然它们也不起作用。然后我将其设置为 10,它再次抛出异常。现在我不知道为什么它会破裂。
private static final int MAX_BATCH_SIZE = 10;

private Session session;
private BatchStatement currentBatch;

...

@Override
public ResultSet executeBatch(Statement statement) {
    if (session == null) {
        throw new IllegalStateException(CONNECTION_STATE_EXCEPTION);
    }

    if (currentBatch == null) {
        currentBatch = new BatchStatement(Type.UNLOGGED);
    }

    currentBatch.add(statement);
    if (currentBatch.size() == MAX_BATCH_SIZE) {
        ResultSet result = session.execute(currentBatch);
        currentBatch = new BatchStatement(Type.UNLOGGED);
        return result;
    }

    return null;
}

我的 C* 架构看起来像这样
CREATE TYPE stream.event (
    data_dbl frozen<map<text, double>>,
    data_str frozen<map<text, text>>,
    data_bool frozen<map<text, boolean>>,
);

CREATE TABLE stream.data (
    log_creator text,
    date text, //date of the timestamp
    ts timestamp,
    log_id text, //some id
    hour int, //just the hour of the timestmap
    x double,
    y double,
    events list<frozen<event>>,
    PRIMARY KEY ((log_creator, date, hour), ts, log_id)
) WITH CLUSTERING ORDER BY (ts ASC, log_id ASC)

我有时需要向现有行添加更多新事件。这就是为什么我需要一个 UDT 列表。我的 UDT 包含三个映射,因为事件创建者产生不同的数据(字符串/ double /boolean 类型的键/值对)。我知道 UDT 已卡住,我无法触摸已摄取事件的映射。这对我来说很好,我有时只需要添加具有相同时间戳的新事件。我对日志的创建者(一些传感器名称)以及记录的日期(即“22-09-2016”)和时间戳的时间进行分区(以更多地分发数据,同时将相关数据保持在一起一个分区)。

我在我的 pom.xml 中使用 Cassandra 3.0.8 和 Datastax Java 驱动程序,版本 3.1.0。
根据What is the batch limit in Cassandra? , 我不应该通过调整 batch_size_fail_threshold_in_kb 来增加批量大小在我的cassandra.yaml .那么......我的导入有什么问题或有什么问题?

更新
所以我调整了我的代码以运行异步查询并将当前运行的插入存储在一个列表中。每当异步插入完成时,它将从列表中删除。当列表大小超过阈值并且之前的插入发生错误时,该方法将等待 500 毫秒,直到插入低于阈值。当没有插入失败时,我的代码现在会自动增加阈值。

但是在流式传输 3.300.000 行之后,有 280.000 个插入正在处理,但没有发生错误。 这似乎当前处理的插入数量看起来太高了。 6 个 cassandra 节点在已有 2 年历史的商用硬件上运行。

这是并发插入的高数量(6 个节点为 280.000)有问题吗?我应该添加一个像 MAX_CONCURRENT_INSERT_LIMIT 这样的变量吗? ?
private List<ResultSetFuture> runningInsertList;
private static int concurrentInsertLimit = 1000;
private static int concurrentInsertSleepTime = 500;
...

@Override
public void executeBatch(Statement statement) throws InterruptedException {
    if (this.runningInsertList == null) {
        this.runningInsertList = new ArrayList<>();
    }

    //Sleep while the currently processing number of inserts is too high
    while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {
        Thread.sleep(concurrentInsertSleepTime);
    }

    ResultSetFuture future = this.executeAsync(statement);
    this.runningInsertList.add(future);

    Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
            runningInsertList.remove(future);
        }

        @Override
        public void onFailure(Throwable t) {
            concurrentInsertErrorOccured = true;
        }
    }, MoreExecutors.sameThreadExecutor());

    if (!concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit) {
        concurrentInsertLimit += 2000;
        LOGGER.info(String.format("New concurrent insert limit is %d", concurrentInsertLimit));
    }

    return;
}

最佳答案

在使用 C* 一段时间后,我确信您应该真正使用批处理来保持多个表同步。如果您不需要该功能,则根本不要使用批处理,因为您 招致绩效处罚。

将数据加载到 C* 中的正确方法是使用异步写入,如果您的集群无法跟上摄取速率,则可以使用可选的背压。您应该将“自定义”批处理方法替换为:

  • 执行异步写入
  • 控制你有多少飞行中写入
  • 写入超时时执行一些重试。

  • 要执行异步写入,请使用 .executeAsync方法,这将返回一个 ResultSetFuture目的。

    为了控制飞行中查询的数量,只需收集 ResultSetFuture.executeAsync 检索到的对象列表中的方法,如果列表获取(此处的大概值)说 1k 个元素,则等待所有元素完成,然后再发出更多写入。或者您可以等待第一个完成后再发出另一个写入,以保持列表完整。

    最后,您可以在等待操作完成时检查写入失败。在这种情况下,您可以:
  • 使用相同的超时值再次写入
  • 再次写入,增加超时值
  • 等待一段时间,然后使用相同的超时值再次写入
  • 等待一段时间,然后使用增加的超时值再次写入

  • 从 1 到 4,您的背压强度增加了。选择最适合您的情况的一种。

    问题更新后编辑

    你的插入逻辑对我来说似乎有点坏了:
  • 我没有看到任何重试逻辑
  • 如果失败,您不会删除列表中的项目
  • 您的 while (concurrentInsertErrorOccured && runningInsertList.size() > concurrentInsertLimit)是错误的,因为只有在发出的查询数大于 concurrentInsertLimit 时才会 hibernate ,并且因为 2. 你的线程只会停在那里。
  • 你永远不会设置为 false concurrentInsertErrorOccured

  • 我通常会保留一个(失败的)查询列表,以便以后重试。这给了我对查询的强大控制,当失败的查询开始累积时,我会睡一会儿,然后继续重试它们(最多 X 次,然后硬失败......)。

    该列表应该是非常动态的,例如,当查询失败时您可以在其中添加项目,并在您执行重试时删除项目。现在您可以了解集群的限制,并调整您的 concurrentInsertLimit基于例如最后一秒内失败查询的平均数,或者坚持使用更简单的方法“如果我们在重试列表中有项目则暂停”等...

    评论后编辑 2

    由于您不想要任何重试逻辑,因此我会以这种方式更改您的代码:
    private List<ResultSetFuture> runningInsertList;
    private static int concurrentInsertLimit = 1000;
    private static int concurrentInsertSleepTime = 500;
    ...
    
    @Override
    public void executeBatch(Statement statement) throws InterruptedException {
        if (this.runningInsertList == null) {
            this.runningInsertList = new ArrayList<>();
        }
    
        ResultSetFuture future = this.executeAsync(statement);
        this.runningInsertList.add(future);
    
        Futures.addCallback(future, new FutureCallback<ResultSet>() {
            @Override
            public void onSuccess(ResultSet result) {
                runningInsertList.remove(future);
            }
    
            @Override
            public void onFailure(Throwable t) {
                runningInsertList.remove(future);
                concurrentInsertErrorOccured = true;
            }
        }, MoreExecutors.sameThreadExecutor());
    
        //Sleep while the currently processing number of inserts is too high
        while (runningInsertList.size() >= concurrentInsertLimit) {
            Thread.sleep(concurrentInsertSleepTime);
        }
    
        if (!concurrentInsertErrorOccured) {
            // Increase your ingestion rate if no query failed so far
            concurrentInsertLimit += 10;
        } else {
            // Decrease your ingestion rate because at least one query failed
            concurrentInsertErrorOccured = false;
            concurrentInsertLimit = Max(1, concurrentInsertLimit - 50);
            while (runningInsertList.size() >= concurrentInsertLimit) {
                Thread.sleep(concurrentInsertSleepTime);
            }
        }
    
        return;
    }
    

    您还可以通过替换 List<ResultSetFuture> 来优化程序。带柜台。

    希望有帮助。

    关于java - 与 MongoDB 相比,使用 Java 驱动程序的 Cassandra Bulk-Write 性能非常糟糕,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39642303/

    相关文章:

    java - `InvalidQueryException: cannot parse ' [B@50908fa9 ' as hex bytes` 使用 Datastax Java 驱动程序

    java - 创建对象矩阵

    java - 超过数组长度后仍存储用户输入

    spring - 在 Spock 测试中加载 Spring 上下文*之前*如何启动嵌入式 Cassandra 服务器?

    Cassandra 版本升级

    cassandra - 修改cassandra.yaml中batch_size_fail_threshold_in_kb值的影响

    java - 默认PreparedStatement不可序列化

    Java - 64 位系统的类路径问题

    java - 替换功能不符合预期结果

    java - Cassandra 错误: Column timestamp is required