redis - 我可以在 Redis 中使用一个订阅者模式订阅多个主机(和集群)吗?

标签 redis publish-subscribe

我正在实现一项功能,将 redis 发布的消息集成到我创建的项目的 mongodb 中,并在测试环境中完美运行。

但我担心生产环境,我有 3 个主服务器,它们存在 12 个从集群。如果我将来自他们的消息发布到 channel 模式,我可以在一个地方订阅所有消息吗

最佳答案

是的,通过 stackexchange redis 设置是可能的,我已经完成了我的一般结构,如下所示

 public class RedisSubscriber: IRedisSubscriber
    {
        private readonly RedisConfigurationManager _config;
        private readonly IMongoDbRepository _mongoDbRepository;
        private readonly ILogger<RedisSubscriber> _logger;
        private readonly IConnectionMultiplexer _connectionMultiplexer;


        public RedisSubscriber(IServiceProvider serviceLocator, ILogger<RedisSubscriber> logger, IConnectionMultiplexer conn)
        {
            _config = (RedisConfigurationManager)serviceLocator.GetService(typeof(RedisConfigurationManager));
            _mongoDbRepository = (IMongoDbRepository)serviceLocator.GetService(typeof(IMongoDbRepository));
            _connectionMultiplexer = conn;
            _logger = logger;
        }

        public void SubScribeChannel()
        {
            _logger.LogInformation("!SubScribeChannel started!!");

            string channelName = _config.ActiveChannelName;
            var pubSub = _connectionMultiplexer.GetSubscriber();
            try
            {
                pubSub.Subscribe(channelName, async (channel, message) => await MessageActionAsync(message, channel));
            }
            catch(Exception ex)
            {
                _logger.LogInformation(String.Format("!error: {0}", ex.Message));
            }
            Debug.WriteLine("EOF");
        }

        private async Task MessageActionAsync(RedisValue message, string channel)
        {
            try
            {
                Transformer t = new Transformer(_logger);
                _logger.LogInformation(String.Format("!SubScribeChannel message received on message!! channel: {0}, message: {1}", channel, message));
                string transformedMessage = Transformer.TransformJsonStringData2Message(message);
                List<Document> documents = Transformer.Deserialize<List<Document>>(transformedMessage);

                await MergeToMongoDb(documents, channel);
                _logger.LogInformation("!Merged");
            }
            catch (Exception ex)
            {
                _logger.LogInformation(String.Format("!error: {0}", ex.Message));
            }
        }

        private async Task MergeToMongoDb(IList<Document> documents, string channelName)
        {
            try
            {
                foreach (Document doc in documents)
                {
                    TurSysPartitionedDocument td = JsonConvert.DeserializeObject<TurSysPartitionedDocument>(JsonConvert.SerializeObject(doc));
                    td.DepartureDate = td.DepartureDate.ToLocalTime();
                    td.PartitionKey = channelName;
                    TurSysPartitionedDocument isExist = await _mongoDbRepository.GetOneAsync<TurSysPartitionedDocument>(k =>
                                k.ProductCode == td.ProductCode &&
                                k.ProviderCode == td.ProviderCode &&
                                k.CabinClassName == td.CabinClassName &&
                                k.OriginAirport == td.OriginAirport &&
                                k.DestinationAirport == td.DestinationAirport &&
                                k.Adult >= td.Adult &&
                                k.DepartureDate == td.DepartureDate,
                                td.PartitionKey);

                    if (isExist != null)
                    {
                        //_logger.LogInformation(String.Format("!isExist departure date: {0}", isExist.DepartureDate));
                        isExist.SearchCount++;
                        await _mongoDbRepository.UpdateOneAsync(isExist, k => k.Adult, td.Adult);
                        await _mongoDbRepository.UpdateOneAsync(isExist, k => k.SearchCount, isExist.SearchCount);
                    }
                    else
                    {
                        //_logger.LogInformation(String.Format("!last ToLocalTime td departure date: {0}", td.DepartureDate));
                        td.SearchCount = 1;
                        await _mongoDbRepository.AddOneAsync(td);
                        //_logger.LogInformation(String.Format("!last ToLocalTime result td departure date: {0}", td.DepartureDate));
                    }
                }
            }
            catch(Exception ex)
            {
                _logger.LogInformation(String.Format("!error: {0}", ex.Message));
            }
        }


    }

关于redis - 我可以在 Redis 中使用一个订阅者模式订阅多个主机(和集群)吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55283431/

相关文章:

redis - 'hiredis' 是否支持 Redis Sentinel 和 Redis Cluster?

Redis HSCAN 多重匹配

使用 call_user_func_array 插入时 redis 丢失 key

javascript - 如何通过 pubsub 在 shell app 和微应用程序(前端)之间安全通信

java - 将Set <byte []>转换为List <String>

node.js - 如何建立在线/离线显示用户状态的好友列表?

publish-subscribe - Rebus 无法订阅具有相同 'messages' 值的多个端点

publish - 设置 GoogleCloudPubsub 消息的过期时间

ruby-on-rails - Redis + ActionController::Live 线程不会死

ruby-on-rails - 根据位置创建缓存键