python cassandra 驱动程序与副本具有相同的插入性能

标签 python cassandra multiprocessing

我正在尝试将 Python async 与 Cassandra 结合使用,看看是否可以比 CQL COPY 命令更快地将记录写入 Cassandra。

我的 python 代码如下所示:

from cassandra.cluster import Cluster
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement
cluster = Cluster(['1.2.1.4'])

session = cluster.connect('test')

with open('dataImport.txt') as f:
    for line in f:
        query = SimpleStatement (
            "INSERT INTO tstTable (id, accts, info) VALUES (%s) " %(line),
            consistency_level=ConsistencyLevel.ONE)
        session.execute_async (query)

但它为我提供了与 COPY 命令相同的性能...大约 2,700 行/秒...使用异步会更快吗?

我需要在 python 中使用多线程吗?只是阅读它,但不确定它如何适合这个...

编辑:

所以我在网上找到了一些我正在尝试修改但无法正常工作的东西...到目前为止我有这个..我还将文件分成 3 个文件到/Data/toImport/目录中:

import multiprocessing
import time
import os
from cassandra.cluster import Cluster
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement


cluster = Cluster(['1.2.1.4'])

session = cluster.connect('test')

def mp_worker(inputArg):
        with open(inputArg[0]) as f:
            for line in f:
                query = SimpleStatement (
                    "INSERT INTO CustInfo (cust_id, accts, offers) values (%s)" %(line),
                    consistency_level=ConsistencyLevel.ONE)
                session.execute_async (query)


def mp_handler(inputData, nThreads = 8):
    p = multiprocessing.Pool(nThreads)
    p.map(mp_worker, inputData, chunksize=1)
    p.close()
    p.join()

if __name__ == '__main__':
    temp_in_data = file_list
    start = time.time()
    in_dir = '/Data/toImport/'
    N_Proc = 8
    file_data = [(in_dir) for i in temp_in_data]

    print '----------------------------------Start Working!!!!-----------------------------'
    print 'Number of Processes using: %d' %N_Proc
    mp_handler(file_data, N_Proc)
    end = time.time()
    time_elapsed = end - start
    print '----------------------------------All Done!!!!-----------------------------'
    print "Time elapsed: {} seconds".format(time_elapsed)

但是得到这个错误:

Traceback (most recent call last):
  File "multiCass.py", line 27, in <module>
    temp_in_data = file_list
NameError: name 'file_list' is not defined

最佳答案

此帖A Multiprocessing Example for Improved Bulk Data Throughput提供提高批量数据摄取性能所需的所有详细信息。基本上有 3 种机制,可以根据您的用例和硬件进行额外的调整:

  1. 单一进程(在你的例子中就是这种情况)
  2. 多处理单个查询
  3. 多处理并发查询

批处理的大小和并发性是您必须自己考虑的变量。

关于python cassandra 驱动程序与副本具有相同的插入性能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33153518/

相关文章:

Python Threading.Timer正在调用命令而无需等待

amazon-s3 - 规划可扩展的 Web 应用程序的开发

cassandra - 连接到远程 JMX 代理时出错!在启动 Nodetool 时

python - 结合输出多处理python

python - 如何正确使用threading.local()来传递值?

python 卡夫卡: Is there a way to block a consumer on a kafka topic till a new message is posted?

python - 如何使用 pywinauto 控制 IE 浏览器

python - 获取给定年份范围内的年计数

.net - 是否有用于 Cassandra NoSQL 数据库的 .Net 包装器或驱动程序?

c - 有什么解决方案可以在原子上下文中在用户和内核之间共享数据吗?