java - 服务器在用户之间交替而不是广播

标签 java multithreading sockets datagram

我一直在开发一个消息系统,用户在该系统中输入服务器 IP/端口,然后该服务器接收消息并将它们转发给服务器上的所有其他用户。整个程序基于我从头开始重写的回显服务器,它为每个 server.accept() 套接字创建两个线程,一个用于接收消息,一个用于发送消息。这两个线程通过 DatagramPacket 系统连接,因此如果服务器从一个套接字接收到一条消息,它将把它发送回所有其他用户,因为他们的线程正在监听相同的东西,这就是我遇到问题的地方;一切正常,除了接收消息的用户按登录时间交替。

两个客户端连接时的问题示例:

客户端 #1 发送了 10 条消息:

0
1
2
3
4
5
6
7
8
9

服务器接收所有这些。

客户端 #1 接收:

1
3
5
7
9

客户端 #2 接收:

0
2
4
6
8

这是客户端的代码:

import java.io.*;
import java.util.*;
import java.net.*;
import java.awt.*;
import java.awt.event.*;
import javax.swing.*;

public class MessageClient {
    public static void main(String[] args) {
        System.out.println("Starting Message System...");
        Scanner in = new Scanner(System.in);
        MessageClient mc = new MessageClient();
        String input;
        System.out.println(":System Started, type help for help.");
        System.out.print(":");
        while (true) {
            input = in.nextLine();
            if (input.equalsIgnoreCase("HELP")) {
                mc.printHelp();
                System.out.print(":");
            } else if (input.equalsIgnoreCase("QUIT")) {
                System.exit(0);
            } else if (input.equalsIgnoreCase("CONNECT")) {
                mc.connect(in);
                in.nextLine();
                System.out.print(":");
            } else {
                System.out.print("No command found.\n:");
            }
        }
    }
    public static void printHelp() {
        System.out.println("help\tShow this prompt\nconnect\tStarts a new connection\nquit\tQuit the program\nexit\tExit a connection");
    }
    public void connect(Scanner in) {
        Socket soc = null;
        InetAddress addr = null;
        System.out.print("IP_ADDRESS/HOST:");
        String ip = in.nextLine();
        System.out.print("PORT:");
        int port = in.nextInt();
        try {
            System.out.println("Attempting to connect to HOST:\'" + ip + "\' on PORT:\'" + port + "\'");
            addr = InetAddress.getByName(ip);
            soc = new Socket(addr, port);
        } catch(Exception e) {
            System.out.println("Error connecting to server: " + e.getLocalizedMessage());
            return;
        }
        SwingUtilities.invokeLater(new MessageGUI(ip + ":" + port, soc));
    }
}

class MessageGUI implements Runnable {
    public MessageGUI(String windowName, Socket server) {
        JFrame window = new JFrame(windowName);
        window.setDefaultCloseOperation(JFrame.DISPOSE_ON_CLOSE);
        window.setSize(500, 300);
        window.setLayout(new BorderLayout());
        window.setVisible(true);

        MessageReceive mr = new MessageReceive(server);
        mr.setEditable(false);
        mr.setBackground(new Color(0, 0, 0));
        mr.setForeground(new Color(0, 255, 0));
        mr.setVisible(true);
        new Thread(mr).start();
        window.add(mr, BorderLayout.CENTER);

        DataOutputStream dos = null;
        try {
            dos = new DataOutputStream(server.getOutputStream());
        } catch(Exception e) {
            System.out.println("Error creating output stream to server: " + e.getLocalizedMessage());
        }

        JTextField input = new JTextField();
        input.addActionListener(new MessageSend(server, input, dos));
        input.setBackground(new Color(0, 0, 0));
        input.setForeground(new Color(0, 255, 0));
        window.add(input, BorderLayout.PAGE_END);

        System.out.println("Displaying connection.");
    }
    public void run() {}
}

