python - 使用 celery 提交任务的最快方法?

标签 python celery

我尝试使用以下代码向 celery 提交大约 1.5 亿个作业:

from celery import chain

from .task_receiver import do_work,handle_results,get_url
urls = '/home/ubuntu/celery_main/urls'

if __name__ == '__main__':
    fh = open(urls,'r')
    alldat = fh.readlines()
    fh.close()
    for line in alldat:
        try:
            result = chain(get_url.s(line[:-1]),do_work.s(line[:-1])).apply_async()
        except:
            print ("failed to submit job")
        print('task submitted ' + str(line[:-1]))

将文件分割成 block 并运行此代码的多个实例会更快吗?或者我能做什么?我使用 memcached 作为后端,rabbitmq 作为代理。

最佳答案

import multiprocessing

from celery import chain

from .task_receiver import do_work,handle_results,get_url
urls = '/home/ubuntu/celery_main/urls'
num_workers = 200

def worker(urls,id):
    """worker function"""
    for url in urls:
        print ("%s - %s" % (id,url))
        result = chain(get_url.s(url),do_work.s(url)).apply_async() 
    return

if __name__ == '__main__':
    fh = open(urls,'r')
    alldat = fh.readlines()
    fh.close()
    jobs = []
    stack = []
    id = 0
    for i in alldat:
        if (len(stack) < len(alldat) / num_workers):
           stack.append(i[:-1])
           continue
        else:
            id = id + 1
            p = multiprocessing.Process(target=worker, args=(stack,id,))
            jobs.append(p)
            p.start()
            stack = []

    for j in jobs:
        j.join()

关于python - 使用 celery 提交任务的最快方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57596667/

相关文章:

python - 关于 MySQLdb conn.autocommit(True)

python - 如何抑制 IPython 启动消息?

python - Celery - 有顺序任务而不是并发?

celery 有效,但与花无效

docker - 如何为 docker-compose celery 命令配置 pycharm 调试?

apache-kafka - 是否可以将 celery 与 Kafka 集成

django - 使用 django celery 任务处理大文件

python - 将 Google Picasa API 与 Python 结合使用

python - 我怎样才能找到字典中值之间的所有差异?

python - 如何包装 numpy 数组类型?