hadoop - 来自Flume的Kafka的EOFException

标签 hadoop apache-kafka hdfs flume flume-ng

我正在尝试建立从控制台Kafka生产者到Hadoop文件系统(HDFS)的简单数据管道。我正在使用64位Ubuntu虚拟机,并按照我所遵循的指南的建议为Hadoop和Kafka创建了单独的用户。通过控制台用户使用Kafka产生的输入,并且HDFS似乎已启动并运行。

现在,我想使用Flume将输入通过管道传输到HDFS。我正在使用以下配置文件:

tier1.sources  = source1
tier1.channels = channel1
tier1.sinks = sink1

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = 127.0.0.1:2181
tier1.sources.source1.topic = test
tier1.sources.source1.groupId = flume
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.kafka.consumer.timeout.ms = 2000

tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000

tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.path = hdfs://flume/kafka/%{topic}/%y-%m-%d
tier1.sinks.sink1.hdfs.rollInterval = 5
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.channel = channel1

现在,当我使用以下命令运行Flume时
bin/flume-ng agent --conf ./conf -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n tier1

我一遍又一遍地在控制台输出中得到相同的异常:
2017-10-19 12:17:04,279 (lifecycleSupervisor-1-2) [DEBUG - org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:467)] Completed connection to node 2147483647
2017-10-19 12:17:04,279 (lifecycleSupervisor-1-2) [DEBUG - org.apache.kafka.common.network.Selector.poll(Selector.java:307)] Connection with Ubuntu-Sandbox/127.0.1.1 disconnected
java.io.EOFException
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:311)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:890)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
    at org.apache.flume.source.kafka.KafkaSource.doStart(KafkaSource.java:529)
    at org.apache.flume.source.BasicSourceSemantics.start(BasicSourceSemantics.java:83)
    at org.apache.flume.source.PollableSourceRunner.start(PollableSourceRunner.java:71)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

阻止Flume的唯一方法是杀死Java进程。

我以为这可能与Hadoop和Kafka的单独用户有关,但是即使使用Kafka用户运行所有内容,我也会得到相同的结果。我也没有在网上找到有关EOFException方法的任何信息,考虑到我刚刚遵循了“入门”指南并对所有内容使用了非常标准的配置,所以这很奇怪。

也许与上一行(“Ubuntu-Sandbox / 127.0.1.1已断开连接”)以及我的VM的配置有关?

任何帮助深表感谢!

最佳答案

您是否考虑过使用Kafka Connect(Apache Kafka的一部分)和HDFS connector代替?通常认为这已取代Flume。它易于使用,并且具有与Flume类似的基于文件的配置。

关于hadoop - 来自Flume的Kafka的EOFException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46828231/

相关文章:

python - 无法在 docker 化环境中从 Flask 连接到 Kafka

hadoop - 将 pig 结果存储在文本文件中

shell - 为什么执行 SparkR 作业会拒绝使用 Oozie 的权限?

hadoop - hdfs(namenode)中使用的命名空间和元数据的含义是什么

docker - Kafka docker镜像中的可配置延迟

apache-kafka - 工作人员未恢复 - 当前配置状态偏移量 5 落后于组分配 20,读取到配置日志末尾

hadoop - BufferedReader和Bufferedwriter用于读取和写入HDFS文件

hadoop - 如何在我的Spark Streaming应用程序中移动文件

hadoop - HDFS 文件位置/副本放置提示

hadoop - 运行start-dfs.sh时出现错误