apache-kafka - Kafka 连接器未按预期运行

标签 apache-kafka apache-kafka-connect

我正在尝试使用 Kafka 文件脉冲连接器 ( https://github.com/streamthoughts/kafka-connect-file-pulse ) 将文件中的数据读取到 Kafka 主题中。
我使用以下方法启动连接器:

../bin/connect-standalone filepulse.properties connect-standalone.properties
filepulse.properties 的内容:
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
offset.storage.file.filename=/opt/kafka-tmp/connect.offsets
bootstrap.servers=localhost:9092
plugin.path=/Users/user1234/connectors/
connect-standalone.properties 的内容:
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

name=CsvSchemaSpoolDir
halt.on.error=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
offset.storage.file.filename=/opt/kafka-tmp/connect.offsets
plugin.path=/Users/plugins/connectors/
connector.class= io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector
fs.cleanup.policy.class= io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy
fs.scanner.class= io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker
fs.scan.directory.path=/opt/kafka-tmp/dir-to-process
fs.scan.filters= io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter
file.filter.regex.pattern=test.csv
fs.scan.interval.ms= 3000
internal.kafka.reporter.bootstrap.servers=localhost:9092
internal.kafka.reporter.id= connect-file-pulse-log4j-quickstart
internal.kafka.reporter.topic= connect-file-pulse-status
offset.strategy=name
read.max.wait.ms=5000
task.reader.class=io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader
topic= logs-kafka-connect
tasks.max= 1
当我将数据添加到 test.csv 时,数据不会根据配置发送到主题 logs-kafka-connect。
启动../bin/connect-standalone filepulse.properties connect-standalone.properties看起来不错,因为文件 test.csv 是 detected :
[2021-01-01 21:01:26,800] INFO Waiting 2821 ms to scan for new files. (io.streamthoughts.kafka.connect.filepulse.source.FileSystemMonitorThread:87)
[2021-01-01 21:01:29,625] INFO Scanning local file system directory '/opt/kafka-tmp/dir-to-process' (io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner:222)
[2021-01-01 21:01:29,629] INFO Completed scanned, number of files detected '1'  (io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner:224)
[2021-01-01 21:01:29,815] INFO Finished lookup for new files : '0' files selected (io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner:229)
[2021-01-01 21:01:29,815] INFO Waiting 2810 ms to scan for new files. (io.streamthoughts.kafka.connect.filepulse.source.FileSystemMonitorThread:87)
我已经明确测试了生成和订阅主题 logs-kafka-connect 的消息不使用插件并且它按预期运行,添加消息并且可以从主题中读取。因此,看来我没有正确配置插件。
我用插件配置的主题:logs-kafka-connect由插件创建,但将消息添加到它配置为监听的文件 (test.csv) 不会将消息发送到主题。如何配置插件以便添加到 test.csv 的项目发到话题logs-kafka-connect ?
更新:
现在在我看来,Kafka 并不适用于这种将文件更新流式传输到 Kafka 主题的用例。我将使用 filebeat 来实现我的目标,即合并多个日志文件以实现更轻松的日志文件检查。

最佳答案

Kafka Connect 可用于使用 FilePulse、Spooldir 等连接器将文件中的数据记录流式传输到 Kafka。
但是,如果您需要摄取日志文件(例如 log4j 应用程序文件),那么 Logstash 或 Filebeat 可能更可取,即使您也可以将 FilePulse 用于该目的。
关于您的问题,我认为问题来自配置属性:file.filter.regex.pattern=test.csv它接受一个正则表达式。
你应该使用这个值:"file.filter.regex.pattern":".*\\.csv$"免责声明:我是 Kafka Connect FilePulse 的作者

关于apache-kafka - Kafka 连接器未按预期运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65533732/

相关文章:

java - 获取 NotSerializedException - 将 Spark Streaming 与 Kafka 结合使用时

apache-kafka - Kafka 生产者/消费者打开了太多的文件描述符

json - Kafka Connect,获取 JsonConverter 的 Json Schema

junit - 有序启动和等待容器

php - 在 Ubuntu 上安装 rdkafka

hadoop - 使用Kafka Connect HDFS时,AccessControlException用户=根,访问=写…

apache-kafka - 我的 Kafka 连接器生成的 Kafka 消息的每条消息开头有两个奇怪的字节

apache-kafka - 如何配置 Kafka Connect Worker 将更多消息传输到 HDFS

apache-kafka - Kafka Consumer Group Id 和消费者再平衡问题

docker - Debezium Kafka Connect - 通过 Docker 从 Confluent Hub 加载插件时出错