Linux 上的 Java BlockingQueue 延迟高

标签 java linux multithreading latency

我正在使用 BlockingQueue:s(同时尝试 ArrayBlockingQueue 和 LinkedBlockingQueue)在我目前正在处理的应用程序中的不同线程之间传递对象。性能和延迟在这个应用程序中相对重要,所以我很好奇使用 BlockingQueue 在两个线程之间传递对象需要多少时间。为了衡量这一点,我编写了一个带有两个线程(一个消费者和一个生产者)的简单程序,我让生产者将时间戳(使用 System.nanoTime() 获取)传递给消费者,请参见下面的代码。

我记得在某个论坛上的某个地方读到,尝试此操作的其他人花费了大约 10 微秒(不知道使用的是什么操作系统和硬件),所以当我在我的计算机上花费了大约 30 微秒时,我并不感到惊讶windows 7 box(Intel E7500 core 2 duo CPU, 2.93GHz),同时在后台运行许多其他应用程序。但是,当我在速度更快的 Linux 服务器(两个 Intel X5677 3.46GHz 四核 CPU,运行内核为 2.6.26-2-amd64 的 Debian 5)上进行相同测试时,我感到非常惊讶。我预计延迟会比我的 windows box 低,但相反,它要高得多 - ~75 - 100 微秒!这两个测试都是使用 Sun 的 Hotspot JVM 版本 1.6.0-23 完成的。

有没有其他人在 Linux 上做过类似的测试并得到类似的结果?或者有谁知道为什么它在 Linux 上慢得多(具有更好的硬件),与 Windows 相比,Linux 上的线程切换是否会慢得多?如果是这样的话,似乎 windows 实际上更适合某些类型的应用程序。非常感谢帮助我理解相对较高数字的任何帮助。

编辑:
在 DaveC 发表评论后,我还做了一个测试,我将 JVM(在 Linux 机器上)限制为单个核心(即所有线程在同一核心上运行)。这极大地改变了结果——延迟降低到 20 微秒以下,即比 Windows 机器上的结果要好。我还做了一些测试,将生产者线程限制在一个核心,将消费者线程限制在另一个核心(尝试将它们放在同一个套接字和不同的套接字上),但这似乎没有帮助 - 延迟仍然约为 75微秒。顺便说一句,这个测试应用程序几乎是我在执行测试时在机器上运行的全部内容。

有谁知道这些结果是否有意义?如果生产者和消费者在不同的内核上运行,它真的应该慢得多吗?任何输入都非常感谢。

