python - Kafka-python 如何消费json消息

标签 python python-3.x apache-kafka kafka-python

我是 Python 新手,从 Kafka 开始。我有一个需要发送和使用 json 消息的要求。为此,我正在使用 kafka-python与卡夫卡沟通。

#Producer.py
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('offering_new', {"dataObjectID": "test1"})

#Consumer.py
import json
from kafka import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',auto_offset_reset='earliest', value_deserializer=lambda m: json.loads(m.decode('utf-8')))
consumer.subscribe(['offering_new'])
for message in consumer :
    print(message)

但是,我在消费者上遇到以下异常:
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1111, in __next__
    return next(self._iterator)
  File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in _message_generator
    for msg in self._fetcher:
  File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 482, in __next__
    return next(self._iterator)
  File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 388, in _message_generator
    self._next_partition_records = self._parse_fetched_data(completion)
  File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 799, in _parse_fetched_data
    unpacked = list(self._unpack_message_set(tp, records))
  File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 458, in _unpack_message_set
    tp.topic, record.value)
  File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 492, in _deserialize
    return f(bytes_)
  File "<stdin>", line 1, in <lambda>
  File "/usr/lib/python3.6/json/__init__.py", line 354, in loads
    return _default_decoder.decode(s)
  File "/usr/lib/python3.6/json/decoder.py", line 339, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/lib/python3.6/json/decoder.py", line 357, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

我在 Python Shell 中运行上面的代码。有人能告诉我我哪里出错了吗?

最佳答案

auto_offset_reset='earliest'您已将消费者配置为读取主题中的所有消息。 JSON 解码错误表明之前为主题生成的某些消息实际上不是 JSON 格式。

一些解决方案:

(1) 改为从话题​​尾部消费:auto_offset_reset='latest'
(2) 开始新话题:consumer.subscribe(['offering_new_too'])
(3) 使用更全面的解串器:

def forgiving_json_deserializer(v):
    return if v is None
    try:
        return json.loads(v.decode('utf-8'))
    except json.decoder.JSONDecodeError:
        log.exception('Unable to decode: %s', v)
        return None

KafkaConsumer(value_deserializer=forgiving_json_deserializer, ...)

希望这可以帮助!

关于python - Kafka-python 如何消费json消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55534547/

相关文章:

python - 防止 x 轴标签在 matplotlib/pyplot 中被截断

python - 在 Python 中基于地类网格求和土地面积网格

python - Tensorflow.compat.v2.__internal__.tracking' 没有属性 'TrackableSaver' 错误

python - 比较两个列表的有效方法,记住每个唯一元素的来源

python - 如何根据给定的键列将多个字典组合到一个列表中?

apache-kafka - 如何找到Kafka brokers CPU使用率高的根本原因?

Python OpenCV - 在二值图像中查找黑色区域

python - 如何将嵌套字典呈现为表格

java - Flink + Kafka + JSON - Java 示例

apache-kafka - Kafka - 您可以在主题存在之前创建模式吗?它们之间的关系是什么?