我正在 AWS EMR 集群中运行 Apache Spark 应用程序。该应用程序从 AWS SQS 检索消息,根据消息数据进行一些计算,然后删除每条消息。
我正在具有 NAT 实例的私有(private)子网上的 VPC 中运行 EMR 集群。
我面临的问题是我无法删除该消息。我能够检索所有消息并且能够发送消息,但无法删除它们。
我在 EMR 集群上使用以下安全性
EC2 实例配置文件:EMR_EC2_DefaultRole
EMR 角色:EMR_DefaultRole
每个角色都附加了以下策略:
AmazonSQSFullAccess
、AmazonElastiCacheFullAccess
、AmazonElasticMapReduceFullAccess
、AmazonVPCFullAccess
我认为问题出在权限上,但 AmazonSQSFullAccess
授予完全权限,所以我别无选择。
这是删除消息的 Java 代码:
public class SQSMessageBroker
{
private AmazonSQS _amazonSqs;
public SQSMessageBroker()
{
// Create the SQS client
createSQSClient();
}
public void deleteMessage(String queueUrl, String receiptHandle)
{
DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest(queueUrl, receiptHandle);
_amazonSqs.deleteMessage(deleteMessageRequest);
}
private void createSQSClient()
{
_amazonSqs = new AmazonSQSClient();
_amazonSqs.setRegion(Region.getRegion(Regions.EU_WEST_1));
}
}
SQSMessageBroker
是我的应用程序中的单例。
当我在本地运行相同的代码时,一切都很好。我在本地创建了一个 AWS 用户,并将 key 和 secret 添加到 .aws
文件中。
编辑
经过大量研究和测试,我发现:
- 看起来这不是权限问题(至少对于 EMR 启动的 EC2 实例来说不是)。我连接到实例,安装了 aws cli,检索了一条消息并成功删除了它。
_amazonSqs.deleteMessage(deleteMessageRequest);
代码不会引发任何异常。看起来好像请求超时了,但没有抛出超时异常。deleteMessage
之后的任何代码都不会执行。- 我在单独的线程中处理每条消息,因此我向每个线程添加了一个
Thread.UncaughtExceptionHandler
,但那里也没有抛出异常。 - 我怀疑问题可能出在
ReceiptHandle
上,更准确地说,因为我在多台机器上运行 Spark 集群,所以我认为机器 IP、名称或类似的内容已被编码在ReceiptHandle
和deleteMessage
中可能是从不同的机器执行的,所以这就是它不起作用的原因。这就是为什么我创建了一个只有一台机器的 Spark 集群。遗憾的是我仍然无法删除该消息。
最佳答案
经过大量的调试和测试,我终于找出了问题所在。
正如预期的那样,这不是权限问题。问题是,由 EMR 启动并运行 Spark 应用程序的 EC2 实例包含特定版本的所有 AWS java 包(包括 SQS 包)。并将包含包的路径添加到 Hadoop、Yarn 和 Spark。因此,当我的应用程序启动时,它使用了计算机上已有的软件包,并且我收到了错误。 (错误记录在 Yarn 日志中。我花了一些时间才弄清楚。)
我正在使用 Maven Shade 插件为我的应用程序构建 uber jar,因此我认为我可以尝试对 AWS 包进行着色(重新定位)。这将使我能够将依赖项封装在我的应用程序中。遗憾的是这没有用。亚马逊似乎在包内使用了反射,并且他们对某些类的名称进行了硬编码,从而使着色变得无用。(在我的着色包中找不到硬编码的类)
经过一番挫折后,我找到了以下解决方案:
- 创建一个 EMR 步骤,将我的 uber jar 从 S3 下载到计算机。
- 使用以下 Spark-submit 选项创建 Spark 应用程序步骤:
--driver-class-path/path_to_your_jar/myapp.jar --class com.myapp.startapp
这里的关键是--driver-class-path
选项。您可以阅读更多相关信息here 。基本上,我将我的 uber jar 添加到 Spark 驱动程序类路径,允许应用程序使用我的依赖项。
到目前为止,这是我找到的唯一可接受的解决方案。如果您知道另一个或更好的,请发表评论或回答。
我希望这个答案可以对一些不幸的灵魂有用。这本来可以节省我几个痛苦的日子。
关于java - 无法从 EMR 中运行的 Spark 应用程序删除 AWS SQS 消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38099650/