python - 在Python中读取巨大的csv文件、处理然后写入处理后的csv的最快方法

标签 python csv bigdata dask

我有许多巨大的 csv 文件(20GB 左右),我需要读取、处理然后将处理后的文件写回新的 csv。

csv 文件的头部如下所示: enter image description here

该任务的目标是读取 csv 文件并比较每行的时间,看看它是否在字典中包含的开始时间和结束时间之内。如果不是,则跳过该行,如果是,则将其写入新文件。

听起来很简单,但由于规模大,效率至关重要,我需要一些建议。

我尝试了多种方法,包括尝试读取 pandas 中的整个文件,这需要很长时间或由于内存问题而崩溃。我还尝试逐行打开并读取文件,然后对其进行处理,但这似乎也需要很长时间。我现在的攻击路线是使用 dask,但在继续之前我想看看是否有人可以给我任何关于提高速度的提示:

  1. 阅读
  2. 处理 - 这似乎需要很长时间,因为我在 dask 中使用 apply 函数将处理函数应用到每一行。当我尝试此操作时,处理一个文件大约需要 3 个小时。
  3. 写入 - 似乎需要很长时间才能写入一个 20GB 的 csv 文件。我尝试使用 dask 输出每个分区,然后将分区合并到一个文件中,这似乎确实提高了一点速度。

考虑到我有 16GB RAM,上面每个阶段的攻击计划是什么才能产生最快的结果?

这是我在阅读您的回复后想出的代码。我加入了一些代码来显示完成的百分比。当文件处理时,完成百分比似乎会减慢很多,就好像存在内存问题或其他问题一样。对于 20GB 的文件,处理时间需要 4 个多小时,这似乎是一个很长的时间。我在处理过程中查看了系统资源,CPU 占用率约为 16%,内存约为 10GB 或 16GB。有人可以告诉我这是否是我期望的速度或者代码有问题吗?


from datetime import datetime, timedelta, timezone
from symbol_specification import darwinex_quote_hours, darwinex_symbol_specs
import os
import csv
import pandas as pd
import io
import tailer

# Software Variables and Information
__author__ = 'TBC'
__copyright__ = 'TBC'
__credits__ = ['TBC']
__license__ = 'GPL'
__version__ = '1.0.4dev'
__maintainer__ = 'TBC'
__email__ = 'TBC'
__status__ = 'Production'
software_name = 'TBC'


def get_digits(str_val):
    return len(str_val.split('.')[-1])


def get_in_filepath(dir_name, filename):
    return dir_name + r"\\" + filename


def get_out_filepath(dir_name, filename):
    return dir_name + r"\\" + filename.split('.csv')[0] + '_OOH_removed.csv'


def get_input_variables():

    dir_name = r"X:"

    # dictionary containing darwinex symbol id and csv filename
    run_details = {'CADCHF': 'CADCHF_mt5_ticks.csv',
                   'CADJPY': 'CADJPY_mt5_ticks.csv',
                   'CHFJPY': 'CHFJPY_mt5_ticks.csv',
                   'EURAUD': 'EURAUD_mt5_ticks.csv',
                   'EURCAD': 'EURCAD_mt5_ticks.csv',
                   'EURCHF': 'EURCHF_mt5_ticks.csv',
                   'EURGBP': 'EURGBP_mt5_ticks.csv',
                   'EURJPY': 'EURJPY_mt5_ticks.csv',
                   'EURNZD': 'EURNZD_mt5_ticks.csv'}

    # remove out of hours ticks?
    remove_out_of_hours_ticks = True

    # round OHLC values?
    round_ohlc_required = False

    # remove trailing zeros - ** Saves sapce in the output file
    remove_trailing_zeros = True

    # get quote hours as shown in MT5 symbol specification
    quote_hours = darwinex_quote_hours

    # specify the broker timezone in hours
    # ****************************IMPORTANT NOTE******************************
    # The Quote hours specified above relate to the broker time zone which maybe
    # different to the timezone tha the tickstory data was downloaded in.
    # Following my data download guidelines the tickstory data should be downloaded in
    # UTC+0):00 Dublin, Edinburgh, Lisbon, London timezone WITH DST. The Quote
    # hours specified in the MT5 specification are in the broker server time
    # which could be different. For example Darwinex is UTC +2.
    # Therefore the broker time offset is +2. The code will then subtract 2
    # hours to any of the quote times before using them.
    # ************************************************************************
    broker_time_offset = 2

    # create input dictionary
    input_vars = {
        'dir_name': dir_name,
        'run_details': run_details,
        'remove_out_of_hours_ticks': remove_out_of_hours_ticks,
        'remove_trailing_zeros': remove_trailing_zeros,
        'quote_hours': quote_hours,
        'quote_broker_time_offset': broker_time_offset,
        'round_ohlc_required': round_ohlc_required,
    }
    return input_vars


