symfony - 将交换绑定(bind)到 Symfony 消息组件中的交换

标签 symfony rabbitmq symfony-4.4 symfony-messenger

是否可以在 Symfony 消息组件中定义交换之间的绑定(bind)? (此处为 4.4 版本)。

我知道可以将交换绑定(bind)到队列,如下所示:

transports:
            incoming:
                dsn: "%env(RABBITMQ_SHARED_URL)%"
                options:
                    queues:
                        app.pl_incoming_events:
                            binding_keys:
                                - pl.app.#
                    exchange:
                        name: my_app.incoming
                        type: topic

然后应用程序设置它们之间的交换、队列和绑定(bind)。我希望根据路由 key 将交换绑定(bind)到另一个交换具有相同的效果。

我知道我可以使用rabbitmq-bundle,但在我看来是多余的——我想保留一个组件来管理rabbitMQ。

例如,我想根据某个路由键将 other_app 交换绑定(bind)到 my_app.incoming 交换。

最佳答案

Messenger 不是 RabbitMQ 管理器,您甚至无法在同一传输中声明多个开箱即用的交换。 但由于它具有所有必需的组件,并且在这种情况下 symfony 对配置有点宽松,因此您可以滥用该系统并自己构建它。

由于我不知道您的要求,所以我会保持基本的内容,希望它能帮助您入门。

首先创建AmqpTransportFactory:

// src/Amqp/AmqpTransportFactory.php
namespace App\Amqp;

use Symfony\Component\Messenger\Transport\AmqpExt\AmqpFactory;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpTransport;
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

class AmqpTransportFactory implements TransportFactoryInterface
{
    public function createTransport(
        string $dsn, array $options, SerializerInterface $serializer
    ): TransportInterface
    {
        unset($options['transport_name']);
        $exchanges['name'] = $options['exchange']['name'];
        $exchanges['bindings'] = $options['exchange']['bindings'] ?? [];

        // Passing unknown options is deprecated in 5.1
        unset($options['exchange']['bindings']);

        $connection = Connection::fromDsn($dsn, $options);

        // Ensure our exchange is created first
        $connection->exchange()->declareExchange();
        $channel = $connection->channel();
    
        // This is normally done in the Connection, but is harder to override
        $this->createExchanges($channel, $exchanges);

        return new AmqpTransport($connection, $serializer);
    }

    public function supports(string $dsn, array $options): bool
    {
        return 0 === strpos($dsn, 'amqp://');
    }

    private function createExchanges(\AMQPChannel $channel, array $configuration): void
    {
        $factory = new AmqpFactory();

        foreach ($configuration['bindings'] as $exchange_name => $arguments) {
            $exchange = $factory->createExchange($channel);
            $exchange->setName($exchange_name);
            $exchange->setType($arguments['type'] ?? \AMQP_EX_TYPE_FANOUT);
            $exchange->setFlags($arguments['flags'] ?? \AMQP_DURABLE);
            $exchange->declareExchange();

            if (!is_array($arguments['binding_keys'])) {
                $arguments['binding_keys'] = [$arguments['binding_keys']];
            }

            foreach ($arguments['binding_keys'] as $key) {
                $exchange->bind($configuration['name'], $key);
            }
        }
    }
}

注册服务:

# config/services.yaml
services:
  messenger.transport.amqp.factory:
    class: App\Amqp\AmqpTransportFactory

将新配置添加到交换中:

# config/packages/messenger.yaml
exchange:
  name: my_app.incoming
  type: topic
  bindings:
    other_app:
      type: direct
      binding_keys: ['route']

它将产生以下绑定(bind):

+-----------------+------------------------+------------------------+
|     source      |      destination       |      routing_key       |
+-----------------+------------------------+------------------------+
| my_app.incoming | app.pl_incoming_events | pl.app.#               |
| my_app.incoming | other_app              | route                  |
+-----------------+------------------------+------------------------+

关于symfony - 将交换绑定(bind)到 Symfony 消息组件中的交换,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64487577/

相关文章:

java - Spring Boot AMQP @RabbitListener 没有收到消息

bash - 如何使用 docker-compose 环境变量来填充配置文件

c# - C++ 应用程序和 Xamarin Forms 应用程序之间的实时消息传递框架

php - 为什么升级到Symfony 4.4之后,我不再获得错误预览页面?

php - HWIOAuthBundle 第一个示例的问题

mongodb - 使用 composer 为 Symfony2 设置 MongoDb

javascript - 从外部文件调用 js 函数 Symfony Twig

Symfony 4 - 将 XML 转换为 YAML