c# - 通过工作线程的异步接口(interface)使用 WCF 服务,如何确保从客户端发送事件 "in order"

标签 c# multithreading silverlight wcf asynchronous

我正在编写一个 Silverlight 类库来抽象 WCF 服务的接口(interface)。 WCF 服务提供集中式日志记录服务。 Silverlight 类库为日志记录提供了一个简化的类似 log4net 的接口(interface)(logger.Info、logger.Warn 等)。我计划从类库中提供选项,以便记录的消息可以在客户端上累积并以“突发”的形式发送到 WCF 日志记录服务,而不是在每条消息发生时发送。一般来说,这运行良好。类库确实会累积消息,并将消息集合发送到 WCF 日志记录服务,在那里它们由底层日志记录框架记录。

我当前的问题是消息(来自具有单个线程的单个客户端 - 所有日志记录代码都在按钮单击事件中)在日志记录服务中交错。我意识到至少部分原因可能是由于 WCF 日志记录服务的实例化 (PerCall) 或同步。然而,我的消息似乎也是如此快速地连续发生,以至于在异步调用上留下的消息“爆发”实际上是以与生成它们不同的顺序“离开”客户端的。

我尝试按照 here 的描述设置生产者消费者队列稍微(或者应该用空引号“轻微”更改)Work 方法阻塞(WaitOne)直到异步调用返回(即直到异步回调执行)。这个想法是,当一个消息突发发送到 WCF 日志服务时,队列应该等到该突发处理完毕后再发送下一个突发。

也许我试图做的事情不可行,或者我试图解决错误的问题,(或者我只是不知道我在做什么!)。

无论如何,这是我的生产者/消费者队列代码:

  internal class ProducerConsumerQueue : IDisposable
  {
    EventWaitHandle wh = new AutoResetEvent(false);
    Thread worker;
    readonly object locker = new object();
    Queue<ObservableCollection<LoggingService.LogEvent>> logEventQueue = new Queue<ObservableCollection<LoggingService.LogEvent>>();

    LoggingService.ILoggingService loggingService;

    internal ProducerConsumerQueue(LoggingService.ILoggingService loggingService)
    {
      this.loggingService = loggingService;
      worker = new Thread(Work);
      worker.Start();
    }

    internal void EnqueueLogEvents(ObservableCollection<LoggingService.LogEvent> logEvents)
    {
      //Queue the next burst of messages
      lock(locker)
      {
        logEventQueue.Enqueue(logEvents);
        //Is this Set conflicting with the WaitOne on the async call in Work?
        wh.Set();        
      }
    }

    private void Work()
    {
      while(true)
      {
        ObservableCollection<LoggingService.LogEvent> events = null;

        lock(locker)
        {
          if (logEventQueue.Count > 0)
          {
            events = logEventQueue.Dequeue();
            if (events == null || events.Count == 0) return;            
          }
        }

        if (events != null && events.Count > 0)
        {
          System.Diagnostics.Debug.WriteLine("1. Work - Sending {0} events", events.Count);

          //
          // This seems to be the key...
          // Send one burst of messages via an async call and wait until the async call completes.
          //
          loggingService.BeginLogEvents(events, ar =>
          {
            try
            {
              loggingService.EndLogEvents(ar);
              System.Diagnostics.Debug.WriteLine("3. Work - Back");
              wh.Set();
            }
            catch (Exception ex)
            {
            }
          }, null);

          System.Diagnostics.Debug.WriteLine("2. Work - Waiting");

          wh.WaitOne();

          System.Diagnostics.Debug.WriteLine("4. Work - Finished");
        }
        else
        {
          wh.WaitOne();
        }
      }
    }

    #region IDisposable Members

    public void Dispose()
    {
      EnqueueLogEvents(null);
      worker.Join();
      wh.Close();
    }

    #endregion
  }

在我的测试中,它基本上是这样调用的:
//Inside of LogManager, get the LoggingService and set up the queue.
ILoggingService loggingService = GetTheLoggingService();
ProducerConsumerQueue loggingQueue = new ProducerConsumerQueue(loggingService);

//Inside of client code, get a logger and log with it
ILog logger = LogManager.GetLogger("test");

for (int i = 0; i < 100; i++)
{
  logger.InfoFormat("logging message [{0}]", i);
}

在内部,logger/LogManager 在将该组消息添加到队列之前累积一定数量的日志消息(比如 25)。像这样的东西:
internal void AddNewMessage(string message)
{
  lock(logMessages)
  {
    logMessages.Add(message);
    if (logMessages.Count >= 25)
    {
      ObservableCollection<LogMessage> messages = new ObservableCollection<LogMessage>(logMessages);
      logMessages.Clear();
      loggingQueue.EnqueueLogEvents(messages);
    }
  }
}

