sockets - 异步客户端套接字ManualResetEvent阻止执行

标签 sockets asynchronous client manualresetevent

我正在尝试利用MSDN的Asynchronous Client Socket代码示例来连接和控制某些家用设备。据我了解,示例代码的 ReceiveCallback 方法使用EventWaitHandle ManualResetEventreceiveDone.WaitOne()方法的实例来保存当前线程的处理,直到该线程收到信号,表明已从远程设备传输了所有套接字的数据。在传输完所有套接字的数据之后(套接字的数据为空并且bytesRead = 0),Waithandle被删除,应用程序继续处理。

不幸的是,通过逐步执行代码,似乎在客户端最后一次从远程设备返回数据之后,ReceiveCallback从未返回以查看数据队列是否为空(即bytesRead = 0),因此绝不会在ReceiveCallback中进入 ManualResetEvent 的状态已重置且应用程序将继续处理的“else”条件。因此,由于它从不进入“其他”条件,因此从不重置ManualResetEvent并且应用程序卡住。

尽管我可以从代码中删除“receiveDone.WaitOne()”方法-允许执行而无需等待ManualResetEvent通知已接收到所有数据;这从设备返回的数据字符串通常不完整。

我是否错误地使用了此代码示例?有没有人以前看过此书,或者有任何解决此问题的经验?

2012年7月14日-更新:在进一步测试MSDN的Asynchronous Client Socket Example之后,很明显ReceiveCallback实际上重新轮询了端口,并且只有在释放套接字(即client.Shutdown)时才满足“bytesRead = 0”的条件。 (SocketShutdown.Both); client.Close();)。如果我正确理解这一点,则意味着必须关闭连接才能通过receiveDone.WaitOne()方法。如果为了满足WaitOne()Waithandle而关闭了连接,则它完全违背了应用程序的目的,因为我一直希望将连接保持打开状态,以便应用程序可以监听不断发生的设备更新。

2012年7月16日-更新:我已写信给 Microsoft技术支持,他们回答 "We're doing research on this issue. It might take some time before we get back to you." 如此看来,目前看来似乎无法通过按摩此代码来解决此挑战。

如果没有可用的“异步客户端套接字”示例代码作为编写异步通信过程的基础,请问是否有人可以建议一个更可靠的替换例程?一共有三台设备,每台设备都有自己的IP地址和端口号。因此,如果可以利用一个类,可以为每个设备创建一个实例,那将是理想的。此外,端口必须保持开放状态,以接收设备连续发送的自发更新。最后,更新没有结束字符或定义的长度来指示消息的传输已完成,因此例程必须连续轮询端口以获取可用数据。 任何建议都将不胜感激。

2012年7月18日-变通方法:在花费了大量时间尝试使 MSDN的异步客户端套接字代码示例正常工作之后,很明显,我将不得不寻找其他方法来使设备响应持续被设备识别。程序。为了希望避免他人受到大脑的伤害,我提供了我目前使用的变通方法,该变通方法似乎很有效。如果有人有任何建议,请不要犹豫,添加到这个问题!

// 
// ORIGINAL CODE ATTEMPT
//
public static Socket LutronClient;
public static String LutronResponse = String.Empty;
private const int LutronPort = 4999;
private const string LutronIP = "192.168.1.71";
private static ManualResetEvent LutronConnectDone = new ManualResetEvent(false);
private static ManualResetEvent LutronSendDone = new ManualResetEvent(false);
private static ManualResetEvent LutronReceiveDone = new ManualResetEvent(false);

