python - 如何将azure事件网格中的事件发布到python中的主题?

标签 python azure-eventgrid azure-python-sdk

无法在我的 VS Code 中导入 eventgrid 模块,我已添加了 require.txt 中的所有模块以及从 cmd 安装的 pip。

此外,下面是我正在寻找并尝试使用的 python 函数:

def publish_event(): 
    # authenticate client 
    credential = AzureKeyCredential(key) 
    client = EventGridPublisherClient(endpoint, credential) 
 
    custom_schema_event = { 
        "customSubject": "sample", 
        "customEventType": "sample.event", 
        "customDataVersion": "2.0", 
        "customId": uuid.uuid4(), 
        "customEventTime": dt.datetime.now(UTC()).isoformat(), 
        "customData": "sample data" 
    } 
 
    # publish events 
    for _  in range(3): 
 
        event_list = []     # list of events to publish 
        # create events and append to list 
        for j in range(randint(1, 3)): 
            event_list.append(custom_schema_event) 
 
        # publish list of events 
        client.send(event_list) 
        print("Batch of size {} published".format(len(event_list))) 
        time.sleep(randint(1, 5)) 
 
 
if name == '__main__': 
    publish_event() 

我不确定这是否是实现此目的的正确方法,正在寻找更好的方法来解决模块问题和发布事件。

如果有帮助,我们将不胜感激!

最佳答案

从你的问题我们可以看出有两点需要回答:

  1. Python 包安装问题
  2. Eventgrid 主题问题
  • 要使用 pip install 安装 Azure Python 包,我们需要在终端中启用虚拟环境,命令如下:

       python -m venv .venv
      .venv\Scripts\activate
    

    启用venv后,在requirements.txt中添加所有eventgrid、storage和其他azure相关的包,然后在cmd中运行以下命令:

    pip install -r requirements.txt

  • 现在,看看您的代码,函数看起来不错,因为它将事件发送到一般主题。以类似的方式,我们有一个示例,它为我们提供了有关将事件网格事件发布到自定义主题的一些见解。 Azure-SDK-Python-GitHub 存储库中对此进行了解释。 Python代码如下:

      import os
      from random import randint, sample
      import time
    
      from azure.core.credentials import AzureKeyCredential
      from azure.eventgrid import EventGridPublisherClient, EventGridEvent
    
      key = os.environ["EG_ACCESS_KEY"]
      endpoint = os.environ["EG_TOPIC_HOSTNAME"]
    
      #authenticate client
      credential = AzureKeyCredential(key)
      client = EventGridPublisherClient(endpoint, credential)
      services = ["EventGrid", "ServiceBus", "EventHubs", "Storage"]    #possible values for data field
    
      def publish_event():
          #publish events
          for _ in range(3):
    
              event_list = []     #list of events to publish
              #create events and append to list
              for j in range(randint(1, 3)):
                  sample_members = sample(services, k=randint(1, 4))      #select random subset of team members
                  event = EventGridEvent(
                          subject="Door1",
                          data={"team": sample_members},
                          event_type="Azure.Sdk.Demo",
                          data_version="2.0"
                          )
                  event_list.append(event)
    
              #publish list of events
              client.send(event_list)
              print("Batch of size {} published".format(len(event_list)))
              time.sleep(randint(1, 5))
    
      if __name__ == '__main__':
          publish_event()
    

关于python - 如何将azure事件网格中的事件发布到python中的主题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70121960/

相关文章:

python - 使用 Django Admin 中内联的 OneToOneField

python - Django 忽略过滤器

json - 如何为 Azure 服务编写 MassTransit Json 反序列化器

azure - eventgrid 事件传递失败,无法传递到订阅者功能

Azure python 库返回 Web 应用程序配置

python - 子类、意外参数和 Unresolved 引用的问题

python - 属性列中的分隔符

azure - 何时使用事件网格以及何时使用服务总线/存储队列?

Azure ML : What means reconnecting terminal?

python - Azure Batch NodePreparationError 尝试从 Azure 容器注册表获取 Docker 镜像