java - 同时运行两个线程,然后在它们之间进行通信

标签 java multithreading flow-control

我正在尝试使用在同一台机器上通信的发送方和接收方来实现停止并等待 ARQ。我的问题是使两个线程同时运行,然后在两个线程之间进行通信(可能使用 Thread.notify() )。目前,当我在两个类中运行两个单独的主要方法时,我的代码可以工作,而无需实现流量控制协议(protocol)。但是,当我尝试从单独的主方法运行代码时,我只能先运行接收者线程或发送者线程,这两者都会导致代码无限期等待。一般来说,我对线程很陌生,因此非常感谢任何帮助!

发件人类别:

import java.net.DatagramSocket;
import java.net.DatagramPacket;
import java.net.InetSocketAddress;

import java.io.File;
import java.io.FileInputStream;
import tcdIO.*;

/**
 *
 * Sending side of a communication channel.
 *
 * The start method splits an image into a number of packets and sends them to a given receiver.
 * The main method acts as test for the class by filling the destination host and port number and the source port number.
 *
 */
public class Sender implements Runnable{
    static final int DEFAULT_SRC_PORT = 50000;
    static final int DEFAULT_DST_PORT = 50001;
    static final String DEFAULT_DST_HOST = "localhost";

    static final String FILENAME = "input.jpg";

    static final int MTU = 1500;

    static Terminal terminal;

    DatagramSocket socket;
    InetSocketAddress dstAddress;

    /**
     * Constructor
     * 
     */
    Sender() {
        this(DEFAULT_DST_HOST, DEFAULT_DST_PORT, DEFAULT_SRC_PORT);
    }


    /**
     * Constructor
     *   
     * Attempts to create socket at given port and create an InetSocketAddress for the destinations
     */
    Sender(String dstHost, int dstPort, int srcPort) {
        try {
            dstAddress= new InetSocketAddress(dstHost, dstPort);
            socket= new DatagramSocket(srcPort);
        }
        catch(java.lang.Exception e) {
            e.printStackTrace();
        }
    }

    synchronized void sleep() {
        try {this.wait(100);}catch(Exception e){e.printStackTrace();}
    }


    /**
     * Sender Method
     * 
     * Transmits a given image as a collection of packets; the first packet contains the size of the image as string.
     */
    public void run() {
        byte[] data= null;
        DatagramPacket packet= null;

        File file= null;
        FileInputStream fin= null;
        byte[] buffer= null;
        int size;
        int counter;

        try {   
            file= new File(FILENAME);               // Reserve buffer for length of file and read file
            buffer= new byte[(int) file.length()];
            fin= new FileInputStream(file);
            size= fin.read(buffer);
            if (size==-1) throw new Exception("Problem with File Access");
            terminal.println("File size: " + buffer.length + ", read: " + size);

            data= (Integer.toString(size)).getBytes();  // 1st packet contains the length only
            packet= new DatagramPacket(data, data.length, dstAddress);
            terminal.println("Please press any key");
            terminal.readChar();
            socket.send(packet);            

            counter= 0;
            do {
                data= new byte[(counter+MTU<size) ? MTU : size-counter];  // The length of the packet is either MTU or a remainder
                java.lang.System.arraycopy(buffer, counter, data, 0, data.length);
                terminal.println("Counter: " + counter + " - Payload size: " + data.length);

                packet= new DatagramPacket(data, data.length, dstAddress);
                socket.send(packet);
                this.sleep();   
                counter+= data.length;
            } while (counter<size);

        terminal.println("Send complete");
    }
    catch(java.lang.Exception e) {
        e.printStackTrace();
    }       
}



public static void main(String[] args) {
    Sender s;
    try {           
        String dstHost;
        int dstPort;
        int srcPort;

        //dstHost= args[0];
        //dstPort= Integer.parseInt(args[1]);
        //srcPort= Integer.parseInt(args[2]);
        dstHost= DEFAULT_DST_HOST;
        dstPort= DEFAULT_DST_PORT;
        srcPort= DEFAULT_SRC_PORT;

        terminal= new Terminal("Sender");

        s= new Sender(dstHost, dstPort, srcPort);
        s.run();

        terminal.println("Program completed");
    } catch(java.lang.Exception e) {
        e.printStackTrace();
    }
}


}

接收器类:

import java.io.File;
import java.io.FileOutputStream;
import java.net.DatagramSocket;
import java.net.DatagramPacket;

import tcdIO.*;

