python - Azure 服务总线队列发送和接收消息的测试

标签 python testing integration-testing servicebus azure-servicebus-queues

我想编写一个集成测试,检查 Python 脚本与 Azure 服务总线队列的连接。测试应该:

  • 向队列发送消息,
  • 确认消息已进入队列。

  • 测试看起来像这样:
    import pytest
    
    from azure.servicebus import ServiceBusClient, ServiceBusMessage, ServiceBusSender
    
    CONNECTION_STRING = <some connection string>
    QUEUE = <queue name>
    
    
    def send_message_to_service_bus(sender: ServiceBusSender, msg: str) -> None:
        message = ServiceBusMessage(msg)
        sender.send_message(message)
    
    
    class TestConnectionWithQueue:
        def test_message_is_sent_to_queue_and_received(self):
            msg = "test message sent to queue"
            expected_message = ServiceBusMessage(msg)
            
            servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STRING, logging_enable=True)
            with servicebus_client:
                sender = servicebus_client.get_queue_sender(queue_name=QUEUE)
                with sender:
                    send_message_to_service_bus(sender, expected_message)
            
                receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE)
                with receiver:
                    messages_in_queue = receiver.receive_messages(max_message_count=10, max_wait_time=20)
            assert any(expected_message == str(actual_message) for actual_message in messages_in_queue)
    
    测试偶尔会起作用,但通常不会。没有其他消息同时发送到队列。当我调试代码时,如果测试不起作用,变量messages_in_queue只是一个空列表。
    为什么代码不能一直工作,应该怎么做才能修复它?

    最佳答案

    您确定没有其他进程接收您的消息吗?也许您正在与其他同事共享您的队列连接字符串,构建机器......
    要进行故障排除,您需要密切关注 Azure 门户上的队列监控。调试您的测试并查看传入消息是否增加 1。然后继续调试并检查它是否减少 1。
    另外,你确定这个单元测试有用吗?看起来您正在测试您的基础设施而不是测试您的代码

    关于python - Azure 服务总线队列发送和接收消息的测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68288982/

    相关文章:

    将字节转换为整数的 Python 2 和 3 兼容方法

    python - 如何使用 cv2.rectangle() 创建随机颜色边界框

    python 编码问题\ufeff in pandas 列名

    javascript - Jest 的顺序测试场景

    javascript - AJAX 测试工具

    python - 如何通过 id 获取 Xlib.display.Window 实例?

    javascript - Protractor - 请参阅页面上的文字

    java - 如何用应用程序上下文配置文件替换 web.xml?

    java - 无法在测试行为调用的 Java 函数中使用 Citrus 变量

    c# - TCP套接字测试