class MessageReceive extends JTextArea implements Runnable {
    protected Socket server;
    public MessageReceive(Socket server) {
        this.server = server;
    }
    public void run() {
        DataInputStream dis = null;
        int bytes;
        try {
            dis = new DataInputStream(server.getInputStream());
        } catch(Exception e) {
            System.out.println("Error connecting server: " + e.getLocalizedMessage());
        }
        this.append("Connected.\n");
        while (true) {
            try {
                while ((bytes = dis.read()) != -1) this.append(String.valueOf((char) bytes));
            } catch(Exception e) {
                System.out.println("Error reading from server: " + e.getLocalizedMessage());
                return;
            }
        }
    }
}

class MessageSend implements ActionListener {
    protected Socket server;
    protected JTextField input;
    protected DataOutputStream dos = null;
    public MessageSend(Socket server, JTextField input, DataOutputStream dos) {
        this.server = server;
        this.input = input;
        this.dos = dos;
    }
    public void actionPerformed(ActionEvent ae) {
        try {
            dos.writeBytes(input.getText() + "\n");
            input.setText("");
        } catch(Exception e) {
            System.out.println("Error writing to server output stream: " + e.getLocalizedMessage());
        }
    }
}

服务器代码如下:

import java.io.*;
import java.net.*;
import java.util.*;

public class MessageServer {
    public static void main(String[] args) {
        int port = Integer.parseInt(args[0]);
        MessageServer ms = new MessageServer();
        System.out.println("Starting server on port " + port + "...");
        ServerSocket ss = null;
        try {
            ss = new ServerSocket(port);
        } catch(Exception e) {
            System.out.println("Error creating server: " + e.getLocalizedMessage());
            System.exit(0);
        }
        System.out.println("Created server port, now waiting for users...");
        Socket client = null;
        DatagramSocket ds = null;
        try {
            ds = new DatagramSocket(4);
        } catch(Exception e) {
            System.out.println("IN:Error creating Datagram Server: " + e.getLocalizedMessage());
            e.printStackTrace();
            System.exit(0);
        }
        while (true) {
            try {
                client = ss.accept();
                System.out.println("Connecting user: " + client.getInetAddress().toString());
            } catch(Exception e) {
                System.out.println("Error on server: " + e.getLocalizedMessage());
            }
            new MessageConnectionIn(client, ds).start();
            new MessageConnectionOut(client, ds).start();
        }
    }
}

class MessageConnectionOut extends Thread {
    protected Socket client;
    public DatagramSocket ds;
    public MessageConnectionOut(Socket client, DatagramSocket ds) {
        this.client = client;
        this.ds = ds;
    }
    public void run() {
        this.setName(client.getInetAddress().getHostAddress() + ":OUT");
        try {
            System.out.println("OUT:User connected.");
            DataOutputStream dos = new DataOutputStream(client.getOutputStream());
            while (true) {
                byte[] outgoing = new byte[4096];
                DatagramPacket dp = new DatagramPacket(outgoing, outgoing.length);
                ds.receive(dp);
                dos.writeChars(new String(outgoing) + "\n");
            }
        } catch(Exception e) {
            System.out.println("OUT:Error connecting " + this.getName() + ": " + e.getLocalizedMessage());
            return;
        }
    }
}

class MessageConnectionIn extends Thread {
    protected Socket client;
    public DatagramSocket ds;
    public MessageConnectionIn(Socket client, DatagramSocket ds) {
        this.client = client;
        this.ds = ds;
    }
    public void run() {
        this.setName(client.getInetAddress().getHostAddress() + ":IN");
        try {
            System.out.println("IN:User connected.");
            BufferedReader br = new BufferedReader(new InputStreamReader(client.getInputStream()));
            while (true) {
                String lineIn = br.readLine();
                byte[] input = lineIn.getBytes();
                System.out.println(lineIn);
                byte[] output = new byte[4096];
                for (int c = 0; c < output.length; c++) output[c] = 0x0;
                for (int i = 0; i < input.length && i < output.length; i++) output[i] = input[i];
                DatagramPacket dp = new DatagramPacket(output, output.length, InetAddress.getLocalHost(), 4);
                ds.send(dp);
            }
        } catch(Exception e) {
            System.out.println("IN:Error connecting to " + this.getName() + ": " + e.getLocalizedMessage());
            return;
        }
    }
}