private static void StartLutronClient()
    {
        try
        {
            var lutronIPAddress = IPAddress.Parse(LutronIP);
            var lutronRemoteEP = new IPEndPoint(lutronIPAddress, LutronPort);
            LutronClient = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            LutronClient.BeginConnect(lutronRemoteEP, LutronConnectCallback, LutronClient);
            LutronConnectDone.WaitOne();

            LutronSend(LutronClient, "sdl,14,100,0,S2\x0d");
            LutronSendDone.WaitOne();
            LutronReceive(LutronClient);
            LutronReceiveDone.WaitOne(new TimeSpan(5000));
            MessageBox.Show("Response received from Lutron: " + LutronResponse);
            txtBoxLutron.Text = LutronResponse;

            LutronClient.Shutdown(SocketShutdown.Both);
            LutronClient.Close();
        }
        catch (Exception e) { MessageBox.Show(e.ToString()); }
    }

    private static void LutronConnectCallback(IAsyncResult lutronAr)
    {
        try
        {
            var lutronClient = (Socket)lutronAr.AsyncState;
            lutronClient.EndConnect(lutronAr);
            LutronConnectDone.Set();
        }
        catch (Exception e) { MessageBox.Show(e.ToString()); }
    }

    private static void LutronReceive(Socket lutronClient)
    {
        try
        {
            var lutronState = new LutronStateObject { LutronWorkSocket = lutronClient };
            lutronClient.BeginReceive(lutronState.LutronBuffer, 0, LutronStateObject.BufferSize, 0, new AsyncCallback(LutronReceiveCallback), lutronState);
        }
        catch (Exception e) { MessageBox.Show(e.ToString()); }
    }

    private static void LutronReceiveCallback(IAsyncResult lutronAR)
    {
        try
        {
            var lutronState = (LutronStateObject)lutronAR.AsyncState;
            var lutronClient = lutronState.LutronWorkSocket;
            var bytesRead = lutronClient.EndReceive(lutronAR);
            if (bytesRead > 0)
            {
                lutronState.LutronStringBuilder.AppendLine(Encoding.ASCII.GetString(lutronState.LutronBuffer, 0, bytesRead));
                lutronClient.BeginReceive(lutronState.LutronBuffer, 0, LutronStateObject.BufferSize, 0, new AsyncCallback(LutronReceiveCallback), lutronState);
            }
            else
            {
                if (lutronState.LutronStringBuilder.Length > 0) { LutronResponse = lutronState.LutronStringBuilder.ToString(); }
                LutronReceiveDone.Set();
            }
        }
        catch (Exception e) { MessageBox.Show(e.ToString()); }
    }

    public static void LutronSend(Socket client, String data)
    {
        var byteData = Encoding.ASCII.GetBytes(data);
        client.BeginSend(byteData, 0, byteData.Length, 0, LutronSendCallback, client);
    }

    private static void LutronSendCallback(IAsyncResult ar)
    {
        try
        {
            var client = (Socket)ar.AsyncState;
            var bytesSent = client.EndSend(ar);
            LutronSendDone.Set();
        }
        catch (Exception e) { MessageBox.Show(e.ToString()); }
    }
    public class LutronStateObject
    {
        public Socket LutronWorkSocket;
        public const int BufferSize = 256;
        public byte[] LutronBuffer = new byte[BufferSize];
        public StringBuilder LutronStringBuilder = new StringBuilder();
    }

}

