python - 无法使用 Kafka-Python 的反序列化器从 Kafka 消费 JSON 消息

标签 python json kafka-python

我正在尝试通过 Kafka 发送一个非常简单的 JSON 对象,并使用 Python 和 kafka-python 从另一端读取它。但是,我不断看到以下错误:

2017-04-07 10:28:52,030.30.9998989105:kafka.future:8228:ERROR:10620:Error processing callback
Traceback (most recent call last):
  File "C:\Anaconda2\lib\site-packages\kafka\future.py", line 79, in _call_backs
    f(value)
  File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 760, in _handle_fetch_response
    unpacked = list(self._unpack_message_set(tp, messages))
  File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 539, in _unpack_message_set
    tp.topic, msg.value)
  File "C:\Anaconda2\lib\site-packages\kafka\consumer\fetcher.py", line 570, in _deserialize
    return f(bytes_)
  File "C:\Users\myUser\workspace\PythonKafkaTest\src\example.py", line 55, in <lambda>
    value_deserializer=lambda m: json.loads(m).decode('utf-8'))
  File "C:\Anaconda2\lib\json\__init__.py", line 339, in loads
    return _default_decoder.decode(s)
  File "C:\Anaconda2\lib\json\decoder.py", line 364, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "C:\Anaconda2\lib\json\decoder.py", line 382, in raw_decode
    raise ValueError("No JSON object could be decoded")
ValueError: No JSON object could be decoded

我做了一些研究,导致此错误的最常见原因是 JSON 错误。我已经尝试在发送之前打印出 JSON,方法是将以下内容添加到我的代码中,并且 JSON 打印没有错误。

  while True:
        json_obj1 = json.dumps({"dataObjectID": "test1"})
        print json_obj1
        producer.send('my-topic', {"dataObjectID": "test1"})
        producer.send('my-topic', {"dataObjectID": "test2"})
        time.sleep(1)

这让我怀疑我可以生成 json,但不能使用它。

这是我的代码:

import threading
import logging
import time
import json

from kafka import KafkaConsumer, KafkaProducer


class Producer(threading.Thread):
    daemon = True

    def run(self):
        producer = KafkaProducer(bootstrap_servers='localhost:9092',
                                 value_serializer=lambda v: json.dumps(v).encode('utf-8'))

        while True:
            producer.send('my-topic', {"dataObjectID": "test1"})
            producer.send('my-topic', {"dataObjectID": "test2"})
            time.sleep(1)


class Consumer(threading.Thread):
    daemon = True

    def run(self):
        consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest',
                                 value_deserializer=lambda m: json.loads(m).decode('utf-8'))
        consumer.subscribe(['my-topic'])

        for message in consumer:
            print (message)


def main():
    threads = [
        Producer(),
        Consumer()
    ]

    for t in threads:
        t.start()

    time.sleep(10)

if __name__ == "__main__":
    logging.basicConfig(
        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:' +
               '%(levelname)s:%(process)d:%(message)s',
        level=logging.INFO
    )
    main()

如果删除 value_serializer 和 value_deserializer,我可以成功发送和接收字符串。当我运行该代码时,我可以看到我正在发送的 JSON。这是一个简短的片段:

ConsumerRecord(topic=u'my-topic', partition=0, offset=5742, timestamp=None, timestamp_type=None, key=None, value='{"dataObjectID": "test1"}', checksum=-1301891455, serialized_key_size=-1, serialized_value_size=25)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5743, timestamp=None, timestamp_type=None, key=None, value='{"dataObjectID": "test2"}', checksum=-1340077864, serialized_key_size=-1, serialized_value_size=25)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5744, timestamp=None, timestamp_type=None, key=None, value='test', checksum=1495943047, serialized_key_size=-1, serialized_value_size=4)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5745, timestamp=None, timestamp_type=None, key=None, value='\xc2Hello, stranger!', checksum=-1090450220, serialized_key_size=-1, serialized_value_size=17)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5746, timestamp=None, timestamp_type=None, key=None, value='test', checksum=1495943047, serialized_key_size=-1, serialized_value_size=4)
ConsumerRecord(topic=u'my-topic', partition=0, offset=5747, timestamp=None, timestamp_type=None, key=None, value='\xc2Hello, stranger!', checksum=-1090450220, serialized_key_size=-1, serialized_value_size=17)

所以我尝试从消费者中删除 value_deserializer,并且该代码执行但没有反序列化器,消息以字符串形式出现,这不是我需要的。那么,为什么 value_deserializer 不起作用?有没有其他方法可以从我应该使用的 Kafka 消息中获取 JSON?

最佳答案

首先将消息解码为 utf-8,然后 json.load/dump 后我的问题就解决了:

value_deserializer=lambda m: json.loads(m.decode('utf-8'))

代替:

value_deserializer=lambda m: json.loads(m).decode('utf-8')

希望这也适用于制作人方面

关于python - 无法使用 Kafka-Python 的反序列化器从 Kafka 消费 JSON 消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43282898/

相关文章:

python - Beautifulsoup 部分提取字符串

python - 将组上的顺序计数器列添加到 pandas 数据帧

c# - 如何在将 JSON 反序列化为 C# .Net 后访问嵌套类中的字段

python - 如何使用 Python 以编程方式在 Kafka Schema Registry 中注册 Avro Schema

python - 如何获取matplotlib中所有标记的列表?

python - 如何获取 python 中最后插入的生成的 uuid?

java - 获取带有值的 JSON 键

javascript - 从数组中查找 parent 和 child

python - KafkaTimeoutError : Failed to update metadata after 60. 0 秒

apache-kafka - 如何手动分配分区,同时仍然能够自动提交?