/**
 * Receiving side of a communication channel.
 *
 * The class provides the basic functionality to receive a datagram from a sender.
 * The main method acts as test for the class by filling the port number at which to receive the datagram.
 */
public class Receiver implements Runnable{
    static final String FILENAME = "output.jpg";
    static final int DEFAULT_PORT = 50001;
    static final int MTU = 1500;
    static Terminal terminal;

    DatagramSocket socket;

    /**
     * Constructor
     * 
     */
    Receiver() {
        this(DEFAULT_PORT);
    }


    /**
     * Constructor
     *   
     * Attempts to create socket at given port
     */
    Receiver(int port) {
        try {
            socket= new DatagramSocket(port);
        }
        catch(java.lang.Exception e) {
            e.printStackTrace();
        }
    }


    /**
     * Receiver Method
     * 
     * Attempts to receive a number of packets that contain an image; the first packet contains the size of the image
     */
    public void run() {
        byte[] data;
        byte[] buffer;
        DatagramPacket packet;
        int counter;
        int size;

        File file;
        FileOutputStream fout;

        try {
            data= new byte[MTU];  // receive first packet with size of image as payload
            packet= new DatagramPacket(data, data.length);
            terminal.println("Waiting for incoming packets");
            socket.receive(packet);         

            data= packet.getData();   // reserve buffer to receive image
            size= (Integer.valueOf(new String(data, 0, packet.getLength()))).intValue();
            terminal.println("Filesize:" + size);
            buffer= new byte[size];

            counter= 0;         
            while(counter<size) {  // receive packet and store payload in array
                data= new byte[MTU];
                packet= new DatagramPacket(data, data.length);
                socket.receive(packet);
                terminal.println("Received packet - Port: " + packet.getPort() + " - Counter: " + counter + " - Payload: "+packet.getLength()); 

                System.arraycopy(data, 0, buffer, counter, packet.getLength());
                counter+= packet.getLength();
            }

            file= new File(FILENAME);               // Create file and write buffer into file
            fout= new FileOutputStream(file);
            fout.write(buffer, 0, buffer.length);
            fout.flush();
            fout.close();
        }
        catch(java.lang.Exception e) {
            e.printStackTrace();
        }       
    }


    /**
     * Test method
     * 
     * Creates an instance of the class Receiver
     * 
     * @param args arg[0] Port number to receive information on
     * /
    public static void main(String[] args) {
        Receiver r;

        try {
            terminal= new Terminal("Receiver");
            int port;

            //port= Integer.parseInt(args[0]);
            port= DEFAULT_PORT;
            r= new Receiver(port);  
            r.run();

            terminal.println("Program completed");
        } catch(java.lang.Exception e) {
            e.printStackTrace();
        }
    }
    */
}

以及 main,它只是实例化并运行它们:

import tcdIO.Terminal;


public class FlowControlMain {

    /**
     * 
     *
     */
    public static void main(String[] args) {

        Sender s;
        Receiver r;
        try{
            String dstHost= "localhost";
            int dstPort= 50001;
            int srcPort= 50000;

            Sender.terminal= new Terminal("Sender");
            Receiver.terminal = new Terminal("Receiver");

            s= new Sender(dstHost, dstPort, srcPort);
            r = new Receiver(dstPort);
            s.run();
            r.run();

        }catch(Exception e){
            e.printStackTrace();

        }

    }


}

对大量代码表示歉意,只是想给出完整的图片

最佳答案

您没有使用线程,而是在主线程中执行 run() 方法。

在其自己的线程中启动Runnable的正确方法是

Thread t = new Thread(myRunnable);
t.start();

或者使用ExecutorService,它的级别更高一些,并且允许诸如线程池之类的事情。

关于java - 同时运行两个线程,然后在它们之间进行通信,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20092752/

相关文章:

java - 如何打印 ArrayList 中数组中的单个元素

android - MediaPlayer 应该在单独的线程中运行吗?

java - 我如何在 while 循环中重复语句 "enter marks or grade"。在这种情况下,每次控制回到循环开始以再次输入主题?

akka-stream - 应用级背压 VS TCP 原生流控

javascript - 如何使用 vo.js 在 for 循环中产生多个 Promise?

java - 文件 java 的类路径

java - Spring Boot 过滤 : At least one property must be given

java - 托管时 Mongodb 变慢

c# - 为什么这个简单的线程输出是正确的

java - 无法模拟 Executors.newSingleThreadExecutor() 并调用真正的方法