更新:

我尝试用 MulticastSockets 替换所有 DatagramSockets,并在声明它时将其添加到一个组,MessageServer.main()。出现同样的问题。

组播代码:

public class MessageServer {
    public static void main(String[] args) {
        int port = Integer.parseInt(args[0]);
        MessageServer msgsrv = new MessageServer();
        System.out.println("Starting server on port " + port + "...");
        ServerSocket ss = null;
        try {
            ss = new ServerSocket(port);
        } catch(Exception e) {
            System.out.println("Error creating server: " + e.getLocalizedMessage());
            System.exit(0);
        }
        System.out.println("Created server port, now waiting for users...");
        Socket client = null;
        MulticastSocket ms = null;
        try {
            ms = new MulticastSocket(4);
            ms.joinGroup(InetAddress.getByName("225.65.65.65"));
        } catch(Exception e) {
            System.out.println("IN:Error creating Datagram Server: " + e.getLocalizedMessage());
            e.printStackTrace();
            System.exit(0);
        }
        while (true) {
            try {
                client = ss.accept();
                System.out.println("Connecting user: " + client.getInetAddress().toString());
            } catch(Exception e) {
                System.out.println("Error on server: " + e.getLocalizedMessage());
            }
            new MessageConnectionIn(client, ms).start();
            new MessageConnectionOut(client, ms).start();
        }
    }
}

class MessageConnectionOut extends Thread {
    protected Socket client;
    public MulticastSocket ms;
    public MessageConnectionOut(Socket client, MulticastSocket ms) {
        this.client = client;
        this.ms = ms;
    }
    public void run() {
        this.setName(client.getInetAddress().getHostAddress() + ":OUT");
        try {
            System.out.println("OUT:User connected.");
            DataOutputStream dos = new DataOutputStream(client.getOutputStream());
            while (true) {
                byte[] outgoing = new byte[4096];
                DatagramPacket dp = new DatagramPacket(outgoing, outgoing.length);
                ms.receive(dp);
                dos.writeChars(new String(outgoing) + "\n");
                System.out.println("SENT_TO:" + this.getName());
            }
        } catch(Exception e) {
            System.out.println("OUT:Error connecting " + this.getName() + ": " + e.getLocalizedMessage());
            return;
        }
    }
}

class MessageConnectionIn extends Thread {
    protected Socket client;
    public MulticastSocket ms;
    public MessageConnectionIn(Socket client, MulticastSocket ms) {
        this.client = client;
        this.ms = ms;
    }
    public void run() {
        this.setName(client.getInetAddress().getHostAddress() + ":IN");
        try {
            System.out.println("IN:User connected.");
            BufferedReader br = new BufferedReader(new InputStreamReader(client.getInputStream()));
            while (true) {
                String lineIn = br.readLine();
                byte[] input = lineIn.getBytes();
                System.out.println(lineIn);
                byte[] output = new byte[4096];
                for (int c = 0; c < output.length; c++) output[c] = 0x0;
                for (int i = 0; i < input.length && i < output.length; i++) output[i] = input[i];
                DatagramPacket dp = new DatagramPacket(output, output.length, InetAddress.getLocalHost(), 4);
                ms.send(dp);
            }
        } catch(Exception e) {
            System.out.println("IN:Error connecting to " + this.getName() + ": " + e.getLocalizedMessage());
            return;
        }
    }
}

最佳答案

此示例可能对您有所帮助。

服务器有 2 个线程。

  1. 一个用于读取 UDP 消息。我使用了 2 个不同的端口,因为我只想避免同一进程读取消息。我没有 2 台机器来测试它。在我的本地主机上测试。
  2. 另一个线程将广播读者线程收到的 UDP 消息。

