python - 在 Python 中动态计算 csv 列的出现次数

标签 python csv dictionary unique

我有一个 csv 文件,其中有 2 亿行。

加载此文件的最佳方法是使用 csv 读取器逐行加载(因为我有很多这样的文件,因此稍后并行化代码不需要加载大量数据集和重载 RAM)。

我正在尝试计算某个列中值出现的次数,并将它们的值和频率记录在字典中。例如,计算列中唯一 ID 的数量以及这些 ID 出现的次数。

以下是我将如何执行此操作的示例:

import csv
from tqdm import tqdm

field_names = ['A','B','ID','C','D']
filename = '/PATH/file'

ID_dict = {}
with open(data_path+filename) as f:
    reader = csv.DictReader(f,field_names,delimiter=',')
    for row in tqdm(reader):
        label = row['ID']
        if label not in ID_dict.keys():
                ID_dict[label] = 0
        ID_dict[label] += 1

所以标有“ID”的列是我感兴趣的,但想象一下它有大约 2 亿个条目。

遍历所有这些行并填充字典的速度很慢(在我的机器上大约需要 10 个小时)。

或者,将值附加到新数组,然后使用 Counter 查找每个唯一元素的出现次数也花费太长时间。 (参见How do I count unique values inside a list)

有没有一种我缺少的更快的方法?也许有一种更快的 Pandas 方式? 提前致谢

最佳答案

不要使用DictReader()DictReader() 完成了将行转换为字典的大量工作,并可配置对缺失列和额外列的处理,但您实际上并不需要这些。只需使用常规阅读器并访问每行的第三列即可。

您可以通过使用 Counter() 对象来进一步加快速度(它会自动为您处理 0 情况)。通过使用 newline='' 打开文件,您可能能够获得非常小的速度提升; CSV 模块建议您无论如何都这样做,因为它希望确保它知道行结尾与列中可能嵌入的换行符。

如果您使用 map() 对象和 operator.itemgetter(),您可以进一步避免计算循环开销,并将 ids 直接传递给计数器:

import csv
import os.path
from collections import Counter
from operator import itemgetter

filename = '/PATH/file'

with open(os.path(data_path, filename), newline='') as f:
    reader = csv.reader(f)
    id_counts = Counter(map(itemgetter(2), reader))

尽管如此,2 亿行的处理工作量仍然很大。我使用 Faker 生成了 100 万行半真实数据,将这些行复制 200 次到一个新文件中,我的 2017 年款配备 SSD 的 Macbook Pro 在包含 tqdm 的情况下仅用了 6 分钟多一点的时间就处理了生成的 12GB 数据,而在不包含 tqdm 的情况下则需要 5 分 14 秒。 tqdm 声称它每次迭代仅增加 60 纳秒(超过 2 亿行 12 秒),但在我的测试中,它似乎很容易是该数字的 3 或 4 倍。

Pandas 读取数据的速度大致相同,因为 Pandas 的 read_csv() 是建立在 csv.reader(),上面的速度与 Python 读取 2 亿行文件的速度一样快。然而,它随后将为这 2 亿行构建一个数据帧,这将需要大量内存来处理。你必须process your data in chunks并汇总结果,使其完全可行。

让我们做一些速度测试,比较您的版本(一个有 tqdm 减速带)、Pandas 和上述方法。我们将使用包含 10k 行和大约 100 个唯一 id 的测试集来均匀地比较事物,而不使用 I/O。这仅测试每种方法的计数能力。因此,设置测试数据并进行测试; name=name 关键字分配有助于避免重复测试的全局名称查找:

