python - 将大型 csv 文件读入字典时出现内存错误

标签 python python-3.x large-files finance bigdata

我是一名研究市场微观结构的博士生。我需要处理非常大的数据集(数百 GB 的毫秒数据)。我一直在使用 SAS,它非常适合处理数据帧格式的大数据。然而,这是昂贵的。我想使用 Python 进行学习/研究。 我有一些 Python 技能,但不是高级技能。我听说过 Pandas,它在处理数据帧方面非常有效,但它仅限于 RAM,这对我的目的来说不太好。

我尝试过的: 我试图逐行迭代数据,处理它们并存储到字典中,但这有内存限制。 我遇到内存错误,我可以看到 Python 耗尽了所有 RAM(我有 32gb)。与我稍后要处理的数据集 (50~100 gb) 相比,这个数据集仍然很小 (500 mb)。此外,有些事情很难逐行完成,例如回归、图表等。 所以我的问题是我应该如何处理和存储这些数据?

输入数据如下所示:

#RIC    Date[L]     Time[L] Type    Price   Volume  Bid Price Ask Price
TPI.AX  20140820    00:11.7 Quote                             0.91
TPI.AX  20140820    00:11.7 Trade   0.91    10000       
TPI.AX  20140820    00:21.5 Quote                             0.91
TPI.AX  20140820    00:22.1 Quote                   0.905   
TPI.AX  20140820    00:42.2 Quote                   0.905   
TPI.AX  20140820    00:42.6 Trade   0.9075  117     
TPI.AX  20140820    00:43.1 Trade   0.9075  495     
TPI.AX  20140820    00:49.6 Quote                   0.905   
TPI.AX  20140820    00:57.6 Quote                   0.905   
TPI.AX  20140820    00:57.6 Quote                   0.905   
TPI.AX  20140820    00:58.3 Quote                   0.905   
TPI.AX  20140820    01:02.6 Quote                             0.91
TPI.AX  20140820    01:02.6 Quote                             0.91
TPI.AX  20140820    01:02.6 Quote                   0.905   
TPI.AX  20140820    01:02.6 Trade   0.91    9365        
TPI.AX  20140820    01:02.6 Trade   0.91    9041        

这些是我的代码:

