.net - 如何创建从 MSMQ 消息队列中读取的 IObservable<T>?

标签 .net msmq system.reactive

我正在从我们的 ASP.NET 站点中删除我们的电子邮件系统,该站点曾经使用系统立即发送电子邮件,以在单独的服务中处理请求以减少网站上的工作量。我正在尝试围绕一组接口(interface)设计它,以便我可以根据需要交换实现,但最初它将基于消息队列(MSMQ)将请求发送到队列,让服务接收传入请求然后处理它们。我目前大致定义了以下接口(interface):

// Sends one or more requests to be processed somehow
public interface IRequestSender
{
    void Send(IEnumerable<Request> requests);
}

// Listens for incoming requests and passes them to an observer to do the real work
public interface IRequestListener : IObservable<Request>
{
    void Start();
    void Stop();
}

// Processes a request given to it by a IRequestListener
public interface IRequestProcessor : IObserver<Request>
{
}

您会注意到 Listener 和 Processor 使用 observable 模式,因为我认为这似乎最合适。

我的问题是弄清楚如何编写 IRequestListener 的实现从 MSMQ 接收,基本上如何创建合适的 IObservable<T> ?

我发现的第一个选项是创建一个 IObservable<T>根据 MSDN documentation 给出的示例从头开始,但这似乎需要做很多管道工作。

另一种选择是使用 Reactive Extensions,因为它似乎旨在使创建 observables 变得更容易。我发现最接近将 Rx 与 MSMQ 结合使用的是这些页面:
  • Using Reactive extension (Rx) for MSMQ message receive using async pattern (queue.BeginReceive,queue.EndReceive)
  • MSMQ using Rx - Code snippet for MSMQ receive timeout problem

  • 但我不确定如何将这些示例应用到我的 IRequestListener界面。

    也欢迎任何其他想法,如果合适的话,甚至可以更改我的基本设计。

    最佳答案

    我最初确实使用了 FromAsyncPattern,但最终为它编写了一个类,因为它可以更好地处理超时和中毒消息。一旦开始,队列无论如何都是热门的 Observables。您也可以使用 Observable.Defer使其更接近 Rx 而不是 Start/Stop。

    这是 QueueObservable 的基本实现。您可以先调用ListenReceive .

    Subject<T> Subject = new Subject<T>();
    
    protected void ListenReceive()
    {
        Queue.BeginReceive(MessageQueue.InfiniteTimeout, null, OnReceive);
    }
    
    protected void OnReceive(IAsyncResult ar)
    {
        Message message = null;
    
        try
        {
            message = Queue.EndReceive(ar);
        }
        catch (TimeoutException ex)
        {
            //retry?
        }
    
        if (message != null)
            Subject.OnNext((T) message.Body);
    
        Thread.Yield();
    
        if (!IsDisposed)
            ListenReceive();
    }    
    
    public IObservable<T> AsObservable()
    {
            return Subject;
    }
    

    关于.net - 如何创建从 MSMQ 消息队列中读取的 IObservable<T>?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9191410/

    相关文章:

    .net - 单独的 Web 服务中的相同类型

    .NET Core、Windows Azure 存储。在控制台应用程序上崩溃但在网络应用程序上不崩溃?

    msmq - NServiceBus 消息队列建立

    c# - 通过 MSMQ 进行通信的 Windows 服务 - 我需要服务总线吗?

    c# - Msmq .NET api MessageQueue.Send 不会在预期的地方抛出

    c# - ActionBlock Framework 4 rx 替代方案

    c# - Microsoft 是否正在恢复 Managed DirectX?

    c# - 接收 : Ignoring updates caused by Subscribers

    c# - 我怎样才能使这个 Observable 更可重用?

    c# - 根据目标自定义复制/粘贴的内容