c# - 简单的内存消息队列

标签 c# queue domain-driven-design semaphore reentrancy

我们现有的域事件实现限制(通过阻塞)一次发布到一个线程,以避免对处理程序的重入调用:

public interface IDomainEvent {}  // Marker interface

public class Dispatcher : IDisposable
{
    private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);

    // Subscribe code...

    public void Publish(IDomainEvent domainEvent)
    {
        semaphore.Wait();
        try
        {
            // Get event subscriber(s) from concurrent dictionary...

            foreach (Action<IDomainEvent> subscriber in eventSubscribers)
            {
                subscriber(domainEvent);
            }
        }
        finally
        {
            semaphore.Release();
        }
    }
    // Dispose pattern...
}

如果一个处理程序发布一个事件,这将死锁。

我如何重写它以序列化对 Publish 的调用?换句话说,如果订阅处理程序 A 发布事件 B,我将得到:

  1. 处理程序 A 调用
  2. 处理程序 B 调用

同时在多线程环境中保留对处理程序的不可重入调用的条件。

我不想更改公共(public)方法签名;例如,应用程序中没有地方可以调用方法来发布队列。

最佳答案

我们想出了一种同步进行的方法。

public class Dispatcher : IDisposable
{
    private readonly ConcurrentQueue<IDomainEvent> queue = new ConcurrentQueue<IDomainEvent>();
    private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);

    // Subscribe code...

    public void Publish(IDomainEvent domainEvent)
    {
        queue.Enqueue(domainEvent);

        if (IsPublishing)
        {
            return;
        }

        PublishQueue();
    }

    private void PublishQueue()
    {
        IDomainEvent domainEvent;
        while (queue.TryDequeue(out domainEvent))
        {
            InternalPublish(domainEvent);
        }
    }

    private void InternalPublish(IDomainEvent domainEvent)
    {
        semaphore.Wait();
        try
        {
            // Get event subscriber(s) from concurrent dictionary...

            foreach (Action<IDomainEvent> subscriber in eventSubscribers)
            {
                subscriber(domainEvent);
            }
        }
        finally
        {
            semaphore.Release();
        }

        // Necessary, as calls to Publish during publishing could have queued events and returned.
        PublishQueue();
    }

    private bool IsPublishing
    {
        get { return semaphore.CurrentCount < 1; }
    }
    // Dispose pattern for semaphore...
}

关于c# - 简单的内存消息队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36436034/

相关文章:

c# - 无法写入输出文件指定的路径/文件名太长?

c# - ASP.net .FindControl() 和 GridView 返回 null

http - 有什么方法可以通过 HTTP POST 请求访问 RabbitMQ?

c++ - C++中的线程安全队列

domain-driven-design - 事件源和读取模型生成

c# - 为什么按钮的 .CausesValidation 默认设置为 True?

问题的 Java 基础知识

C# MongoDB : How to correctly map a domain object?

domain-driven-design - DDD 和 Getter 和 Setter 的使用

c# - 单元测试帮助。如何测试消息输出到控制台?