hadoop - 基于时间的桶记录(kafka-hdfs-connector)

标签 hadoop hive apache-kafka kafka-consumer-api kafka-producer-api

我正在尝试使用 Confluent 平台提供的 kafka-hdfs-connector 将数据从 Kafka 复制到 Hive 表中。虽然我能够成功完成,但我想知道如何根据时间间隔对传入数据进行存储。例如,我希望每 5 分钟创建一个新分区。

我尝试使用 io.confluent.connect.hdfs.partitioner.TimeBasedPartitionerpartition.duration.ms 但我认为我做错了。我在 Hive 表中只看到一个分区,所有数据都进入该特定分区。像这样:

hive> show partitions test;
OK
partition
year=2016/month=03/day=15/hour=19/minute=03

并且所有 avro 对象都被复制到这个分区中。

相反,我想要这样的东西:

hive> show partitions test;
OK
partition
year=2016/month=03/day=15/hour=19/minute=03
year=2016/month=03/day=15/hour=19/minute=08
year=2016/month=03/day=15/hour=19/minute=13

最初连接器将创建路径 year=2016/month=03/day=15/hour=19/minute=03 并将在接下来的 5 天内继续将所有传入数据复制到此目录中分钟,在第 6 分钟开始时,它应该创建一个新路径,即 year=2016/month=03/day=15/hour=19/minute=08 并复制接下来 5 分钟的数据分钟进入此目录,依此类推。

这是我的配置文件的样子:

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test
hdfs.url=hdfs://localhost:9000
flush.size=3
partitioner.class=io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner
partition.duration.ms=300000
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/'minute'=MM/
locale=en
timezone=GMT
logs.dir=/kafka-connect/logs
topics.dir=/kafka-connect/topics
hive.integration=true
hive.metastore.uris=thrift://localhost:9083
schema.compatibility=BACKWARD

如果有人能指出正确的方向,那将非常有帮助。如果需要,我很乐意分享更多详细信息。不想让这个问题看起来像一个永无止境的问题。

非常感谢!

最佳答案

你在 path.format 中的分钟字段是错误的:

path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/'minute'=MM/

应该是:

path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/'minute'=mm/

关于hadoop - 基于时间的桶记录(kafka-hdfs-connector),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36036507/

相关文章:

scala - 如何使用 Spark 确定分区键/列

java - 自定义 Java 生产者中的 Kafka SSL 握手失败

apache-kafka - Kafka如何为key选择分区?

hadoop - 普通身份验证失败 : User yarn is not configured for any impersonation. impersonationUser:alluxio mapreduce 中的 root

java - reducer 数量对集群节点数量的依赖性

hadoop - 在yarn上运行spark时我们应该使用哪种模式?

hadoop - 使用自定义分隔符将数据加载到 Hive 中

java - 使用 Hadoop 进行 MapReduce, "Ouput file directory already exists"

hadoop - Presto服务器无法从配置单元目录启动

linux - Linux 重启后,Kafka 抛出 "no brokers found when trying to rebalance"