node.js - 试图理解作为 api 网关 websockets 连接逻辑一部分的 lambda 函数

标签 node.js amazon-web-services websocket aws-lambda aws-api-gateway

TLDR:我如何从 mqtt 请求向 aws iot 发送一个短负载到 aws lambda,它通过 apigateway 与在 linux 本地运行的电子应用程序建立了开放连接。

我有一个带有以下代码的 esp8266 作为 init.js 此代码成功将其消息发送到 aws iot,并设置了一个规则来触发名为 sendmessage 的 lambda。现在这个 sendmessage lambda 通过 websockets 连接到我的 linux 机器上本地的 Electon 应用程序。我能够通过 websockets 从 Electron 应用程序发送消息到 api 网关 wss url。我按照这个例子 here它使用 api 网关和 aws lambdas(一个是 sendmessage lambda)设置所有 websockets。

load("api_config.js");
load("api_gpio.js");
load("api_mqtt.js");
load("api_sys.js");
load("api_timer.js");

let pin = 0;
GPIO.set_button_handler(
  pin,
  GPIO.PULL_UP,
  GPIO.INT_EDGE_NEG,
  50,
  function (x) {
    let res = MQTT.pub(
      "mOS/topic1",
      JSON.stringify({ action: "sendmessage", data: "pushed" }),
      1
    );

    print(res);
    print("Published:", res ? "yes" : "no");
    let connected = MQTT.isConnected();

    print(connected);
  },
  true
);
print("Flash button is configured on GPIO pin", pin);
print("Press the flash button now!");

我知道从 iot 到 sendmessage lambda 的消息需要是一个 websockets 消息,但它只有最小对象 {"action":"sendmessage","data":"hello world"} 它缺少 websocket 需要的一堆信息。但是我不需要 aws iot 和 sendmessage lambda 之间的 websocket 连接,我需要它从 IOT -> sendmessage lambda 以最小的有效负载 -> electron app 通过 websockets 和来自 IOT 的有效负载。

发送消息 lambda

// Copyright 2018-2020Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT-0

const AWS = require('aws-sdk');

const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION });

const { TABLE_NAME } = process.env;

exports.handler = async event => {
  let connectionData;
  
  try {
    connectionData = await ddb.scan({ TableName: TABLE_NAME, ProjectionExpression: 'connectionId' }).promise();
  } catch (e) {
    return { statusCode: 500, body: e.stack };
  }
  
  const apigwManagementApi = new AWS.ApiGatewayManagementApi({
    apiVersion: '2018-11-29',
    endpoint: event.requestContext.domainName + '/' + event.requestContext.stage
  });
  
  const postData = JSON.parse(event.body).data;
  
  const postCalls = connectionData.Items.map(async ({ connectionId }) => {
    try {
      await apigwManagementApi.postToConnection({ ConnectionId: connectionId, Data: postData }).promise();
    } catch (e) {
      if (e.statusCode === 410) {
        console.log(`Found stale connection, deleting ${connectionId}`);
        await ddb.delete({ TableName: TABLE_NAME, Key: { connectionId } }).promise();
      } else {
        throw e;
      }
    }
  });
  
  try {
    await Promise.all(postCalls);
  } catch (e) {
    return { statusCode: 500, body: e.stack };
  }

  return { statusCode: 200, body: 'Data sent.' };
};

onconnect lambda

// SPDX-License-Identifier: MIT-0

const AWS = require('aws-sdk');

const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION });

exports.handler = async event => {
  const putParams = {
    TableName: process.env.TABLE_NAME,
    Item: {
      connectionId: event.requestContext.connectionId
    }
  };

  try {
    await ddb.put(putParams).promise();
  } catch (err) {
    return { statusCode: 500, body: 'Failed to connect: ' + JSON.stringify(err) };
  }

  return { statusCode: 200, body: 'Connected.' };
};

ondisconnect lambda

// Copyright 2018-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT-0

