我有一个 Lambda
函数,它有多个 MSK
触发器配置 - 每个都针对不同的主题。
如果 Lambda
的输入 ( MSKEvent
) 可以包含多个不同的主题,则未在官方文档中找到任何信息。
官方文档提供了以下输入单个mytopic
主题的例子:
{
"eventSource":"aws:kafka",
"eventSourceArn":"arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
"records":{
"mytopic-0":[
{
"topic":"mytopic",
"partition":"0",
"offset":15,
"timestamp":1545084650987,
"timestampType":"CREATE_TIME",
"value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"headers":[
{
"headerKey":[
104,
101,
97,
100,
101,
114,
86,
97,
108,
117,
101
]
}
]
}
]
}
}
但尚不清楚以下具有 2 个不同主题(mytopic
、different-topic
)的示例是否可行:
{
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
"records": {
"mytopic-0": [
{
"topic": "mytopic",
"partition": "0",
"offset": 15,
"timestamp": 1545084650987,
"timestampType": "CREATE_TIME",
"value": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"headers": [
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
}
]
}
],
"different-topic-0": [
{
"topic": "different-topic",
"partition": "0",
"offset": 15,
"timestamp": 1545084650987,
"timestampType": "CREATE_TIME",
"value": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"headers": [
{
"headerKey": [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101]
}
]
}
]
}
}
最佳答案
确实支持,你必须在单独的MSK/Kafka事件源中设置每个主题。如果您为每个事件源指定相同的消费者组名称,Lambda 甚至会使用相同的消费者组从多个主题中消费。
该事件看起来就像您在原始问题中模拟的那样。 records
顶层的字典项有一个键topic_partition
,主题也存在于每个字典项的值中的每个 Kafka 记录中。
根据我的理解,该事件是下面 Java 对象的序列化版本,它来自此处的存储库。您可以派生一个数据结构以将事件反序列化到您在 Lambda 中定位的任何平台。
另一篇文章提到在这个用例中使用 Glue (Spark Streaming)。对于大容量、大数据或分析问题,这是一个好主意。但除非您需要分布式计算解决方案,否则它可能有点矫枉过正。但消费低容量主题或不需要大量消费逻辑或分布式计算的主题非常适合 Lambda。
/*
* Copyright 2015-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with
* the License. A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/
package com.amazonaws.services.lambda.runtime.events;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
import java.util.Map;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder(setterPrefix = "with")
/** Represents a Kafka Event. **/
public class KafkaEvent {
private Map<String, List<KafkaEventRecord>> records;
private String eventSource;
private String eventSourceArn;
private String bootstrapServers;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder(setterPrefix = "with")
public static class KafkaEventRecord {
private String topic;
private int partition;
private long offset;
private long timestamp;
private String timestampType;
private String key;
private String value;
private List<Map<String, byte[]>> headers;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder(setterPrefix = "with")
public static class TopicPartition {
private String topic;
private int partition;
@Override
public String toString() {
//Kafka also uses '-' for toString()
return topic + "-" + partition;
}
}
}
关于aws-lambda - AWS Lambda 的 AWS MSK 触发器 - 同一执行上下文中的多个主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71099655/