我正在尝试从另一个 lambda 函数调用多个 lambda 函数(一个 lambda 函数,它将运行单独的并行进程)。第一个作为 cron lambda 运行,它只从 db 查询文档,然后使用文档的参数调用另一个 lambda。此 cron lambda 每五分钟运行一次并正确查询文档。我正在用两个文档测试第二个 lambda。问题是每次调用第二个 lambda 时它只处理一个文档——每次它处理另一个它在上一次调用时没有处理的文档:
例如:
- 文档 1
- 文档 2
首先,调用第二个 lambda -> process doc 1
其次,调用第二个 lambda -> process doc 2
第三,调用第二个lambda -> process doc 1
第四次调用第二个 lambda -> process doc 2
等...
第一个(cron)lambda 代码:
aws.config.update({
region : env.lambdaRegion,
accessKeyId: env.lambdaAccessKeyId,
secretAccessKey: env.lambdaSecretAccessKey,
});
const lambda = new aws.Lambda({
region: env.lambdaRegion,
});
exports.handler = async (event: any, context: any) => {
context.callbackWaitsForEmptyEventLoop = false;
return new Promise(async (resolve, reject) => {
for (let i = 0; i < 100; i++) {
const doc = await mongo.db.collection('docs').
findOneAndUpdate(
{
status: 1,
lambdaProcessing: null,
},
{ $set: { lambdaProcessing: new Date() } },
{
sort: { processedAt: 1 },
returnNewDocument: true,
},
);
if (doc.value && doc.value._id) {
const params = {
FunctionName: env.lambdaName,
InvocationType: 'Event',
Payload: JSON.stringify({ docId: doc.value._id }),
};
lambda.invoke(params);
} else {
if (doc.lastErrorObject && doc.lastErrorObject.n === 0) {
break;
}
}
}
resolve();
});
};
第二个 lambda 函数:
exports.handler = async (event: any, ctx: any) => {
ctx.callbackWaitsForEmptyEventLoop = false;
if (event && event.docId) {
const doc = await mongo.db.collection('docs').findById(event.docId);
return await processDoc(doc);
} else {
throw new Error('doc ID is not present.');
}
};
最佳答案
要在没有“丑陋”的 cronjob 解决方案的情况下并行运行多个 lambda,我建议使用类型为 Parallel
的 AWS 步骤函数。您可以在 serverless.yml
中设置逻辑,函数调用本身就是 lambda 函数。您可以通过 callback
的第二个参数传递数据。如果数据大于 32kb,我建议使用 S3 存储桶/数据库。
示例 serverless.yml
stepFunctions:
stateMachines:
test:
name: 'test'
definition:
Comment: "Testing tips-like state structure"
StartAt: GatherData
States:
GatherData:
Type: Parallel
Branches:
-
StartAt: GatherDataA
States:
GatherDataA:
Type: Task
Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${opt:stage, self:provider.stage}-firstA"
TimeoutSeconds: 15
End: true
-
StartAt: GatherDataB
States:
GatherDataB:
Type: Task
Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${opt:stage, self:provider.stage}-firstB"
TimeoutSeconds: 15
End: true
Next: ResolveData
ResolveData:
Type: Task
Resource: "arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:${self:service}-${opt:stage, self:provider.stage}-resolveAB"
TimeoutSeconds: 15
End: true
示例处理程序
module.exports.firstA = (event, context, callback) => {
const data = {
id: 3,
somethingElse: ['Hello', 'World'],
};
callback(null, data);
};
module.exports.firstB = (event, context, callback) => {
const data = {
id: 12,
somethingElse: ['olleH', 'dlroW'],
};
callback(null, data);
};
module.exports.resolveAB = (event, context, callback) => {
console.log("resolving data from a and b: ", event);
const [dataFromA, dataFromB] = event;
callback(null, event);
};
更多信息见
关于node.js - 调用多个 AWS Lambdas 不会产生并行进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56628429/