我有一个 csv 文件,其中有 2 亿行。
加载此文件的最佳方法是使用 csv 读取器逐行加载(因为我有很多这样的文件,因此稍后并行化代码不需要加载大量数据集和重载 RAM)。
我正在尝试计算某个列中值出现的次数,并将它们的值和频率记录在字典中。例如,计算列中唯一 ID 的数量以及这些 ID 出现的次数。
以下是我将如何执行此操作的示例:
import csv
from tqdm import tqdm
field_names = ['A','B','ID','C','D']
filename = '/PATH/file'
ID_dict = {}
with open(data_path+filename) as f:
reader = csv.DictReader(f,field_names,delimiter=',')
for row in tqdm(reader):
label = row['ID']
if label not in ID_dict.keys():
ID_dict[label] = 0
ID_dict[label] += 1
所以标有“ID”的列是我感兴趣的,但想象一下它有大约 2 亿个条目。
遍历所有这些行并填充字典的速度很慢(在我的机器上大约需要 10 个小时)。
或者,将值附加到新数组,然后使用 Counter 查找每个唯一元素的出现次数也花费太长时间。 (参见How do I count unique values inside a list)
有没有一种我缺少的更快的方法?也许有一种更快的 Pandas 方式? 提前致谢
最佳答案
不要使用DictReader()
。 DictReader()
完成了将行转换为字典的大量工作,并可配置对缺失列和额外列的处理,但您实际上并不需要这些。只需使用常规阅读器并访问每行的第三列即可。
您可以通过使用 Counter()
对象来进一步加快速度(它会自动为您处理 0
情况)。通过使用 newline=''
打开文件,您可能能够获得非常小的速度提升; CSV 模块建议您无论如何都这样做,因为它希望确保它知道行结尾与列中可能嵌入的换行符。
如果您使用 map()
对象和 operator.itemgetter()
,您可以进一步避免计算循环开销,并将 ids 直接传递给计数器:
import csv
import os.path
from collections import Counter
from operator import itemgetter
filename = '/PATH/file'
with open(os.path(data_path, filename), newline='') as f:
reader = csv.reader(f)
id_counts = Counter(map(itemgetter(2), reader))
尽管如此,2 亿行的处理工作量仍然很大。我使用 Faker 生成了 100 万行半真实数据,将这些行复制 200 次到一个新文件中,我的 2017 年款配备 SSD 的 Macbook Pro 在包含 tqdm
的情况下仅用了 6 分钟多一点的时间就处理了生成的 12GB 数据,而在不包含 tqdm 的情况下则需要 5 分 14 秒。 tqdm
声称它每次迭代仅增加 60 纳秒(超过 2 亿行 12 秒),但在我的测试中,它似乎很容易是该数字的 3 或 4 倍。
Pandas 读取数据的速度大致相同,因为 Pandas 的 read_csv()
是建立在 csv.reader()
,上面的速度与 Python 读取 2 亿行文件的速度一样快。然而,它随后将为这 2 亿行构建一个数据帧,这将需要大量内存来处理。你必须process your data in chunks并汇总结果,使其完全可行。
让我们做一些速度测试,比较您的版本(一个有 tqdm
减速带)、Pandas 和上述方法。我们将使用包含 10k 行和大约 100 个唯一 id 的测试集来均匀地比较事物,而不使用 I/O。这仅测试每种方法的计数能力。因此,设置测试数据并进行测试; name=name
关键字分配有助于避免重复测试的全局名称查找:
>>> import csv, pandas
>>> from timeit import Timer
>>> from collections import Counter
>>> from contextlib import redirect_stderr
>>> from io import StringIO
>>> from operator import itemgetter
>>> from random import randrange
>>> from tqdm import tqdm
>>> row = lambda: f",,{randrange(100)},,\r\n" # 5 columns, only care about middle column
>>> test_data = ''.join([row() for _ in range(10 ** 4)]) # CSV of 10.000 rows
>>> field_names = ['A', 'B', 'ID', 'C', 'D']
>>> filename = '/PATH/file'
>>> tests = []
>>> def as_test(f):
... tests.append((f.__name__, f))
...
>>> @as_test
... def in_question(f, csv=csv, tqdm=tqdm, field_names=field_names):
... ID_dict = {}
... reader = csv.DictReader(f, field_names, delimiter=',')
... for row in tqdm(reader):
... label = row['ID']
... if label not in ID_dict.keys():
... ID_dict[label] = 0
... ID_dict[label] += 1
...
>>> @as_test
... def in_question_no_tqdm(f, csv=csv, tqdm=tqdm, field_names=field_names):
... ID_dict = {}
... reader = csv.DictReader(f, field_names, delimiter=',')
... for row in reader:
... label = row['ID']
... if label not in ID_dict.keys():
... ID_dict[label] = 0
... ID_dict[label] += 1
...
>>> @as_test
... def pandas_groupby_count(f, pandas=pandas, field_names=field_names):
... df = pandas.read_csv(f, names=field_names)
... grouped_counts = df.groupby('ID').count()
...
>>> @as_test
... def pandas_value_counts(f, pandas=pandas, field_names=field_names):
... df = pandas.read_csv(f, names=field_names)
... counts = df['ID'].value_counts()
...
>>> @as_test
... def counter_over_map(f, csv=csv, Counter=Counter, ig2=itemgetter(2)):
... reader = csv.reader(f)
... id_counts = Counter(map(ig2, reader))
...
以及运行定时测试:
>>> for testname, testfunc in tests:
... timer = Timer(lambda s=StringIO, t=test_data: testfunc(s(t)))
... with redirect_stderr(StringIO()): # silence tqdm
... count, totaltime = timer.autorange()
... print(f"{testname:>25}: {totaltime / count * 1000:6.3f} microseconds ({count:>2d} runs)")
...
in_question: 33.303 microseconds (10 runs)
in_question_no_tqdm: 30.467 microseconds (10 runs)
pandas_groupby_count: 5.298 microseconds (50 runs)
pandas_value_counts: 5.975 microseconds (50 runs)
counter_over_map: 4.047 microseconds (50 runs)
DictReader() 和 Python for
循环的组合才是真正导致您的版本慢 6 到 7 倍的原因。在抑制 stderr
的情况下,tqdm
的开销已降至 0.3 纳秒;删除 with redirect_stderr()
上下文管理器会使输出更加详细,并将时间增加到 50 微秒,因此每次迭代大约需要 2 纳秒:
>>> timer = Timer(lambda s=StringIO, t=test_data: tests[0][1](s(t)))
>>> count, totaltime = timer.autorange()
10000it [00:00, 263935.46it/s]
10000it [00:00, 240672.96it/s]
10000it [00:00, 215298.98it/s]
10000it [00:00, 226025.18it/s]
10000it [00:00, 201787.96it/s]
10000it [00:00, 202984.24it/s]
10000it [00:00, 192296.06it/s]
10000it [00:00, 195963.46it/s]
>>> print(f"{totaltime / count * 1000:6.3f} microseconds ({count:>2d} runs)")
50.193 microseconds ( 5 runs)
不过,Pandas 在这里表现得很好!但是,如果不分块,将所有 2 亿行数据读入内存(使用实际数据集,而不是我在这里生成的空列)所需的 GB 内存将会慢很多,并且可能不是您的机器实际可以承载的。使用 Counter()
不需要千兆字节的内存。
如果您需要对 CSV 数据集进行更多处理,那么使用 SQLite 也是一个好主意。那时我什至不会使用Python;只需使用 SQLite command line tool to import the CSV data directly :
$ csvanalysis.db
SQLite version 3.19.3 2017-06-27 16:48:08
Enter ".help" for usage hints.
sqlite> CREATE TABLE csvdata (A, B, ID, C, D);
sqlite> CREATE INDEX csvdata_id on csvdata (ID);
sqlite> .import /path/to/file.csv csvdata
sqlite> SELECT ID, COUNT(*) FROM csvdata GROUP BY ID;
等等
关于python - 在 Python 中动态计算 csv 列的出现次数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53538888/