apache-kafka - 如何配置 Kafka Connect Worker 将更多消息传输到 HDFS

标签 apache-kafka hdfs parquet apache-kafka-connect confluent-platform

我当前的工作设置:

NiFi 将 Avro 消息(Confluence 架构注册表引用)流式传输到 Kafka(v2.0.0、20 个分区、Confluence v5.0.0),Kafka Connect Worker(HDFS 接收器)使用 flush 将这些消息以 Parquet 格式流式传输到 HDFS。大小=70000

我的问题:

此配置工作正常,但是当我将配置更改为 flush.size=1000000 (因为 70k 消息大小最大为 5-7 Mb,但 Parquet 文件 block 大小为 256 Mb)连接工作程序返回发送获取请求时出错错误:

...
[2019-05-24 14:00:21,784] INFO [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Error sending fetch request (sessionId=1661483807, epoch=374) to node 3: java.io.IOException: Connection to 3 was disconnected before the response was read. (org.apache.kafka.clients.FetchSessionHandler)
[2019-05-24 14:00:21,784] WARN [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={mytopic-10=(offset=27647797, logStartOffset=24913298, maxBytes=1048576), mytopic-16=(offset=27647472, logStartOffset=24913295, maxBytes=1048576), mytopic-7=(offset=27647429, logStartOffset=24913298, maxBytes=1048576), mytopic-4=(offset=27646967, logStartOffset=24913296, maxBytes=1048576), mytopic-13=(offset=27646404, logStartOffset=24913298, maxBytes=1048576), mytopic-19=(offset=27648276, logStartOffset=24913300, maxBytes=1048576), mytopic-1=(offset=27647036, logStartOffset=24913307, maxBytes=1048576)}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1661483807, epoch=374)) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 3 was disconnected before the response was read
...

我的配置:

HDFS 连接器配置:

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
tasks.max=1
topics=mytopic
hdfs.url=hdfs://hdfsnode:8020/user/someuser/kafka_hdfs_sink/
flush.size=1000000

Kafka Connect Worker 配置:

bootstrap.servers=confleuntnode1:9092,confleuntnode2:9092,confleuntnode3:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://confleuntnode:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=/opt/confluent/current/share/java/

我的问题:

如何使用 Kafka Connect Worker 将更大尺寸的消息从 Kafka 流式传输到 HDFS?

最佳答案

我通过在分布式模式(而不是独立模式)下运行 connect 解决了这个问题。 现在我可以向 HDFS 写入多达 350 万条记录(约 256 mb)。但随之而来的一个新问题是:1)处理速度非常慢(1小时处理3500万条记录); 2) 无法写入大于 256 Mb 的 parquet 文件。我将发布一个新的 SO 问题。

关于apache-kafka - 如何配置 Kafka Connect Worker 将更多消息传输到 HDFS,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56321170/

相关文章:

java - 从 Java 代码运行 Linux Hadoop fs 命令

java - 如何通过网络将数据从一个HDFS集群迁移到另一个集群?

apache-spark - 将 MySQL 表转换为 Parquet 时出现 Spark 异常

apache-kafka - 消费者和消费群体是否有限制?

apache-kafka - 多节点kafka集群中的master节点

hadoop - block 池如何在 HDFS 联合中运行

parquet - PyArrow 从 S3 中的文件中获取元数据

hive 不会更改 Parquet 架构

hadoop - Spark 流 "ERROR JobScheduler: error in job generator"

apache-kafka - KAFKA 重启问题 : Unable to restart kafka without deleting/tmp/kafka-logs