django-unittest - 模拟 python kafka 生产者

标签 django-unittest kafka-python

我有一个文件,该文件具有创建 Kafka Producer 并将一些消息发布到 Kafka 主题的功能。

def publishToKafkaTopic(value_str):

    producer = KafkaProducer(
        bootstrap_servers=KAFKA_SERVERS,
        value_serializer=lambda x: dumps(x).encode("utf-8"),
    )


    try:
        ack = producer.send(TOPIC_NAME, key=module, value=value_str)
        metadata = ack.get()
        return "Success"
    except KafkaError as kafka_error:
        GLOBAL_LOGGER.error(
            "Failed to publish record on to Kafka broker with error %s", kafka_error
        )


    return "Failed"

现在,我想通过模拟 KafkaProducer 和 producer.send() 来测试我的生产者。我该怎么做??

最佳答案

注意:您需要在测试中导入您的模块。

您可以在下面的第一个测试中测试您的生产者是否被调用,您也可以在第二个测试中测试您传递给它的参数:

from unittest.mock import patch

@patch('kafka.KafkaProducer')
def test_produce_message(KafkaProducerMock):
    publishToKafkaTopic('data')
    KafkaProducerMock.send.assert_called
@patch('kafka.KafkaProducer.send')
def test_vertify_produce_message(KafkaProducerMock):
    publishToKafkaTopic({'key1':'value1', 'key2': 'value2'})
    args = KafkaProducerMock.call_args
    assert(args[0] == (TOPIC_NAME,))
    assert(args[2] == {'value': {'key1':'value1', 'key2': 'value2'}})

关于django-unittest - 模拟 python kafka 生产者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61930831/

相关文章:

python - 模拟修补类方法不起作用

python - 编写用于检查新用户创建的 Django 注册表单测试

python - 在 django 中对 FileField 进行单元测试的干净方法是什么?

python - 如何使用带有分区和复制的 pykafka 创建新主题?

apache-kafka - 在 Kafka-python 中的消费者组中重置 kafka LAG(更改偏移量)

python - Django/DjangoRestFramework - unittest 未验证使用 ORM 创建的用户

apache-kafka - 处理kafka消息需要很长时间

amazon-web-services - 动物园管理员-Kafka : ConnectException - Connection refused

python-3.x - 在 python 中为 Kafka 代理指定 SSL 详细信息