我有许多巨大的 csv 文件(20GB 左右),我需要读取、处理然后将处理后的文件写回新的 csv。
该任务的目标是读取 csv 文件并比较每行的时间,看看它是否在字典中包含的开始时间和结束时间之内。如果不是,则跳过该行,如果是,则将其写入新文件。
听起来很简单,但由于规模大,效率至关重要,我需要一些建议。
我尝试了多种方法,包括尝试读取 pandas 中的整个文件,这需要很长时间或由于内存问题而崩溃。我还尝试逐行打开并读取文件,然后对其进行处理,但这似乎也需要很长时间。我现在的攻击路线是使用 dask,但在继续之前我想看看是否有人可以给我任何关于提高速度的提示:
- 阅读
- 处理 - 这似乎需要很长时间,因为我在 dask 中使用 apply 函数将处理函数应用到每一行。当我尝试此操作时,处理一个文件大约需要 3 个小时。
- 写入 - 似乎需要很长时间才能写入一个 20GB 的 csv 文件。我尝试使用 dask 输出每个分区,然后将分区合并到一个文件中,这似乎确实提高了一点速度。
考虑到我有 16GB RAM,上面每个阶段的攻击计划是什么才能产生最快的结果?
这是我在阅读您的回复后想出的代码。我加入了一些代码来显示完成的百分比。当文件处理时,完成百分比似乎会减慢很多,就好像存在内存问题或其他问题一样。对于 20GB 的文件,处理时间需要 4 个多小时,这似乎是一个很长的时间。我在处理过程中查看了系统资源,CPU 占用率约为 16%,内存约为 10GB 或 16GB。有人可以告诉我这是否是我期望的速度或者代码有问题吗?
from datetime import datetime, timedelta, timezone
from symbol_specification import darwinex_quote_hours, darwinex_symbol_specs
import os
import csv
import pandas as pd
import io
import tailer
# Software Variables and Information
__author__ = 'TBC'
__copyright__ = 'TBC'
__credits__ = ['TBC']
__license__ = 'GPL'
__version__ = '1.0.4dev'
__maintainer__ = 'TBC'
__email__ = 'TBC'
__status__ = 'Production'
software_name = 'TBC'
def get_digits(str_val):
return len(str_val.split('.')[-1])
def get_in_filepath(dir_name, filename):
return dir_name + r"\\" + filename
def get_out_filepath(dir_name, filename):
return dir_name + r"\\" + filename.split('.csv')[0] + '_OOH_removed.csv'
def get_input_variables():
dir_name = r"X:"
# dictionary containing darwinex symbol id and csv filename
run_details = {'CADCHF': 'CADCHF_mt5_ticks.csv',
'CADJPY': 'CADJPY_mt5_ticks.csv',
'CHFJPY': 'CHFJPY_mt5_ticks.csv',
'EURAUD': 'EURAUD_mt5_ticks.csv',
'EURCAD': 'EURCAD_mt5_ticks.csv',
'EURCHF': 'EURCHF_mt5_ticks.csv',
'EURGBP': 'EURGBP_mt5_ticks.csv',
'EURJPY': 'EURJPY_mt5_ticks.csv',
'EURNZD': 'EURNZD_mt5_ticks.csv'}
# remove out of hours ticks?
remove_out_of_hours_ticks = True
# round OHLC values?
round_ohlc_required = False
# remove trailing zeros - ** Saves sapce in the output file
remove_trailing_zeros = True
# get quote hours as shown in MT5 symbol specification
quote_hours = darwinex_quote_hours
# specify the broker timezone in hours
# ****************************IMPORTANT NOTE******************************
# The Quote hours specified above relate to the broker time zone which maybe
# different to the timezone tha the tickstory data was downloaded in.
# Following my data download guidelines the tickstory data should be downloaded in
# UTC+0):00 Dublin, Edinburgh, Lisbon, London timezone WITH DST. The Quote
# hours specified in the MT5 specification are in the broker server time
# which could be different. For example Darwinex is UTC +2.
# Therefore the broker time offset is +2. The code will then subtract 2
# hours to any of the quote times before using them.
# ************************************************************************
broker_time_offset = 2
# create input dictionary
input_vars = {
'dir_name': dir_name,
'run_details': run_details,
'remove_out_of_hours_ticks': remove_out_of_hours_ticks,
'remove_trailing_zeros': remove_trailing_zeros,
'quote_hours': quote_hours,
'quote_broker_time_offset': broker_time_offset,
'round_ohlc_required': round_ohlc_required,
}
return input_vars
def round_ohlc(line, digits, input_vars):
# assign vals
date = line[0]
time = line[1]
bid = line[2]
ask = line[3]
last = line[4]
vol = line[5]
if digits != 0:
bid = round(float(bid), digits)
ask = round(float(ask), digits)
last = round(float(last), digits)
else:
bid = int(round(float(bid), digits))
ask = int(round(float(ask), digits))
last = int(round(float(last), digits))
# assemble line
if input_vars['remove_trailing_zeros']:
line = [date, time, f'{bid:.{digits}f}'.rstrip('0').rstrip('.'),
f'{ask:.{digits}f}'.rstrip('0').rstrip('.'),
f'{last:.{digits}f}'.rstrip('0').rstrip('.'), vol]
else:
line = [date, time, f'{bid:.{digits}f}', f'{ask:.{digits}f}',
f'{last:.{digits}f}', vol]
return line
def get_weekday_string(day_digit):
weekdays = {0: 'Mon',
1: 'Tues',
2: 'Wed',
3: 'Thurs',
4: 'Fri',
5: 'Sat',
6: 'Sun'}
return weekdays[day_digit]
def get_weekday_string(day_digit):
weekdays = {0: 'Mon',
1: 'Tues',
2: 'Wed',
3: 'Thurs',
4: 'Fri',
5: 'Sat',
6: 'Sun'}
return weekdays[day_digit]
def remove_out_of_hours_tick(tick_datetime, symbol_quote_hours, input_vars):
# get quote offset
quote_offset = input_vars['quote_broker_time_offset']
# adjust tick_datetime for offset
tick_datetime_adj = tick_datetime + timedelta(hours=quote_offset)
# get quote hours
day_string = get_weekday_string(tick_datetime_adj.weekday())
quote_hours = symbol_quote_hours[day_string]
# initialse keep tick to False (remove)
remove_tick = True
# iterate through all quote start/end pairs and check to see if tick is in hours
for idx in range(len(quote_hours['start'])):
tick_time = tick_datetime_adj
# get date of tick
tick_date = tick_time.date()
# form quote hours start time
start_time = datetime.strptime(quote_hours['start'][idx], '%H:%M').time()
# combine the date and quote start time to form datetime
start = datetime.combine(tick_date, start_time)
if quote_hours['end'][idx] == '24:00':
# special case. 24:00 means to the end of the day but it's not
# recognised by python as a valid datetime. To get around this the
# day is incremented by 1 and a time of 00:00 used which is equivalent.
# form quote hours end time
end_time = datetime.strptime('00:00', '%H:%M').time()
# combine the date and quote end time to form datetime
end = datetime.combine(tick_date + timedelta(days=1), end_time)
else:
# form quote hours end time
end_time = datetime.strptime(quote_hours['end'][idx], '%H:%M').time()
# combine the date and quote end time to form datetime
end = datetime.combine(tick_date, end_time)
# check to see if tick is within quote hours
if start <= tick_time <= end:
remove_tick = False
return remove_tick
def write_conversion_log(input_filename, output_filename,
mod_string, conversion_time, input_vars):
# conversion log file name
working_dir = input_vars['dir_name']
filename = working_dir + '//tickstory_tick_pre_processor_conversions.log'
# determine if file exists or not and assign appropriate opening flag
if os.path.exists(filename):
append_write = 'a' # append if already exists
else:
append_write = 'w' # make a new file if not
with open(filename, append_write) as outfile:
outfile.write('--------------------------------------------------------'
'-------------------------\n')
outfile.write('Conversion Details\n')
outfile.write(f'Software Name: {software_name}\n')
outfile.write(f'Software Version: {__version__}\n')
outfile.write(f'Date/Time: {datetime.now(timezone.utc)}\n')
outfile.write(f'Input file = {input_filename}\n')
outfile.write(f'{mod_string}\n')
outfile.write(f'Output file = {output_filename}\n')
outfile.write(f'Conversion Duration: {conversion_time}')
outfile.write('--------------------------------------------------------'
'-------------------------')
def get_start_end_date(filename):
# Get the first 3 rows of the file
df_start = pd.read_csv(filepath_or_buffer=filename,
encoding='utf8',
nrows=3)
# Add column names
df_start.columns = ['Date', 'Time', 'Bid_Price', 'Ask_Price', 'Last_Price', 'Volume']
# create Datetime column
df_start['Datetime'] = df_start['Date'].astype(str) + ' ' + df_start['Time'].astype(str)
df_start['Datetime'] = pd.to_datetime(df_start['Datetime'], format='%Y%m%d %H:%M:%S')
# Get the last 3 rows of the file
with open(filename, 'r', encoding='utf8') as file:
last_lines = tailer.tail(file, 3)
# clean up last line for line feed carriage returns etc by checking if line has a string
while last_lines[-1] == '':
last_lines = last_lines[:-1]
df_end = pd.read_csv(io.StringIO('\n'.join(last_lines[1:])), header=None)
# Add column names
df_end.columns = ['Date', 'Time', 'Bid_Price', 'Ask_Price', 'Last_Price', 'Volume']
# create Datetime column
df_end['Datetime'] = df_end['Date'].astype(str) + ' ' + df_end['Time'].astype(str)
df_end['Datetime'] = pd.to_datetime(df_end['Datetime'], format='%Y%m%d %H:%M:%S')
# Add Start and End time to Symbol Info dictionary
start_date = df_start['Datetime'][0]
end_date = df_end['Datetime'][0]
return start_date, end_date
def get_percentage_complete(start, end, tick_datetime):
total_period = end - start
period_complete = tick_datetime - start
pct_complete = (period_complete / total_period) * 100
return pct_complete
def main():
# get input variables
input_vars = get_input_variables()
for darwinex_id, filename in input_vars['run_details'].items():
# get filenames
input_filename = get_in_filepath(input_vars['dir_name'], filename)
output_filename = get_out_filepath(input_vars['dir_name'], filename)
# get the start and end dates of the data so % copmlete can be determined
start_date, end_date = get_start_end_date(input_filename)
# get symbol quote hours
symbol_quote_hours = darwinex_quote_hours[darwinex_id]
# initialse list
temp_list = []
# read csv
before_process = datetime.now()
# initialise counters and mod string
ticks_removed_count = 0
mod_string = ''
file_converted = False
percentage_complete = 0
# if processing requried open input and ouput files and process as requried
if input_vars['remove_out_of_hours_ticks'] or input_vars['round_ohlc_required']:
file_converted = True
with open(input_filename, 'r', newline='') as f_in:
with open(output_filename, 'w', newline='') as f_out:
# set up reader ad writer buffers
reader = csv.reader(f_in)
writer = csv.writer(f_out)
# for each line check whether datetime is within hours. If it is keep line, if not skip
for idx, line in enumerate(reader):
tick_datetime = datetime.strptime(line[0] + ' ' + line[1], '%Y%m%d %H:%M:%S')
if not remove_out_of_hours_tick(tick_datetime, symbol_quote_hours, input_vars):
# keep line
# convert OHLC values if required
if input_vars['round_ohlc_required']:
digits_round = darwinex_symbol_specs[darwinex_id]['digits']
line = round_ohlc(line, digits_round, input_vars)
# write line to new file
writer.writerow(line)
else:
ticks_removed_count += 1
# determine and output the % complete after every 1000 lines
if idx % 1000 == 0:
percentage_complete_new = get_percentage_complete(start_date, end_date, tick_datetime)
if int(percentage_complete_new) - int(percentage_complete):
percentage_complete = int(percentage_complete_new)
print(f'{input_filename} % Complete: {percentage_complete:.0f}%', end='\r')
# calculate conversion time
after_process = datetime.now()
conversion_time = after_process - before_process
if mod_string != '':
newline = '\n'
else:
newline = ''
# update tick removal modification string
if input_vars['remove_out_of_hours_ticks']:
mod_string = mod_string + (f'{newline}***Tick Removal***\n'
f'{ticks_removed_count} ticks have been removed')
else:
mod_string = mod_string + f'{newline}***Tick removal NOT requested***'
if mod_string != '':
newline = '\n'
else:
newline = ''
# update rounding modification string
if input_vars['round_ohlc_required']:
mod_string = mod_string + (f'{newline}***OHLC values converted***\n'
f'OHLC values rounded to {digits_round} digits')
if input_vars['remove_trailing_zeros']:
mod_string = mod_string + (f'\nOHLC values trailing zeros removed')
else:
mod_string = mod_string + (f'{newline}***OHLC value conversion NOT requested***')
if mod_string != '':
newline = '\n'
else:
newline = ''
# update case when no conversion is specified
if not input_vars['remove_out_of_hours_ticks'] and not input_vars['round_ohlc_required']:
mod_string = f'No Conversion performed as not requested in input parameters'
# write conversion details to log file
if file_converted:
write_conversion_log(input_filename, output_filename,
mod_string, conversion_time, input_vars)
# print conversion details to the terminal
print('----------------------------------------------------------------')
print(f'Software Name: {software_name}')
print(f'Software Version: {__version__}')
print(f'Date/Time: {datetime.now(timezone.utc)}')
print('Conversion Details')
print(f'input file = {input_filename}')
print(mod_string)
print(f'output file = {output_filename}')
print(f'Conversion Duration: {conversion_time}')
print('----------------------------------------------------------------')
else:
print(f'input file = {input_filename} - {mod_string}')
if __name__ == '__main__':
main()
最佳答案
过滤 CSV 的基本模式如下所示:
import csv
f_in = open("input.csv", newline="")
f_out = open("output.csv", "w", newline="")
reader = csv.reader(f_in)
writer = csv.writer(f_out)
writer.writerow(next(reader)) # transfer header, if you have one
for row in reader:
if meets_my_condition(row):
writer.writerow(row)
f_in.close()
f_out.close()
对于简单的行过滤,这与在 Python 中实现的速度和内存效率一样快:读取器一次迭代一行,因此一次内存中不超过一行时间;文件读/写会被缓冲,因此 IO 瓶颈会在系统和 Python 允许的范围内尽可能低。与此相比,任何其他框架/库(Dask 等)可能会带来一些性能开销;和 never use Pandas to iterate rows .
根据我对您的屏幕截图和描述的理解,meets_my_condition
需要检查第2列中的值是否在某个时间范围内;可能看起来像:
def meets_my_condition(row: list[str]) -> bool:
return row[1] >= "00:00:00" and row[1] <= "12:00:00"
鉴于此输入 CSV:
Col1,Col2
a,01:00
b,10:00
c,13:00
d,07:00
e,23:00
f,00:00
我得到这个输出 CSV:
Col1,Col2
a,01:00
b,10:00
d,07:00
f,00:00
我之前已经规范过类似的操作,并且内存几乎不超过我笔记本电脑上 Python 的最小占用量 7-8MB,因此有了 16GB,您将拥有足够的空间。
关于python - 在Python中读取巨大的csv文件、处理然后写入处理后的csv的最快方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75413294/