java - SocketChannel : Why if I write msgs quickly the latency of each message is low, 但是当我每 30 秒写一个 msg 时延迟很高?

标签 java sockets networking jvm real-time

这个问题的发展现在在这个新问题中明确描述:Why does the JVM show more latency for the same block of code after a busy spin pause?

我在下面包含了一个简单的服务器和客户端的源代码,用于演示和隔离问题。基本上我正在计时乒乓(客户端-服务器-客户端)消息的延迟。我首先每 1 毫秒发送一条消息。我等待发送 200k 条消息,以便 HotSpot 有机会优化代码。然后我将暂停时间从 1 毫秒更改为 30 秒。令我惊讶的是,我的读写操作变得相当慢。

我认为这不是 JIT/HotSpot 问题。我能够确定本地 JNI 调用写入 (write0) 和读取的较慢方法。看起来你暂停的时间越长它变得越慢。

我正在寻找有关如何调试、理解、解释或解决此问题的指示。

服务器.java:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

public class Server {

    private final ServerSocketChannel serverSocketChannel;
    private final ByteBuffer readBuffer = ByteBuffer.allocateDirect(1024);
    private final int port;
    private final int msgSize;

    public Server(int port, int msgSize) throws IOException {
        this.serverSocketChannel = ServerSocketChannel.open();
        this.port = port;
        this.msgSize = msgSize;
    }

    public void start() throws IOException {
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        final SocketChannel socketChannel = serverSocketChannel.accept(); // blocking mode...
        System.out.println("Client accepted!");
        socketChannel.configureBlocking(false);
        socketChannel.socket().setTcpNoDelay(true);
        Thread t = new Thread(new Runnable() {

            @Override
            public void run() {
                try {
                    while(true) {
                        int bytesRead = socketChannel.read(readBuffer);
                        if (bytesRead == -1) {
                            System.out.println("Client disconnected!");
                            return;
                        } else if (bytesRead > 0) {
                            if (readBuffer.position() == msgSize) {
                                // have a full message there...
                                readBuffer.flip();
                                int bytesSent = socketChannel.write(readBuffer);
                                if (bytesSent != msgSize) throw new RuntimeException("Could not send full message out: " + bytesSent);
                                readBuffer.clear();
                            }
                        }
                    }
                } catch(Exception e) {
                    throw new RuntimeException(e);
                }
            }
        });
        t.start();
        serverSocketChannel.close();
    }

    public static void main(String[] args) throws Exception {

        Server s = new Server(9999, 8);
        s.start();
    }
}

