java - NIO 套接字 - 分布式系统

标签 java sockets client-server nio

我有三个进程,名为 LWA1、LWA2 和 LWA3。每个都有一台服务器,LWA1 的端口为 55555,LWA2 的端口为 55556,LWA3 的端口为 55557。 此外,每个进程都有一个客户端,以便连接到其他进程。

每个进程都应该能够写入和读取其他进程。所以:

  • LWA1 应该向 LWA2 和 LWA3 写入和读取
  • LWA2 应向 LWA1 和 LWA3 写入数据或从 LWA1 和 LWA3 读取数据
  • LWA3 应该向 LWA1 和 LWA2 写入数据或从 LWA1 和 LWA2 读取数据

目前,每个进程执行两次写入,但只接收一条消息。每个进程的输出如下(选项卡式打印属于客户端,未选项卡式打印属于服务器)。

LWA1:

Setting up server with port: 55555
Server configured.

    Opening sockets to port 55556 and port 55557
    Sending lamport request: LamportRequest{clock=0, process='LWA1', id=1}
Key accepted
Reading data from server
I read: LamportRequest{clock=0, process='LWA3', id=3}
Key accepted

LWA2:

Setting up server with port: 55556
Server configured.

    Opening sockets to port 55557 and port 55555
Key accepted
Reading data from server
I read: LamportRequest{clock=0, process='LWA1', id=1}
Key accepted
    Sending lamport request: LamportRequest{clock=0, process='LWA2', id=2}

LWA3:

Setting up server with port: 55557
Server configured.

    Opening sockets to port 55555 and port 55556
Key accepted
Key accepted
    Sending lamport request: LamportRequest{clock=0, process='LWA3', id=3}
Reading data from server
I read: LamportRequest{clock=0, process='LWA2', id=2}

正如您所看到的,每个客户端都向另外两个客户端写入一个 LamportRequest,但其他两个客户端只收到一条消息。为什么其他消息没有被接收?

我怀疑这可能与服务器中的 key 有关,但不知道可能是什么。另外,我也不完全理解他们。如果我错了请纠正我:

与选择器的每个连接都用不同的 (SelectableChannel) 键表示,因此服务器 LWA1 中的迭代器(例如)应该只有(因此,仅监听事件)两个键,一个用于 LWA2,另一个用于 LWA2 LWA3,对吗?我尝试在 keyAccept 方法中将整数附加到每个键来区分它们,效果很好,但是在 keyRead 方法中打印附加的整数时,它显示为 null。该方法的关键是新的吗?第三把 key 突然出现了?

额外问题:我应该在单个线程中实现这个结构。目前我使用两种,一种用于服务器,一种用于客户端。一旦它开始工作,有什么关于如何统一它们的技巧吗?

-----------------代码-----------------

服务器(为了阅读目的而简化)如下:

public TalkToBrotherSocket(int clock, int port) {
    this.port = port;
    this.clock = clock;

    try {
        setServer();
        System.out.println("Server configured.\n");
    } catch (IOException e) {
        e.printStackTrace();
    }
}

@Override
public void run() {
    while (true) {
        try {
            // Wait for an event one of the registered channels
            selector.select();

            // Iterate over the set of keys for which events are available
            Iterator selectedKeys = selector.selectedKeys().iterator();
            while (selectedKeys.hasNext()) {
                SelectionKey key = (SelectionKey) selectedKeys.next();
                selectedKeys.remove();

                if (!key.isValid()) {
                    continue;
                }

                // Check if they key is ready to accept a new socket connection
                if (key.isAcceptable()) {
                    keyAccept(key);
                    System.out.println("Key accepted");
                } else if (key.isReadable()){
                    System.out.println("Reading data from server");
                    keyRead(key);
                } else if (key.isWritable()){
                    System.out.println("Writting data from server");
                    keyWrite(key); //unused at the moment
                }

            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


private void keyRead(SelectionKey key) throws IOException {
    // Create a SocketChannel to read the request
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Clear out our read buffer so it's ready for new data
    buffer.clear();

    // Attempt to read off the channel
    int numRead;
    try {
        numRead = socketChannel.read(buffer);
    } catch (IOException e) {
        System.out.println("Closing socket");
        // The remote forcibly closed the connection, cancel
        // the selection key and close the channel.
        key.cancel();
        socketChannel.close();
        return;
    }

    if (numRead == -1) {
        System.out.println("Shutting down socket");
        // Remote entity shut the socket down cleanly. Do the
        // same from our end and cancel the channel.
        key.channel().close();
        key.cancel();
        return;
    }

    System.out.println("I read: " + new String(buffer.array()).trim());
}

private void keyAccept(SelectionKey key) throws IOException {
    // For an accept to be pending the channel must be a server socket channel.
    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

    // Accept the connection and make it non-blocking
    SocketChannel socketChannel = serverSocketChannel.accept();
    //Socket socket = socketChannel.socket();
    socketChannel.configureBlocking(false);

    // Register the new SocketChannel with our Selector, indicating
    // we'd like to be notified when there's data waiting to be read
    socketChannel.register(selector, SelectionKey.OP_READ);
}

private void setServer() throws IOException {
    // Create a new selector
    selector = Selector.open();

    // Create a new non-blocking server socket channel
    serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(false);

    // Bind the server socket to the specified address and port
    serverChannel.bind(new InetSocketAddress("localhost", port));

    // Register the server socket channel, indicating an interest in
    // accepting new connections
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}

客户端(为了阅读目的而简化)如下:

public NIOClient(int clock, int firstPort, int secondPort, int id, String process) {
    this.process = process;
    this.clock = clock;
    this.id = id;

    try {
        System.out.println("\tOpening sockets to port " + firstPort + " and port " + secondPort);
        firstClient = SocketChannel.open(new InetSocketAddress("localhost", firstPort));
        secondClient = SocketChannel.open(new InetSocketAddress("localhost", secondPort));
        firstBuffer = ByteBuffer.allocate(1024);
        secondBuffer = ByteBuffer.allocate(1024);
        sendRequests();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

private void sendRequests() {
    LamportRequest lamportRequest = new LamportRequest(clock, process, id);
    firstBuffer = ByteBuffer.wrap(lamportRequest.toString().getBytes());
    secondBuffer = ByteBuffer.wrap(lamportRequest.toString().getBytes());
    String converted = new String(firstBuffer.array(), StandardCharsets.UTF_8);
    System.out.println("\tSending lamport request: " + converted);
    try {
        firstClient.write(firstBuffer);
        secondClient.write(secondBuffer);
        firstBuffer.clear();
}

初始化如下:

System.out.println("Setting up server with port: " + myPort);
TalkToBrotherSocket talkToBrotherSocket = new TalkToBrotherSocket(clock, myPort);
talkToBrotherSocket.start();

new NIOClient(clock, firstPort, secondPort, id, process);

最佳答案

根据@user207421的评论,我添加了一个新的第二个缓冲区。对 sendRequests 方法的更改已编辑到原始帖子中。

关于java - NIO 套接字 - 分布式系统,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61370342/

相关文章:

java - 如何在 Google App Engine Flex 上获取 intelliJ Java 项目

C 语言客户端/服务器程序

java - 我的 java 服务器在代码中间停止

使用 getaddrinfo 编译错误

C、绑定(bind)失败: `Address Already in Use`

java - libgdx - 创建选择框时出现异常

java - 输入不匹配异常错误,

JavaFX:如何编辑现有的 TableView 条目

php - PHP的开放2个TCP套接字

javascript - 如何在 Sails.js 中保护蓝图访问