Python - Apache Beam - Flink 运行器设置 : ReadFromKafka returns error - RuntimeError: cannot encode a null byte[]

标签 python apache-kafka apache-flink apache-beam

我正在尝试在 python 中构建一个流式光束管道,它应该从 kafka 捕获消息,然后执行从其他来源和聚合获取数据的进一步阶段。 到目前为止,我所构建的逐步过程是:

  1. 在 localhost:9092 上运行 Kafka 实例

    ./bin/kafka-server-start.sh ./config/server.properties

  2. 使用 docker 运行 beam-flink 作业服务器

    docker run --net=host apache/beam_flink1.10_job_server:latest

  3. 运行 beam-kafka 管道

import apache_beam as beam
from apache_beam.io.external.kafka import ReadFromKafka, WriteToKafka
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions


if __name__ == '__main__':
    options = PipelineOptions([
        "--job_endpoint=localhost:8099",
        "--environment_type=LOOPBACK",
        "--streaming",
        "--environment_config={\"command\":\"/opt/apache/beam/boot\"}",
    ])

    options = options.view_as(StandardOptions)
    options.streaming = True

    pipeline = beam.Pipeline(options=options)

    result = (
        pipeline

        | "Read from kafka" >> ReadFromKafka(
            consumer_config={
                "bootstrap.servers": 'localhost:9092',
            }, 
            topics=['mytopic'],
            expansion_service='localhost:8097',
        )

        | beam.Map(print)
    )

    pipeline.run()
  1. 使用kafka-producer.sh发布新消息
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic
>tryme

发布此试用消息后,Beam 管道感知到该消息但崩溃并给出此错误:

RuntimeError: org.apache.beam.sdk.util.UserCodeException: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
    at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1014)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
    at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1.processElement(KafkaIO.java:1042)
    at org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata$1$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1011)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.output(FnApiDoFnRunner.java:1478)
    at org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:138)
    at org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:740)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$700(FnApiDoFnRunner.java:132)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory.lambda$createRunnerForPTransform$1(FnApiDoFnRunner.java:203)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1011)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:132)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:1483)
    at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:84)
    at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.processElement(Read.java:516)
    at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForElementAndRestriction(FnApiDoFnRunner.java:838)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSizedElementAndRestriction(FnApiDoFnRunner.java:808)
    at org.apache.beam.fn.harness.FnApiDoFnRunner.access$200(FnApiDoFnRunner.java:132)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory$2.accept(FnApiDoFnRunner.java:226)
    at org.apache.beam.fn.harness.FnApiDoFnRunner$Factory$2.accept(FnApiDoFnRunner.java:223)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)
    at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:204)
    at org.apache.beam.fn.harness.data.QueueingBeamFnDataClient.drainAndBlock(QueueingBeamFnDataClient.java:106)
    at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:295)
    at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:173)
    at org.apache.beam.fn.harness.control.BeamFnControlClient.lambda$processInstructionRequests$0(BeamFnControlClient.java:157)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.beam.sdk.coders.CoderException: cannot encode a null byte[]
    at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:63)
    at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:56)
    at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
    at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:70)
    at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:590)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:581)
    at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:541)
    at org.apache.beam.sdk.fn.data.BeamFnDataSizeBasedBufferingOutboundObserver.accept(BeamFnDataSizeBasedBufferingOutboundObserver.java:109)
    at org.apache.beam.fn.harness.BeamFnDataWriteRunner.consume(BeamFnDataWriteRunner.java:155)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:216)
    at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:179)

最佳答案

如果您还没有解决它,那么您正在发布一 strip 有空键值的消息,这会导致 Kafka 抛出异常并导致您的作业失败。

您可以使用 kafka 命令行工具将消息作为键值对发布,如下所示:

./bin/kafka-console-producer.sh \
  --broker-list localhost:9092 \
  --topic my-topic \
  --property "parse.key=true" \
  --property "key.separator=:"
>my_key:my_value

关于Python - Apache Beam - Flink 运行器设置 : ReadFromKafka returns error - RuntimeError: cannot encode a null byte[],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62899399/

相关文章:

python - 将列表转换为以索引为键的字典

Python检查大量请求

python - 斐波那契数,在 Python 3 中使用单线?

apache-kafka - 如何使用 Kafka 中的整数序列化器将整数值传递给 kafka 生产者并在 kafka 消费者控制台上读回

java - Apache Flink - 无法使用 Log4j 创建每小时/每日日志文件

scala - 理解 Flink - 任务不可序列化

python - Django中未关联时如何删除多对多?

apache-kafka - 如何在两个 Kafka 流之间使用持久化的 StateStore

apache-kafka - Kafka 确认与 Kafka 提交

elasticsearch - 使用Flink Rich InputFormat创建Elasticsearch的输入格式