// https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-websocket-api-route-keys-connect-disconnect.html
// The $disconnect route is executed after the connection is closed.
// The connection can be closed by the server or by the client. As the connection is already closed when it is executed, 
// $disconnect is a best-effort event. 
// API Gateway will try its best to deliver the $disconnect event to your integration, but it cannot guarantee delivery.

const AWS = require('aws-sdk');

const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10', region: process.env.AWS_REGION });

exports.handler = async event => {
  const deleteParams = {
    TableName: process.env.TABLE_NAME,
    Key: {
      connectionId: event.requestContext.connectionId
    }
  };

  try {
    await ddb.delete(deleteParams).promise();
  } catch (err) {
    return { statusCode: 500, body: 'Failed to disconnect: ' + JSON.stringify(err) };
  }

  return { statusCode: 200, body: 'Disconnected.' };
};

在我的电子应用程序中,我有以下代码来测试 websocket,但我收到了一个禁止的错误。然而与 wscat 一起工作......

"use strict";
const { app, BrowserWindow } = require("electron");
const { Notification } = require("electron");
const WebSocket = require("ws");

function createWindow() {
  const win = new BrowserWindow({
    width: 800,
    height: 600,
    webPreferences: {
      nodeIntegration: true,
    },
  });

  win.loadFile("index.html");
  win.webContents.openDevTools();
}

app.whenReady().then(createWindow);

app.on("window-all-closed", () => {
  if (process.platform !== "darwin") {
    app.quit();
  }
});

app.on("activate", () => {
  if (BrowserWindow.getAllWindows().length === 0) {
    createWindow();
  }
});

// Tell express to use the body-parser middleware and to not parse extended bodies

const url = "wss://random.execute-api.us-east-1.amazonaws.com/Prod";
const connection = new WebSocket(url);

connection.onopen = () => {
  connection.send("hello world");
};

connection.onmessage = (e) => {
  console.log(e.data);
};

connection.onerror = (error) => {
  console.log(`WebSocket error: ${error}`);
};

function showNotification() {
  const notification = {
    title: "Basic Notification",
    body: `notification`,
  };

  new Notification(notification).show();
}

app.whenReady().then(createWindow).then(showNotification);

我现在设置我的 mqtt 事件以将相同的数据发送到 lambda 但我在 lambda 中收到以下错误

{
    "errorType": "TypeError",
    "errorMessage": "Cannot read property 'domainName' of undefined",
    "stack": [
        "TypeError: Cannot read property 'domainName' of undefined",
        "    at Runtime.exports.handler (/var/task/app.js:29:28)",
        "    at processTicksAndRejections (internal/process/task_queues.js:97:5)"
    ]
}

更新: 这是我的最后一个 lambda,在从 IOT 接收到事件后,我向 wss 地址发送了一条消息,但它不起作用它控制台记录事件但不触发任何 ws.on 函数

// const axios = require('axios')
// const url = 'http://checkip.amazonaws.com/';
const WebSocket = require("ws");
let response;

/**
 *
 * Event doc: https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html#api-gateway-simple-proxy-for-lambda-input-format
 * @param {Object} event - API Gateway Lambda Proxy Input Format
 *
 * Context doc: https://docs.aws.amazon.com/lambda/latest/dg/nodejs-prog-model-context.html
 * @param {Object} context
 *
 * Return doc: https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html
 * @returns {Object} object - API Gateway Lambda Proxy Output Format
 *
 */
exports.lambdaHandler = async (event, context) => {
  try {
    // const ret = await axios(url);

    console.log(event);

    const url = "wss://obsf.execute-api.us-east-1.amazonaws.com/Prod";
    const ws = new WebSocket(url);

    var test = { action: "sendmessage", data: "hello world from button" };

    ws.on("open", function open() {
      ws.send(JSON.stringify(test));
    });

    ws.on("message", function incoming(data) {
      console.log(data);
    });

    response = {
      statusCode: 200,
      body: JSON.stringify({
        message: "hello world",
        // location: ret.data.trim()
      }),
    };
  } catch (err) {
    console.log(err);
    return err;
  }

  return response;
};

