symfony - Symfony Messenger/RabbitMQ 中的消费者错误处理

标签 symfony rabbitmq dead-letter symfony-messenger

我正在使用新的 Symfony Messenger Component 4.1 和 RabbitMQ 3.6.10-1 从我的 Symfony 4.1 Web 应用程序排队并异步发送电子邮件和短信通知。我的 Messenger 配置 (messenger.yaml) 如下所示:

framework:
    messenger:
        transports:
            amqp: '%env(MESSENGER_TRANSPORT_DSN_NOTIFICATIONS)%'

        routing:
            'App\NotificationBundle\Entity\NotificationQueueEntry': amqp

当要发送新通知时,我将其排队如下:

use Symfony\Component\Messenger\MessageBusInterface;
// ...
$notificationQueueEntry = new NotificationQueueEntry();
// [Set notification details such as recipients, subject, and message]
$this->messageBus->dispatch($notificationQueueEntry);

然后我在命令行上像这样启动消费者:

$ bin/console messenger:consume-messages

我已经实现了一个 SendNotificationHandler 服务,实际的传递发生在其中。服务配置:

App\NotificationBundle\MessageHandler\SendNotificationHandler:
    arguments:
        - '@App\NotificationBundle\Service\NotificationQueueService'
    tags: [ messenger.message_handler ]

还有类(class):

class SendNotificationHandler
{
    public function __invoke(NotificationQueueEntry $entry): void
    {
        $this->notificationQueueService->sendNotification($entry);
    }
}

到目前为止,一切顺利并且通知已发送。

现在我的问题:可能会由于(临时)网络故障而无法发送电子邮件或短信。在这种情况下,我希望我的系统在指定的时间后重试传送,最多达到指定的最大重试次数。 实现这一目标的方法是什么?

我读过Dead Letter Exchanges但是,我找不到任何有关如何将其与 Symfony Messenger 组件集成的文档或示例。

最佳答案

您需要做的是告诉 RabbitMQ,消息被拒绝而不是确认。默认情况下,信使将在 AmqpReceiver 内处理此问题。 。正如您所看到的,如果您抛出一个在处理程序内实现 RejectMessageExceptionInterface 的异常,该消息将自动被拒绝。

您还可以使用自定义中间件“模拟”此行为。我在一个小型演示应用程序中创建了类似的东西。该机制由一个中间件组成,该中间件将(序列化的)原始消息包装在新的 RetryMessage 中,并通过自定义消息总线将其发送到不同的队列,用作死信交换。然后,该消息的处理程序将解压 RetryMessage(获取原始消息并将其反序列化)并通过默认总线传输它:

参见:

这是一个基本设置,它拒绝消息并允许您立即再次使用它(!)。您可能希望在延迟消耗时添加其他信息,例如时间戳 header 以改进这一点。为此,您应该考虑编写自己的接收器、中间件和/或处理程序。

关于symfony - Symfony Messenger/RabbitMQ 中的消费者错误处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53068322/

相关文章:

azure - 如何在 Azure CLI 中显示死信消息

azure - 您可以清除Azure中的死信队列吗

php - 如何获取表单的名称 - Symfony2

php - 如何在 symfony2 中设置小数点

c - 如何用c通过rabbitmq发送和接收protobuf数据

routes - Exchange 到 Exchange 绑定(bind)如何将代理与 ‘know’ 服务器队列分离?

php - Symfony 2.6 登录随机失败

php - Symfony 4 简单的 form_login 不起作用

RabbitMQ 上的 Spring 集成和 AMQP 抛出异常 Method handleToken(byte[]) 找不到

java - 从一个队列铲到另一个队列后如何拒绝消息?