python - 导出时使用大JSON会导致内存问题

标签 python json mongodb api elasticsearch

问题:

我有一个API,可以从多个 flex 索引中获取数据,并将其组合为一个JSON记录,并在调用该API时返回。从API提取的结果通常也很大。

因此,iam有一个包装器脚本,可以每天从API获取所有数据。但是在我的代码中,iam有一个名为results的数组,当当天的数据较少时,Iam不会出现问题。但是,当当天获取的数据量巨大时,整个阵列都位于RAM中,从而导致系统速度变慢。

创建此数组的主要目的是在一个mongo中导出该mongo,该mongo可在另一个网络中直接从我的网络复制。

代码段:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import division, print_function, absolute_import
import argparse
import sys
import logging
import MySQLdb
import requests
import json
import time



_logger = logging.getLogger(__name__)


def get_samples(date,end):
    """
    Get Samples hashes form Database

    :param date: date of sample arrival
    :return list_of_hashes
    """
    try:
        results = []
        cur_time = time.time()
        with open('config.json','r') as c:
            config = json.load(c)
        _logger.info('Entering into database {}'.format(date))
        connection = MySQLdb.connect(config['malware_mysql'],"root","root","meta")
        cursor = connection.cursor()
        cursor.execute("SELECT MD5 from some where `Last_Seen` BETWEEN '{} 00:00:00' AND '{} 23:59:59'".format(date,end))
        hashes = cursor.fetchall()
        for hash in hashes:
            _logger.info('Hash {}'.format(hash[0]))
            try:
                response = requests.get('http://{}:{}/some/{}'.format(config['a'],config['b'],hash[0]))
                _logger.info('Result from API {}'.format(response))
                if response.status_code == 200:
                    results.append(json.loads(response.text))
                else:
                    _logger.error('Error in Querying API {} for hash {}'.format(response.status_code,hash))
            except Exception as e:
                _logger.error('Error in querying database {} - {}'.format(hash,e))
        connection.close()
        with open('{}_{}.json'.format(date,end),'w') as f:
            f.write(json.dumps(results))
    except KeyboardInterrupt:
        print('Bye')
    except Exception as e:
        _logger.error('Error in querying database final {}'.format(e))
    return '{} completed'.format(date)


def parse_args(args):
    """
    Parse command line parameters

    :param args: command line parameters as list of strings
    :return: command line parameters as :obj:`airgparse.Namespace`
    """
    parser = argparse.ArgumentParser(
        description="Enter date to Export")
    parser.add_argument(
        dest="date",
        help="Date of Sample Arrival in format 2018-08-16",
        )
    parser.add_argument(
        dest="end",
        help="Date of Sample Arrival end",
        )
    return parser.parse_args(args)


def main(args):
    args = parse_args(args)
    print("{} Samples are quiered -- {}".format(args.date, get_samples(args.date,args.end)))
    _logger.info("Script ends here")


def run():
    logging.basicConfig(level=logging.INFO, stream=sys.stdout)
    main(sys.argv[1:])


if __name__ == "__main__":
    run()

为什么我要这样做?
我想从API导出一整天的记录,然后使用mongoimport将JSON文件传输到mongo。

我需要什么?
防止整个阵列位于RAM中并导致系统变慢的替代解决方案。其他解决方案使解决方案更加有效。

最佳答案

从我收集的数据来看,您无法直接连接到Mongo数据库,对吗?您可以在本地启动MongoDB吗?这样,您可以使用Mongo Python库在获取结果时保存结果,使用 mongoexport 将其提取为JSON文件,然后将其导入最终数据库中?

现在回到您的问题,这里有一些建议:

  • 一旦获得所需的信息,就立即关闭与MySQL connection.close()的连接,因此在hashes = cursor.fetchall()后面紧接
  • json.loads(response.text)在更好的API下也被称为response.json()
  • 您可以直接写入文件
  • ,而不是附加到内存中的results列表。

    将所有内容放在一起,无需键盘中断处理,仅更改get_samples函数:
    def get_samples(date, end):
        with open('{}_{}.json'.format(date, end), 'w') as out_file:
            out_file.write('[\n')
            with open('config.json','r') as c:
                config = json.load(c)
            _logger.info('Entering into database {}'.format(date))
            connection = MySQLdb.connect(config['malware_mysql'],"root","root","meta")
            cursor = connection.cursor()
            cursor.execute(
                "SELECT MD5 from some where `Last_Seen` BETWEEN '{} 00:00:00' AND '{} 23:59:59'".format(date, end)
            )
            hashes = cursor.fetchall()
            connection.close()
            for hash in hashes:
                _logger.info('Hash {}'.format(hash[0]))
                try:
                    response = requests.get('http://{}:{}/some/{}'.format(config['a'],config['b'],hash[0]))
                    _logger.info('Result from API {}'.format(response))
                    if response.status_code == 200:
                        out_file.write(response.json() + ',\n')
                    else:
                        _logger.error('Error in Querying API {} for hash {}'.format(response.status_code,hash))
                except Exception as e:
                    _logger.error('Error in querying database {} - {}'.format(hash,e))
            out_file.write(']\n')
    

    我没有尝试过此代码,因此某处可能存在语法错误。希望这应该使您足够接近。

    如果内存使用率仍然很高,请注意,请求库具有a streaming mode,这可能会进一步帮助您。

    关于python - 导出时使用大JSON会导致内存问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52918612/

    相关文章:

    node.js - Mongoose 如何在 update/findOneAndUpdate 时验证类型和值

    python - 对于 Matplotlib 图,如何隐藏刻度线但保留刻度标签?

    python - 在 python 中获取 keyerror

    javascript - 使用 Jquery 获取 HTML 表格数据

    java - 如何在 Java 中访问 JSON 数据的嵌套元素?

    node.js - 服务器位于 10.70.152.26 :27017 reports maximum wire version 4, 但此版本的 Node.js 驱动程序至少需要 6 个(MongoDB 3.6)

    python - Pandas 无法读取用 h5py 创建的 hdf5 文件

    Python,为什么这个 lambda 函数不正确?

    javascript - 在 CakePHP 中保存数据,并同时通过 AJAX 发布 JSON?

    c# - Mongodb C# 驱动程序 : view MQL bson query generated from linq