java - fork-join,compute() 方法在一段时间后崩溃

标签 java fork-join

我编写了一个小程序,用于打开 UDP 套接字并接收一些 UDP 数据包。 我尝试的是,每个数据包由自己的线程使用 fork-join 进行处理(字节顺序发生变化),然后通过 udp 将更改后的数据包转发到另一个系统。

这是代码:

package cwstreamswitcher;

import java.io.IOException;
import java.net.*;
import java.util.concurrent.*;
import java.util.logging.Level;
import java.util.logging.Logger;

public class CwStreamSwitcher {

    public class Switch extends RecursiveAction {

        private byte[] mMessage;
        DatagramSocket mSocket;

        public Switch(DatagramSocket serverSocket) throws UnknownHostException, IOException {
            // mMessage = message;
            mSocket = serverSocket;
            int cwPort = 51001;
            String host = "localhost";

            byte[] receiveData = new byte[33];
            DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length);
            serverSocket.receive(receivePacket);

            StringBuilder sb = new StringBuilder();                 //
            for (byte b : receiveData) {                            //
                sb.append(String.format("%02X ", b));               //just for checking the data , will be removed in final version
            }                                                       //
            System.out.println("Received: " + sb.toString());       //


            byte[] newCache = new byte[29];

            newCache[0] = (byte) 0x02;                       
            System.arraycopy(receiveData, 32, newCache, 1, 1);      
            System.arraycopy(receiveData, 1, newCache, 2, 2);       
            byte onid = receiveData[7];
            switch (onid) {
                case (byte) 0x6d:
                    newCache[4] = (byte) 0x03;
                    newCache[5] = (byte) 0x6e;
                    break;
                case (byte) 0x08:
                    newCache[4] = (byte) 0x08;
                    newCache[5] = (byte) 0x00;
                    break;
                case (byte) 0x3f:
                    newCache[4] = (byte) 0x01;
                    newCache[5] = (byte) 0x3e;
                    break;
                case (byte) 0x01:
                    newCache[4] = (byte) 0x00;
                    newCache[5] = (byte) 0x01;
                    break;
                case (byte) 0x04:
                    newCache[4] = (byte) 0xfb;
                    newCache[5] = (byte) 0xff;
                    break;
                case (byte) 0xab:
                    newCache[4] = (byte) 0x00;
                    newCache[5] = (byte) 0xab;
                    break;
                case (byte) 0x56:
                    newCache[4] = (byte) 0x00;
                    newCache[5] = (byte) 0x56;
                    break;
                case (byte) 0x02:
                    newCache[4] = (byte) 0x00;
                    newCache[5] = (byte) 0x02;
                    break;
                case (byte) 0x7e:
                    newCache[4] = (byte) 0x00;
                    newCache[5] = (byte) 0x7e;
                    break;
                case (byte) 0x06:
                    newCache[4] = (byte) 0x06;
                    newCache[5] = (byte) 0x00;
                    break;
                case (byte) 0x85:
                    newCache[4] = (byte) 0x00;
                    newCache[5] = (byte) 0x85;
                    break;
                case (byte) 0x71:
                    newCache[4] = (byte) 0x00;
                    newCache[5] = (byte) 0x71;
                    break;
                case (byte) 0x46:
                    newCache[4] = (byte) 0x00;
                    newCache[5] = (byte) 0x46;
                    break;
                default:
                    newCache[4] = (byte) 0x00;
                    newCache[5] = (byte) 0x85;
            }
            newCache[6] = (byte) 0x06;                       
            newCache[7] = (byte) 0x04;                       
            System.arraycopy(receiveData, 12, newCache, 8, 4);      
            newCache[12] = (byte) 0x01;                      
            System.arraycopy(receiveData, 16, newCache, 13, 16);    

            //send the converted data
            DatagramPacket response = new DatagramPacket(newCache, newCache.length, InetAddress.getByName(host), cwPort);
            mSocket.send(response);


            StringBuilder sb2 = new StringBuilder();                //
            for (byte k : newCache) {                               //
                sb2.append(String.format("%02X ", k));              //just for checking the data , will be removed in final version
            }                                                       //
            System.out.println("Sent:     " + sb2.toString());      //
        }

        @Override
        protected void compute() {
            try {
                invokeAll(new Switch(mSocket));
            } catch (UnknownHostException ex) {
                Logger.getLogger(CwStreamSwitcher.class.getName()).log(Level.SEVERE, null, ex);
            } catch (IOException ex) {
                Logger.getLogger(CwStreamSwitcher.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

    public void CwServer(int port) throws SocketException, IOException {
        DatagramSocket serverSocket = new DatagramSocket(port);
        int processors = Runtime.getRuntime().availableProcessors();
        CwStreamSwitcher.Switch fb = new CwStreamSwitcher.Switch(serverSocket);
        ForkJoinPool pool = new ForkJoinPool();
        pool.invoke(fb);
    }

    public static void main(String[] args) throws SocketException, IOException {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 9876;
        }
        new CwStreamSwitcher().CwServer(port);
    }
}

程序编译并运行良好,但大约 5 分钟后它停止并显示以下错误消息:

Exception in thread "main" java.lang.StackOverflowError
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
        at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:536)
        at java.util.concurrent.ForkJoinTask.reportResult(ForkJoinTask.java:596)
        at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:640)
        at java.util.concurrent.ForkJoinPool.invoke(ForkJoinPool.java:1521)
        at cwstreamswitcher.CwStreamSwitcher.CwServer(CwStreamSwitcher.java:150)
        at cwstreamswitcher.CwStreamSwitcher.main(CwStreamSwitcher.java:171)
Caused by: java.lang.StackOverflowError
        at java.net.PlainDatagramSocketImpl.receive0(Native Method)
        at java.net.AbstractPlainDatagramSocketImpl.receive(AbstractPlainDatagramSocketImpl.java:135)
        at java.net.DatagramSocket.receive(DatagramSocket.java:775)
        at cwstreamswitcher.CwStreamSwitcher$Switch.<init>(CwStreamSwitcher.java:31)
        at cwstreamswitcher.CwStreamSwitcher$Switch.compute(CwStreamSwitcher.java:126)
        at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:177)
        at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:377)
        at java.util.concurrent.ForkJoinTask.invokeAll(ForkJoinTask.java:721)

CwStreamSwitcher.java:126 是这个方法调用: protected 无效计算(){

什么可能导致这种情况?

最佳答案

看来你的RecursiveAction没有停止条件。 在compute()中,您应该决定何时生成新的Switch,以及何时生成足够的switch。

您的代码将永远生成 Switch。

关于java - fork-join,compute() 方法在一段时间后崩溃,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14124460/

相关文章:

java - 策略模式的鸭子示例

java - @JsonProperty 在枚举中被忽略

java - 带列的 hibernate 公式

angular - RxJS forkJoin 处理路由解析错误

javascript - 您可以返回一个对象数组,其中每个对象都包含来自 forkJoin 的可观察值吗?

java - 如何确定fork-join任务的合适分工阈值

java - 使用 java.util.concurrent 模糊图像,但是,生成的图像完全是黑色的

java - 使用 java 创建 Windows 用户帐户

java - 仅当类实现时如何使用接口(interface)的方法?