python - Celery 消费者未从 LocalStack 上的 SQS 队列接收消息

标签 python pip celery amazon-sqs localstack

我在 LocalStack 服务器上有一个 SQS 队列,我正在尝试使用 Celery 使用者来使用其中的消息。

这表明消费者已正确附加到队列,例如队列 sqs-test-queue ,但当我尝试使用 aws 发送一条消息时,它没有收到任何消息命令。

我的celeryconfig.py看起来像这样:

from kombu import (
    Exchange,
    Queue
)


broker_transport_options = {'region': REGION}
broker_transport = 'sqs'

accept_content = ['application/json']
result_serializer = 'json'
content_encoding = 'utf-8'
task_serializer = 'json'

worker_enable_remote_control = False
worker_send_task_events = False
result_backend = None

task_queues = (
    Queue('sqs-test-queue', exchange=Exchange(''), routing_key='sqs-test-queue'),
)

还有我的tasks.py模块如下所示:

from celery import Celery
from kombu.utils.url import quote


AWS_ACCESS_KEY = quote("AWS_ACCESS_KEY")
AWS_SECRET_KEY = quote("AWS_SECRET_KEY")
LOCALSTACK = "<IP>:<PORT>"

broker_url = "sqs://{access}:{secret}@{host}".format(access=AWS_ACCESS_KEY,
                                                     secret=AWS_SECRET_KEY,
                                                     host=LOCALSTACK)

app = Celery('tasks', broker=broker_url, backend=None)
app.config_from_object('celeryconfig')


@app.task(bind=True, name='tasks.consume', acks_late=True, ignore_result=True)
def consume(self, msg):
    # DO SOMETHING WITH THE RECEIVED MESSAGE
    return True

尝试使用 celery -A tasks worker -l INFO -Q sqs-test-queue 执行它一切都很好:

...

[tasks]
  . tasks.consume

[... INFO/MainProcess] Connected to sqs://AWS_ACCESS_KEY:**@<IP>:<PORT>// 
[... INFO/MainProcess] celery@local ready

但是当我尝试使用 aws sqs send-message --endpoint-url=http://<IP>:<PORT> --queue-url=http://localhost:<PORT>/queue/sqs-test-queue --message-body="Test message" 发送消息时,什么也没发生。

我做错了什么?我是否遗漏了配置中的某些内容?

PS:如果我尝试运行命令 aws sqs receive-message --endpoint-url=http://<IP>:<PORT> --queue-url=http://localhost:<PORT>/queue/sqs-test-queue ,我能够收到消息。

注意:

我正在使用Python 3.7.0和我的pip freeze看起来像这样:

boto3==1.10.16
botocore==1.13.16
celery==4.3.0
kombu==4.6.6
pycurl==7.43.0.3
...

最佳答案

我跟你经历过同样的事情。为了解决这个问题,我做了几件事:

  • 我在 localstack 中设置了 HOSTNAME_EXTERNALHOSTNAME 环境变量
  • broker_url 设置为 sqs://{access}:{secret}@{host}:{port}(如您所拥有的那样)
  • 确保 celery Worker 的 broker_transport_options 不包含配置项:wait_time_seconds,因为这会导致自 2020 年 2 月 7 日起 localstack 出现错误 ( check issue here )。

一旦我做了这两件事,它就开始工作了,希望它有所帮助。

关于python - Celery 消费者未从 LocalStack 上的 SQS 队列接收消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58834460/

相关文章:

python - Celery 中如何执行 rate_limit?

python - 使用 numpy multivariate_normal 随机采样时内存不足

python - 类型错误 : unsupported operand type(s) for +: 'map' and 'float'

python - 在我只有 ftp 访问权限的服务器上安装 pip 包?

python - 出现错误 "error: command ' cl.exe' 失败 : No such file or directory"

python - celery 任务中的grpc超时

python - 如何使用 pandas 将两列中的数据合并为带有 + 号的一列

python - 类型错误 : can only concatenate tuple (not "float") to tuple cousera deeplearnig. ai

使用pyenv安装python后找不到python3命令

python - 使用本地计算机作为主机将 EC2 实例设置为 Celery Worker