def round_ohlc(line, digits, input_vars):
    # assign vals
    date = line[0]
    time = line[1]
    bid = line[2]
    ask = line[3]
    last = line[4]
    vol = line[5]

    if digits != 0:
        bid = round(float(bid), digits)
        ask = round(float(ask), digits)
        last = round(float(last), digits)
    else:
        bid = int(round(float(bid), digits))
        ask = int(round(float(ask), digits))
        last = int(round(float(last), digits))

    # assemble line
    if input_vars['remove_trailing_zeros']:
        line = [date, time, f'{bid:.{digits}f}'.rstrip('0').rstrip('.'),
                f'{ask:.{digits}f}'.rstrip('0').rstrip('.'),
                f'{last:.{digits}f}'.rstrip('0').rstrip('.'), vol]
    else:
        line = [date, time, f'{bid:.{digits}f}', f'{ask:.{digits}f}',
                f'{last:.{digits}f}', vol]

    return line


def get_weekday_string(day_digit):
    weekdays = {0: 'Mon',
                1: 'Tues',
                2: 'Wed',
                3: 'Thurs',
                4: 'Fri',
                5: 'Sat',
                6: 'Sun'}

    return weekdays[day_digit]


def get_weekday_string(day_digit):
    weekdays = {0: 'Mon',
                1: 'Tues',
                2: 'Wed',
                3: 'Thurs',
                4: 'Fri',
                5: 'Sat',
                6: 'Sun'}

    return weekdays[day_digit]


def remove_out_of_hours_tick(tick_datetime, symbol_quote_hours, input_vars):
    # get quote offset
    quote_offset = input_vars['quote_broker_time_offset']

    # adjust tick_datetime for offset
    tick_datetime_adj = tick_datetime + timedelta(hours=quote_offset)

    # get quote hours
    day_string = get_weekday_string(tick_datetime_adj.weekday())
    quote_hours = symbol_quote_hours[day_string]

    # initialse keep tick to False (remove)
    remove_tick = True

    # iterate through all quote start/end pairs and check to see if tick is in hours
    for idx in range(len(quote_hours['start'])):
        tick_time = tick_datetime_adj

        # get date of tick
        tick_date = tick_time.date()

        # form quote hours start time
        start_time = datetime.strptime(quote_hours['start'][idx], '%H:%M').time()

        # combine the date and quote start time to form datetime
        start = datetime.combine(tick_date, start_time)

        if quote_hours['end'][idx] == '24:00':
            # special case. 24:00 means to the end of the day but it's not
            # recognised by python as a valid datetime. To get around this the
            # day is incremented by 1 and a time of 00:00 used which is equivalent.

            # form quote hours end time
            end_time = datetime.strptime('00:00', '%H:%M').time()

            # combine the date and quote end time to form datetime
            end = datetime.combine(tick_date + timedelta(days=1), end_time)
        else:
            # form quote hours end time
            end_time = datetime.strptime(quote_hours['end'][idx], '%H:%M').time()

            # combine the date and quote end time to form datetime
            end = datetime.combine(tick_date, end_time)

        # check to see if tick is within quote hours
        if start <= tick_time <= end:
            remove_tick = False

    return remove_tick


def write_conversion_log(input_filename, output_filename,
                         mod_string, conversion_time, input_vars):
    # conversion log file name
    working_dir = input_vars['dir_name']
    filename = working_dir + '//tickstory_tick_pre_processor_conversions.log'

    # determine if file exists or not and assign appropriate opening flag
    if os.path.exists(filename):
        append_write = 'a'  # append if already exists
    else:
        append_write = 'w'  # make a new file if not

    with open(filename, append_write) as outfile:
        outfile.write('--------------------------------------------------------'
                      '-------------------------\n')
        outfile.write('Conversion Details\n')
        outfile.write(f'Software Name: {software_name}\n')
        outfile.write(f'Software Version: {__version__}\n')
        outfile.write(f'Date/Time: {datetime.now(timezone.utc)}\n')
        outfile.write(f'Input file = {input_filename}\n')
        outfile.write(f'{mod_string}\n')
        outfile.write(f'Output file = {output_filename}\n')
        outfile.write(f'Conversion Duration: {conversion_time}')
        outfile.write('--------------------------------------------------------'
                      '-------------------------')


