scala - Sqs Akka Stream 内存不足

标签 scala aws-lambda amazon-sqs akka-stream alpakka

下面的代码在运行 15 分钟内在 EC2 实例上抛出 OOO (java config xms 1024 xmx2G) 但在 intellij 上运行时不会抛出任何错误。

SqsSource(queueUrl,
      //parallelism = maxBufferSize / maxBatchSize 20 10
      SqsSourceSettings().withWaitTime(10 seconds)
        .withMaxBatchSize(10).withMaxBufferSize(20)
    ).map {
      msg => {
        val out = Source.single(msg)
          .via(messageToLambdaRequest)
          .via(lambdaRequestToLambdaResp)
          .via(lambdaRespToAggregationKeySet)
          .via(workFlow)
          .log("error while consuming events internally.")
          .withAttributes(ActorAttributes.supervisionStrategy(decider))
          .runWith(Sink.seq)

        val reducedResponse = out.map(response => {
          response.foldLeft[Response](OK)((a, b) =>
            if (a == OK && b == OK) OK else NotOK)
        })

        val messageAction = reducedResponse
          .map(res =>
            if (res == OK) {
              //log.info("finally response is OK hence message action delete is prepared. {}.", msg.messageId())
              delete(msg)
            } else
              ignore(msg)
          )
        messageAction
      }
    }
      .mapAsync(1)(identity)
      .withAttributes(ActorAttributes.supervisionStrategy(decider))
      // For the SQS Sinks and Sources the sum of all parallelism (Source) and maxInFlight (Sink)
      // must be less than or equal to the thread pool size.
      .log("error log")
      .runWith(SqsAckSink(queueUrl, SqsAckSettings(1)))

  }

我用 1.0-M3 和 1.0-RC1 都试过了。 有解决办法吗?

使用 jhat 的前 5 个对象创建直方图 -

Class   Instance Count  Total Size
class [C    1376284 2068640582
class software.amazon.awssdk.services.sqs.model.Message 332718  18632208
class java.lang.String  1375675 16508100
class [Lakka.dispatch.forkjoin.ForkJoinTask;    227 14880304
class scala.collection.immutable.$colon$colon   334396  5350336

我在这里也发现了类似的问题 - https://github.com/akka/alpakka/issues/1588

我想知道是否有一些替代方法可以解决这个问题。

最佳答案

您可以等待 RC2/1.0.0 Alpakka 发布,或者您可以同时创建您自己的 SQS 源,因为代码行数不多:

object MyVeryOwnSqsSource {

  def apply(
      queueUrl: String,
      settings: SqsSourceSettings = SqsSourceSettings.Defaults
  )(implicit sqsClient: SqsAsyncClient): Source[Message, NotUsed] =
    Source
      .repeat {
        val requestBuilder =
          ReceiveMessageRequest
            .builder()
            .queueUrl(queueUrl)
            .attributeNames(settings.attributeNames.map(_.name).map(QueueAttributeName.fromValue).asJava)
            .messageAttributeNames(settings.messageAttributeNames.map(_.name).asJava)
            .maxNumberOfMessages(settings.maxBatchSize)
            .waitTimeSeconds(settings.waitTimeSeconds)

        settings.visibilityTimeout match {
          case None => requestBuilder.build()
          case Some(t) => requestBuilder.visibilityTimeout(t.toSeconds.toInt).build()
        }
      }
      .mapAsync(settings.maxBufferSize / settings.maxBatchSize)(sqsClient.receiveMessage(_).toScala)
      .map(_.messages().asScala.toList)
      .takeWhile(messages => !settings.closeOnEmptyReceive || messages.nonEmpty)
      .mapConcat(identity)
      .buffer(settings.maxBufferSize, OverflowStrategy.backpressure)
}

然后使用它:

MyVeryOwnSqsSource(queueUrl,
      //parallelism = maxBufferSize / maxBatchSize 20 10
      SqsSourceSettings().withWaitTime(10 seconds)
        .withMaxBatchSize(10).withMaxBufferSize(20)
    ).map {
      msg => {
        val out = Source.single(msg)
          .via(messageToLambdaRequest)
          .via(lambdaRequestToLambdaResp)
          .via(lambdaRespToAggregationKeySet)
          .via(workFlow)
          .log("error while consuming events internally.")
          .withAttributes(ActorAttributes.supervisionStrategy(decider))
          .runWith(Sink.seq)

        val reducedResponse = out.map(response => {
          response.foldLeft[Response](OK)((a, b) =>
            if (a == OK && b == OK) OK else NotOK)
        })

        val messageAction = reducedResponse
          .map(res =>
            if (res == OK) {
              //log.info("finally response is OK hence message action delete is prepared. {}.", msg.messageId())
              delete(msg)
            } else
              ignore(msg)
          )
        messageAction
      }
    }
      .mapAsync(1)(identity)
      .withAttributes(ActorAttributes.supervisionStrategy(decider))
      // For the SQS Sinks and Sources the sum of all parallelism (Source) and maxInFlight (Sink)
      // must be less than or equal to the thread pool size.
      .log("error log")

关于scala - Sqs Akka Stream 内存不足,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55317415/

相关文章:

amazon-web-services - 将数据从一个 aws 队列 (SQS) 复制到另一个 SQS 的最佳方法

java - 执行 "max"的功能方法(使用递归/不使用可变变量)

java - hibernate 未找到 hibernate.cfg.xml 文件

amazon-web-services - 仅使用 AWS Lambda 记录错误

python - 通过 OpenFaaS 在 Kubernetes 中部署 FastAPI 微服务

amazon-web-services - 我们可以在AWS SQS FIFO队列中更新消息吗?

scala - 在单行中缓存一个中间变量

Scala节点匹配案例

aws-lambda - 是否可以在 lambda 的环境变量中使用 SSM 参数?

amazon-web-services - AWS Lambda 未向 AWS SQS DLQ 发送错误消息