因此,在这种情况下,我希望有 4 个突发,每个突发 25 条消息。根据我的 ProducerConsumerQueue 代码中的 Debug 语句(可能不是调试这个的最佳方式?),我希望看到这样的东西:
  • 工作 - 发送 25 个事件
  • 工作 - 等待
  • 工作 - 返回
  • 工作 - 完成

  • 重复4次。

    相反,我看到的是这样的:

    *1.工作 - 发送 25 个事件

    *2.工作 - 等待

    *4.工作 - 完成

    *1.工作 - 发送 25 个事件

    *2.工作 - 等待

    *3.工作 - 返回

    *4.工作 - 完成

    *1.工作 - 发送 25 个事件

    *2.工作 - 等待

    *3.工作 - 返回

    *4.工作 - 完成

    *1.工作 - 发送 25 个事件

    *2.工作 - 等待

    *3.工作 - 返回

    *3.工作 - 返回

    *4.工作 - 完成

    (添加前导 * 以便这些行不会被 SO 自动编号)

    我想我会预料到,队列将允许添加多个消息突发,但它会在处理下一个突发之前完全处理一个突发(等待 acync 调用完成)。它似乎没有这样做。它似乎不能可靠地等待异步调用的完成。我确实有一个电话给 SetEnqueueLogEvents ,也许这是取消WaitOne来自 Work方法?

    所以,我有几个问题:
    1.我对我想要完成的事情的解释是否有意义(我的解释是否清楚,这不是一个好主意吗)?
  • 我正在尝试(从客户端传输来自单个线程的消息,按照它们发生的顺序,一次完全处理一组消息)是个好主意吗?
  • 我很亲近吗?
  • 可以做到吗?
  • 应该做吗?

  • 谢谢你的帮助!

    [编辑]
    经过更多调查并感谢布赖恩的建议,我们能够使这项工作正常进行。我已经复制了修改后的代码。关键是我们现在严格为 ProducerConsumerQueue 函数使用“wh”等待句柄。我们现在不是使用 wh 来等待异步调用完成,而是等待由 BeginLogEvents 调用返回的 res.AsyncWaitHandle。
      internal class LoggingQueue : IDisposable
      {
        EventWaitHandle wh = new AutoResetEvent(false);
        Thread worker;
        readonly object locker = new object();
        bool working = false;
    
        Queue<ObservableCollection<LoggingService.LogEvent>> logEventQueue = new Queue<ObservableCollection<LoggingService.LogEvent>>();
    
        LoggingService.ILoggingService loggingService;
    
        internal LoggingQueue(LoggingService.ILoggingService loggingService)
        {
          this.loggingService = loggingService;
          worker = new Thread(Work);
          worker.Start();
        }
    
        internal void EnqueueLogEvents(ObservableCollection<LoggingService.LogEvent> logEvents)
        {
          lock (locker)
          {
            logEventQueue.Enqueue(logEvents);
    
            //System.Diagnostics.Debug.WriteLine("EnqueueLogEvents calling Set");
    
            wh.Set();
          }
        }
    
        private void Work()
        {
          while (true)
          {
            ObservableCollection<LoggingService.LogEvent> events = null;
    
            lock (locker)
            {
              if (logEventQueue.Count > 0)
              {
                events = logEventQueue.Dequeue();
                if (events == null || events.Count == 0) return;
              }
            }
    
            if (events != null && events.Count > 0)
            {
              //System.Diagnostics.Debug.WriteLine("1. Work - Sending {0} events", events.Count);
    
              IAsyncResult res = loggingService.BeginLogEvents(events, ar =>
              {
                try
                {
                  loggingService.EndLogEvents(ar);
                  //System.Diagnostics.Debug.WriteLine("3. Work - Back");
                }
                catch (Exception ex)
                {
                }
              }, null);
    
              //System.Diagnostics.Debug.WriteLine("2. Work - Waiting");
    
              // Block until async call returns.  We are doing this so that we can be sure that all logging messages
              // are sent FROM the client in the order they were generated.  ALSO, we don't want interleave blocks of logging
              // messages from the same client by sending a new block of messages before the previous block has been
              // completely processed.
    
              res.AsyncWaitHandle.WaitOne();
    
              //System.Diagnostics.Debug.WriteLine("4. Work - Finished");
            }
            else
            {
              wh.WaitOne();
            }
          }
        }
    
        #region IDisposable Members
    
        public void Dispose()
        {
          EnqueueLogEvents(null);
          worker.Join();
          wh.Close();
        }
    
        #endregion
      }
    

    正如我在最初的问题以及对 Jon 和 Brian 的评论中提到的,我仍然不知道做所有这些工作是否是一个好主意,但至少代码做了我想要它做的事情。这意味着我至少可以选择以这种方式或其他方式(例如事后恢复秩序)而不是没有选择。

    最佳答案

    我可以建议所有这些协调都有一个简单的替代方案吗?有一个使用廉价单调递增 ID 的序列(例如使用 Interlocked.Increment() ),这样无论客户端或服务器发生什么顺序,您都可以稍后重新生成原始顺序。

    这应该让您高效灵活,异步发送您想要的任何内容,而无需等待确认,但不会丢失排序。

    显然,这意味着 ID(或可能是保证唯一的时间戳字段)需要成为 WCF 服务的一部分,但如果您控制两端,那应该相当简单。

    关于c# - 通过工作线程的异步接口(interface)使用 WCF 服务,如何确保从客户端发送事件 "in order",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4136808/

    相关文章:

    c# - 由另一个方法调用的方法上的 caSTLe windsor 拦截器

    c# - RoleEnvironment.RequestRecycle() 不触发重新启动

    c# - 在 C# 中将多个 Parallel.ForEach 合并为一个

    c# - 使用 SolidColorBrush 填充矩形不起作用

    silverlight - 将 Hunspell 与 Silverlight 结合使用

    c# - 默认情况下,查询字段提升权重的最小值和最大值是多少?

    c# - 如何在 C# linq 查询中按业务类型代码选择前 (5) 个贡献者组

    c# - 您如何同时为*所有*手机/移动设备写作?

    Java(Android),线程安全的FIFO,无需锁定?

    wpf - Graphic 元素使用位图好还是 Xaml 好