python - 使用多处理模块结束守护进程

标签 python sqlalchemy multiprocessing

我在下面包含了多处理的示例用法。这是一个进程池模型。它并不像它可能的那么简单,但在结构上与我实际使用的代码比较接近。它还使用 sqlalchemy,抱歉。

我的问题是 - 我目前的情况是我有一个运行时间相对较长的 Python 脚本,它正在执行许多函数,每个函数看起来都像下面的代码,因此父进程在所有情况下都是相同的。换句话说,多个池是由一个 python 脚本创建的。 (我想我不必这样做,但替代方案是使用类似 os.system 和 subprocess 的东西。)问题是这些进程徘徊并保留内存。文档说这些守护进程应该一直存在直到父进程退出,但是如果父进程继续生成另一个池或进程并且不会立即退出怎么办。

调用 terminate() 有效,但这似乎不太礼貌。有没有一种好方法可以让进程很好地终止? IE。自己清理一下,现在就走吧,我需要启动下一个游泳池?

我还尝试在进程上调用 join()。根据文档,这意味着等待进程终止。如果他们不打算终止怎么办?实际发生的是进程挂起。

提前致谢。

问候,法希姆。

import multiprocessing, time

class Worker(multiprocessing.Process):
    """Process executing tasks from a given tasks queue"""
    def __init__(self, queue, num):
        multiprocessing.Process.__init__(self)
        self.num = num
        self.queue = queue
        self.daemon = True

    def run(self):
        import traceback
        while True:
            func, args, kargs = self.queue.get()
            try:
                print "trying %s with args %s"%(func.__name__, args)
                func(*args, **kargs)
            except:
                traceback.print_exc()
            self.queue.task_done()

class ProcessPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.queue = multiprocessing.JoinableQueue()
        self.workerlist = []
        self.num = num_threads
        for i in range(num_threads):
            self.workerlist.append(Worker(self.queue, i))

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.queue.put((func, args, kargs))

    def start(self):
        for w in self.workerlist:
            w.start()

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.queue.join()
        for worker in self.workerlist:
            print worker.__dict__
            #worker.terminate()        <--- terminate used here  
            worker.join()              <--- join used here

start = time.time()

from sqlalchemy import *
from sqlalchemy.orm import *

dbuser = ''
password = ''
dbname = ''
dbstring = "postgres://%s:%s@localhost:5432/%s"%(dbuser, password, dbname)
db = create_engine(dbstring, echo=True)
m = MetaData(db)

def make_foo(i):
    t1 = Table('foo%s'%i, m, Column('a', Integer, primary_key=True))

conn = db.connect()
for i in range(10):
    conn.execute("DROP TABLE IF EXISTS foo%s"%i)
conn.close()

for i in range(10):
    make_foo(i)

m.create_all()

def do(i, dbstring):
    dbstring = "postgres://%s:%s@localhost:5432/%s"%(dbuser, password, dbname)
    db = create_engine(dbstring, echo=True)
    Session = scoped_session(sessionmaker())
    Session.configure(bind=db)
    Session.execute("ALTER TABLE foo%s SET ( autovacuum_enabled = false );"%i)
    Session.execute("ALTER TABLE foo%s SET ( autovacuum_enabled = true );"%i)
    Session.commit()

pool = ProcessPool(5)
for i in range(10):
    pool.add_task(do, i, dbstring)
pool.start()
pool.wait_completion()

最佳答案

我的处理方式是:

import multiprocessing

for prc in multiprocessing.active_children():
    prc.terminate()

我更喜欢这个,所以我不必用一些 if 子句污染辅助函数。

关于python - 使用多处理模块结束守护进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4792473/

相关文章:

mysqldb 上的 Python 多处理

python - 在 mypy 中使用 python multiprocessing.Lock 作为参数类型

python - Python中的二分搜索(二分法)

python - python中这个递归问题的解释

python - 将 N by N Dataframe 转换为 3 Column Dataframe

python - 在 Pyramid 中存储和验证用于登录的加密密码

Python 多处理 : Crash in subprocess?

python 从二进制文件中读取 16 个字节长的 double

python - 如何使用 Flask-Cache 和 Redis 缓存 SQL Alchemy 调用?

python - 在 SQLAlchemy 中使用正确的文件结构以及如何将数据添加到 db