python - 使用 cqlengine 在 cassandra 中插入和更新大量行的最快和最有效的方法

标签 python csv cassandra cql

我有一个包含 15000000 条记录的 csv 文件,我正在尝试将其处理成一个 cassandra 表。以下是列标题和数据的示例:

enter image description here

为了更好地理解它,这是我在 python 中的模型:

class DIDSummary(Model):
    __keyspace__ = 'processor_api'

    did = columns.Text(required=True, primary_key=True, partition_key=True)
    month = columns.DateTime(required=True, primary_key=True, partition_key=True)
    direction = columns.Text(required=True, primary_key=True)
    duration = columns.Counter(required=True)
    cost = columns.Counter(required=True)

现在我正在尝试处理 csv 文件每一行中的数据,并将它们分批插入 500、1000、10000、250 等,但结果相同(每 1000 约 0.33 秒,这意味着完成所有这些需要 90 分钟)。我还尝试采用多处理池和 apply_async()'ing 每个 batch.execute() 调用,但没有更好的结果。有没有一种方法可以在 python 中使用 SSTableWriter,或者做一些其他的事情来更好地将它们插入到 cassandra 中?作为引用,这是我的 process_sheet_row() 方法:

def process_sheet_row(self, row, batch):
    report_datetime = '{0}{1:02d}'.format(self.report.report_year, self.report.report_month)
    duration = int(float(row[self.columns['DURATION']]) * 10)
    cost = int(float(row[self.columns['COST']]) * 100000)

    anisummary = DIDSummary.batch(batch).create(did='{}{}'.format(self.report.ani_country_code, row[self.columns['ANI']]),
                                                direction='from',
                                                month=datetime.datetime.strptime(report_datetime, '%Y%m'))
    anisummary.duration += duration
    anisummary.cost += cost
    anisummary.batch(batch).save()

    destsummary = DIDSummary.batch(batch).create(did='{}{}'.format(self.report.dest_country_code, row[self.columns['DEST']]),
                                                 direction='to',
                                                 month=datetime.datetime.strptime(report_datetime, '%Y%m'))
    destsummary.duration += duration
    destsummary.cost += cost
    destsummary.batch(batch).save()

如有任何帮助,我们将不胜感激。谢谢!

编辑:这是我用于遍历文件并处理它的代码:

with open(self.path) as csvfile:
    reader = csv.DictReader(csvfile)
    if arr[0] == 'inventory':
            self.parse_inventory(reader)
    b = BatchQuery(batch_type=BatchType.Unlogged)
    i = 1
    for row in reader:
        self.parse_sheet_row(row, b)
        if not i % 1000:
            connection.check_connection() # This just makes sure we're still connected to cassandra. Check code below
            self.pool.apply_async(b.execute())
            b = BatchQuery(batch_type=BatchType.Unlogged)
        i += 1
print "Done processing: {}".format(self.path)
print "Time to Execute: {}".format(datetime.datetime.now() - start)
print "Batches: {}".format(i / 1000)
print "Records processed: {}".format(i - 1)

因为这可能有点帮助,这里是 connection.check_connection() 方法(和周围的方法):

def setup_defaults():
    connection.setup(['127.0.0.1'], 'processor_api', lazy_connect=True)

def check_connection():
    from cdr.models import DIDSummary
    try:
        DIDSummary.objects.all().count()
    except CQLEngineException:
        setup_defaults()

最佳答案

一般而言,批处理并不是执行插入的最快方法。在包含各种分区的未记录批处理中尤其如此。分批阅读here

如果您可以脱离 cqlengine 进行插入,您应该尝试 async callback chaining在以下 Python 驱动程序中实现:cassandra.execute_concurrent .

在误用各种大小的批处理后,我在每秒插入数方面有了重大改进,但 YMMV。

关于python - 使用 cqlengine 在 cassandra 中插入和更新大量行的最快和最有效的方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31479932/

相关文章:

python - 在 Python 中跳过多行

python - SQLAlchemy 查询,加入关系并按计数排序

python - 如何使用 Python 计算 csv 文件中某个值出现的次数?

insert - cassandra hive插入空异常

java - 错误 : Could not find or load main class, Cassandra

nosql - 大规模数据处理 Hbase vs Cassandra

匹配 char 后跟/前跟相同 char 但大写/小写的 Python RegEx

python - Pygame 当您按下键时将图像移动到位置

node.js - 不知道如何使用 csv-writer 在 API 调用后逐行写入 csv 文件

java - 同时读取 .csv 文件