有一个线程安全列表,它在线程之间充当数据同步。接收到的数据添加到列表中。广播线程轮询数据列表,如果有任何广播,则 hibernate 500 微秒。线程是使用执行器创建的。

private final static String INET_ADDR = "224.0.0.3";
private final static int PORT1 = 8888;
private final static int PORT2 = 8889;
private static List<String> threadSafeList = null;

public static void main(String[] args) throws UnknownHostException, InterruptedException {
    threadSafeList = new CopyOnWriteArrayList<String>();
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    executorService.submit(new Sender(InetAddress.getByName(INET_ADDR), PORT1));
    executorService.submit(new Receiver(InetAddress.getByName(INET_ADDR), PORT2));
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}

private static class Receiver implements Runnable {

    private InetAddress addr;
    private int port;

    public Receiver (InetAddress inetAddress, int port) throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
    }

    public void run() {
        System.out.println(" @ Receiver ");
        System.out.println(" @ Receiver " + this.port);
        byte[] buf = new byte[256];

        try {
            MulticastSocket clientSocket = new MulticastSocket(this.port);
            //Joint the Multicast group.
            clientSocket.joinGroup(this.addr);

            while (true) {
                // Receive the information and print it.
                DatagramPacket msgPacket = new DatagramPacket(buf, buf.length);
                clientSocket.receive(msgPacket);

                String msg = new String(buf, 0, buf.length);
                System.out.println("Socket 1 received msg: " + msg);
                threadSafeList.add(msg);
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

private static class Sender implements Runnable {

    private InetAddress addr;
    private int port;

    public Sender (InetAddress inetAddress, int port) throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
    }

    public void run() {
        System.out.println(" @ Sender Address " + new String(this.addr.getAddress()));
        System.out.println(" @ Sender port " + this.port);
        // Open a new DatagramSocket, which will be used to send the data.
        while (true) {
            try (DatagramSocket serverSocket = new DatagramSocket()) {
                for (Iterator<String> it = threadSafeList.iterator(); !threadSafeList.isEmpty() && it.hasNext(); ) {

                    String i = it.next();
                    String msg = "Sent message no " + i;

                    // Create a packet that will contain the data
                    // (in the form of bytes) and send it.
                    DatagramPacket msgPacket = new DatagramPacket(msg.getBytes(), msg.getBytes().length, this.addr, this.port);
                    serverSocket.send(msgPacket);

                    threadSafeList.remove(i);
                    System.out.println("Server sent packet with msg: " + msg);
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
            try {
                System.out.println("going for sleep"); 
                Thread.currentThread().sleep(500);
                System.out.println("going for sleeping"); 
            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
        }
    }
}

可以通过更改发送线程的创建来修改设计。每当接收方线程收到消息时,创建一个发送方线程并进行广播并关闭该线程。您可以使用可重用线程池而不是本示例中使用的固定线程池。您可以在创建发件人线程时将消息作为参数传递(因此可能根本不需要列表)并执行提交。我有代码。

    public static void main(String[] args) throws UnknownHostException,
        InterruptedException {
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.submit(new Receiver(InetAddress.getByName(INET_ADDR),
            PORT2, executorService));
    executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}

和内部类,

    private static class Receiver implements Runnable {

    private InetAddress addr;
    private int port;
    private ExecutorService executorService;

    public Receiver(InetAddress inetAddress, int port,
            ExecutorService executorService) throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
        this.executorService = executorService;
    }

    public void run() {
        System.out.println(" @ Receiver ");
        System.out.println(" @ Receiver " + this.port);
        byte[] buf = new byte[256];

        try {
            MulticastSocket clientSocket = new MulticastSocket(this.port);
            // Joint the Multicast group.
            clientSocket.joinGroup(this.addr);

            while (true) {
                // Receive the information and print it.
                DatagramPacket msgPacket = new DatagramPacket(buf,
                        buf.length);
                clientSocket.receive(msgPacket);

                String msg = new String(buf, 0, buf.length);
                System.out.println("Socket 1 received msg: " + msg);
                executorService.submit(new Sender(InetAddress
                        .getByName(INET_ADDR), PORT1, msg));
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

    private static class Sender implements Runnable {

    private InetAddress addr;
    private int port;
    private String message;

    public Sender(InetAddress inetAddress, int port, String message)
            throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
        this.message = message;
    }

    public void run() {
        System.out.println(" @ Sender Address "
                + new String(this.addr.getAddress()));
        System.out.println(" @ Sender port " + this.port);
        try {
            DatagramSocket serverSocket = new DatagramSocket();
            String msg = "Sent message no " + message;

            // Create a packet that will contain the data
            // (in the form of bytes) and send it.
            DatagramPacket msgPacket = new DatagramPacket(msg.getBytes(),
                    msg.getBytes().length, this.addr, this.port);
            serverSocket.send(msgPacket);

            System.out.println("Server sent packet with msg: " + msg);
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

客户端有2个线程,

  1. 一个用于阅读广播消息。
  2. 另一个用于循环发送 5 条消息。一旦完成,线程将关闭。

这里没有数据交换,所以没有线程安全列表。

    private static class Receiver implements Runnable {

    private InetAddress addr;
    private int port;

    public Receiver(InetAddress inetAddress, int port)
            throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
    }

    public void run() {
        System.out.println(" @ Receiver ");
        System.out.println(" @ Receiver port " + this.port);
        byte[] buf = new byte[256];

        try (MulticastSocket clientSocket = new MulticastSocket(this.port)) {
            // Joint the Multicast group.
            clientSocket.joinGroup(this.addr);
            while (true) {
                // Receive the information and print it.
                DatagramPacket msgPacket = new DatagramPacket(buf,
                        buf.length);
                clientSocket.receive(msgPacket);

                String msg = new String(buf, 0, buf.length);
                System.out.println("Socket 1 received msg: " + msg);
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

    private static class Sender implements Runnable {

    private InetAddress addr;
    private int port;

    public Sender(InetAddress inetAddress, int port)
            throws UnknownHostException {
        this.addr = InetAddress.getByName(INET_ADDR);
        this.port = port;
    }

    public void run() {
        System.out.println(" @ Sender Address "
                + new String(this.addr.getAddress()));
        System.out.println(" @ Sender port " + this.port);
        // Open a new DatagramSocket, which will be used to send the data.
        try {
            DatagramSocket serverSocket = new DatagramSocket();

            for (int i = 0; i < 5; i++) {

                System.out.println("inside loop");
                String msg = "Sent message no 2" + i;

                // Create a packet that will contain the data
                // (in the form of bytes) and send it.
                DatagramPacket msgPacket = new DatagramPacket(
                        msg.getBytes(), msg.getBytes().length, this.addr,
                        this.port);
                System.out.println("Before sending to socket");
                serverSocket.send(msgPacket);

                System.out.println("Server sent packet with msg: " + msg);
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

This article sample code is extended further.

待微调的代码。

关于java - 服务器在用户之间交替而不是广播,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30769508/

相关文章:

Java 为新连接重新打开 ServerSocket

java - 如何用 Java 1.4 兼容版本替换 @Resource 注解

java - JSP+JDBC问题

c - Raspberry PI 上的多线程 C 程序

c# - 将 SqlDataReader 传递给多个线程时是否应该通过引用传递 SqlDataReader

python - 当我在 Windows 上使用 os.dup2() 时,出现错误 : OSError: [Errno 9] Bad file descriptor

java - 与 postgres 的 spring JDBC 连接问题

java - 有没有办法从字符串遍历到 int 数组,反之亦然

java - hibernate 线程如何向自身发送中断?

java - 当套接字被接受时,我怎样才能关闭这个对话框?