C# 通过 SOCKET 读取/写入 JAVA 并存在一些并发/套接字问题

标签 c# java multithreading sockets concurrency

Rrom C#,通过 SOCKET 读取/写入 JAVA,并存在一些并发/套接字问题。

我正在尝试实现一个服务器客户端应用程序,其中服务器是Java,客户端是C#。它们通过 TCP/IP 进行通信并在它们之间交换一些二进制数据。

特别是我有一个用 Java 和 C# 定义的 Packet 类。它有一个 header 、键和值。 Java 和 C# 都以完全相同的方式向 Socket 写入和读取数据包。这样我就可以从 C# 发送请求数据包,在 Java 服务器上处理它,并将响应作为数据包发回。

原来的问题要复杂得多,但我能够将其归结为这个“简单”版本。

我已经实现了服务器和客户端,如下所述。该代码也可以在底部找到。

要让我陈述问题,你必须继续阅读:)

服务器(Java)端

在服务器端,我有一个非常虚拟的 ServerSocket 用法。它读取传入的数据包并发送回几乎相同的数据包作为响应。

客户端(C#)端 客户端有点复杂。客户端启动N(可配置) 个线程(我将其称为用户线程)。一进一出线程。所有用户线程都会创建一个具有虚拟请求数据包和唯一 ID 的 Call 对象。然后将调用添加到本地 BlockingCollection 中。

Out Thread不断读取本地BlockingCollection并将所有请求包发送到服务器

In Thread 还会不断地从服务器读取响应数据包,并将它们与 Call 对象进行匹配(记住唯一的 call id)。

如果特定 Call 对象在 5 秒间隔内没有响应,用户线程将通过打印到控制台来提示。

还有一个以 10 秒为间隔的计时器,用于打印每秒执行的事务数。

如果您到目前为止已经达到了,谢谢您:)。

现在的问题:

下面的代码是我上面描述的代码的实现,在 Mac 上的 Mono 上运行良好。在 Windows 上,当用户线程数量较少(<10)时,它也不会立即失败。当我突然增加线程数量时,客户端收到的响应数据包不知何故被损坏。就该应用程序而言,所有用户线程都会陷入困境,因为未收到对其请求的答复。 问题是为什么它们会被破坏?正如您所看到的,接触套接字的线程是输入和输出线程。但不知何故,用户线程的数量会影响客户端并阻止它。

看起来像是一些并发或套接字问题,但我可以找到它。