这是我使用的解决方法:
 //
 // WORK-AROUND
 //
 using System;
 using System.Windows.Forms;

 namespace _GlobalCacheInterface
 {
     public partial class GlobalCacheDataScreen : Form
     {

         //Interface objects
         private static GC_Interface _lutronInterface;
         private const int LutronPort = 4999;
         private const string LutronIP = "192.168.1.71";
         delegate void ThreadSafeLutronCallback(string text);

         private static GC_Interface _elanInterface;
         private const int ElanPort = 4998;
         private const string ElanIP = "192.168.1.70";
         delegate void ThreadSafeElanCallback(string text);

         private static GC_Interface _tuneSuiteInterface;
         private const int TuneSuitePort = 5000;
         private const string TuneSuiteIP = "192.168.1.70";
         delegate void ThreadSafeTuneSuiteCallback(string text);

         public GlobalCacheDataScreen()
         {
              InitializeComponent();

              _lutronInterface = new GC_Interface(LutronIP, LutronPort);
              _elanInterface = new GC_Interface(ElanIP, ElanPort);
              _tuneSuiteInterface = new GC_Interface(TuneSuiteIP, TuneSuitePort);

             // Create event handlers to notify application of available updated information.
             _lutronInterface.DataAvailable += (s, e) => ThreadSafeTxtBoxLutron(_lutronInterface._returnString);
             _elanInterface.DataAvailable += (s, e) => ThreadSafeTxtBoxElan(_elanInterface._returnString);
             _tuneSuiteInterface.DataAvailable += (s, e) => ThreadSafeTxtBoxTuneSuite(_tuneSuiteInterface._returnString);
             _lutronInterface.Connected += (s, e) => UpdateUI();
             _elanInterface.Connected += (s, e) => UpdateUI();
             _tuneSuiteInterface.Connected += (s, e) => UpdateUI();

             UpdateUI();
         }

         private void UpdateUI()
         {
             _buttonConnectToLutron.Enabled = !_lutronInterface._isConnected;
             _buttonConnectToElan.Enabled = !_elanInterface._isConnected;
             _buttonConnectToTuneSuite.Enabled = !_tuneSuiteInterface._isConnected;
             _buttonDisconnectFromLutron.Enabled = _lutronInterface._isConnected;
             _buttonDisconnectFromElan.Enabled = _elanInterface._isConnected;
             _buttonDisconnectFromTuneSuite.Enabled = _tuneSuiteInterface._isConnected;
             string connectLutronStatus = _lutronInterface._isConnected ? "Connected" : "Not Connected";
             string connectElanStatus = _elanInterface._isConnected ? "Connected" : "Not Connected";
             string connectTuneSuiteStatus = _tuneSuiteInterface._isConnected ? "Connected" : "Not Connected";
             _textBoxLutronConnectStatus.Text = connectLutronStatus;
             _textBoxElanConnectStatus.Text = connectElanStatus;
             _textBoxTuneSuiteConnectStatus.Text = connectTuneSuiteStatus;
         }


         private void ThreadSafeTxtBoxLutron(string message) { if (_lutronRichTextRxMessage.InvokeRequired) { var d = new ThreadSafeLutronCallback(ThreadSafeTxtBoxLutron); _lutronRichTextRxMessage.Invoke(d, new object[] { message }); } else { _lutronRichTextRxMessage.Text = message; } }     
         private void ThreadSafeTxtBoxElan(string message) { if (_elanRichTextRxMessage.InvokeRequired) { var d = new ThreadSafeElanCallback(ThreadSafeTxtBoxElan); _elanRichTextRxMessage.Invoke(d, new object[] { message }); } else { _elanRichTextRxMessage.Text = message; if (message.EndsWith("\r")) { MessageBoxEx.Show(message, "Message from Lutron Elan", 1000); } } }
         private void ThreadSafeTxtBoxTuneSuite(string message) { if (_tuneSuiteRichTextRxMessage.InvokeRequired) { var d = new ThreadSafeTuneSuiteCallback(ThreadSafeTxtBoxTuneSuite); _tuneSuiteRichTextRxMessage.Invoke(d, new object[] { message }); } else { _tuneSuiteRichTextRxMessage.Text = message; if (message.EndsWith("\r")) { MessageBoxEx.Show(message, "Message from TuneSuite", 1000); } } }

         private void _buttonConnectToLutron_Click(object sender, EventArgs e) { _lutronInterface.Connect(); }
         private void _buttonDisconnectFromLutron_Click(object sender, EventArgs e) { _lutronInterface.Disconnect(); }
         private void _buttonConnectToElan_Click(object sender, EventArgs e) { _elanInterface.Connect(); }
         private void _buttonDisconnectFromElan_Click(object sender, EventArgs e) { _elanInterface.Disconnect(); }
         private void _buttonConnectToTuneSuite_Click(object sender, EventArgs e) { _tuneSuiteInterface.Connect(); }
         private void _buttonDisconnectFromTuneSuite_Click(object sender, EventArgs e) { _tuneSuiteInterface.Disconnect(); }
         private void _buttonLutronSendMessage_Click(object sender, EventArgs e) { _lutronInterface.SendCommand(_lutronRichTextTxMessage.Text); }
         private void _buttonElanSendMessage_Click(object sender, EventArgs e) { _elanInterface.SendCommand(_elanRichTextTxMessage.Text); }
         private void _buttonTuneSuiteSendMessage_Click(object sender, EventArgs e) { _tuneSuiteInterface.SendCommand(_elanRichTextTxMessage.Text); }
         private void _buttonLightOn_Click(object sender, EventArgs e) { _lutronInterface.SendCommand("sdl,14,100,0,S2"); }
         private void _buttonLightOff_Click(object sender, EventArgs e) { _lutronInterface.SendCommand("sdl,14,0,0,S2"); }
         private void _buttonStereoOnOff_Click(object sender, EventArgs e) { _elanInterface.SendCommand("sendir,4:3,1,40000,4,1,21,181,21,181,21,181,21,181,21,181,21,181,21,181,21,181,21,181,21,181,21,181,21,800"); }
         private void _button30_Click(object sender, EventArgs e) { _tuneSuiteInterface.SendCommand("\xB8\x4D\xB5\x33\x30\x00\x30\x21\xB8"); }
         private void _button31_Click(object sender, EventArgs e) { _tuneSuiteInterface.SendCommand("\xB8\x4D\xB5\x33\x31\x00\x30\x21\xB8"); }
         private void _button26_Click(object sender, EventArgs e) { _tuneSuiteInterface.SendCommand("\xB8\x4D\xB5\x32\x36\x00\x30\x21\xB8"); }
     }
 }

