python - 如何在不使用 pandas merge 将所有数据帧读取到内存的情况下对多个文件执行逐行合并

标签 python pandas dask

我希望在与该文件比较后根据 2 列匹配将多个文件与单个 (f1.txt) 文件合并。我可以在 pandas 中做到这一点,但它会将所有内容读取到内存中,这会变得非常快。我认为逐行阅读不会将所有内容加载到内存中。现在 Pandas 也不是一个选择。如何在与 f1.txt 不匹配的单元格中填写 null 时执行操作?

在这里,我使用了一本字典,我不确定它是否会保存在内存中,而且我也找不到在其他文件与 f1.txt 不匹配的情况下添加 null 的方法。其他文件可能有多达 1000 个不同的文件。只要我不把所有内容都记下来,时间并不重要

文件(制表符分隔)

f1.txt
A B  num  val scol
1 a1 1000 2 3
2 a2 456 7 2
3 a3 23 2 7
4 a4 800 7 3
5 a5 10 8 7

a1.txt
A B num val scol fcol dcol
1 a1 1000 2 3 0.2 0.77
2 a2 456 7 2 0.3 0.4
3 a3 23 2 7 0.5 0.6
4 a4 800 7 3 0.003 0.088

a2.txt
A B num val scol fcol2 dcol1
2 a2 456 7 2 0.7 0.8
4 a4 800 7 3 0.9 0.01
5 a5 10 8 7 0.03 0.07

当前代码

import os
import csv
m1 = os.getcwd() + '/f1.txt'
files_to_compare = [i for i in os.listdir('dir')]
dictionary = dict()
dictionary1 = dict()
with open(m1, 'rt') as a:
    reader1 = csv.reader(a, delimiter='\t')
    for x in files_to_compare:
        with open(os.getcwd() + '/dir/' + x, 'rt') as b:
            reader2 = csv.reader(b, delimiter='\t')
            for row1 in list(reader1):              
                dictionary[row1[0]] = list()
                dictionary1[row1[0]] = list(row1)
            for row2 in list(reader2):
                try:
                    dictionary[row2[0]].append(row2[5:])
                except KeyError:
                    pass
print(dictionary)
print(dictionary1)

我想要实现的目标类似于使用: df.merge(df1, on=['A','B'], how='left').fillna('null')

current result
{'A': [['fcol1', 'dcol1'], ['fcol', 'dcol']], '1': [['0.2', '0.77']], '2': [['0.7', '0.8'], ['0.3', '0.4']], '3': [['0.5', '0.6']], '4': [['0.9', '0.01'], ['0.003', '0.088']], '5': [['0.03', '0.07']]}

{'A': ['A', 'B', 'num', 'val', 'scol'], '1': ['1', 'a1', '1000', '2', '3'], '2': ['2', 'a2', '456', '7', '2'], '3': ['3', 'a3', '23', '2', '7'], '4': ['4', 'a4', '800', '7', '3'], '5': ['5', 'a5', '10', '8', '7']}
Desired result
{'A': [['fcol1', 'dcol1'], ['fcol', 'dcol']], '1': [['0.2', '0.77'],['null', 'null']], '2': [['0.7', '0.8'], ['0.3', '0.4']], '3': [['0.5', '0.6'],['null', 'null']], '4': [['0.9', '0.01'], ['0.003', '0.088']], '5': [['null', 'null'],['0.03', '0.07']]}

{'A': ['A', 'B', 'num', 'val', 'scol'], '1': ['1', 'a1', '1000', '2', '3'], '2': ['2', 'a2', '456', '7', '2'], '3': ['3', 'a3', '23', '2', '7'], '4': ['4', 'a4', '800', '7', '3'], '5': ['5', 'a5', '10', '8', '7']}

我的最终目的是将字典写入文本文件。我不知道将使用多少内存,也不知道它是否适合内存。如果有更好的方法而不使用 pandas,那会很好,否则我如何使字典工作?

DASK 尝试:

import dask.dataframe as dd    
directory = 'input_dir/'
first_file = dd.read_csv('f1.txt', sep='\t')
df = dd.read_csv(directory + '*.txt', sep='\t')
df2 = dd.merge(first_file, df, on=[A, B])

I kept getting ValueError: Metadata mismatch found in 'from_delayed' 
+-----------+--------------------+
| column    |  Found  | Expected |
+--------------------------------+
| fcol      |  int64  | float64  |
+-----------+--------------------+

我用谷歌搜索,发现了类似的投诉,但无法解决。这就是为什么我决定尝试这个。检查我的文件,所有数据类型似乎都是一致的。我的 dask 版本是 2.9.1

最佳答案

如果您想要手工制作解决方案,您可以查看 heapq.mergeitertools.groupby 。这假设您的文件按前两列(键)排序

我做了一个简单的例子,将文件合并和分组并生成两个文件,而不是字典(因此(几乎)没有任何内容存储在内存中,所有内容都在从磁盘读取/写入):

from heapq import merge
from itertools import groupby

first_file_name = 'f1.txt'
other_files = ['a1.txt', 'a2.txt']

def get_lines(filename):
    with open(filename, 'r') as f_in:
        for line in f_in:
            yield [filename, *line.strip().split()]

def get_values(lines):
    for line in lines:
        yield line
    while True:
        yield ['null']

opened_files = [get_lines(f) for f in [first_file_name] + other_files]

# save headers
headers = [next(f) for f in opened_files]

with open('out1.txt', 'w') as out1, open('out2.txt', 'w') as out2:
    # print headers to files
    print(*headers[0][1:6], sep='\t', file=out1)

    new_header = []
    for h in headers[1:]:
        new_header.extend(h[6:])

    print(*(['ID'] + new_header), sep='\t', file=out2)

    for v, g in groupby(merge(*opened_files, key=lambda k: (k[1], k[2])), lambda k: (k[1], k[2])):
        lines = [*g]

        print(*lines[0][1:6], sep='\t', file=out1)

        out_line = [lines[0][1]]
        iter_lines = get_values(lines[1:])
        current_line = next(iter_lines)
        for current_file in other_files:
            if current_line[0] == current_file:
                out_line.extend(current_line[6:])
                current_line = next(iter_lines)
            else:
                out_line.extend(['null', 'null'])
        print(*out_line, sep='\t', file=out2)

生成两个文件:

out1.txt:

A   B   num val scol
1   a1  1000    2   3
2   a2  456 7   2
3   a3  23  2   7
4   a4  800 7   3
5   a5  10  8   7

out2.txt:

ID  fcol    dcol    fcol2   dcol1
1   0.2 0.77    null    null
2   0.3 0.4 0.7 0.8
3   0.5 0.6 null    null
4   0.003   0.088   0.9 0.01
5   null    null    0.03    0.07

关于python - 如何在不使用 pandas merge 将所有数据帧读取到内存的情况下对多个文件执行逐行合并,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59563051/

相关文章:

python - 使用 Python 从 HTML 中提取纯文本

python - 如何在 tkinter 中用箭头连接两个状态圆圈?

python - 从 Pandas 数据框创建字典

python - 从 pyspark 中的数据框构建 StructType

python - 数据清理(标记)死传感器

python - 根据值计数列删除排序行

python - 如何在 Dask 中重置连接数据帧的索引

python - Dask add_done_callback 与其他参数?

python - 在python中按列连接两个大文件

Python pandas 将可变数量的重复时间戳更改为唯一