java - 无法从 EMR 中运行的 Spark 应用程序删除 AWS SQS 消息

标签 java amazon-web-services amazon-ec2 amazon-sqs

我正在 AWS EMR 集群中运行 Apache Spark 应用程序。该应用程序从 AWS SQS 检索消息,根据消息数据进行一些计算,然后删除每条消息。

我正在具有 NAT 实例的私有(private)子网上的 VPC 中运行 EMR 集群。

我面临的问题是我无法删除该消息。我能够检索所有消息并且能够发送消息,但无法删除它们。

我在 EMR 集群上使用以下安全性 EC2 实例配置文件:EMR_EC2_DefaultRole EMR 角色:EMR_DefaultRole

每个角色都附加了以下策略: AmazonSQSFullAccessAmazonElastiCacheFullAccessAmazonElasticMapReduceFullAccessAmazonVPCFullAccess

我认为问题出在权限上,但 AmazonSQSFullAccess 授予完全权限,所以我别无选择。

这是删除消息的 Java 代码:

public class SQSMessageBroker
{
    private AmazonSQS _amazonSqs;

    public SQSMessageBroker()
    {
        // Create the SQS client
        createSQSClient();
    }

    public void deleteMessage(String queueUrl, String receiptHandle)
        {
            DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest(queueUrl, receiptHandle);

            _amazonSqs.deleteMessage(deleteMessageRequest);
        }

  private void createSQSClient()
    {
        _amazonSqs = new AmazonSQSClient();
        _amazonSqs.setRegion(Region.getRegion(Regions.EU_WEST_1));
    }
}

SQSMessageBroker 是我的应用程序中的单例。 当我在本地运行相同的代码时,一切都很好。我在本地创建了一个 AWS 用户,并将 key 和 secret 添加到 .aws 文件中。


编辑

经过大量研究和测试,我发现:

  1. 看起来这不是权限问题(至少对于 EMR 启动的 EC2 实例来说不是)。我连接到实例,安装了 aws cli,检索了一条消息并成功删除了它。
  2. _amazonSqs.deleteMessage(deleteMessageRequest); 代码不会引发任何异常。看起来好像请求超时了,但没有抛出超时异常。 deleteMessage 之后的任何代码都不会执行。
  3. 我在单独的线程中处理每条消息,因此我向每个线程添加了一个 Thread.UncaughtExceptionHandler,但那里也没有抛出异常。
  4. 我怀疑问题可能出在 ReceiptHandle 上,更准确地说,因为我在多台机器上运行 Spark 集群,所以我认为机器 IP、名称或类似的内容已被编码在 ReceiptHandledeleteMessage 中可能是从不同的机器执行的,所以这就是它不起作用的原因。这就是为什么我创建了一个只有一台机器的 Spark 集群。遗憾的是我仍然无法删除该消息。

最佳答案

经过大量的调试和测试,我终于找出了问题所在。

正如预期的那样,这不是权限问题。问题是,由 EMR 启动并运行 Spark 应用程序的 EC2 实例包含特定版本的所有 AWS java 包(包括 SQS 包)。并将包含包的路径添加到 Hadoop、Yarn 和 Spark。因此,当我的应用程序启动时,它使用了计算机上已有的软件包,并且我收到了错误。 (错误记录在 Yarn 日志中。我花了一些时间才弄清楚。)

我正在使用 Maven Shade 插件为我的应用程序构建 uber jar,因此我认为我可以尝试对 AWS 包进行着色(重新定位)。这将使我能够将依赖项封装在我的应用程序中。遗憾的是这没有用。亚马逊似乎在包内使用了反射,并且他们对某些类的名称进行了硬编码,从而使着色变得无用。(在我的着色包中找不到硬编码的类)

经过一番挫折后,我找到了以下解决方案:

  1. 创建一个 EMR 步骤,将我的 uber jar 从 S3 下载到计算机。
  2. 使用以下 Spark-submit 选项创建 Spark 应用程序步骤:

--driver-class-path/path_to_your_jar/myapp.jar --class com.myapp.startapp

这里的关键是--driver-class-path选项。您可以阅读更多相关信息here 。基本上,我将我的 uber jar 添加到 Spark 驱动程序类路径,允许应用程序使用我的依赖项。

到目前为止,这是我找到的唯一可接受的解决方案。如果您知道另一个或更好的,请发表评论或回答。

我希望这个答案可以对一些不幸的灵魂有用。这本来可以节省我几个痛苦的日子。

关于java - 无法从 EMR 中运行的 Spark 应用程序删除 AWS SQS 消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38099650/

相关文章:

amazon-web-services - 在 AWS CLI 上调用 CreateInvalidation 操作时拒绝访问

amazon-web-services - CloudFormation 使用 FindInMap 嵌套 Sub

java - 我的 Android UI 无法运行?

java - 从三个 Android 位置提供者给出最佳位置估计的方法

java - 无法将数据添加到 javaFX 中的表

javascript - 访问以www开头的网页地址时没有 'Access-Control-Allow-Origin' header 。没有 www 它可以工作 - 为什么?

hadoop - 为什么部分文件在 HIVE 输出中没有任何内容

java - 当客户端断开连接时,服务器不会删除/断开套接字

python - "decoder jpeg not available"在 AWS Elastic Beanstalk 上使用 Pillow

java - Spring Cloud AWS SQS无法在本地连接到服务端点