python - 为什么我的消费者不消费?

标签 python multiprocessing python-multithreading

编辑:我已查明问题出在我下载 Zip 文件并解析它的程序部分。如果我将其注释掉并用默认行替换它,它会毫无问题地解析 10,000 次。

不确定应该编辑多少这个问题以反射(reflect)该发现。

我编写了一个 python 程序,它下载一个 zip 文件,其中包含一个大约 10,000 行的日志文件。然后逐行解析该文件,并将数据存入数据库。

最终我的脚本将运行 200 个服务器/zip 文件并处理大约 100,000 行。 (不是所有的服务器都有需要的文件)

但是,目前当我用 1 个消费者运行脚本时,我只将大约 13 行处理到数据库中。如果我运行 2 个消费者,我得到 24 个。如果我运行 10 个消费者,我得到 100 个,如果我运行 20 个消费者,我得到 240 个。

有时,运行脚本的结果是“Consumer Finished”,其中包含数据库中的条目数(远远低于我预期的 10K-30K),但其他时候,我收到一条错误消息:

> Traceback (most recent call last):   File
> "C:\Python27\lib\multiprocessing\queues.py", line 262, in _feed
>     send(obj) IOError: [Errno 232] The pipe is being closed

是什么导致了这个问题?附件是我的代码的修改版本,用于删除敏感数据:

import urllib, urlparse
import sys
import os
import datetime
from calculon import Calculon
import random
import pprint
import time
import random
import urllib, urlparse
import traceback
import psycopg2
import psycopg2.extras
from datetime import date, datetime, time, timedelta
import os.path
import requests
import io
import urllib2, cStringIO, zipfile
import re
import httplib
import urlparse

def daterange(start_date, end_date):
  for n in range(int((end_date - start_date).days)):
    yield start_date + timedelta(n)



def producer(args):
  print "Producing!"
  logdt_start = args["logdt_start"]
  logdt_end = args["logdt_end"]
  for single_date in daterange(logdt_start, logdt_end):
    logdt = single_date + timedelta(days=1)
    print "Reading log file..."
    for x in range(1,2):
      servername = "server-{0}".format("%02d" % (x,))
      filename = "zipped_log.log{0}".format(logdt.date().isoformat())
      url = "http://url.to.zip.file/{0}/{1}".format(servername, filename)
      zip_path = 'path/to/file/within/zip/{0}/{1}'.format(servername, filename)
      if httpExists(url):
        try:
           request = urllib2.urlopen(url)
           zipinmemory =  cStringIO.StringIO(request.read())
             with zipfile.ZipFile(zipinmemory) as archive:
               with archive.open(zip_path) as log:
                 print "File Found! Reading %s..." % filename
                 for line in log:
                  args["_queue"].put(line)

          print "Queue has approximatly {0} items".format(args["_queue"].qsize())

        except:
          print "exception could not load %s" % url
          traceback.print_exc()
    return True


def httpExists(url):
    host, path = urlparse.urlsplit(url)[1:3]
    found = 0
    try:
        connection = httplib.HTTPConnection(host)  ## Make HTTPConnection Object
        connection.request("HEAD", path)
        responseOb = connection.getresponse()      ## Grab HTTPResponse Object

        if responseOb.status == 200:
            found = 1
        #else:
            #print "Status %d %s : %s" % (responseOb.status, responseOb.reason, url)
    except Exception, e:
        print e.__class__,  e, url
    return found


def parse_log(line):
    if len(line) < 10 or line[0] != '[':
        return {}
    mod_line = line
    mod_line = mod_line.replace('  ', ' ') #whats this for?
    query_dict = {}
    match = re.search('([\d:\/\s]+)\sUTC', mod_line)
    s = match.start()
    e = match.end() - 5
    query_dict['date_ts'] = datetime.strptime(mod_line[s:e], '%d/%m/%Y %H:%M:%S:%f')
    e = e+2
    mod_line = mod_line[e:]
    match = re.search('(\w+)\sLogger:\s', mod_line)
    e = match.end()
    query_dict['status'] = match.group(1)
    mod_line = mod_line[e:]
    for key_value in re.split(',', mod_line):
        keypair = re.search('(\w+)=(\w+)', key_value)
        key = keypair.group(1)
        value = keypair.group(2)
        query_dict[key] = value
    return query_dict