和GC_Interface类:
 using System;
 using System.Net;
 using System.Net.Sockets;
 using System.Text;
 using System.Windows.Forms;

 namespace _GlobalCacheInterface
 {
     class GC_Interface
     {
         // Declare an event handler to notify when updates are available.
         public event EventHandler<EventArgs> DataAvailable;
         public string _returnString = "";

         // Declare an event handler to notify status of connection.
         public event EventHandler<EventArgs> Connected;
         public bool _isConnected;

         public AsyncCallback ReceiveCallback;
         public Socket Client;
         private string _ipAddress;
         private int _port;
         private bool _waitForEndCharacter;
         private byte _endCharacter;
         byte[] m_DataBuffer = new byte[10];
         IAsyncResult m_Result;

         public GC_Interface(string ipAddress, int port) { Init(ipAddress, port, false, 0); }

         private void Init(string ipAddress, int port, bool waitForEndCharacter, byte endCharacter)
         {
             _ipAddress = ipAddress;
             _port = port;
             _waitForEndCharacter = waitForEndCharacter;
             _endCharacter = endCharacter;
             _isConnected = false;
         }

         public bool Connect()
         {
             try
             {
                 // Create a TCP/IP socket.
                 Client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

                 // Establish the remote endpoint for the socket.
                 var address = IPAddress.Parse(_ipAddress);
                 var remoteEP = new IPEndPoint(address, _port);

                 // Connect to the remote endpoint.
                 Client.Connect(remoteEP);
                 if (Client.Connected)
                 {
                     _isConnected = true;
                     ConnectedEventHandler();
                     WaitForData();
                 }
                 return true;
             }
             catch (SocketException se) { MessageBox.Show("\n connection failed, is the server running?\n" + se.Message ); return false; }
         }
         public bool SendCommand(string command)
         {
             try
             {
                 // Convert the string data to byte data using ASCII encoding.
                 var byteData = Encoding.Default.GetBytes(command);
                 // Add a carraige-return to the end.  
                 var newArray = new byte[byteData.Length + 1];
                 byteData.CopyTo(newArray, 0);
                 newArray[newArray.Length - 1] = 13;
                 if (Client == null) { return false; }
                 Client.Send(newArray);
                 return true;
             }
             catch (SocketException se) {  MessageBox.Show(se.Message); return false;  }
         }
         public void WaitForData()
         {
             try
             {
                 if (ReceiveCallback == null) { ReceiveCallback = new AsyncCallback(OnDataReceived); }
                 var theSocPkt = new SocketPacket { thisSocket = Client };
                 m_Result = Client.BeginReceive(theSocPkt.DataBuffer, 0, theSocPkt.DataBuffer.Length, SocketFlags.None, ReceiveCallback, theSocPkt);
             }
             catch (SocketException se) { MessageBox.Show(se.Message); }
         }
         public class SocketPacket
         {
             public System.Net.Sockets.Socket thisSocket;
             public byte[] DataBuffer = new byte[1];
         }
         public void OnDataReceived(IAsyncResult asyn)
         {
             try
             {
                  SocketPacket theSockId = (SocketPacket)asyn.AsyncState;
                 var iRx = theSockId.thisSocket.EndReceive(asyn);
                 char[] Chars = new char[iRx + 1];
                 System.Text.Decoder d = System.Text.Encoding.UTF8.GetDecoder();
                 int CharLen = d.GetChars(theSockId.DataBuffer, 0, iRx, Chars, 0);
                 System.String szData = new System.String(Chars);
                 _returnString = _returnString + szData.Replace("\0", "");
                 // When an update is received, raise DataAvailable event
                 DataAvailableEventHandler();
                 WaitForData();
             }
             catch (ObjectDisposedException) { System.Diagnostics.Debugger.Log(0, "1", "\nOnDataReceived: Socket has been closed\n"); }
             catch (SocketException se) { MessageBox.Show(se.Message); }
         }
         public bool Disconnect()
         {
              try
              {
                  if (Client == null) { return false; }
                  Client.Close(); 
                  Client = null;
                  _isConnected = false;
                  return true;
              }
              catch (Exception) { return false; }
         }
         protected virtual void DataAvailableEventHandler()
         {
             var handler = DataAvailable;
             if (handler != null) { handler(this, EventArgs.Empty); }
         }
         protected virtual void ConnectedEventHandler()
         {
             var handler = Connected;
             if (handler != null) { handler(this, EventArgs.Empty); }
         }

     }
 }

