python - 使用fastavro从Kafka反序列化Avro

标签 python apache-kafka avro fastavro

我正在构建一个从 Kafka 接收数据的应用程序。当使用 Apache ( https://pypi.org/project/avro-python3/ ) 提供的标准 avro 库时,结果是正确的,但是,反序列化过程非常缓慢。

class KafkaReceiver:
    data = {}

    def __init__(self, bootstrap='192.168.1.111:9092'):
        self.client = KafkaConsumer(
            'topic',
            bootstrap_servers=bootstrap,
            client_id='app',
            api_version=(0, 10, 1)
        )
        self.schema = avro.schema.parse(open("Schema.avsc", "rb").read())
        self.reader = avro.io.DatumReader(self.schema)

    def do(self):
        for msg in self.client:
            bytes_reader = io.BytesIO(msg.value)
            decoder = BinaryDecoder(bytes_reader)

            self.data = self.reader.read(decoder)

在阅读为什么这么慢时,我发现 fastavro这应该快得多。我是这样使用的:
    def do(self):

        schema = fastavro.schema.load_schema('Schema.avsc')
        for msg in self.client:
            bytes_reader = io.BytesIO(msg.value)
            bytes_reader.seek(0)
            for record in reader(bytes_reader, schema):
                self.data = record

而且,由于在使用 Apache 的库时一切正常,我希望在使用 fastavro 时一切都会以相同的方式工作。 .但是,在运行它时,我得到
  File "fastavro/_read.pyx", line 389, in fastavro._read.read_map
  File "fastavro/_read.pyx", line 290, in fastavro._read.read_utf8
  File "fastavro/_six.pyx", line 22, in fastavro._six.py3_btou
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xfc in position 3: invalid start byte

我通常不使用 Python 编程,所以我不完全知道如何处理这个问题。有任何想法吗?

最佳答案

fastavro.reader需要包含标题的 avro 文件格式。看起来您拥有的是没有标题的序列化记录。我认为您可以使用 fastavro.schemaless_reader 阅读此内容。 .

所以而不是:

for record in reader(bytes_reader, schema):
    self.data = record

你会这样做:

self.data = schemaless_reader(bytes_reader, schema)

关于python - 使用fastavro从Kafka反序列化Avro,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61473880/

相关文章:

python - 如何使用discord.py 查找用户上次事件的时间?

python - Kafka 10 - 具有身份验证和授权的 Python 客户端

spring-boot - Camel 上下文在应用程序启动后立即启动关闭,没有明显的错误

python - 将包添加到 memgraph 转换

python - 导入 Pandas 时导致大量内存提交的原因

python - 在Python中加载和保存图表

apache-spark - Apache Kafka 和 Spark 流

avro - 将 JSON 转换为 Parquet

c++ - 包含多个数组的 Avro union

python - 使 Tkinter 多个窗口在任务栏中有一个图标