def consumer(args):
  global consumed
  consumed += 1
  print "Consumed : {0}".format(consumed)
  try:
    db = args["db"]
    cname = args["cname"]
    arg_value = args["_value"]

    cur = db.cursor()
    error_count = 0


    if arg_value is None:
        print "Consumer Finished!"
        return False
    line = arg_value
    qdict = parse_log(line)

    if len(qdict) == 0:
        print "No data to consumer %s" % cname
        return None

    query = """
    INSERT INTO my_db(date_ts,
        status, cmd, creativeString, environment_id, client_type_id, platform_id, sn_type_id, user_id,
        device_id, invoker_sn_id, invoker_type, poster_sn_id, origin, event_type, creative_id, ch,
         src, afp, cmp, p1, p2,p3)
    VALUES (%(date_ts)s,%(status)s,%(cmd)s,%(creativeString)s,%(environment_id)s,%(client_type_id)s,%(platform_id)s,
    %(sn_type_id)s,%(user_id)s,%(device_id)s,%(invoker_sn_id)s,%(invoker_type)s,%(poster_sn_id)s,%(origin)s,
    %(event_type)s,%(creative_id)s,%(ch)s, %(src)s, %(afp)s, %(cmp)s,
    %(p1)s, %(p2)s, %(p3)s);
    """

    try:
      cur.execute(cur.mogrify(query, qdict))
      db.commit()
      global processed
      processed += 1
      print "processed : {0}".format(processed)
    except:
      error_count = error_count + 1
      print "ERROR in insert {0}".format(error_count)
      traceback.print_exc()
      print qdict
      sys.exit(2)   
  except:
      print "Error in parsing:  "  + val
      tracback.print_exc()
      sys.exit(12)

def main():
  log_start = datetime(2015,1,19);
  log_end = datetime(2015,1,20);
  consumer_args_list = []

  noOfConsumers = 1;
  for x in range(0, noOfConsumers):
    print "Creating Consumer {0}".format(x)
    print "Connecting to logs db..."
    db_name = 'mydb'
    connString = "dbname={0} host={1} port={2} user={3} password={4}".format(db_name, 'localhost', 5433, 'postgres',                                                                       'pword')
    db = psycopg2.connect(connString)
    consumer_args = {"cname": "CONSUMER_{0}".format(x), "db":db}
    consumer_args_list.append(consumer_args)

  calculon = Calculon( producer, 
        [{"logdt_start": log_start,
          "logdt_end": log_end}],
            True,
            consumer,
            consumer_args_list,
            True)
  result = calculon.start()

consumed = 0
processed = 0
if __name__ == "__main__":
  main()

输出看起来像这样:

> Creating Consumer 0
Connecting to logs db...
Producing!
Reading log file...
File Found! Reading log2015-01-20...
Queue has approximatly 9549 items
Consumed : 1
processed : 1
Consumed : 2
processed : 2
Consumed : 3
processed : 3
Consumed : 4
processed : 4
Consumed : 5
processed : 5
Consumed : 6
processed : 6
Consumed : 7
processed : 7
Consumed : 8
processed : 8
Consumed : 9
processed : 9
Consumed : 10
processed : 10
Consumed : 11
processed : 11
Consumed : 12
processed : 12
Consumed : 13
Traceback (most recent call last):
  File "C:\Python27\lib\multiprocessing\queues.py", line 262, in _feed
    send(obj)
IOError: [Errno 232] The pipe is being closed

最佳答案

错误原来是输入文件中的错误行,它破坏了正则表达式。

例如:逗号分隔列表的值之一是:foobar=2, foo=Something here, is ,a really, poor value, bar=2

我能够通过在消费者方法中添加以下代码来解决问题:

    try:
      qdict = parse_adx_client_log(line)
    except:
      qdict = {}
      print "BAD LINE {0}".format(line)

    if len(qdict) == 0:
       print "No data to consumer %s" % cname
       return None

关于python - 为什么我的消费者不消费?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28172170/

相关文章:

python-3.x - GC如何通过两个线程收集Class引用

python - 什么是 Python 中的原子操作?

python - 生成所有唯一的三元组

python - 如何使用 python 将 yaml 文件作为字典读取并更新值

Python 2.7 ProcessPoolExecutor 抛出 IOError : [Errno 32] Broken pipe

python - 将 BeautifulSoup 与多处理池映射一起使用时的递归深度错误

GitHub Actions - 同一台机器上的并行自托管运行器

python - 在多线程中使用 Tornado 时出现python3.6错误

python - 仅使用静态方法调用继承类的构造函数

python - 每 x 秒调用一次函数 (Python)