aws-lambda - AWS Lambda 的 AWS MSK 触发器 - 同一执行上下文中的多个主题

标签 aws-lambda aws-msk

我有一个 Lambda 函数,它有多个 MSK 触发器配置 - 每个都针对不同的主题。

如果 Lambda 的输入 ( MSKEvent ) 可以包含多个不同的主题,则未在官方文档中找到任何信息。

官方文档提供了以下输入单个mytopic主题的例子:

{
   "eventSource":"aws:kafka",
   "eventSourceArn":"arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
   "records":{
      "mytopic-0":[
         {
            "topic":"mytopic",
            "partition":"0",
            "offset":15,
            "timestamp":1545084650987,
            "timestampType":"CREATE_TIME",
            "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
            "headers":[
               {
                  "headerKey":[
                     104,
                     101,
                     97,
                     100,
                     101,
                     114,
                     86,
                     97,
                     108,
                     117,
                     101
                  ]
               }
            ]
         }
      ]
   }
}

但尚不清楚以下具有 2 个不同主题(mytopicdifferent-topic)的示例是否可行:

{
  "eventSource": "aws:kafka",
  "eventSourceArn": "arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
  "records": {
    "mytopic-0": [
      {
        "topic": "mytopic",
        "partition": "0",
        "offset": 15,
        "timestamp": 1545084650987,
        "timestampType": "CREATE_TIME",
        "value": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
        "headers": [
          {
            "headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
          }
        ]
      }
    ],
    "different-topic-0": [
      {
        "topic": "different-topic",
        "partition": "0",
        "offset": 15,
        "timestamp": 1545084650987,
        "timestampType": "CREATE_TIME",
        "value": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
        "headers": [
          {
            "headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
          }
        ]
      }
    ]
  }
}

最佳答案

确实支持,你必须在单独的MSK/Kafka事件源中设置每个主题。如果您为每个事件源指定相同的消费者组名称,Lambda 甚至会使用相同的消费者组从多个主题中消费。

该事件看起来就像您在原始问题中模拟的那样。 records 顶层的字典项有一个键topic_partition,主题也存在于每个字典项的值中的每个 Kafka 记录中。

根据我的理解,该事件是下面 Java 对象的序列化版本,它来自此处的存储库。您可以派生一个数据结构以将事件反序列化到您在 Lambda 中定位的任何平台。

另一篇文章提到在这个用例中使用 Glue (Spark Streaming)。对于大容量、大数据或分析问题,这是一个好主意。但除非您需要分布式计算解决方案,否则它可能有点矫枉过正。但消费低容量主题或不需要大量消费逻辑或分布式计算的主题非常适合 Lambda。

https://github.com/aws/aws-lambda-java-libs/blob/master/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events/KafkaEvent.java

/*
 * Copyright 2015-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
 * the License. A copy of the License is located at
 *
 * http://aws.amazon.com/apache2.0
 *
 * or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
 * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
 * and limitations under the License.
 */
package com.amazonaws.services.lambda.runtime.events;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
import java.util.Map;

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder(setterPrefix = "with")
/** Represents a Kafka Event. **/
public class KafkaEvent {
    private Map<String, List<KafkaEventRecord>> records;
    private String eventSource;
    private String eventSourceArn;
    private String bootstrapServers;
    
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @Builder(setterPrefix = "with")
    public static class KafkaEventRecord {
        private String topic;
        private int partition;
        private long offset;
        private long timestamp;
        private String timestampType;
        private String key;
        private String value;
        private List<Map<String, byte[]>> headers;
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @Builder(setterPrefix = "with")
    public static class TopicPartition {
        private  String topic;
        private  int partition;

        @Override
        public String toString() {
            //Kafka also uses '-' for toString()
            return topic + "-" + partition;
        }
    }
}

关于aws-lambda - AWS Lambda 的 AWS MSK 触发器 - 同一执行上下文中的多个主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71099655/

相关文章:

java - AWS Lambda 中的 S3 客户端初始化缓慢

python-3.x - 无法将 EventBridge 计划附加到 AWS SAM 中的 Lambda 函数

amazon-web-services - AWS MSK - 在 ACL 打开的情况下创建 Kafka 主题时超时

apache-kafka - 将 Confluence Schema Registry 与 MSK 结合使用

amazon-web-services - 事件完成后触发 AWS lambda

amazon-web-services - 在 AWS Lambda 执行处理程序之外缓存异步 API 结果

java - AWS lambda java : How to set the class path while compiling on the fly

apache-kafka - AWS DMS Kafka 目标端点测试失败,显示 "application error 1020912: failed to connect to database"

python - 使用 python 脚本和 boto3 创建 MSK 主题

python - 无法连接到 MSK 上的 Kafka