flask - 无法为交换路由消息 'reply.celery.pidbox' : Table empty or key no longer exists

标签 flask redis docker-compose celerybeat kombu

我正在尝试使用 docker-compose 生成一些后台 celery beat 进程,但它们不再工作了。我的配置:

docker-compose-dev.yml

worker-periodic:
    image: dev3_web
    restart: always
    volumes:
      - ./services/web:/usr/src/app
      - ./services/web/celery_logs:/usr/src/app/celery_logs

    command: celery beat -A celery_worker.celery --schedule=/tmp/celerybeat-schedule --loglevel=DEBUG --pidfile=/tmp/celerybeat.pid
    environment:
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/0
      - FLASK_ENV=development
      - APP_SETTINGS=project.config.DevelopmentConfig
      - DATABASE_URL=postgres://postgres:postgres@web-db:5432/web_dev 
      - DATABASE_TEST_URL=postgres://postgres:postgres@web-db:5432/web_test
      - SECRET_KEY=my_precious  
    depends_on:
      - web
      - redis
      - web-db
    links:
      - redis:redis
      - web-db:web-db

在我启动容器后,我 $ docker ps 并得到(注意 worker-periodic_1 如何总是在几秒钟前启动):

697322a621d5        dev3_web               "celery worker -A ce…"   24 hours ago        Up 5 minutes                                           dev3_worker-analysis_1
d8e414aa4e5b        dev3_web               "celery worker -A ce…"   24 hours ago        Up 5 minutes                                           dev3_worker-learning_1
ae327266132c        dev3_web               "flower -A celery_wo…"   24 hours ago        Up 5 minutes        0.0.0.0:5555->5555/tcp             dev3_monitor_1
3ccb79e01412        dev3_web               "celery beat -A cele…"   24 hours ago        Up 14 seconds                                          dev3_worker-periodic_1
a50e1276f692        dev3_web               "celery worker -A ce…"   24 hours ago        Up 5 minutes                                           dev3_worker-scraping_1

调用端点时,所有 celery worker 都在工作,除非它是一个 celery beat,周期性地自动化,进程。当我启动容器时,我的日志在 celery_logs/worker_analysis.log 提示:

    [2019-11-16 23:29:20,880: DEBUG/MainProcess] pidbox received method hello(from_node='celery@d8e414aa4e5b', revoked={}) [reply_to:{'exchange': 'reply.celery.pidbox', 'routing_key': '85f4128f-2f75-3996-8375-2a19aa58d5d4'} ticket:0daa0dc4-fa09-438d-9003-ccbd39f259dd]
    [2019-11-16 23:29:20,907: INFO/MainProcess] sync with celery@d8e414aa4e5b
    [2019-11-16 23:29:21,018: ERROR/MainProcess] Control command error: OperationalError("\nCannot route message for exchange 'reply.celery.pidbox': Table empty or key no longer exists.\nProbably the key ('_kombu.binding.reply.celery.pidbox') has been removed from the Redis database.\n",)
    Traceback (most recent call last):
      File "/usr/lib/python3.6/site-packages/kombu/connection.py", line 439, in _reraise_as_library_errors
        yield
      File "/usr/lib/python3.6/site-packages/kombu/connection.py", line 518, in _ensured
        return fun(*args, **kwargs)
      File "/usr/lib/python3.6/site-packages/kombu/messaging.py", line 203, in _publish
        mandatory=mandatory, immediate=immediate,
      File "/usr/lib/python3.6/site-packages/kombu/transport/virtual/base.py", line 605, in basic_publish
        message, exchange, routing_key, **kwargs
      File "/usr/lib/python3.6/site-packages/kombu/transport/virtual/exchange.py", line 70, in deliver
        for queue in _lookup(exchange, routing_key):
      File "/usr/lib/python3.6/site-packages/kombu/transport/redis.py", line 877, in _lookup
        exchange, redis_key))
    kombu.exceptions.InconsistencyError: 
    Cannot route message for exchange 'reply.celery.pidbox': Table empty or key no longer exists.
    Probably the key ('_kombu.binding.reply.celery.pidbox') has been removed from the Redis database.


    During handling of the above exception, another exception occurred:

    Traceback (most recent call last):
      File "/usr/lib/python3.6/site-packages/celery/worker/pidbox.py", line 46, in on_message
        self.node.handle_message(body, message)
      File "/usr/lib/python3.6/site-packages/kombu/pidbox.py", line 145, in handle_message
        return self.dispatch(**body)
      File "/usr/lib/python3.6/site-packages/kombu/pidbox.py", line 115, in dispatch
        ticket=ticket)
      File "/usr/lib/python3.6/site-packages/kombu/pidbox.py", line 151, in reply
        serializer=self.mailbox.serializer)
      File "/usr/lib/python3.6/site-packages/kombu/pidbox.py", line 285, in _publish_reply
        **opts
      File "/usr/lib/python3.6/site-packages/kombu/messaging.py", line 181, in publish
        exchange_name, declare,
      File "/usr/lib/python3.6/site-packages/kombu/connection.py", line 551, in _ensured
        errback and errback(exc, 0)
      File "/usr/lib/python3.6/contextlib.py", line 99, in __exit__
        self.gen.throw(type, value, traceback)
      File "/usr/lib/python3.6/site-packages/kombu/connection.py", line 444, in _reraise_as_library_errors
        sys.exc_info()[2])
      File "/usr/lib/python3.6/site-packages/vine/five.py", line 194, in reraise
        raise value.with_traceback(tb)
      File "/usr/lib/python3.6/site-packages/kombu/connection.py", line 439, in _reraise_as_library_errors
        yield
      File "/usr/lib/python3.6/site-packages/kombu/connection.py", line 518, in _ensured
        return fun(*args, **kwargs)
