java - Apache Ignite Queue 比 LinkedBlockingQueue 慢得多

标签 java ignite

我正在尝试在 Ignite 中复制一个简单的生产者-消费者场景:

public class QueueExample {
    public static void main(String[] args) {
        new QueueExample().start();
    }

    private void start() {
        final AtomicBoolean finishedTest1 = new AtomicBoolean(false);
        final BlockingQueue<Double> queue = new LinkedBlockingQueue<>(5);
        final CountDownLatch latch = new CountDownLatch(2);
        final int MAX = 1000;

        new Thread(() -> {
            System.out.println("test1 before latch");
            latch.countDown();
            try {
                // wait until other runnable is able to poll
                latch.await(20, TimeUnit.SECONDS);
            } catch (Exception ex) {
                throw new RuntimeException(ex);
            }
            System.out.println(new Date().getTime() + " start test1");
            double test = 2;
            Random r = new Random();
            StopWatch sw = new StopWatch();
            sw.start();
            for (int i = 0; i < MAX; i++) {
                try {
                    queue.put(r.nextDouble());
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            sw.stop();
            finishedTest1.set(true);
            //LoggerFactory.getLogger(getClass()).info
            System.out.println(new Date().getTime() + " end test1. " + test + ", took:" + sw.getTime() / 1000f);
        }).start();

        new Thread(() -> {
            System.out.println("test2 before latch");
            latch.countDown();
            try {
                // wait until other runnable is able to poll
                latch.await(10, TimeUnit.SECONDS);
            } catch (Exception ex) {
                throw new RuntimeException(ex);
            }
            System.out.println(new Date().getTime() + " start test2");
            StopWatch sw = new StopWatch();
            sw.start();
            int counter = 0;
            try {
                for (int i = 0; i < MAX ; i++) {
                    Double res = queue.poll(1, TimeUnit.SECONDS);
                    counter++;
                }
            } catch (InterruptedException e) {
                // expected
            }
            sw.stop();

            //LoggerFactory.getLogger(getClass()).info
            System.out.println(new Date().getTime() + " end test2. counter " + counter + ", finished:" + finishedTest1.get() + ", took:" + sw.getTime() / 1000f);
        }).start();
    }
}

与以下 Ignite 代码相比,为什么速度快了 100 倍(0.02 秒 vs. <2 秒)?

public class MyIgnite {
    public static void main(String[] args) {
        new MyIgnite().start();
    }

    private void start() {
        IgniteConfiguration icfg = new IgniteConfiguration();
        icfg.setIgniteInstanceName("test1");
        Ignite ignite1 = Ignition.start(icfg);

        final CountDownLatch latch = new CountDownLatch(2);

        final int queueSize = 5;
        CollectionConfiguration queueCfg = new CollectionConfiguration();

        ignite1.compute().runAsync(new IgniteRunnable() {

            @IgniteInstanceResource
            Ignite ignite;

            @Override
            public void run() {
                IgniteQueue<Double> queue = ignite.queue("test", queueSize, queueCfg);
                System.out.println("test1 fetched queue");
                latch.countDown();
                try {
                    // wait until other runnable is able to poll
                    latch.await(20, TimeUnit.SECONDS);
                } catch (Exception ex) {
                    throw new RuntimeException(ex);
                }
                System.out.println("start test1");
                double test = 2;
                Random r = new Random();
                StopWatch sw = new StopWatch();
                sw.start();
                for (int i = 0; i < 1000; i++) {
                    queue.put(r.nextDouble());
                }
                sw.stop();
                //LoggerFactory.getLogger(getClass()).info
                System.out.println("end test1. " + test + " at ignite " + ignite.name() + ", took:" + sw.getTime() / 1000f);
            }
        });

        System.out.println("starting test2");
        icfg = new IgniteConfiguration();
        icfg.setIgniteInstanceName("test2");
        Ignite ignite2 = Ignition.start(icfg);
        ignite2.compute().runAsync(new IgniteRunnable() {
            @IgniteInstanceResource
            Ignite ignite;

            @Override
            public void run() {
                IgniteQueue<Double> queue = ignite.queue("test", queueSize, queueCfg);
                System.out.println("test2 fetched queue");
                latch.countDown();
                try {
                    // wait until other runnable is able to poll
                    latch.await(10, TimeUnit.SECONDS);
                } catch (Exception ex) {
                    throw new RuntimeException(ex);
                }
                System.out.println("start test2");
                StopWatch sw = new StopWatch();
                sw.start();
                int counter = 0;
                try {
                    for (int i = 0; i < 1000; i++) {
                        Double res = queue.poll(5, TimeUnit.SECONDS);
                        counter++;
                    }

                } catch (IgniteException exc) {
                    System.out.println("Somehow cannot poll. " + exc);
                }
                sw.stop();
                //LoggerFactory.getLogger(getClass()).info
                System.out.println("end test2. counter " + counter + " at ignite " + ignite.name() + ", took:" + sw.getTime() / 1000f);
            }
        });

        System.out.println("oldest node: " + ignite1.cluster().forOldest().hostNames());
        System.out.println("nodes: " + ignite1.cluster().nodes().size());

        // does it really gracefully shut the nodes down?
//        Ignition.stop(ignite1.name(), false);
//        Ignition.stop(ignite2.name(), false);
    }
}

我尝试修改 ignite 存储以使其表现得更像内存中,但未能成功更改数字。

最佳答案

您在这里将航空母舰与玩具船进行比较。

LinkedBlockingQueue是一种在单个JVM内存中工作的数据结构。

IgniteQueue是一个基于Ignite键值存储的分布式结构。它可以在数百台机器上运行,具有不同的一致性级别、备份副本和持久性。当然,它背后有很多机器支持,而且它比简单的本地队列慢。

关于java - Apache Ignite Queue 比 LinkedBlockingQueue 慢得多,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53454232/

相关文章:

java - Spring 3 在自定义 bean 中接收 servletContext

java - 多边形内的测试点

java - 确定列表大小的最快方法

java - 是否有一个特殊的字符集将所有可能的字节值映射到有效字符并映射回来?

kubernetes - 使用 TcpDiscoveryKubernetesIpFinder 时,OpenShift/K8s 项目 pod 不加入同一网格的问题,而是创建多个隔离的网格

java - 获取所有行中最近添加的列?

java - 如何修复 Apache Ignite 中的 "Could not initialize class org.apache.ignite.IgniteJdbcThinDriver"错误?

java - Apache Ignite 与 Spring Boot CrudRepo 无法初始化

spring-boot - Ignite CrudRepository 仍然遇到 deleteAll 的名称冲突

ssl - 点燃 pod 之间的 TLS 通信