python - 如何使用 kafka-python 订阅多个 kafka 通配符模式的列表?

标签 python apache-kafka kafka-python

我正在使用带有通配符的模式订阅 Kafka,如下所示。通配符代表一个动态客户 ID。

consumer.subscribe(pattern='customer.*.validations')

这很有效,因为我可以从主题字符串中提取客户 ID。但现在我需要扩展功能,以便出于稍微不同的目的收听类似的主题。我们称它为 customer.*.additional-validations。代码需要存在于同一个项目中,因为共享了如此多的功能,但我需要能够根据队列类型采用不同的路径。

Kafka documentation我可以看到可以订阅一系列主题。然而,这些是硬编码的字符串。不是允许灵 active 的模式。

>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
...     assert isinstance(msg.value, dict)

所以我想知道是否有可能以某种方式将两者结合起来?有点像这样(不工作):

consumer.subscribe(pattern=['customer.*.validations', 'customer.*.additional-validations'])

最佳答案

在KafkaConsumer代码中,它支持主题列表,或模式,

https://github.com/dpkp/kafka-python/blob/68c8fa4ad01f8fef38708f257cb1c261cfac01ab/kafka/consumer/group.py#L717

   def subscribe(self, topics=(), pattern=None, listener=None):
        """Subscribe to a list of topics, or a topic regex pattern
        Partitions will be dynamically assigned via a group coordinator.
        Topic subscriptions are not incremental: this list will replace the
        current assignment (if there is one).

因此您可以创建一个正则表达式,使用 | 或条件,它应该作为订阅多个动态主题正则表达式,因为它在内部使用 re 模块进行匹配。

(客户.*.validations)|(customer.*.additional-validations)

关于python - 如何使用 kafka-python 订阅多个 kafka 通配符模式的列表?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39520222/

相关文章:

gradle - 无法在gradle项目中从Confluent存储库下载Jars

python - 将日期范围行拆分为年(取消分组) - Python Pandas

python - 为什么我会使用 tf.concat 而不是 tf.stack?

python - 基于代码的唯一约束 Django 模型

python - Django setUpTestData() 不适用于继承?

python - 无法使用 Kafka-Python 的反序列化器从 Kafka 消费 JSON 消息

go - Kafka 0.11/Golang Sarama 版本支持

apache-kafka - 如何自动缩放apache Zookeeper

python - 使用 python 库在 kafka 中检索消费者组偏移量

python - 为什么我的 Kafka 消费者比我的 Kafka 生产者慢得多?