再次编辑(1 月 6 日):
我对代码和运行环境进行了不同的更改:

  • 我将 Linux 内核升级到 2.6.36.2(从 2.6.26.2)。内核升级后,测量时间从升级前的 75-100 变为 60 微秒,变化非常小。为生产者和消费者线程设置 CPU 亲和性没有任何影响,除非将它们限制在相同的内核上。在同一内核上运行时,测得的延迟为 13 微秒。
  • 在原始代码中,我让生产者在每次迭代之间 hibernate 1 秒,以便给消费者足够的时间来计算耗时并将其打印到控制台。如果我删除对 Thread.sleep() 的调用,而是让生产者和消费者在每次迭代中都调用 barrier.await()(消费者在将耗时打印到控制台后调用它),则测量的延迟从60 微秒到 10 微秒以下。如果在同一内核上运行线程,则延迟会低于 1 微秒。谁能解释为什么这会如此显着地减少延迟?我的第一个猜测是更改的效果是生产者在消费者调用 queue.take() 之前调用了 queue.put(),因此消费者永远不必阻塞,但是在使用了 ArrayBlockingQueue 的修改版本之后,我发现这个猜测是错误的——消费者确实阻止了。如果您有其他猜测,请告诉我。 (顺便说一句,如果我让生产者同时调用 Thread.sleep() 和 barrier.await(),延迟保持在 60 微秒)。
  • 我还尝试了另一种方法——我没有调用 queue.take(),而是调用 queue.poll(),超时时间为 100 微秒。这将平均延迟降低到 10 微秒以下,但当然 CPU 密集度更高(但 CPU 密集度可能低于忙等待?)。

  • 再次编辑(1 月 10 日) - 问题已解决:
    ninjalj 表示,大约 60 微秒的延迟是由于 CPU 必须从更深的 sleep 状态中唤醒——他完全正确!在 BIOS 中禁用 C 状态后,延迟减少到 <10 微秒。这解释了为什么我在上面的第 2 点下获得了更好的延迟 - 当我更频繁地发送对象时,CPU 保持足够忙碌,不会进入更深的 sleep 状态。非常感谢所有花时间阅读我的问题并在这里分享您的想法的人!

    ...
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.CyclicBarrier;
    
    public class QueueTest {
    
        ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(10);
        Thread consumerThread;
        CyclicBarrier barrier = new CyclicBarrier(2);
        static final int RUNS = 500000;
        volatile int sleep = 1000;
    
        public void start() {
            consumerThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        barrier.await();
                        for(int i = 0; i < RUNS; i++) {
                            consume();
    
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    } 
                }
            });
            consumerThread.start();
    
            try {
                barrier.await();
            } catch (Exception e) { e.printStackTrace(); }
    
            for(int i = 0; i < RUNS; i++) {
                try {
                    if(sleep > 0)
                        Thread.sleep(sleep);
                    produce();
    
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        public void produce() {
            try {
                queue.put(System.nanoTime());
            } catch (InterruptedException e) {
            }
        }
    
        public void consume() {
            try {
                long t = queue.take();
                long now = System.nanoTime();
                long time = (now - t) / 1000; // Divide by 1000 to get result in microseconds
                if(sleep > 0) {
                    System.out.println("Time: " + time);
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    
        public static void main(String[] args) {
            QueueTest test = new QueueTest();
            System.out.println("Starting...");
            // Run first once, ignoring results
            test.sleep = 0;
            test.start();
            // Run again, printing the results
            System.out.println("Starting again...");
            test.sleep = 1000;
            test.start();
        }
    }
    

    最佳答案

    您的测试不能很好地衡量队列切换延迟,因为您有一个线程从队列中读取同步写入 System.out (当它在它的时候做一个字符串和长连接)在它再次需要之前。要正确衡量这一点,您需要将此 Activity 移出该线程,并在获取线程中尽可能少做工作。

    您最好只在接受者中进行计算(当时)并将结果添加到其他一些集合中,该集合由另一个输出结果的线程定期排出。我倾向于通过添加到通过 AtomicReference 访问的适当大小的数组支持结构中来做到这一点(因此,报告线程只需使用该存储结构的另一个实例对该引用进行 getAndSet 即可获取最新一批结果;例如 make 2列表,将其中一个设置为主动,每个 xsa 线程都会唤醒并交换主动和被动线程)。然后您可以报告一些分布而不是每个结果(例如十分位数范围),这意味着您不会在每次运行时生成大量日志文件并为您打印有用的信息。

    FWIW 我同意 Peter Lawrey 所说的时间,如果延迟真的很重要,那么您需要考虑以适当的 CPU 关联性进行忙等待(即为该线程专门分配一个核心)

    1 月 6 日之后编辑

    If I remove the call to Thread.sleep () and instead let both the producer and consumer call barrier.await() in every iteration (the consumer calls it after having printed the elapsed time to the console), the measured latency is reduced from 60 microseconds to below 10 microseconds. If running the threads on the same core, the latency gets below 1 microsecond. Can anyone explain why this reduced the latency so significantly?



    您正在查看 java.util.concurrent.locks.LockSupport#park 之间的区别(以及相应的 unpark )和 Thread#sleep .大多数 j.u.c.东西建立在 LockSupport (通常通过 AbstractQueuedSynchronizer 提供或直接通过 ReentrantLock 提供)并且此(在 Hotspot 中)解析为 sun.misc.Unsafe#park (和 unpark ),这往往会落入 pthread(posix 线程)库的手中。通常 pthread_cond_broadcast醒来和pthread_cond_waitpthread_cond_timedwait对于诸如 BlockingQueue#take 之类的事情.

    我不能说我曾经看过 Thread#sleep实际上是实现的(因为我从来没有遇到过不是基于条件的等待的低延迟),但我想它会导致它以比 pthread 信号机制更积极的方式被调度程序降级,那就是延迟差异的原因是什么。

    关于Linux 上的 Java BlockingQueue 延迟高,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/4584240/

    相关文章:

    java - 运行永远不会返回 salt master 的 salt 脚本作业

    java - 如何在Linux中安装Hammurapi来查看Java源代码

    c++ - 编译 C++ 代码时出现 ios::nocreate 错误

    c# - 什么时候应该使用每个线程同步对象?

    c# - 如何使用 ConcurrentQueue<T> 进行线程处理

    java - 如何在 Android 中发送带有 fragment 的 graphql 查询

    Java LibGDX 更新和绘制方法

    java - 将结果集转换为 Java 模型对于一百万条记录来说速度缓慢

    c++ - 我如何从 strace 输出中确定我的程序的哪一部分未能获得互斥锁

    java - 使用线程在数组中搜索元素