>>> import csv, pandas
>>> from timeit import Timer
>>> from collections import Counter
>>> from contextlib import redirect_stderr
>>> from io import StringIO
>>> from operator import itemgetter
>>> from random import randrange
>>> from tqdm import tqdm
>>> row = lambda: f",,{randrange(100)},,\r\n"  # 5 columns, only care about middle column
>>> test_data = ''.join([row() for _ in range(10 ** 4)])  # CSV of 10.000 rows
>>> field_names = ['A', 'B', 'ID', 'C', 'D']
>>> filename = '/PATH/file'
>>> tests = []
>>> def as_test(f):
...     tests.append((f.__name__, f))
...
>>> @as_test
... def in_question(f, csv=csv, tqdm=tqdm, field_names=field_names):
...     ID_dict = {}
...     reader = csv.DictReader(f, field_names, delimiter=',')
...     for row in tqdm(reader):
...         label = row['ID']
...         if label not in ID_dict.keys():
...                 ID_dict[label] = 0
...         ID_dict[label] += 1
...
>>> @as_test
... def in_question_no_tqdm(f, csv=csv, tqdm=tqdm, field_names=field_names):
...     ID_dict = {}
...     reader = csv.DictReader(f, field_names, delimiter=',')
...     for row in reader:
...         label = row['ID']
...         if label not in ID_dict.keys():
...                 ID_dict[label] = 0
...         ID_dict[label] += 1
...
>>> @as_test
... def pandas_groupby_count(f, pandas=pandas, field_names=field_names):
...     df = pandas.read_csv(f, names=field_names)
...     grouped_counts = df.groupby('ID').count()
...
>>> @as_test
... def pandas_value_counts(f, pandas=pandas, field_names=field_names):
...     df = pandas.read_csv(f, names=field_names)
...     counts = df['ID'].value_counts()
...
>>> @as_test
... def counter_over_map(f, csv=csv, Counter=Counter, ig2=itemgetter(2)):
...     reader = csv.reader(f)
...     id_counts = Counter(map(ig2, reader))
...

以及运行定时测试:

>>> for testname, testfunc in tests:
...     timer = Timer(lambda s=StringIO, t=test_data: testfunc(s(t)))
...     with redirect_stderr(StringIO()):  # silence tqdm
...         count, totaltime = timer.autorange()
...         print(f"{testname:>25}: {totaltime / count * 1000:6.3f} microseconds ({count:>2d} runs)")
...
              in_question: 33.303 microseconds (10 runs)
      in_question_no_tqdm: 30.467 microseconds (10 runs)
     pandas_groupby_count:  5.298 microseconds (50 runs)
      pandas_value_counts:  5.975 microseconds (50 runs)
         counter_over_map:  4.047 microseconds (50 runs)

DictReader() 和 Python for 循环的组合才是真正导致您的版本慢 6 到 7 倍的原因。在抑制 stderr 的情况下,tqdm 的开销已降至 0.3 纳秒;删除 with redirect_stderr() 上下文管理器会使输出更加详细,并将时间增加到 50 微秒,因此每次迭代大约需要 2 纳秒:

>>> timer = Timer(lambda s=StringIO, t=test_data: tests[0][1](s(t)))
>>> count, totaltime = timer.autorange()
10000it [00:00, 263935.46it/s]
10000it [00:00, 240672.96it/s]
10000it [00:00, 215298.98it/s]
10000it [00:00, 226025.18it/s]
10000it [00:00, 201787.96it/s]
10000it [00:00, 202984.24it/s]
10000it [00:00, 192296.06it/s]
10000it [00:00, 195963.46it/s]
>>> print(f"{totaltime / count * 1000:6.3f} microseconds ({count:>2d} runs)")
50.193 microseconds ( 5 runs)

不过,Pandas 在这里表现得很好!但是,如果不分块,将所有 2 亿行数据读入内存(使用实际数据集,而不是我在这里生成的空列)所需的 GB 内存将会慢很多,并且可能不是您的机器实际可以承载的。使用 Counter() 不需要千兆字节的内存。

如果您需要对 CSV 数据集进行更多处理,那么使用 SQLite 也是一个好主意。那时我什至不会使用Python;只需使用 SQLite command line tool to import the CSV data directly :

$  csvanalysis.db
SQLite version 3.19.3 2017-06-27 16:48:08
Enter ".help" for usage hints.
sqlite> CREATE TABLE csvdata (A, B, ID, C, D);
sqlite> CREATE INDEX csvdata_id on csvdata (ID);
sqlite> .import /path/to/file.csv csvdata
sqlite> SELECT ID, COUNT(*) FROM csvdata GROUP BY ID;

等等

关于python - 在 Python 中动态计算 csv 列的出现次数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53538888/

相关文章:

python - 如何在 Pandas DataFrame 中对经纬度配对进行分组?

python - 具有 4 个方程的绘图图由 f(x,y, xy) 组成

javascript - 如何在 JavaScript 代码中使用 JMeter 变量?

windows - 如何通过在 shell 脚本中读取 csv 文件来计算 2 列的总和,将其添加到新列中

python - 如何将外部文件中的分数添加到字典中并对它们进行从 1 到 10 的排名

c# - 传递字典以在 HTML/Razor 中完全查看和使用它

Python 检查列表项是否在文本文件中

python - Python 中一长串 Try-Except 语句的简洁替换?

firebase - 如何将 Firebase 实时数据库导出为 CSV?

C# 字典 TryGetValue 与 int 值,如何避免双重查找