python - 使用 Python 逐行处理非常大的 900M 行 MySQL 表

标签 python mysql sqlalchemy mysql-python bigdata

我经常需要使用 Python 逐行处理 MySQL 表的数亿行。我想要一个健壮且不需要监视的脚本。

我在下面粘贴了一个脚本,用于对我的行中消息字段的语言进行分类。它利用了 sqlalchemy 和 MySQLdb.cursors.SSCursor 模块。不幸的是,这个脚本在我远程运行 4840 行和在本地运行 42000 行后始终抛出一个“查询期间与 MySQL 服务器失去连接” 错误。

此外,我已经根据这个 stackoverflow 问题的答案检查了 max_allowed_pa​​cket = 32M 我的 MySQL 服务器的/etc/mysql/my.cnf 文件 Lost connection to MySQL server during query

任何有关修复此错误或使用其他方法以稳健方式使用 Python 处理非常大的 MySQL 文件的建议,我们将不胜感激!

import sqlalchemy
import MySQLdb.cursors
import langid

schema = "twitterstuff"
table = "messages_en" #900M row table
engine_url = "mysql://myserver/{}?charset=utf8mb4&read_default_file=~/.my.cnf".format(schema)
db_eng = sqlalchemy.create_engine(engine_url, connect_args={'cursorclass': MySQLdb.cursors.SSCursor} )
langid.set_languages(['fr', 'de'])

print "Executing input query..."
data_iter = db_eng.execute("SELECT message_id, message FROM {} WHERE langid_lang IS NULL LIMIT 10000".format(table))

def process(inp_iter):
    for item in inp_iter:
        item = dict(item)
        (item['langid_lang'], item['langid_conf']) = langid.classify(item['message'])
        yield item

def update_table(update_iter):
    count = 0;
    for item in update_iter:
        count += 1;
        if count%10 == 0:
            print "{} rows processed".format(count)
        lang = item['langid_lang']
        conf = item['langid_conf']
        message_id = item['message_id']
        db_eng.execute("UPDATE {} SET langid_lang = '{}', langid_conf = {} WHERE message_id = {}".format(table, lang, conf, message_id))

data_iter_upd = process(data_iter)

print "Begin processing..."
update_table(data_iter_upd)

最佳答案

According to MySQLdb developer Andy Dustman ,

[When using SSCursor,] no new queries can be issued on the connection until the entire result set has been fetched.

那篇文章说,如果您发出另一个查询,您将收到“命令顺序错误”错误,这不是您看到的错误。所以我不确定以下内容是否一定能解决您的问题。尽管如此,尝试从您的代码中删除 SSCursor 并使用更简单的默认 Cursor 可能是值得的,只是为了测试这是否是问题的根源。

例如,您可以在 SELECT 语句中使用 LIMIT chunksize OFFSET n 以 block 的形式循环遍历数据集:

import sqlalchemy
import MySQLdb.cursors
import langid
import itertools as IT
chunksize = 1000

def process(inp_iter):
    for item in inp_iter:
        item = dict(item)
        (item['langid_lang'], item['langid_conf']) = langid.classify(item['message'])
        yield item

def update_table(update_iter, engine):
    for count, item in enumerate(update_iter):
        if count%10 == 0:
            print "{} rows processed".format(count)
        lang = item['langid_lang']
        conf = item['langid_conf']
        message_id = item['message_id']
        engine.execute(
            "UPDATE {} SET langid_lang = '{}', langid_conf = {} WHERE message_id = {}"
            .format(table, lang, conf, message_id))

schema = "twitterstuff"
table = "messages_en" #900M row table
engine_url = ("mysql://myserver/{}?charset=utf8mb4&read_default_file=~/.my.cnf"
              .format(schema))

db_eng = sqlalchemy.create_engine(engine_url)
langid.set_languages(['fr', 'de'])

for offset in IT.count(start=0, step=chunksize):
    print "Executing input query..."
    result = db_eng.execute(
        "SELECT message_id, message FROM {} WHERE langid_lang IS NULL LIMIT {} OFFSET {}"
        .format(table, chunksize, offset))
    result = list(result)
    if not result: break
    data_iter_upd = process(result)

    print "Begin processing..."
    update_table(data_iter_upd, db_eng)

关于python - 使用 Python 逐行处理非常大的 900M 行 MySQL 表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34817603/

相关文章:

python - 修复使用 Ajax 从 Django Form 发送数据时出现 "missing 1 required positional argument: ' id'"错误

mysql - 忘记打字; mysql语句结束后,如何返回?

python - 在多个 Python worker 之间共享 Postgres 表中指定的工作

python - 从字典创建类实例?

php - 使用 PHP 从 MariaDB 数据创建嵌套 JSON

python - 使用 SQLAlchemy 渲染 INSERT INTO SELECT RETURNING

python - 高斯混合模型交叉验证

python - 比较两个 python 列表的复杂性顺序是什么?

python - Gspread - 读取时也获取无值?

MySQL加载数据: This command is not supported in the prepared statement protocol yet