情况
我正在使用Python 3.7.2及其内置的sqlite3模块。 (sqlite3.版本==2.6.0)
我有一个如下所示的 sqlite 数据库:
| user_id | action | timestamp |
| ------- | ------ | ---------- |
| Alice | 0 | 1551683796 |
| Alice | 23 | 1551683797 |
| James | 1 | 1551683798 |
| ....... | ...... | .......... |
其中 user_id
是 TEXT
,action
是任意 INTEGER
,timestamp
> 是代表 UNIX 时间的 INTEGER
。
数据库有 2 亿行,并且有 70K 个不同的 user_id
。
目标
我需要制作一个如下所示的 Python 字典:
{
"Alice":[(0, 1551683796), (23, 1551683797)],
"James":[(1, 1551683798)],
...
}
以 user_id
作为键,以相应的事件日志作为值,它们是元组列表(操作,时间戳)
。希望每个列表都按时间戳
按升序排序,但即使不是,我认为通过在创建字典后对每个列表进行排序也可以轻松实现。
努力
我有以下代码来查询数据库。它首先查询用户列表(使用 user_list_cursor
),然后查询属于该用户的所有行。
import sqlite3
connection = sqlite3.connect("database.db")
user_list_cursor = connection.cursor()
user_list_cursor.execute("SELECT DISTINCT user_id FROM EVENT_LOG")
user_id = user_list_cursor.fetchone()
classified_log = {}
log_cursor = connection.cursor()
while user_id:
user_id = user_id[0] # cursor.fetchone() returns a tuple
query = (
"SELECT action, timestamp"
" FROM TABLE"
" WHERE user_id = ?"
" ORDER BY timestamp ASC"
)
parameters = (user_id,)
local_cursor.execute(query, parameters) # Here is the bottleneck
classified_log[user_id] = list()
for row in local_cursor.fetchall():
classified_log[user_id].append(row)
user_id = user_list_cursor.fetchone()
问题
每个用户的查询执行速度太慢。对于每个 user_id
,该单行代码(被注释为瓶颈)大约需要 10 秒。我认为我对查询采取了错误的方法。实现目标的正确方法是什么?
我尝试使用关键字“按列分类数据库”、“按列分类sql”、“sql日志到字典python”进行搜索,但似乎没有任何内容符合我的情况。我认为这并不是一个罕见的需求,所以也许我缺少正确的搜索关键字。
再现性
如果有人愿意用200M行的sqlite数据库重现这种情况,下面的代码将创建一个5GB的数据库文件。
但我希望有人熟悉这种情况并且知道如何编写正确的查询。
import sqlite3
import random
connection = sqlite3.connect("tmp.db")
cursor = connection.cursor()
cursor.execute(
"CREATE TABLE IF NOT EXISTS EVENT_LOG (user_id TEXT, action INTEGER, timestamp INTEGER)"
)
query = "INSERT INTO EVENT_LOG VALUES (?, ?, ?)"
parameters = []
for timestamp in range(200_000_000):
user_id = f"user{random.randint(0, 70000)}"
action = random.randint(0, 1_000_000)
parameters.append((user_id, action, timestamp))
cursor.executemany(query, parameters)
connection.commit()
cursor.close()
connection.close()
最佳答案
非常感谢@Strawberry 和@Solarflare 在评论中提供的帮助。
以下解决方案实现了超过 70 倍的性能提升,因此为了完整起见,我将我所做的作为答案。
按照他们的建议,我使用了索引并查询了整个表。
import sqlite3
from operators import attrgetter
connection = sqlite3.connect("database.db")
# Creating index, thanks to @Solarflare
cursor = connection.cursor()
cursor.execute("CREATE INDEX IF NOT EXISTS idx_user_id ON EVENT_LOG (user_id)")
cursor.commit()
# Reading the whole table, then make lists by user_id. Thanks to @Strawberry
cursor.execute("SELECT user_id, action, timestamp FROM EVENT_LOG ORDER BY user_id ASC")
previous_user_id = None
log_per_user = list()
classified_log = dict()
for row in cursor:
user_id, action, timestamp = row
if user_id != previous_user_id:
if previous_user_id:
log_per_user.sort(key=itemgetter(1))
classified_log[previous_user_id] = log_per_user[:]
log_per_user = list()
log_per_user.append((action, timestamp))
previous_user_id = user_id
所以要点是
- 按
user_id
建立索引,以使ORDER BY user_id ASC
在可接受的时间内执行。 - 读取整个表格,然后按
user_id
进行分类,而不是针对每个user_id
进行单独查询。 - 迭代
cursor
以逐行读取,而不是cursor.fetchall()
。
关于python - 按列对事件日志数据库进行有效分类,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54978924/