我有一个文件,该文件具有创建 Kafka Producer 并将一些消息发布到 Kafka 主题的功能。
def publishToKafkaTopic(value_str):
producer = KafkaProducer(
bootstrap_servers=KAFKA_SERVERS,
value_serializer=lambda x: dumps(x).encode("utf-8"),
)
try:
ack = producer.send(TOPIC_NAME, key=module, value=value_str)
metadata = ack.get()
return "Success"
except KafkaError as kafka_error:
GLOBAL_LOGGER.error(
"Failed to publish record on to Kafka broker with error %s", kafka_error
)
return "Failed"
现在,我想通过模拟 KafkaProducer 和 producer.send() 来测试我的生产者。我该怎么做??
最佳答案
注意:您需要在测试中导入您的模块。
您可以在下面的第一个测试中测试您的生产者是否被调用,您也可以在第二个测试中测试您传递给它的参数:
from unittest.mock import patch
@patch('kafka.KafkaProducer')
def test_produce_message(KafkaProducerMock):
publishToKafkaTopic('data')
KafkaProducerMock.send.assert_called
@patch('kafka.KafkaProducer.send')
def test_vertify_produce_message(KafkaProducerMock):
publishToKafkaTopic({'key1':'value1', 'key2': 'value2'})
args = KafkaProducerMock.call_args
assert(args[0] == (TOPIC_NAME,))
assert(args[2] == {'value': {'key1':'value1', 'key2': 'value2'}})
关于django-unittest - 模拟 python kafka 生产者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61930831/