def get_start_end_date(filename):
    # Get the first 3 rows of the file
    df_start = pd.read_csv(filepath_or_buffer=filename,
                           encoding='utf8',
                           nrows=3)

    # Add column names
    df_start.columns = ['Date', 'Time', 'Bid_Price', 'Ask_Price', 'Last_Price', 'Volume']

    # create Datetime column
    df_start['Datetime'] = df_start['Date'].astype(str) + ' ' + df_start['Time'].astype(str)
    df_start['Datetime'] = pd.to_datetime(df_start['Datetime'], format='%Y%m%d %H:%M:%S')

    # Get the last 3 rows of the file
    with open(filename, 'r', encoding='utf8') as file:
        last_lines = tailer.tail(file, 3)

    # clean up last line for line feed carriage returns etc by checking if line has a string
    while last_lines[-1] == '':
        last_lines = last_lines[:-1]

    df_end = pd.read_csv(io.StringIO('\n'.join(last_lines[1:])), header=None)

    # Add column names
    df_end.columns = ['Date', 'Time', 'Bid_Price', 'Ask_Price', 'Last_Price', 'Volume']

    # create Datetime column
    df_end['Datetime'] = df_end['Date'].astype(str) + ' ' + df_end['Time'].astype(str)
    df_end['Datetime'] = pd.to_datetime(df_end['Datetime'], format='%Y%m%d %H:%M:%S')

    # Add Start and End time to Symbol Info dictionary
    start_date = df_start['Datetime'][0]
    end_date = df_end['Datetime'][0]

    return start_date, end_date


def get_percentage_complete(start, end, tick_datetime):
    total_period = end - start
    period_complete = tick_datetime - start
    pct_complete = (period_complete / total_period) * 100

    return pct_complete


def main():
    # get input variables
    input_vars = get_input_variables()

    for darwinex_id, filename in input_vars['run_details'].items():

        # get filenames
        input_filename = get_in_filepath(input_vars['dir_name'], filename)
        output_filename = get_out_filepath(input_vars['dir_name'], filename)

        # get the start and end dates of the data so % copmlete can be determined
        start_date, end_date = get_start_end_date(input_filename)

        # get symbol quote hours
        symbol_quote_hours = darwinex_quote_hours[darwinex_id]

        # initialse list
        temp_list = []

        # read csv
        before_process = datetime.now()

        # initialise counters and mod string
        ticks_removed_count = 0
        mod_string = ''
        file_converted = False
        percentage_complete = 0

        # if processing requried open input and ouput files and process as requried
        if input_vars['remove_out_of_hours_ticks'] or input_vars['round_ohlc_required']:
            file_converted = True
            with open(input_filename, 'r', newline='') as f_in:
                with open(output_filename, 'w', newline='') as f_out:
                    # set up reader ad writer buffers
                    reader = csv.reader(f_in)
                    writer = csv.writer(f_out)
                    # for each line check whether datetime is within hours. If it is keep line, if not skip
                    for idx, line in enumerate(reader):
                        tick_datetime = datetime.strptime(line[0] + ' ' + line[1], '%Y%m%d %H:%M:%S')
                        if not remove_out_of_hours_tick(tick_datetime, symbol_quote_hours, input_vars):
                            # keep line
                            # convert OHLC values if required
                            if input_vars['round_ohlc_required']:
                                digits_round = darwinex_symbol_specs[darwinex_id]['digits']
                                line = round_ohlc(line, digits_round, input_vars)
                            # write line to new file
                            writer.writerow(line)
                        else:
                            ticks_removed_count += 1
                        # determine and output the % complete after every 1000 lines
                        if idx % 1000 == 0:
                            percentage_complete_new = get_percentage_complete(start_date, end_date, tick_datetime)
                            if int(percentage_complete_new) - int(percentage_complete):
                                percentage_complete = int(percentage_complete_new)
                                print(f'{input_filename} % Complete: {percentage_complete:.0f}%', end='\r')

        # calculate conversion time
        after_process = datetime.now()
        conversion_time = after_process - before_process

        if mod_string != '':
            newline = '\n'
        else:
            newline = ''

        # update tick removal modification string
        if input_vars['remove_out_of_hours_ticks']:
            mod_string = mod_string + (f'{newline}***Tick Removal***\n'
                                       f'{ticks_removed_count} ticks have been removed')
        else:
            mod_string = mod_string + f'{newline}***Tick removal NOT requested***'

        if mod_string != '':
            newline = '\n'
        else:
            newline = ''

        # update rounding modification string
        if input_vars['round_ohlc_required']:
            mod_string = mod_string + (f'{newline}***OHLC values converted***\n'
                                       f'OHLC values rounded to {digits_round} digits')
            if input_vars['remove_trailing_zeros']:
                mod_string = mod_string + (f'\nOHLC values trailing zeros removed')
        else:
            mod_string = mod_string + (f'{newline}***OHLC value conversion NOT requested***')

        if mod_string != '':
            newline = '\n'
        else:
            newline = ''

        # update case when no conversion is specified
        if not input_vars['remove_out_of_hours_ticks'] and not input_vars['round_ohlc_required']:
            mod_string = f'No Conversion performed as not requested in input parameters'

        # write conversion details to log file
        if file_converted:
            write_conversion_log(input_filename, output_filename,
                                 mod_string, conversion_time, input_vars)

            # print conversion details to the terminal
            print('----------------------------------------------------------------')
            print(f'Software Name: {software_name}')
            print(f'Software Version: {__version__}')
            print(f'Date/Time: {datetime.now(timezone.utc)}')
            print('Conversion Details')
            print(f'input file = {input_filename}')
            print(mod_string)
            print(f'output file = {output_filename}')
            print(f'Conversion Duration: {conversion_time}')
            print('----------------------------------------------------------------')
        else:
            print(f'input file = {input_filename} - {mod_string}')