at celer
      File "/usr/lib/python3.6/site-packages/kombu/messaging.py", line 203, in _publish
        mandatory=mandatory, immediate=immediate,
      File "/usr/lib/python3.6/site-packages/kombu/transport/virtual/base.py", line 605, in basic_publish
        message, exchange, routing_key, **kwargs
      File "/usr/lib/python3.6/site-packages/kombu/transport/virtual/exchange.py", line 70, in deliver
        for queue in _lookup(exchange, routing_key):
      File "/usr/lib/python3.6/site-packages/kombu/transport/redis.py", line 877, in _lookup
        exchange, redis_key))
    kombu.exceptions.OperationalError: 
    Cannot route message for exchange 'reply.celery.pidbox': Table empty or key no longer exists.
    Probably the key ('_kombu.binding.reply.celery.pidbox') has been removed from the Redis database.

celery 是这样配置的:

web/project/config.py:

class DevelopmentConfig(BaseConfig):
    # CELERY
    INSTALLED_APPS = ['routes']
    # celery config
    CELERYD_CONCURRENCY = 4
    # Add a one-minute timeout to all Celery tasks.
    CELERYD_TASK_SOFT_TIME_LIMIT = 60
    CELERY_ENABLE_UTC = False
    CELERY_TIMEZONE = 'America/Sao_Paulo'
    CELERY_BROKER_URL = os.environ.get('CELERY_BROKER')
    CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND')
    CELERY_IMPORTS = ('project.api.routes.background',)
    # periodic tasks
    CELERYBEAT_SCHEDULE =  {
                        'playlist_generator_with_audio_features': {
                            'task': 'project.api.routes.background.playlist_generator_with_audio_features',
                            # Every minute
                            'schedule': crontab(minute=59),
                            'args' : [('user_id'),]
                            },
                        'cache_user_tracks_with_analysis': {
                            'task': 'project.api.routes.background.cache_user_tracks_with_analysis',
                            # Every hour
                            'schedule': crontab(minute=0, hour='*/1'),
                            'args' : ('user_id','token')
                            },
                        }

这是 project/api/routes/background.py 中的示例任务,在我的 Flask 服务器上:

@celery.task(queue='analysis', default_retry_delay=30, max_retries=3, soft_time_limit=1000)
def cache_user_tracks_with_analysis(user_id, token):
   # business logic
   return {'Status': 'Task completed!',
        'features': results}

在我的requirements.txt: 中,kombu 没有明确声明,我有:

celery==4.2.1
redis==3.2.0

我错过了什么?

最佳答案

这是一个开放的 celery/kombu 问题:https://github.com/celery/kombu/issues/1063

显式降级到 kombu==4.5.0 为我修复了错误。

关于flask - 无法为交换路由消息 'reply.celery.pidbox' : Table empty or key no longer exists,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58896295/

相关文章:

python - 'SQLAlchemy' 的实例没有 'Column' 成员(无成员)

node.js - 为什么我在 Redis 中需要超过 1 个连接,聊天应用程序需要多少内存?

bash - Docker Run Script 捕捉中断信号

docker - 为Octopus配置Docker服务时遇到的问题

node.js - 我应该在文件/模块之间共享 Redis 连接吗?

docker - nginx 通过 docker - 错误 :1408F10B:SSL routines:ssl3_get_record:wrong version number

ios - 如果IOS信息不正确,如何阻止用户通过登录页面

python - 如何使用 Flask 处理从 jquery 数据表发送的服务器端参数?

python - Flask-PyMongo 与应用程序工厂和蓝图

Redis 用于 session 存储