python - Celery + Azure 服务总线(代理)= 声明为空或 token 无效

标签 python azure queue celery azureservicebus

我正在尝试使用 Azure 服务总线作为我的 celery 应用程序的代理。

我已经通过引用各种来源修补了该解决方案。 目标是使用 Azure 服务总线作为代理,使用 PostgresSQL 作为后端。

我创建了一个 Azure 服务总线,并将 RootManageSharedAccessKey 的凭据复制到 celery 应用。

enter image description here

以下是task.py

from time import sleep
from celery import Celery
from kombu.utils.url import safequote

SAS_policy = safequote("RootManageSharedAccessKey") #SAS Policy
SAS_key = safequote("1234222zUY28tRUtp+A2YoHmDYcABCD") #Primary key from the previous SS
namespace = safequote("bluenode-dev")

app = Celery('tasks', backend='db+postgresql://afsan.gujarati:admin@localhost/local_dev', 
            broker=f'azureservicebus://{SAS_policy}:{SAS_key}=@{namespace}')

@app.task
def divide(x, y):
    sleep(30)
    return x/y

当我尝试使用以下命令运行 C​​elery 应用程序时:

celery -A 任务 worker --loglevel=INFO

我收到以下错误

[2020-10-09 14:00:32,035: CRITICAL/MainProcess] Unrecoverable error: AzureHttpError('Unauthorized\n<Error><Code>401</Code><Detail>claim is empty or token is invalid. TrackingId:295f7c76-770e-40cc-8489-e0eb56248b09_G5S1, SystemTracker:bluenode-dev.servicebus.windows.net:$Resources/Queues, Timestamp:2020-10-09T20:00:31</Detail></Error>')
Traceback (most recent call last):
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 918, in create_channel
    return self._avail_channels.pop()
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/servicebusservice.py", line 1225, in _perform_request
    resp = self._filter(request)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/_http/httpclient.py", line 211, in perform_request
    raise HTTPError(status, message, respheaders, respbody)
azure.servicebus.control_client._http.HTTPError: Unauthorized

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/worker.py", line 203, in start
    self.blueprint.start(self)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/bootsteps.py", line 365, in start
    return self.obj.start()
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 311, in start
    blueprint.start(self)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/consumer/connection.py", line 21, in start
    c.connection = c.connect()
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 398, in connect
    conn = self.connection_for_read(heartbeat=self.amqheartbeat)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 404, in connection_for_read
    return self.ensure_connected(
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 430, in ensure_connected
    conn = conn.ensure_connection(
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/connection.py", line 383, in ensure_connection
    self._ensure_connection(*args, **kwargs)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/connection.py", line 435, in _ensure_connection
    return retry_over_time(
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/utils/functional.py", line 325, in retry_over_time
    return fun(*args, **kwargs)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/connection.py", line 866, in _connection_factory
    self._connection = self._establish_connection()
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/connection.py", line 801, in _establish_connection
    conn = self.transport.establish_connection()
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 938, in establish_connection
    self._avail_channels.append(self.create_channel(self))
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 920, in create_channel
    channel = self.Channel(connection)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/transport/azureservicebus.py", line 64, in __init__
    for queue in self.queue_service.list_queues():
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/servicebusservice.py", line 313, in list_queues
    response = self._perform_request(request)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/servicebusservice.py", line 1227, in _perform_request
    return _service_bus_error_handler(ex)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/_serialization.py", line 569, in _service_bus_error_handler
    return _general_error_handler(http_error)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/_common_error.py", line 41, in _general_error_handler
    raise AzureHttpError(message, http_error.status)
azure.common.AzureHttpError: Unauthorized
<Error><Code>401</Code><Detail>claim is empty or token is invalid. TrackingId:295f7c76-770e-40cc-8489-e0eb56248b09_G5S1, SystemTracker:bluenode-dev.servicebus.windows.net:$Resources/Queues, Timestamp:2020-10-09T20:00:31</Detail></Error>

我在任何地方都找不到直接的解决方案。我错过了什么?

附注我没有在Azure服务总线中创建队列。我假设 celery 应用程序执行时会自行创建队列。

附注我还尝试在 Python 的服务总线客户端中使用完全相同的凭据,它 seemed to work 。感觉像是 Celery 问题,但我无法弄清楚到底是什么。

最佳答案

如果要使用 Azure 服务总线传输来连接 Azure 服务总线,则 URL 应为 azureservicebus://{SAS 策略名称}:{SAS key}@{服务总线命名空间}

例如

  1. 获取共享访问策略RootManageSharedAccessKey

enter image description here

  • 代码
  • from celery import Celery
    from kombu.utils.url import safequote
    
    
    SAS_policy = "RootManageSharedAccessKey"  # SAS Policy
    # Primary key from the previous SS
    SAS_key = safequote("X/*****qyY=")
    namespace = "bowman1012"
    app = Celery('tasks', backend='db+postgresql://<>@localhost/<>',
                 broker=f'azureservicebus://{SAS_policy}:{SAS_key}@{namespace}')
    
    
    @app.task
    def add(x, y):
        return x + y
    

    enter image description here

    关于python - Celery + Azure 服务总线(代理)= 声明为空或 token 无效,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64286912/

    相关文章:

    python - While 循环何时中断

    azure - Azure 中的 Widevine 加密视频不会过期

    c# - Azure AD : Property 'mail' is read-only and cannot be set

    制作目录时出现 Python "FileExists"错误

    python - 使用 Tkinter 正确完成线程化(有或没有队列)

    php - 如何搭建一个带有倒计时功能的mysql排队系统?

    python - 为什么这段 Haskell 代码这么慢?

    python - 将生成器对象转换为列表

    python - 是否有像 Java 线程转储一样的 Python 线程转储方法?

    azure - Windows azure 部署问题,仅部署新的/更新的内容