if __name__ == '__main__':
    main()

最佳答案

过滤 CSV 的基本模式如下所示:

import csv

f_in = open("input.csv", newline="")
f_out = open("output.csv", "w", newline="")

reader = csv.reader(f_in)
writer = csv.writer(f_out)

writer.writerow(next(reader))  # transfer header, if you have one

for row in reader:
    if meets_my_condition(row):
        writer.writerow(row)

f_in.close()
f_out.close()

对于简单的行过滤,这与在 Python 中实现的速度和内存效率一样快:读取器一次迭代一行,因此一次内存中不超过一行时间;文件读/写会被缓冲,因此 IO 瓶颈会在系统和 Python 允许的范围内尽可能低。与此相比,任何其他框架/库(Dask 等)可能会带来一些性能开销;和 never use Pandas to iterate rows .

根据我对您的屏幕截图和描述的理解,meets_my_condition需要检查第2列中的值是否在某个时间范围内;可能看起来像:

def meets_my_condition(row: list[str]) -> bool:
    return row[1] >= "00:00:00" and row[1] <= "12:00:00"

鉴于此输入 CSV:

Col1,Col2
a,01:00
b,10:00
c,13:00
d,07:00
e,23:00
f,00:00

我得到这个输出 CSV:

Col1,Col2
a,01:00
b,10:00
d,07:00
f,00:00

我之前已经规范过类似的操作,并且内存几乎不超过我笔记本电脑上 Python 的最小占用量 7-8MB,因此有了 16GB,您将拥有足够的空间。

关于python - 在Python中读取巨大的csv文件、处理然后写入处理后的csv的最快方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75413294/

相关文章:

python - Jupyter Notebook 中是否缺少错误突出显示和自动完成功能?

python - 使用 TypeVar 作为 Callable 的参数时出现问题

ios - 在应用程序中附加电子邮件?

hadoop - 无法将现有文件附加到 HDFS

java - 安装 R 包时出错

python - 用作外键时如何更改 Django Admin 中的用户表示?

python - 如何加载 pyYAML 文件并使用属性而不是使用字典符号访问它?

javascript - Mongoose - 版本错误 : No matching document found for id

csv - 使用awk有效解析CSV的最健壮方法是什么?

hadoop - 独立的 map reduce 作业一个接一个地执行