更新:最后我试过了,我什至没有得到错误,我知道 ws 在那里,因为如果我控制它,它会返回一个带有一堆函数的大对象

    console.log(ws); this returns a large object

    ws.on("error", console.error); this does nothing

最佳答案

您似乎正在设置 1 个 lambda 来处理 2 个触发源,一个是 IoT 服务,另一个是 API Gateway Websocket。由于您使用 1 个 lambda,因此您必须处理请求来自源的情况:

  1. 虽然 event.requestContext 在从 API 网关触发请求时可用,但在从 IoT 服务触发请求时不可用(请在此处检查 IoT 事件对象 https://docs.aws.amazon.com/lambda/latest/dg/services-iotevents.html )。因此,您遇到的错误(无法读取未定义的属性“domainName”)就是关于此的。您应该关闭来自 IoT 服务的 lambda 触发器或处理来自 IoT 服务的请求。
  2. 我不确定禁止错误,但它更像是您向 API 网关 WS 发送了非结构化消息,它应该是 connection.send(JSON.stringify({ action: "sendmessage", data: " Hello World "})); 而不是 connection.send("hello world");

根据帖子更新编辑:

I know ws is there because if I console it it returns a big object with a bunch of functions

Lambda 函数并不是真正的服务器,它是一个实例 Node 环境(这就是它被称为 FUNCTION 的原因),Lambda 函数并不像您认为的普通 Nodejs 应用程序那样工作,它的容器( Node 环境)通常是只要它的工作完成就会停止(或卡住),所以你不能像普通服务器一样保持它的容器事件。 这就是为什么当您可以控制台记录 Websocket 对象时,您不能让它保持事件状态,每当您返回/响应时,NodeJS 容器已经停止。

由于您不能使用 Websocket 对象在 Lambda 中打开 WS 连接,Amazon 提供了一种通过 API 网关来实现的方法。我们使用 API Gateway Websocket 的方式也不同于普通服务器,它类似于:

  • 用户 -> 请求 API 网关连接到 websocket -> 调用 Lambda 1(onconnect 函数)
  • 用户 -> 请求 API 网关通过 Websocket 发送消息 -> 调用 Lambda 2(发送消息函数)
  • 用户 -> 请求 API 网关关闭连接 -> 调用 Lambda 3(ondisconnect 函数)

以上3个设置是在API网关(https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-websocket-api-integrations.html)中配置的,可以处理3个函数onconnectsendmessageondisconnect的逻辑根据我们的设计方式,在 1 个 lambda 或 3 个 lambda 函数中,我检查了你的 3 个 lambda 函数,它看起来没问题。

我看到您想使用 IoT,但我不确定为什么。您应该先测试您的 Websocket API,不要与 IoT 相关。如果您能在这里说出您想要实现的目标,那就更好了,因为 IoT 的工作方式更像是发布/订阅/消息传递 channel ,我认为没有必要在这里使用它。

关于node.js - 试图理解作为 api 网关 websockets 连接逻辑一部分的 lambda 函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64759924/

相关文章:

amazon-ec2 - 作为创建或更新触发器 : command not found

python - Boto3 中的异常 - botocore.exceptions.EndpointConnectionError

php - 在后台进程中运行 bash 文件问题

node.js - 替换 NPM/Node 中的包

amazon-web-services - 为 AWS ElastiCache 启用 BGSAVE

node.js - 使用 NodeJS 进行无 session

java - 使用 Spring WebSocket 的 SimpMessagingTemplate 进行多端点配置

javascript - 如何确保通过 socket.io 的消息已收到客户端?

Javascript 字符串是否位于括号之间?

node.js - 服务器未收到客户端的鼠标移动数据