apache-kafka - Filebeat 5.0 输出到 Kafka 多个主题

标签 apache-kafka filebeat

我在我的应用服务器上安装了 Filebeat 5.0 并且有 3 个 Filebeat 探矿者,每个探矿者都指向不同的日志路径并输出到一个名为 myapp_applog 的 kafka 主题。一切正常。

我的 Filebeat 输出配置到一个主题 - 工作

output.kafka:
    # initial brokers for reading cluster metadata
    hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"]

    # message topic selection + partitioning
    topic: 'myapp_applog'
    partition.round_robin:
      reachable_only: false

    required_acks: 1
    compression: gzip
    max_message_bytes: 1000000

我想要做的是根据条件将每个日志文件发送到单独的主题,请参阅 topics 上的文档部分.我曾尝试这样做,但没有向任何主题发送数据。有谁知道为什么我的条件不匹配或者它是正确的。我似乎可以找到有关如何正确使用“主题主题条件”的示例。

这是我的 kafka 输出到多个主题配置。

不工作
output.kafka:
    # initial brokers for reading cluster metadata
    hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"]

    # message topic selection + partitioning
    topics:
      - topic: 'myapp_applog'
        when: 
          equals:
            document_type: applog_myappapi
      - topic: 'myapp_applog_stats'
        when:
          equals:
            document_type: applog_myappapi_stats
      - topic: 'myapp_elblog'
        when:
          equals:
            document_type: elblog_myappapi
    partition.round_robin:
      reachable_only: false

    required_acks: 1
    compression: gzip
    max_message_bytes: 1000000

这是完整的 filebeat.yml 配置文件。
################### Filebeat Configuration Example #########################
############################# Filebeat ######################################
filebeat.prospectors:
    # App logs - prospector
    - input_type: log
      paths:
        - /myapp/logs/myapp.log
      exclude_lines: [".+? INFO[^*].+", ".+? DEBUG[^*].+"]
      exclude_files: [".gz$", ".tmp"]
      fields:
        api: myappapi
        environment: STG
      ignore_older: 24h
      document_type: applog_myappapi
      scan_frequency: 1s

      # Multine on Timestamp, YYYY-MM-DD
      # https://www.elastic.co/guide/en/beats/filebeat/master/multiline-examples.html 
      multiline:
        pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
        negate: true
        match: after
        max_lines: 500
        timeout: 5s

    # Server Stats - prospector
    - input_type: log
      paths:
        - /myapp/logs/serverstats.log

      # Exclude messages with log level
      exclude_lines: [".+? ERROR[^*].+", ".+? DEBUG[^*].+"]
      exclude_files: [".gz$", ".tmp"]
      fields:
        api: myappapi
        environment: STG
      ignore_older: 24h
      document_type: applog_myappapi_stats
      scan_frequency: 1s

    # ELB prospector
    -
      input_type: log
      paths:
        - /var/log/httpd/elasticbeanstalk-access_log
      document_type: elblog_myappapi
      fields:
        api: myappapi
        environment: STG
      exclude_lines: [".+? INFO[^*].+", ".+? DEBUG[^*].+"]
      exclude_files: [".gz$", ".tmp"]
      ignore_older: 24h

      # 0s, it is done as often as possible. Default: 10s
      scan_frequency: 1s
registry_file: /var/lib/filebeat/registry

############################# Output ##########################################
# Configure what outputs to use when sending the data collected by the beat.
# Multiple outputs may be used.
#----------------------------- Kafka output --------------------------------

output.kafka:
    # initial brokers for reading cluster metadata
    hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"]

    # message topic selection + partitioning
    topics:
      - topic: 'myapp_applog'
        when: 
          equals:
            document_type: applog_myappapi
      - topic: 'myapp_applog_stats'
        when:
          equals:
            document_type: applog_myappapi_stats
      - topic: 'myapp_elblog'
        when:
          equals:
            document_type: elblog_myappapi
    partition.round_robin:
      reachable_only: false

    required_acks: 1
    compression: gzip
    max_message_bytes: 1000000

############################# Logging #########################################

# There are three options for the log ouput: syslog, file, stderr.
# Under Windos systems, the log files are per default sent to the file output,
# under all other system per default to syslog.
logging:

  # Send all logging output to syslog. On Windows default is false, otherwise
  # default is true.
  to_syslog: true

  # Write all logging output to files. Beats automatically rotate files if rotateeverybytes
  # limit is reached.
  to_files: true

  # To enable logging to files, to_files option has to be set to true
  files:
    # The directory where the log files will written to.
    path: /var/log/

    # The name of the files where the logs are written to.
    name: filebeats.log

    # Configure log file size limit. If limit is reached, log file will be
    # automatically rotated
    rotateeverybytes: 10485760 # = 10MB

    # Number of rotated log files to keep. Oldest files will be deleted first.
    keepfiles: 7

  # Enable debug output for selected components. To enable all selectors use ["*"]
  # Other available selectors are beat, publish, service
  # Multiple selectors can be chained.
  #selectors: ["*" ]

  # Sets log level. The default log level is error.
  # Available log levels are: critical, error, warning, info, debug
  level: info

最佳答案

我遇到了同样的问题,并通过将输出定义为:

topics:
  - topic: '%{[type]}'
use_type: true

作为输入,您只需在 document_type: kaffka's topic 中设置
  • 输入类型:日志
    路径:
  • /路径/到/日志/文件
    document_type: "你是卡夫卡的主题 1"
  • 输入类型:日志
    路径:
  • /path/to/another/log/file

  • document_type: "你是另一个卡夫卡的话题 1"

    关于apache-kafka - Filebeat 5.0 输出到 Kafka 多个主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40952714/

    相关文章:

    docker - 错误 pipeline/output.go :100 Failed to connect to backoff(async(tcp://logstash:5044)) - ELK Filebeat . NET Core 3.1 Docker

    elasticsearch - 如何设置Elasticsearch和Filebeat

    ssl - TLS 连接 - Beats 到 Redis

    java - KafkaConsumer无法轮询消息,但kafka-console-consumer.sh可以工作,为什么?

    maven - 使用架构注册表时出现问题 :download

    apache-kafka - 从 KSQL 表中读取数据

    java - KafkaProducer不发送记录

    elasticsearch - Logstash 未打开 filebeat 的输入端口

    elasticsearch - Logstash配置错误

    elasticsearch - 在Logstash和Kafka中优先处理消息