hadoop - 如何使用 kafka-connect-hdfs 将数据从 kafka avro 控制台流式传输到 HDFS?

标签 hadoop apache-kafka apache-kafka-connect confluent-platform

我正在尝试运行 kafka-connect-hdfs 但没有成功。

我已将以下行添加到 .bash_profile 并运行“source ~/.bash_profile”

export LOG_DIR=~/logs

quickstart-hdfs.properties配置文件为

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
hdfs.url=xxx.xxx.xxx.xxx:xxxx # placeholder
flush.size=3

hadoop.conf.dir = /etc/hadoop/conf/
logs.dir = ~/logs
topics.dir = ~/topics
topics=test_hdfs

我正在按照中概述的快速入门说明进行操作 https://docs.confluent.io/current/connect/connect-hdfs/docs/hdfs_connector.html

connector-avro-stanalone.properties 文件的内容是:

bootstrap.servers=yyy.yyy.yyy.yyy:yyyy # This is the placeholder for the Kafka broker url with the appropriate port
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets

我的主目录中有 quickstart-hdfs.properties 和 connector-avro-stanalone.properties,我运行:

confluent load hdfs-sink -d quickstart-hdfs.properties

我不确定如何访问主目录中 connector-avro-stanalone.properties 文件中的信息。

当我运行:'confluent log connect' 时,出现以下错误:

[2018-04-26 17:36:00,217] INFO Couldn't start HdfsSinkConnector: (io.confluent.connect.hdfs.HdfsSinkTask:90)
org.apache.kafka.connect.errors.ConnectException: java.lang.reflect.InvocationTargetException
        at io.confluent.connect.storage.StorageFactory.createStorage(StorageFactory.java:56)
        at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:213)
        at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:101)
        at io.confluent.connect.hdfs.HdfsSinkTask.start(HdfsSinkTask.java:82)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:267)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:163)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at io.confluent.connect.storage.StorageFactory.createStorage(StorageFactory.java:51)
        ... 12 more
Caused by: java.io.IOException: No FileSystem for scheme: xxx.xxx.xxx.xxx -> This is the hdfs_url in quickstart-hdfs.properties file without the port
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
        at org.apache.hadoop.fs.FileSystem$Cache.getUnique(FileSystem.java:2691)
        at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:420)
        at io.confluent.connect.hdfs.storage.HdfsStorage.<init>(HdfsStorage.java:56)
        ... 17 more
[2018-04-26 17:36:00,217] INFO Shutting down HdfsSinkConnector. (io.confluent.connect.hdfs.HdfsSinkTask:91)

我们将不胜感激解决此问题的任何帮助。

最佳答案

Caused by: java.io.IOException: No FileSystem for scheme: xxx.xxx.xxx.xxx

您需要 hdfs.url=hdfs://xxx.yyy.zzz.abc 以及名称节点端口

此外,您还需要删除属性文件中等号周围的空格

关于hadoop - 如何使用 kafka-connect-hdfs 将数据从 kafka avro 控制台流式传输到 HDFS?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50066018/

相关文章:

java - 在Java中访问HDFS文件系统的异常

mongodb - hadoop 与 mongodb 和 hadoop vs mongodb

hadoop - 是否可以在SAP HANA中的虚拟表上创建索引?

docker - 当我尝试从docker文件运行命令以查找和删除特定日志时,为什么显示 “File not found”?

typescript - 无法连接到Kafka服务器的TCP套接字

cassandra - 融合 3.3 升级后 Kafka-cassandra 连接器失败

apache-kafka - 将 Kafka 主题导出到 MySql

amazon-web-services - 具有重叠 EC2 实例的集群

apache-kafka - 如何读取Kafka主题中的所有记录

apache-kafka - Kafka Streams JDBC Source Long 不兼容