我已经放置了服务器(Java)和客户端(C#)的代码。他们没有任何依赖性,只需在两个(第一台服务器)上编译并运行 Main 方法就会显示问题。

如果您读到这里,我将不胜感激。

服务器代码

import java.io.*;
import java.net.*;
import java.nio.ByteBuffer;

public class DummyServer {

public static void main(String[] args) throws IOException {
    ServerSocket server = new ServerSocket(9900);
    System.out.println("Server started");
    for(;;){
        final Socket socket = server.accept();
        System.out.println("Accepting a connection");
        new Thread(new Runnable(){
            public void run() {
                try {
                    System.out.println("Thread started to handle the connection");
                    DataInputStream dis = new DataInputStream(socket.getInputStream());
                    DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
                    for(int i=0; ; i++){
                        Packet packet = new Packet();
                        packet.readFrom(dis);
                        packet.key = null;
                        packet.value = new byte[1000];
                        packet.writeTo(dos);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}
public static class Packet {
    byte[] key;
    byte[] value;
    long callId = -1;
    private int valueHash = -1;

    public void writeTo(DataOutputStream outputStream) throws IOException {
        final ByteBuffer writeHeaderBuffer = ByteBuffer.allocate(1 << 10); // 1k
        writeHeaderBuffer.clear();
        writeHeaderBuffer.position(12);
        writeHeaderBuffer.putLong(callId);
        writeHeaderBuffer.putInt(valueHash);
        int size = writeHeaderBuffer.position();
        int headerSize = size - 12;
        writeHeaderBuffer.position(0);
        writeHeaderBuffer.putInt(headerSize);
        writeHeaderBuffer.putInt((key == null) ? 0 : key.length);
        writeHeaderBuffer.putInt((value == null) ? 0 : value.length);
        outputStream.write(writeHeaderBuffer.array(), 0, size);
        if (key != null)outputStream.write(key);
        if (value != null)outputStream.write(value);
    }

    public void readFrom(DataInputStream dis) throws IOException {
        final ByteBuffer readHeaderBuffer = ByteBuffer.allocate(1 << 10);
        final int headerSize = dis.readInt();
        int keySize = dis.readInt();
        int valueSize = dis.readInt();
        readHeaderBuffer.clear();
        readHeaderBuffer.limit(headerSize);
        dis.readFully(readHeaderBuffer.array(), 0, headerSize);
        this.callId = readHeaderBuffer.getLong();
        valueHash = readHeaderBuffer.getInt();
        key = new byte[keySize];
        dis.readFully(key);
        value = new byte[valueSize];
        dis.readFully(value);
    }
}

}

C# 客户端代码:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Net.Sockets;
using System.Net;
using System.IO;
using System.Collections.Concurrent;
using System.Threading;

namespace Client
{
public class Program
{
    readonly ConcurrentDictionary<long, Call> calls = new ConcurrentDictionary<long, Call>();
    readonly BlockingCollection<Call> outThreadQueue = new BlockingCollection<Call>(1000);
    readonly TcpClient tcpClient = new TcpClient("localhost", 9900);
    readonly private int THREAD_COUNT;
    static int ops;

    public static void Main(string[] args) {
        new Program(args.Length > 0 ? int.Parse(args[0]) : 100).Start();
    }
    public Program(int threadCount) {
        this.THREAD_COUNT = threadCount;
        new Thread(new ThreadStart(this.InThreadRun)).Start();//start the InThread
        new Thread(new ThreadStart(this.OutThreadRun)).Start();//start the OutThread
    }
    public void Start(){
        for (int i = 0; i < THREAD_COUNT; i++)
            new Thread(new ThreadStart(this.Call)).Start();
        Console.WriteLine(THREAD_COUNT + " User Threads started to perform server call");
        System.Timers.Timer aTimer = new System.Timers.Timer(10000);
        aTimer.Elapsed += new System.Timers.ElapsedEventHandler(this.Stats);
        aTimer.Enabled = true;
    }
    public void Stats(object source, System.Timers.ElapsedEventArgs e){
        Console.WriteLine("Ops per second: " + Interlocked.Exchange(ref ops, 0) / 10);
    }
    public void Call() {
        for (; ;){
            Call call = new Call(new Packet());
            call.request.key = new byte[10];
            call.request.value = new byte[1000];
            outThreadQueue.Add(call);
            Packet result = null;
            for (int i = 1;result==null ; i++){
                result = call.getResult(5000);
                if(result==null) Console.WriteLine("Call"  + call.id + " didn't get answer within "+ 5000*i/1000 + " seconds");
            }
            Interlocked.Increment(ref ops);
        }
    }
    public void InThreadRun(){
        for (; ; ){
            Packet packet = new Packet();
            packet.Read(tcpClient.GetStream());
            Call call;
            if (calls.TryGetValue(packet.callId, out call))
                call.inbQ.Add(packet);
            else
                Console.WriteLine("Unkown call result: " + packet.callId);
        }
    }
    public void OutThreadRun() {
        for (; ; ){
            Call call = outThreadQueue.Take();
            calls.TryAdd(call.id, call);
            Packet packet = call.request;
            if (packet != null) packet.write(tcpClient.GetStream());
        }
    }
}
public class Call
{
    readonly public long id;
    readonly public Packet request;
    static long callIdGen = 0;
    readonly public BlockingCollection<Packet> inbQ = new BlockingCollection<Packet>(1);
    public Call(Packet request)
    {
        this.id = incrementCallId();
        this.request = request;
        this.request.callId = id;
    }
    public Packet getResult(int timeout)
    {
        Packet response = null;
        inbQ.TryTake(out response, timeout);
        return response;
    }
    private static long incrementCallId()
    {
        long initialValue, computedValue;
        do
        {
            initialValue = callIdGen;
            computedValue = initialValue + 1;
        } while (initialValue != Interlocked.CompareExchange(ref callIdGen, computedValue, initialValue));
        return computedValue;
    }
}

public class Packet
{
    public byte[] key;
    public byte[] value;
    public long callId = 0;
    public void write(Stream stream)
    {
        MemoryStream header = new MemoryStream();
        using (BinaryWriter writer = new BinaryWriter(header))
        {
            writer.Write(System.Net.IPAddress.HostToNetworkOrder((long)callId));
            writer.Write(System.Net.IPAddress.HostToNetworkOrder((int)-1));
        }
        byte[] headerInBytes = header.ToArray();
        MemoryStream body = new MemoryStream();
        using (BinaryWriter writer = new BinaryWriter(body))
        {
            writer.Write(System.Net.IPAddress.HostToNetworkOrder(headerInBytes.Length));
            writer.Write(System.Net.IPAddress.HostToNetworkOrder(key == null ? 0 : key.Length));
            writer.Write(System.Net.IPAddress.HostToNetworkOrder(value == null ? 0 : value.Length));
            writer.Write(headerInBytes);
            if (key != null) writer.Write(key);
            if (value != null) writer.Write(value);
            byte[] packetInBytes = body.ToArray();
            stream.Write(packetInBytes, 0, packetInBytes.Length);
        }
    }
    public void Read(Stream stream)
    {
        BinaryReader reader = new BinaryReader(stream);
        int headerSize = IPAddress.NetworkToHostOrder(reader.ReadInt32());
        int keySize = IPAddress.NetworkToHostOrder(reader.ReadInt32());
        int valueSize = IPAddress.NetworkToHostOrder(reader.ReadInt32());
        this.callId = IPAddress.NetworkToHostOrder(reader.ReadInt64());
        int valuePartitionHash = IPAddress.NetworkToHostOrder(reader.ReadInt32());
        this.key = new byte[keySize];
        this.value = new byte[valueSize];
        if (keySize > 0) reader.Read(this.key, 0, keySize);
        if (valueSize > 0) reader.Read(this.value, 0, valueSize);
    }
}

}

最佳答案

这是一个非常常见的错误:套接字上的任何 Read 调用实际上可能无法读取您要求的字节数(如果它们当前不可用)。 Read 将返回每次调用读取的字节数。如果你希望读取n个字节的数据,那么你需要多次调用read,直到读取的字节数加起来为n。

关于C# 通过 SOCKET 读取/写入 JAVA 并存在一些并发/套接字问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9302934/

相关文章:

java - Tomcat JavaMelody 奇怪的错误ThreadDeath

java - 以这种方式构造对象是否不符合常规? (关于同一个构造函数的几个问题)

c# - IO绑定(bind)异步任务不异步执行

c# - 如何舍入到最接近的 0.5?

c# - 处理功能键按下

c# - Azure DocumentDB .JSON 反序列化为实体时有多个级别

java - 如何定期调用 asyncTasks

c# - 使用编辑器模板处理可空类型

java - 如何计算字符串中的对数

针对 I/O 密集型和 CPU 密集型操作的 Java 线程同步