python - 如何有效地将数百万条记录从大型压缩 csv 文件插入到 mongo DB 中?

标签 python mongodb csv zip

我正在尝试向 Mongo 中插入大约 800 万条记录,它似乎以每秒 1000 条记录的速度插入它们,这非常慢。

代码是用python写的,所以可能是python的问题,但我对此表示怀疑。这是代码:

def str2datetime(str):
  return None if (not str or str == r'\N') else datetime.strptime(str, '%Y-%m-%d %H:%M:%S')
def str2bool(str):
  return None if (not str or str == r'\N') else (False if str == '0' else True)
def str2int(str):
  return None if (not str or str == r'\N') else int(str)
def str2float(str):
  return None if (not str or str == r'\N') else float(str)
def str2float2int(str):
  return None if (not str or str == r'\N') else int(float(str) + 0.5)
def str2latin1(str):
  return unicode(str, 'latin-1')

_ = lambda x: x

converters_map = {
  'test_id': str2int,
  'android_device_id': str2int,
  'android_fingerprint': _,
  'test_date': str2datetime,
  'client_ip_address': _,
  'download_kbps': str2int,
  'upload_kbps': str2int,
  'latency': str2int,
  'server_name': _,
  'server_country': _,
  'server_country_code': _,
  'server_latitude': str2float,
  'server_longitude': str2float,
  'client_country': _,
  'client_country_code': _,
  'client_region_name': str2latin1,
  'client_region_code': _,
  'client_city': str2latin1,
  'client_latitude': str2float,
  'client_longitude': str2float,
  'miles_between': str2float2int,
  'connection_type': str2int,
  'isp_name': _,
  'is_isp': str2bool,
  'network_operator_name': _,
  'network_operator': _,
  'brand': _,
  'device': _,
  'hardware': _,
  'build_id': _,
  'manufacturer': _,
  'model': str2latin1,
  'product': _,
  'cdma_cell_id': str2int,
  'gsm_cell_id': str2int,
  'client_ip_id': str2int,
  'user_agent': _,
  'client_net_speed': str2int,
  'iphone_device_id': str2int,
  'carrier_name': _,
  'iso_country_code': _,
  'mobile_country_code': str2int,
  'mobile_network_code': str2int,
  'model': str2latin1,
  'version': _,
  'server_sponsor_name': _,
}

def read_csv_zip(path):
  with ZipFile(path) as z:
    with z.open(z.namelist()[0]) as input:
      r = csv.reader(input)
      header = r.next()
      converters = tuple((title if title != 'test_id' else '_id', converters_map[title]) for title in header)
      for row in r:
        row = {converter[0]:converter[1](value) for converter, value in zip(converters, row)}
        yield row

argv = [x for x in argv if not x == '']
if len(argv) == 1:
  print("Usage: " + argv[0] + " zip-file")
  exit(1)

zip_file = argv[1]
collection_name = zip_file[:zip_file.index('_')]

print("Populating " + collection_name + " with the data from " + zip_file)
with Connection() as connection:
  db = connection.db
  collection = db.__getattr__(collection_name)
  i = 0;
  try:
    start = time()
    for item in read_csv_zip(zip_file):
      i += 1
      if (i % 1000) == 0:
        stdout.write("\r%d " % i)
        stdout.flush()
      try:
        collection.insert(item)
      except Exception as exc:
        print("Failed at the record #{0} (id = {1})".format(i,item['_id']))
        print exc
    print("Elapsed time = {0} seconds, {1} records.".format(time() - start, i))
    raw_input("Press ENTER to exit")
  except Exception as exc:
    print("Failed at the record #{0} (id = {1})".format(i,item['_id']))
    print exc
    exit(1)

插入 262796 条记录(一个 csv 文件)需要 350 秒。

mongo 服务器运行在同一台机器上,但没有人使用它。因此,如果有办法的话,我可以直接写入数据库文件。

我对分片不感兴趣,因为 800 万条记录不应该需要分片,不是吗?

我的问题是我做错了什么?也许我选择的DB是错误的?典型的流程是每月刷新一次记录,然后仅对数据库进行查询。

谢谢。

编辑

事实证明,瓶颈不是mongo,而是读取zip文件。我更改了代码,以 1000 行为一组读取 zip 文件,然后通过一次调用 Collection.insert 将它们提供给 mongo。它是 zip 文件,需要很长时间。下面是修改后的代码:

def insert_documents(collection, source, i, batch_size):
  count = 0;
  while True:
    items = list(itertools.islice(source, batch_size))
    if len(items) == 0:
      break;
    old_i = i
    count += len(items)
    i += len(items)
    if (old_i / 1000) != (i / 1000):
      sys.stdout.write("\r%d " % i)
      sys.stdout.flush()
    try:
      collection.insert(items)
    except Exception as exc:
      print("Failed at some record between #{0} (id = {1}) and #{2} (id = {3})".format(old_i,items[0]['_id'],i,items[-1]['_id']))
      print exc
  return count

def main():
  argv = [x for x in sys.argv if not x == '']
  if len(argv) == 1:
    print("Usage: " + argv[0] + " zip-file")
    exit(1)

  zip_file = argv[1]
  collection_name = zip_file[:zip_file.index('_')]

  print("Populating " + collection_name + " with the data from " + zip_file)
  with Connection() as connection:
    ookla = connection.ookla
    collection = ookla.__getattr__(collection_name)
    i = 0;
    start = time()
    count = insert_documents(collection, read_csv_zip(zip_file), i, 1000)
    i += count
    print("Elapsed time = {0} seconds, {1} records.".format(time() - start, count))
    raw_input("Press ENTER to exit")

if __name__ == "__main__":
  main()

事实证明,大部分时间都花在 items = list(itertools.islice(source, batch_size)) 上。

关于如何改进它有什么想法吗?

最佳答案

尽管您在评论中指出您不能使用 mongoimport,但您可以而且应该使用。日期以及 str2latin 转换都可以完美导入。只需将您的 csv 预处理为与 mongoimport 兼容即可。

将日期转换为 {myDate:{$date: msSinceEpoch}} 并且 mongoimport 会理解它。因此,通过一个预处理步骤,您可以使用 mongoimport 并考虑到您的用例,我不明白为什么这会成为问题。

也就是说 mongoimport 不应该比批量插入快一个数量级,尽管 1000/秒并不慢,但它肯定不符合我在简单的开发机器上获得的性能类型。如果我使用批量插入而不是单声道插入,我可以轻松达到 30k/秒,甚至可能更高,特别是使用 safe=false 写入(在这种情况下应该没问题,因为您可以在导入后作为第二步进行验证)。什么资源是你的瓶颈? (检查 mongostat 和 top)

关于python - 如何有效地将数百万条记录从大型压缩 csv 文件插入到 mongo DB 中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9760242/

相关文章:

python - 在 Alembic 迁移中使用 SQLAlchemy ORM : how do I?

node.js - Mongoose upsert 不创建默认模式属性

java - 如何在 Java/Groovy 中测试 MongoDB 过滤器 (BSON) 的相等性?

node.js - 在 mongoose 的计算中使用项目字段

python - Django:如何禁用模型中的排序

python - 为什么 "getcontext().prec = 2"没有实际设置它,以便 Decimal() 的使用达到两位小数?

java - 进口声明的历史是什么?

python - 根据另一个表中的多个列在一个表中创建一列 [python]

csv - 如何处理自动重复用户删除

arrays - 如何在 Ruby 中将数组转换为另一个数组