我尝试使用以下代码向 celery 提交大约 1.5 亿个作业:
from celery import chain
from .task_receiver import do_work,handle_results,get_url
urls = '/home/ubuntu/celery_main/urls'
if __name__ == '__main__':
fh = open(urls,'r')
alldat = fh.readlines()
fh.close()
for line in alldat:
try:
result = chain(get_url.s(line[:-1]),do_work.s(line[:-1])).apply_async()
except:
print ("failed to submit job")
print('task submitted ' + str(line[:-1]))
将文件分割成 block 并运行此代码的多个实例会更快吗?或者我能做什么?我使用 memcached 作为后端,rabbitmq 作为代理。
最佳答案
import multiprocessing
from celery import chain
from .task_receiver import do_work,handle_results,get_url
urls = '/home/ubuntu/celery_main/urls'
num_workers = 200
def worker(urls,id):
"""worker function"""
for url in urls:
print ("%s - %s" % (id,url))
result = chain(get_url.s(url),do_work.s(url)).apply_async()
return
if __name__ == '__main__':
fh = open(urls,'r')
alldat = fh.readlines()
fh.close()
jobs = []
stack = []
id = 0
for i in alldat:
if (len(stack) < len(alldat) / num_workers):
stack.append(i[:-1])
continue
else:
id = id + 1
p = multiprocessing.Process(target=worker, args=(stack,id,))
jobs.append(p)
p.start()
stack = []
for j in jobs:
j.join()
关于python - 使用 celery 提交任务的最快方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57596667/