python - 使用 TensorFlow 的 celery worker 没有响应

标签 python tensorflow celery

我正在尝试在 celery worker 中使用 tensorflow。我遇到超时而不是收到工作人员的响应。

我使用了以下代码:

任务.py

from celery import Celery
from celery.signals import worker_init

import tensorflow as tf

app = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')

class TFModel():
  def __init__(self):
    self.sess = tf.Session()
  def run(self):
    return self.sess.run(tf.constant('hello'))

tf_model = None

@worker_init.connect
def on_worker_init(**_):
  global tf_model
  tf_model = TFModel()
  print(tf_model.run())
  return

@app.task(time_limit=10)
def run():
  return tf_model.run()

测试.py

import time
from tasks import run

r=run.delay()
while not r.ready():
  time.sleep(2)

print(r.get())

我用这个命令执行了一个worker。

$ celery -A tasks worker -l info -c 1

当我执行 worker 时,hello 被打印出来,因为 on_worker_init()print(tf_model.run())。 这意味着 tensorflow 可以正常工作。

然后,我跑了:

$ python 测试.py

然后,我得到:

celery.backends.base.TimeLimitExceeded: TimeLimitExceeded(10,)

怎么了? 我该如何调查发生的事情?

我的环境是:

python 3.5.1
tensorflow 0.11.0
celery 4.0.2

谢谢。

最佳答案

试试这个:

import tensorflow as tf
from celery import Celery
from celery.utils.log import get_task_logger
from celery.signals import worker_init, worker_process_init
from models import Network, Extractor
from celery.concurrency import asynpool
asynpool.PROC_ALIVE_TIMEOUT = 100.0 #set this long enough

logger = get_task_logger(__name__)

CELERY_BROKER_URL = 'redis://localhost:6379/'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/'

# Celery: Distributed Task Queue
app = Celery('tasks', backend=CELERY_RESULT_BACKEND, broker=CELERY_BROKER_URL)
app.conf.task_serializer   = 'json'
app.conf.result_serializer = 'json'

tf_model = None

@worker_process_init.connect()
def on_worker_init(**_):
  global tf_model
  # Create server with model
  logger.info('model for worker: started init')
  print("model for dsa")
  session = tf.Session()
  model = Network(session, True)
  #model.load_model('./models/test_2')
  extractor = Extractor(model)
  tf_model = extractor
  logger.info('model for worker: initialized')


@app.task(name='process_single')
def process_single(image):
    logger.info('process_single: started')
    descriptor = tf_model.process_single(image)
    logger.info('process_single: completed')

    return descriptor

我认为这是有效的:

[2017-01-21 09:41:18,892: INFO/Worker-1] ???[???]: model for worker: started init
[2017-01-21 09:41:18,893: WARNING/Worker-1] model for dsa
[2017-01-21 09:41:18,902: INFO/MainProcess] Connected to redis://localhost:6379//
[2017-01-21 09:41:18,915: INFO/MainProcess] mingle: searching for neighbors
[2017-01-21 09:41:19,920: INFO/MainProcess] mingle: all alone
[2017-01-21 09:41:19,949: WARNING/MainProcess] celery@cospelpc ready.
[2017-01-21 09:41:20,930: INFO/Worker-1] ???[???]: model for worker: initialized
[2017-01-21 09:41:31,648: INFO/MainProcess] Received task: process_single[024068ba-9ea2-4405-8aab-d3504a06aa55]
[2017-01-21 09:41:31,658: INFO/Worker-1] process_single[024068ba-9ea2-4405-8aab-d3504a06aa55]: process_single: started
[2017-01-21 09:41:33,125: INFO/Worker-1] process_single[024068ba-9ea2-4405-8aab-d3504a06aa55]: process_single: completed
[2017-01-21 09:41:33,128: INFO/MainProcess] Task process_single[024068ba-9ea2-4405-8aab-d3504a06aa55] succeeded in 1.470330449s: [153608.4375, 0.0, 0.0, 243285.75, 0.0, 155679.671875, 346120.625, 70663.265625, 0.0, 29445.03125, 0.0, 518396.25, 0.0,...

关于python - 使用 TensorFlow 的 celery worker 没有响应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41734275/

相关文章:

python - 路径错误 ImportError : attempted relative import with no known parent package

python - 在 Django 中完成 celery 任务时更新 UI 的最佳方法是什么?

java - 没有celery如何回复celery任务?

python - 查找连续七个 "7"的所有 10 位素数 - Python

python - 编译 libjingle 的问题

machine-learning - LSTM 训练模式

regression - 理解 tf.keras 中线性回归模型调整的问题

python - Keras 中的 tensorflow session 在哪里

python - Python中RK4算法错误

python - 使用 Unicode 将 CSV 转换为 YAML?