java - Flume1.6 spooldir 源仅摄取文件的一部分

标签 java flume

感谢您阅读本文

我正在尝试使用flume-1.6收集日志。但我发现并非全部 日志文件是使用假脱机导向器源摄取的。请给 您的建议!!

在一次测试中,我的日志文件 (170M) 有 369,189 行,但在另一端我只收到 169,335 行。当我检查flume.log时,它说已到达文件末尾并将.COMPLETED添加到原始日志文件中。

我尝试使用不同的日志文件,大约有 300,000 行,并在另一端收到 52,410 条记录。

这是背景和配置:

  1. 每个日志文件大小约为 200M。

  2. flume配置了spooldir源、文件 channel 和kafka接收器,如下:

    #agent definition
    log_agent.sources = spooldirSrc 
    log_agent.channels = fileChannel 
    log_agent.sinks = kafkaSink 
    
    log_agent.sources.spooldirSrc.channels = fileChannel
    log_agent.sinks.kafkaSink.channel = fileChannel
    
    # source define
    log_agent.sources.spooldirSrc.type = spooldir
    log_agent.sources.spooldirSrc.spoolDir=/log_path/
    
    
    # kafkaSink definition
    log_agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
    log_agent.sinks.kafkaSink.topic=log-topic
    log_agent.sinks.kafkaSink.brokerList=kafka-host-1:9092,kafka-host-1:9092,kafka-host-1:9092
    log_agent.sinks.kafkaSink.requiredAcks=1
    log_agent.sinks.kafkaSink.batchSize=100
    
    # fileChannel definition
    log_agent.channels.fileChannel.type=file
    log_agent.channels.fileChannel.checkpointDir=/path/checkpoint/
    log_agent.channels.fileChannel.dataDirs=/path/data
    log_agent.channels.fileChannel.capacity=100000
    

我读到flume document , spooldir source 默认使用 Line Deserializer。 我下载了 Flume-1.6 源代码,并将文件读取末尾附近的打印行添加到 ReliableSpoolingFileEventReader,它负责从日志中读取。 看来读者在到达 EOF 之前就提前结束了。

如有任何建议,我们将不胜感激!

最佳答案

终于找到原因了。 LineDeserializer 依赖 ResettableFileInputStream 来读取文件。可重置文件输入流 读取多代码点 unicode 字符时出现问题。该问题在flume1.7中得到解决。

我用source code替换了flume1.6 ResettableFileInputStream从flume1.7开始,重新编译并替换flume1.6/lib中的flume-ng-core-1.6.0.jar。 然后 spooldir source 就可以消耗文件的所有内容。

希望这可以帮助遇到同样问题的人。

请引用apache issue site .

关于java - Flume1.6 spooldir 源仅摄取文件的一部分,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37135772/

相关文章:

java - 无法从根上下文 Autowiring 子 bean(在 Web 上下文中定义)

java - 是否可以查看 Java 类文件字节码

hadoop - 在 pig 中读取二进制 avro

excel - Flume 加载 csv 文件优于 hdfs 接收器

hadoop - 将数据从 Rest API 加载到 HDFS

apache - 提高水槽性能的指导方针是什么

java - Gin 或 Guice 的 singleton 和 eagersingleton 之间的区别?

java - 有没有一种简单的方法可以使用 spring boot 从 mongodb 数据库集合中查询特定文档而不需要实体/pojo 类?

java - Selenium WebDriver Java - 我无法从 span 获取动态文本

hadoop - 我没有收到来自 Twitter 的推文