java - 多线程追加器队列上的慢队列尾部

标签 java multithreading chronicle chronicle-queue

我遇到了多个线程写入同一个队列的情况。

Appender 线程接收来自不同市场的更新(每个线程单个市场)并将这些数据推送到同一队列中:

ChronicleQueue queue = SingleChronicleQueueBuilder.binary(path + "/market").build();
        final ExcerptTailer tailer = queue.createTailer();
appender.writeDocument(
                        wire -> {

                                wire
                                        .getValueOut().text("buy")
                                        .getValueOut().text(exchange.name())
                                        .getValueOut().text(currencyPair.toString())
                                        .getValueOut().dateTime(LocalDateTime.now(Clock.systemUTC()))
                                        .getValueOut().text(price);
                            });

然后我有完全独立的进程(不同的JVM)来连续从队列中读取:

while (true){
     tailer.readDocument(........

但是,虽然我每秒对队列生成大约 10 次更新,但tailer 大约每 3 秒处理一条记录。我想我在这里遗漏了一些基本的东西:-)

或者持续监听队列更新的正确方法是什么?除了 while (true) then do 之外,我找不到任何其他解决方案...

我正在 18 核机器(36 个线程)上进行开发,并使用 Java Affinity 将每个工作分配给 itc 自己的 CPU。

感谢您的任何提示。

最佳答案

创建队列的成本非常高,如果可以的话,请尝试每个进程只执行一次。

创建 Tailer 的成本也很高,您应该创建一次并不断轮询更新。

创建对象可能会很昂贵,我会避免创建任何对象。例如避免调用 toStringLocalDate.now

这是基准测试的示例

String path = OS.getTarget();
ChronicleQueue queue = SingleChronicleQueueBuilder.binary(path + "/market").build();
ExcerptAppender appender = queue.acquireAppender();
Exchange exchange = Exchange.EBS;
CurrencyPair currencyPair = CurrencyPair.EURUSD;
double price = 1.2345;
for (int t = 0; t < 5; t++) {
    long start = System.nanoTime();
    int messages = 100000;
    for (int i = 0; i < messages; i++) {
        try (DocumentContext dc = appender.writingDocument()) {
            ValueOut valueOut = dc.wire().getValueOut();
            valueOut.text("buy")
                    .getValueOut().asEnum(exchange)
                    .getValueOut().asEnum(currencyPair)
                    .getValueOut().int64(System.currentTimeMillis())
                    .getValueOut().float64(price);
        }
    }
    long time = System.nanoTime() - start;
    System.out.printf("Throughput was %,d messages per second%n", (long) (messages * 1e9 / time));
    Jvm.pause(100);
}

打印

Throughput was 962,942 messages per second
Throughput was 2,952,433 messages per second
Throughput was 4,776,337 messages per second
Throughput was 3,250,235 messages per second
Throughput was 3,514,863 messages per second

对于阅读,你可以这样做

final ExcerptTailer tailer = queue.createTailer();
for (int t = 0; t < 5; t++) {
    long start = System.nanoTime();
    int messages = 100000;
    for (int i = 0; i < messages; i++) {
        try (DocumentContext dc = tailer.readingDocument()) {
            if (!dc.isPresent())
                throw new AssertionError("Missing t: " + t + ", i: " + i);
            ValueIn in = dc.wire().getValueIn();
            String buy = in.text();
            Exchange exchange2 = in.asEnum(Exchange.class);
            CurrencyPair currencyPair2 = in.asEnum(CurrencyPair.class);
            long time = in.int64();
            double price2 = in.float64();
        }
    }
    long time = System.nanoTime() - start;
    System.out.printf("Read Throughput was %,d messages per second%n", (long) (messages * 1e9 / time));
}

注意:它读取的消息数量与写入的消息数量相同。

打印

Read Throughput was 477,849 messages per second
Read Throughput was 3,083,642 messages per second
Read Throughput was 5,100,516 messages per second
Read Throughput was 6,342,525 messages per second
Read Throughput was 6,672,971 messages per second

关于java - 多线程追加器队列上的慢队列尾部,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48172892/

相关文章:

java - 当值的大小变化很大时,ChronicleMap 会导致 JVM 崩溃

java - VanillaChronicle 的线程安全

java - 高效并行素因数分解

Java FutureTask - 多线程调用 get()

java - 如何将参数传递给 java bash 脚本?

java - 它可能在 url 中发送数组

multithreading - Perl : empty variable while using eval in a thread

c++ - 对 std::lock 的调用是否可以传递已被调用线程锁定的资源?

c# - 用户在网站中触发一个漫长的过程

java - 在 Chronicle map 上执行操作的观察者模式