客户端.java:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class Client implements Runnable {

    private static final int WARMUP = 200000;

    private final SocketChannel socketChannel;
    private final String host;
    private final int port;
    private final ByteBuffer outBuffer;
    private final ByteBuffer inBuffer = ByteBuffer.allocateDirect(1024);
    private final int msgSize;
    private final StringBuilder sb = new StringBuilder(1024);

    private int interval;
    private int totalMessagesSent;
    private long timeSent;
    private int mod;


    public Client(String host, int port, int msgSize) throws IOException {
        this.socketChannel = SocketChannel.open();
        this.host = host;
        this.port = port;
        this.outBuffer = ByteBuffer.allocateDirect(msgSize);
        this.msgSize = msgSize;
        for(int i = 0; i < msgSize; i++) outBuffer.put((byte) i);
        outBuffer.flip();
        this.interval = 1;
        this.mod = 20000;
    }

    public static long busySleep(long t) {
        long x = 0;
        for(int i = 0; i < t * 20000; i++) {
            x += System.currentTimeMillis() / System.nanoTime();
        }
        return x;
    }

    public void start() throws Exception {
        this.socketChannel.configureBlocking(false);
        this.socketChannel.socket().setTcpNoDelay(true);
        this.socketChannel.connect(new InetSocketAddress(host, port));

        while(!socketChannel.finishConnect()) {
            System.out.println("Waiting to connect");
            Thread.sleep(1000);
        }
        System.out.println("Please wait as output will appear every minute or so. After " + WARMUP + " messages you will see the problem.");
        Thread t = new Thread(this);
        t.start();
    }

    private final void printResults(long latency, long timeToWrite, long timeToRead, long zeroReads, long partialReads, long realRead) {
        sb.setLength(0);
        sb.append(new java.util.Date().toString());
        sb.append(" Results: totalMessagesSent=").append(totalMessagesSent);
        sb.append(" currInterval=").append(interval);
        sb.append(" latency=").append(latency);
        sb.append(" timeToWrite=").append(timeToWrite);
        sb.append(" timeToRead=").append(timeToRead);
        sb.append(" realRead=").append(realRead);
        sb.append(" zeroReads=").append(zeroReads);
        sb.append(" partialReads=").append(partialReads);
        System.out.println(sb);
    }

    @Override
    public void run() {

        try {

            while(true) {

                busySleep(interval);

                outBuffer.position(0);

                timeSent = System.nanoTime();

                int bytesSent = socketChannel.write(outBuffer);
                long timeToWrite = System.nanoTime() - timeSent;
                if (bytesSent != msgSize) throw new IOException("Can't write message: " + bytesSent);

                inBuffer.clear();
                long zeroReads = 0;
                long partialReads = 0;
                long timeToRead = System.nanoTime();
                long realRead = 0;
                while(inBuffer.position() != msgSize) {
                    realRead = System.nanoTime();
                    int bytesRead = socketChannel.read(inBuffer);
                    if (bytesRead == 0) {
                        zeroReads++;
                    } else if (bytesRead == -1) {
                        System.out.println("Other side disconnected!");
                        return;
                    } else if (bytesRead != msgSize) {
                        partialReads++;
                        realRead = -1;
                    } else {
                        realRead = System.nanoTime() - realRead;
                    }
                }

                long now = System.nanoTime();

                timeToRead = now - timeToRead;

                long latency = now - timeSent;

                if (++totalMessagesSent % mod == 0 || totalMessagesSent == 1) {
                    printResults(latency, timeToWrite, timeToRead, zeroReads, partialReads, realRead);
                }

                if (totalMessagesSent == WARMUP) {
                    this.interval = 30000;
                    this.mod = 1;
                }
            }

        } catch(Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] args) throws Exception {

        Client client = new Client("localhost", 9999, 8);
        client.start();
    }
}

我执行 java -server -cp . Serverjava -server -cp . Client .客户端的输出是:
enter image description here

根据@dunni 请求,将延迟更改为 1 秒而不是 30 秒。同样的问题:
enter image description here

最佳答案

您遇到的一个问题是,当没有数据要读取时,JVM、CPU 和它的缓存都处于 hibernate 状态。一旦发生这种情况,机器必须做更多的事情才能获取数据,而不是当您的问题很严重时。

  • CPU 速度可能已降低以节省电量。例如一半正常。它可以在愚蠢的繁忙循环中执行此操作。
  • 线程没有运行,必须在新的 CPU 上重新启动。 (在你的情况下,这应该很少见)
  • CPU 的缓存可能已断电,必须从 L3 缓存或主内存逐步加载
  • 即使在你的线程返回后,它也会比正常运行慢达 100 微秒,因为缓存会引入更多的数据/代码。
  • 您将获得每秒 100 次以上无法关闭的不可屏蔽中断。

  • 简而言之,如果您需要一致的延迟,您需要
  • 关闭电源管理。
  • 不要放弃 CPU,即忙等待。 (你在做什么)
  • 在独立的 CPU 上运行,将线程与亲和性绑定(bind)。
  • 禁用该内核上的所有可屏蔽中断。
  • 使用用户空间驱动程序而不是内核进行网络连接。

  • 注意:鉴于每个操作似乎都需要大约 2 倍的时间,我会先看看电源管理。

    关于java - SocketChannel : Why if I write msgs quickly the latency of each message is low, 但是当我每 30 秒写一个 msg 时延迟很高?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43377600/

    相关文章:

    C Sockets 客户端/服务器 pthreads 服务器广播

    java - 从Java中的ObjectInputStream连续读取对象

    c++ - C++异步主机名解析

    python - 时间服务器网络

    java - 如何使用Selenium网格通过远程机器节点浏览器直接将文件下载到我的集线器机器(存在我的项目代码工作区的地方)?

    java - 如何使用 Datastax Java 驱动程序的异步/批量写入功能

    java - 是否存在有界无锁阻塞队列?

    java - 通过 Socket 将复杂对象从 Java 客户端发送到 C 服务器

    sockets - 带倍频程的 pkg 负载 socket

    从主线程关闭一个套接字