我正在使用 Rx 框架编写消息监听器。
我面临的问题是,我正在使用的库使用一个消费者,它在消息到达时发布事件。
我已经设法通过 Observable.FromEventPattern
使用传入的消息,但我对服务器中已有的消息有疑问。
目前我有以下命令链
- 创建消费者
- 使用
FromEventPattern
创建可观察序列并应用所需的转换 - 告诉消费者开始
- 订阅序列
最简单的解决方案是交换步骤 3. 和 4.,但由于它们发生在系统的不同组件中,所以我很难这样做。
理想情况下,我希望在第 4 步发生时执行第 3 步(如 OnSubscribe
方法)。
感谢您的帮助:)
PS:要添加更多细节,事件来自 RabbitMQ 队列,我正在使用 RabbitMQ.Client 包中的 EventingBasicConsumer
类。
Here你可以找到我正在研究的图书馆。具体来说,this是类(class)/方法给我带来了问题。
编辑
这是有问题的代码的精简版
void Main()
{
var engine = new Engine();
var messages = engine.Start();
messages.Subscribe(m => m.Dump());
Console.ReadLine();
engine.Stop();
}
public class Engine
{
IConnection _connection;
IModel _channel;
public IObservable<Message> Start()
{
var connectionFactory = new ConnectionFactory();
_connection = connectionFactory.CreateConnection();
_channel = _connection.CreateModel();
EventingBasicConsumer consumer = new EventingBasicConsumer(_channel);
var observable = Observable.FromEventPattern<BasicDeliverEventArgs>(
a => consumer.Received += a,
a => consumer.Received -= a)
.Select(e => e.EventArgs);
_channel.BasicConsume("a_queue", false, consumer);
return observable.Select(Transform);
}
private Message Transform(BasicDeliverEventArgs args) => new Message();
public void Stop()
{
_channel.Dispose();
_connection.Dispose();
}
}
public class Message { }
我遇到的症状是,由于我在订阅序列之前调用了 BasicConsume,所以 RabbitMQ 队列中的任何消息都会被提取但不会向下传递到管道。
因为我没有打开“autoack”,程序一停止,消息就会返回到队列中。
最佳答案
正如一些人在评论中指出的那样,正如您在问题中指出的那样,问题是由于您使用 RabbitMQ 客户端的方式造成的。
为了解决其中一些问题,我实际上所做的是创建一个 ObservableConsumer 类。这是当前使用的 EventingBasicConsumer 的替代方法。我这样做的一个原因是为了处理问题中描述的问题,但这样做的另一件事是允许您在单个连接/ channel 实例之外重新使用此消费者对象。这样做的好处是,无论 transient 连接/ channel 特征如何,都可以让您的下游 react 代码保持连线状态。
using System;
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using RabbitMQ.Client;
namespace com.rabbitmq.consumers
{
public sealed class ObservableConsumer : IBasicConsumer
{
private readonly List<string> _consumerTags = new List<string>();
private readonly object _consumerTagsLock = new object();
private readonly Subject<Message> _subject = new Subject<Message>();
public ushort PrefetchCount { get; set; }
public IEnumerable<string> ConsumerTags { get { return new List<string>(_consumerTags); } }
/// <summary>
/// Registers this consumer on the given queue.
/// </summary>
/// <returns>The consumer tag assigned.</returns>
public string ConsumeFrom(IModel channel, string queueName)
{
Model = channel;
return Model.BasicConsume(queueName, false, this);
}
/// <summary>
/// Contains an observable of the incoming messages where messages are processed on a thread pool thread.
/// </summary>
public IObservable<Message> IncomingMessages
{
get { return _subject.ObserveOn(Scheduler.ThreadPool); }
}
///<summary>Retrieve the IModel instance this consumer is
///registered with.</summary>
public IModel Model { get; private set; }
///<summary>Returns true while the consumer is registered and
///expecting deliveries from the broker.</summary>
public bool IsRunning
{
get { return _consumerTags.Count > 0; }
}
/// <summary>
/// Run after a consumer is cancelled.
/// </summary>
/// <param name="consumerTag"></param>
private void OnConsumerCanceled(string consumerTag)
{
}
/// <summary>
/// Run after a consumer is added.
/// </summary>
/// <param name="consumerTag"></param>
private void OnConsumerAdded(string consumerTag)
{
}
public void HandleBasicConsumeOk(string consumerTag)
{
lock (_consumerTagsLock) {
if (!_consumerTags.Contains(consumerTag))
_consumerTags.Add(consumerTag);
}
}
public void HandleBasicCancelOk(string consumerTag)
{
lock (_consumerTagsLock) {
if (_consumerTags.Contains(consumerTag)) {
_consumerTags.Remove(consumerTag);
OnConsumerCanceled(consumerTag);
}
}
}
public void HandleBasicCancel(string consumerTag)
{
lock (_consumerTagsLock) {
if (_consumerTags.Contains(consumerTag)) {
_consumerTags.Remove(consumerTag);
OnConsumerCanceled(consumerTag);
}
}
}
public void HandleModelShutdown(IModel model, ShutdownEventArgs reason)
{
//Don't need to do anything.
}
public void HandleBasicDeliver(string consumerTag,
ulong deliveryTag,
bool redelivered,
string exchange,
string routingKey,
IBasicProperties properties,
byte[] body)
{
//Hack - prevents the broker from sending too many messages.
//if (PrefetchCount > 0 && _unackedMessages.Count > PrefetchCount) {
// Model.BasicReject(deliveryTag, true);
// return;
//}
var message = new Message(properties.HeaderFromBasicProperties()) { Content = body };
var deliveryData = new MessageDeliveryData()
{
ConsumerTag = consumerTag,
DeliveryTag = deliveryTag,
Redelivered = redelivered,
};
message.Tag = deliveryData;
if (AckMode != AcknowledgeMode.AckWhenReceived) {
message.Acknowledged += messageAcknowledged;
message.Failed += messageFailed;
}
_subject.OnNext(message);
}
void messageFailed(Message message, Exception ex, bool requeue)
{
try {
message.Acknowledged -= messageAcknowledged;
message.Failed -= messageFailed;
if (message.Tag is MessageDeliveryData) {
Model.BasicNack((message.Tag as MessageDeliveryData).DeliveryTag, false, requeue);
}
}
catch {}
}
void messageAcknowledged(Message message)
{
try {
message.Acknowledged -= messageAcknowledged;
message.Failed -= messageFailed;
if (message.Tag is MessageDeliveryData) {
var ackMultiple = AckMode == AcknowledgeMode.AckAfterAny;
Model.BasicAck((message.Tag as MessageDeliveryData).DeliveryTag, ackMultiple);
}
}
catch {}
}
}
}
关于c# - 在使用 FromEventPattern 订阅之前捕获事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49619839/