etl - 我们如何在benthos中生成多个输出文件?

标签 etl pipeline data-pipeline benthos

输入数据:

{ "name": "Coffee", "price": "$1.00" } 
{ "name": "Tea", "price": "$2.00" } 
{ "name": "Coke", "price": "$3.00" } 
{ "name": "Water", "price": "$4.00" }

扩展.yaml

input:
  label: ""
  file:
    paths: [./input/*]
    codec: lines
    max_buffer: 1000000
    delete_on_finish: false
pipeline:
  processors:  
    - bloblang: |
        root.name = this.name.uppercase()
        root.price = this.price
output:
  file:
    path: "result/file1.log"
    codec: lines

我想根据输入和定义的处理器来创建一个输出文件以满足我们的需要。

最佳答案

扩展.yaml

input:
  label: ""
  file:
    paths: [./input/*]
    codec: lines
    max_buffer: 1000000
    delete_on_finish: false
pipeline:
  processors:  
    - bloblang: |
        root.name = this.name.uppercase()
        root.price = this.price
output:
  broker:
    pattern: fan_out
    outputs:
    - file:
        path: "result/file1.log"
        codec: lines
    - file:
        path: "result/file2.log"
        codec: lines
      processors:
        - bloblang: |
            root.name = this.name.lowercase()

它将生成两个输出文件“file1.log”和file2.log”。

文件1.log

{"name":"COFFEE","price":"$1.00"}
{"name":"TEA","price":"$2.00"}
{"name":"COKE","price":"$3.00"}
{"name":"WATER","price":"$4.00"}

文件2.log

{"name":"coffee"}
{"name":"tea"}
{"name":"coke"}
{"name":"water"}

这里我们可以在 Benthos 中使用名为“Broker”的东西,如上面的示例所示。

它允许您使用一系列代理模式将消息路由到多个子输出。

您可以引用此链接了解更多信息:Broker

关于etl - 我们如何在benthos中生成多个输出文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73541526/

相关文章:

java - 尝试使用 github 操作 (maven) 部署到 github 包时出现错误代码 400

c - 使用 Unix 管道读取 infile 并对其进行排序

amazon-web-services - 使用 Go SDK 检查 AWS Data Pipeline 的状态

google-cloud-platform - 带有python flex模板的数据流 - 启动器超时

python - 在 SciKit-Learn 中使用管道排列重要性

firebase - 我应该什么时候为导出到 BigQuery 的 Firebase Analytics 数据运行每日 ETL 作业?

java - charAt :error java. lang.NullPointerException

java - Talend - 排除具有特定掩码的文件

amazon-web-services - 如何使用 AWS Glue 从 S3 导入 JSON 数据?