c# - 在 2008R2 上正确使用 Message Queue

标签 c# msmq message-queue windows-server-2008-r2

我不是程序员,但我想通过给他们一些指导来帮助他们。我们不再拥有任何关于 msmq 的内部专业知识。我们正在尝试使用它来将一些功能与调度应用程序集成。

日程安排应用程序通过使用自定义构建的 dll 进行网络调用来触发作业。 dll 调用 weburl。 Web 应用程序将运行其任务并将有关其执行的任务的更新发送到网站。网站将消息写入队列。调用该站点的 dll 正在监视队列中带有分配给该作业的标签的消息。当它收到最终状态消息时,它会关闭。

我们每隔几个小时就会收到以下消息。我们每小时运行近 100 个使用此方法的作业。在底部列出的代码中,jobid 对应于消息队列中消息的标签。每个作业在开始时都会发出一个 jobid,并将使用它作为它发送到该作业的 msmq 的每条消息的标签。

 System.Messaging.MessageQueueException (0x80004005): Message that the cursor is currently pointing to has been removed from the queue by another process or by another call to Receive without the use of this cursor.
  at System.Messaging.MessageQueue.ReceiveCurrent(TimeSpan timeout, Int32 action, CursorHandle cursor, MessagePropertyFilter filter, MessageQueueTransaction internalTransaction, MessageQueueTransactionType transactionType)
  at System.Messaging.MessageEnumerator.get_Current() 

这是它的代码。

  while ( running )
        {
            // System.Console.WriteLine( "Begin Peek" );
            messageQueue.Peek();
            //System.Console.WriteLine( "End Peek" );
            messageQueue.MessageReadPropertyFilter.SetAll();

            using ( MessageEnumerator enumerator = messageQueue.GetMessageEnumerator2() )
            {
                enumerator.Reset();

                while ( enumerator.MoveNext() )
                {
                    Message msg = enumerator.Current;

                    if ( msg.Label.Equals( this.jobid ) )
                    {
                        StringBuilder sb = new StringBuilder();
                        /*
                        try
                        {
                            sb.Append( "Message Source: " );
                            //sb.Append( msg.SourceMachine );
                            sb.Append( " Sent: " );
                            sb.Append( msg.SentTime );
                            sb.Append( " Label " );
                            sb.Append( msg.Label );
                            sb.Append( " ID: " );
                            sb.Append( msg.Id );
                            sb.Append( " CorrelationID: " );
                            sb.Append( msg.CorrelationId );
                            sb.Append( " Body Type: " );
                            sb.Append( msg.BodyType );
                        }
                        catch ( Exception )
                        {
                            throw;
                        }
                        finally
                        {
                            System.Console.WriteLine( sb.ToString() );
                        }
                        */
                        //System.Console.WriteLine( "Receiving Message started" );
                        using ( Message message = messageQueue.ReceiveById( msg.Id ) )
                        {
                            //System.Console.WriteLine( "Receiving Message Complete" );
                            //sb = new StringBuilder();
                            string bodyText = string.Empty;

                            try
                            {
                                System.IO.StringWriter sw = new System.IO.StringWriter( sb );
                                System.IO.StreamReader sr = new System.IO.StreamReader( message.BodyStream );

                                while ( !sr.EndOfStream )
                                {
                                    sw.WriteLine( sr.ReadLine() );
                                }
                                sr.Close();
                                sw.Close();
                                bodyText = ( string ) FromXml( sb.ToString(), typeof( string ) );
                                int indx = bodyText.IndexOf( ',' );
                                string tokens = bodyText.Substring( indx + 1 );
                                indx = tokens.IndexOf( ',' );
                                string command = tokens.Substring( 0, indx );
                                tokens = tokens.Substring( indx + 1 );
                                if ( command.Equals( COMMAND_STARTED ) )
                                {
                                    System.Console.WriteLine( "STARTED " + tokens );
                                }
                                else if ( command.Equals( COMMAND_UPDATE ) )
                                {
                                    System.Console.WriteLine( tokens );
                                }
                                else if ( command.Equals( COMMAND_ENDED_OK ) )
                                {
                                    System.Console.WriteLine( tokens );
                                    System.Console.WriteLine( "WEBJOB: Success" );
                                    finalResults = new FinalResults( 0, 0, "Success" );
                                    running = false;
                                }
                                else if ( command.Equals( COMMAND_ENDED_WARNING ) )
                                {
                                    System.Console.WriteLine( tokens );
                                    System.Console.WriteLine( "WEBJOB: Warning Issued" );
                                    finalResults = new FinalResults( 1, 1, "Warning" );
                                    running = false;
                                }
                                else if ( command.Equals( COMMAND_ENDED_FAIL ) )
                                {
                                    System.Console.WriteLine( tokens );
                                    System.Console.WriteLine( "WEBJOB: Failure" );
                                    finalResults = new FinalResults( 2, 16, "Failure" );
                                    running = false;
                                }
                            }
                            catch ( Exception )
                            {
                                throw;
                            }
                            finally
                            {
                                //System.Console.WriteLine( "Body: " + bodyText );
                            }
                        }
                    }
                }
            }
        }

        return finalResults;
    }

    MessageQueue messageQueue = null;
    string webServiceURL = "";
    Dictionary<string, string> parms = new Dictionary<string, string>();
    string jobid = "NONE";

最佳答案

kprobst 的解释很可能是正在发生的事情。即使您看到此特定消息在队列中,如果不同的应用程序(或同一应用程序的不同实例)从该队列中选择一条(任何)消息,也会使游标无效。

从本质上讲,如果多个进程从同一个队列中获取数据,则此代码无法正常工作。

关于c# - 在 2008R2 上正确使用 Message Queue,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5412145/

相关文章:

c# - 替换逗号分隔列表的单个字段中的字符

java - JMS 持久性和持久性

c# - 是否可以使用 RabbitMQ 和 gRPC 通过 .NET 对消息进行排队?

c# - 使用 32feet.NET 在 Windows 10 上进行蓝牙配对 (SSP)

c# - WPF:如何只从文本框中获取时间?

wcf - 似乎无法修复 "The protocol ' net.msmq' 没有注册 HostedTransportConfiguration 类型的实现。”

c# - 我正在尝试设置通过 MsmqIntegrationBinding 调用的 WCF 服务,但出现错误

c# - 如何配置 NServiceBus 以使用远程 MSMQ 队列?

java - 如何从队列中丢弃 Tibco EMS 消息(消费者端)

javascript - IHttpActionResult 不会返回 JSON 对象