def spread_calculation(input_file_list, output_file):
    """This function calculates the spreads for securities in input_file_list
    input: trade and quote data from TRTH
    2 parameters: 1. list of file names, 2.output file name
    output: csv file contains spreads"""
    # Set variables:
    date = None
    exchange_bbo = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(float)))))
    effective_spread = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(float)))))
    time_bucket = [i * 100000.0 for i in range(0, (16 * 60 * 60 * 1000) * 1000 / 100000)]
    for file in input_file_list:
        file_to_open = '%s.csv' % file
        reader = csv.DictReader(open(file_to_open, 'rb'))
        for i in reader:
            if not bool(date):
                date = i['Date[L]'][0:4] + "-" + i['Date[L]'][4:6] + "-" + i['Date[L]'][6:8]
            if i['Type'] == 'Quote' and (time_to_milli(i['Time[L]']) <= (16*60*60*1000)*1000):
                security = i['#RIC'].split('.')[0]
                exchange = i['#RIC'].split('.')[1]
                timestamp = float(time_to_milli(i['Time[L]']))
                bucket = ceil(float(time_to_milli(i['Time[L]'])) / 100000.0) * 100000.0
                if i['Bid Price'] == "":
                    bid = 0.0
                else:
                    bid = float(i['Bid Price'])
                if i['Ask Price'] == "":
                    ask = 0.0
                else:
                    ask = float(i['Ask Price'])
                if bid < ask < 199999.99:
                    if not bool(exchange_bbo[security][exchange][date][bucket]['ask']):
                        exchange_bbo[security][exchange][date][bucket]['ask'] = ask
                        exchange_bbo[security][exchange][date][bucket]['diff_ask'] = bucket - timestamp
                    elif exchange_bbo[security][exchange][date][bucket]['diff_ask'] > bucket - timestamp:
                        exchange_bbo[security][exchange][date][bucket]['ask'] = ask
                        exchange_bbo[security][exchange][date][bucket]['diff_ask'] = bucket - timestamp
                    if not bool(exchange_bbo[security][exchange][date][bucket]['bid']):
                        exchange_bbo[security][exchange][date][bucket]['bid'] = bid
                        exchange_bbo[security][exchange][date][bucket]['diff_bid'] = bucket - timestamp
                    elif exchange_bbo[security][exchange][date][bucket]['diff_bid'] > bucket - timestamp:
                        exchange_bbo[security][exchange][date][bucket]['bid'] = bid
                        exchange_bbo[security][exchange][date][bucket]['diff_bid'] = bucket - timestamp
            if i['Type'] == 'Trade' and i['Price'] != "" and i['Price'] != 0.0:
                timestamp = float(time_to_milli(i['Time[L]']))
                bucket = ceil(float(time_to_milli(i['Time[L]'])) / 100000.0) * 100000.0
                security = i['#RIC'].split('.')[0]
                exchange = i['#RIC'].split('.')[1]
                price = float(i['Price'])
                volume= float(i['Volume'])
                if not bool(exchange_bbo[security][exchange][date][bucket]['price']):
                    exchange_bbo[security][exchange][date][bucket]['price'] = price
                    exchange_bbo[security][exchange][date][bucket]['volume'] = volume
                    exchange_bbo[security][exchange][date][bucket]['time_diff'] = bucket - timestamp
                elif exchange_bbo[security][exchange][date][bucket]['time_diff'] > bucket - timestamp and price != 0.0:
                    exchange_bbo[security][exchange][date][bucket]['price'] = price
                    exchange_bbo[security][exchange][date][bucket]['volume'] = volume
                    exchange_bbo[security][exchange][date][bucket]['time_diff'] = bucket - timestamp

        # Fill the empty buckets - exchange level
        for security in exchange_bbo:
            for exchange in exchange_bbo[security]:
                for date in exchange_bbo[security][exchange]:
                    for bucket in time_bucket:
                        previous = bucket - 100000.0
                        # best offer
                        bo_t = exchange_bbo[security][exchange][date][bucket]['ask']
                        bo_t1 = exchange_bbo[security][exchange][date][previous]['ask']
                        if bo_t == 0.0 and bo_t1 != 0.0:
                            exchange_bbo[security][exchange][date][bucket]['ask'] = bo_t1
                        # best bid
                        bb_t = exchange_bbo[security][exchange][date][bucket]['bid']
                        bb_t1 = exchange_bbo[security][exchange][date][previous]['bid']
                        if bb_t == 0.0 and bb_t1 != 0.0:
                            exchange_bbo[security][exchange][date][bucket]['bid'] = bb_t1

        for security in exchange_bbo:
            for exchange in exchange_bbo[security]:
                for date in exchange_bbo[security][exchange]:
                    for bucket in exchange_bbo[security][exchange][date]:
                        if not bool(exchange_bbo[security][exchange][date][bucket]['price']):
                            nbo = exchange_bbo[security][exchange][date][bucket]['ask']
                            nbb = exchange_bbo[security][exchange][date][bucket]['bid']
                            midpoint = (nbo + nbb) / 2.0
                            price = exchange_bbo[security][exchange][date][bucket]['price']
                            volume= exchange_bbo[security][exchange][date][bucket]['volume']
                            # print security, exchange, bucket, price, midpoint
                            if price > 0.0 and midpoint != 0.0:
                                effective_spread[security][exchange][date][bucket]['espread_bps'] = 2.0 * abs(price - midpoint)/midpoint
                                effective_spread[security][exchange][date][bucket]['volume']=volume
                                effective_spread[security][exchange][date]['count'] += 1.0

        data_writer = csv.DictWriter(open(output_file, 'wb'),
                                     fieldnames=['security', 'exchange', 'date', 'bucket' 'espread_bps', 'volume', 'count'])

        data_writer.writeheader()

        for security in effective_spread:
            for exchange in effective_spread[security]:
                for date in effective_spread[security][exchange]:
                    for bucket in effective_spread[security][exchange][date]:
                        espread_bps = effective_spread[security][exchange][date][bucket]['espread_bps']
                        volume = effective_spread[security][exchange][date][bucket]['volume']
                        count = effective_spread[security][exchange][date][bucket]['count']
                        data_writer.writerow({'security': security, 'exchange': exchange, 'date': date, 'bucket': bucket,
                                              'espread_bps': espread_bps, 'volume': volume, 'count': count})

input_files = ['ScandinavianTAQ']

非常感谢

最佳答案

100 GB 并不是那么多数据。 SQL 数据库和 Pandas 应该就是您所需要的。您需要学习如何编写 SQL 查询,我建议您获取 Wes McKinney 的 book 的副本。 .我没有看过您的代码,但在我看来,最大的问题是您逐行执行所有操作,而不是对操作进行分组。
另外,查看 Dask

关于python - 将大型 csv 文件读入字典时出现内存错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40683252/

相关文章:

python - SQLAlchemy 关系加入多个外键

python - blender - 如何为对象添加颜色?

python - 如何将两个或多个计数器合并到一个字典中?

python - 带有 lambda 函数字典的 python 中的奇怪行为

python3 http.client ssl证书错误

Powershell:如何从非常大的文件中流式传输、分页文本?

php - 在 PHP 中流式解析 4 GB XML 文件

python - 如何使用 Pandas 在 csv 中查找缺失的行?

python - 迭代计数器 Python3x

ruby - Sax 从 S3 解析大文件