最佳答案

我遇到了同样的问题,在代码中添加了“可用”检查,从而解决了我的问题。下面是修改后的代码。

private static void ReceiveCallback( IAsyncResult ar ) {
        try {
            StateObject state = (StateObject) ar.AsyncState;
            Socket client = state.workSocket;

            int bytesRead = client.EndReceive(ar);
            if (bytesRead > 0) {
                state.sb.Append(Encoding.ASCII.GetString(state.buffer, 0, bytesRead));
                // Check if there is anymore data on the socket
                if (client.Available > 0) {
                    client.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReceiveCallback), state);
                }
            }

            if (bytesRead == 0 || client.Available == 0) {
                if (state.sb.Length > 1) {
                    response = state.sb.ToString();
                }
                receiveDone.Set();
            }
        } catch (Exception e) {
            Console.WriteLine(e.ToString());
        }
    }

希望能有所帮助。

关于sockets - 异步客户端套接字ManualResetEvent阻止执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11453939/

相关文章:

Linux:INET 监听套接字绑定(bind)的更细粒度控制?

python - Salt - 如何异步获取 cmd_async 输出

缺少 JBoss 作为客户端 5.1.0.GA 存储库

java - 我有一个 keystore 文件,如何在 Android 应用程序中为 sslContext 提供 keyManagers?

c - 使用 SO_BINDTODEVICE 接口(interface)上没有流量绑定(bind)

c++ - 无法从端口 2368 (Linux) C 接收 UDP 流

java - 在后台接收消息

java - AsyncListener#onError 的用法

python - 在Tornado中进行异步xmlrpc(客户端调用)

WCF、REST、SSL、客户端、自定义证书验证