我遇到了多个线程写入同一个队列的情况。
Appender 线程接收来自不同市场的更新(每个线程单个市场)并将这些数据推送到同一队列中:
ChronicleQueue queue = SingleChronicleQueueBuilder.binary(path + "/market").build();
final ExcerptTailer tailer = queue.createTailer();
appender.writeDocument(
wire -> {
wire
.getValueOut().text("buy")
.getValueOut().text(exchange.name())
.getValueOut().text(currencyPair.toString())
.getValueOut().dateTime(LocalDateTime.now(Clock.systemUTC()))
.getValueOut().text(price);
});
然后我有完全独立的进程(不同的JVM)来连续从队列中读取:
while (true){
tailer.readDocument(........
但是,虽然我每秒对队列生成大约 10 次更新,但tailer 大约每 3 秒处理一条记录。我想我在这里遗漏了一些基本的东西:-)
或者持续监听队列更新的正确方法是什么?除了 while (true) then do 之外,我找不到任何其他解决方案...
我正在 18 核机器(36 个线程)上进行开发,并使用 Java Affinity 将每个工作分配给 itc 自己的 CPU。
感谢您的任何提示。
最佳答案
创建队列的成本非常高,如果可以的话,请尝试每个进程只执行一次。
创建 Tailer 的成本也很高,您应该创建一次并不断轮询更新。
创建对象可能会很昂贵,我会避免创建任何对象。例如避免调用 toString
或 LocalDate.now
这是基准测试的示例
String path = OS.getTarget();
ChronicleQueue queue = SingleChronicleQueueBuilder.binary(path + "/market").build();
ExcerptAppender appender = queue.acquireAppender();
Exchange exchange = Exchange.EBS;
CurrencyPair currencyPair = CurrencyPair.EURUSD;
double price = 1.2345;
for (int t = 0; t < 5; t++) {
long start = System.nanoTime();
int messages = 100000;
for (int i = 0; i < messages; i++) {
try (DocumentContext dc = appender.writingDocument()) {
ValueOut valueOut = dc.wire().getValueOut();
valueOut.text("buy")
.getValueOut().asEnum(exchange)
.getValueOut().asEnum(currencyPair)
.getValueOut().int64(System.currentTimeMillis())
.getValueOut().float64(price);
}
}
long time = System.nanoTime() - start;
System.out.printf("Throughput was %,d messages per second%n", (long) (messages * 1e9 / time));
Jvm.pause(100);
}
打印
Throughput was 962,942 messages per second
Throughput was 2,952,433 messages per second
Throughput was 4,776,337 messages per second
Throughput was 3,250,235 messages per second
Throughput was 3,514,863 messages per second
对于阅读,你可以这样做
final ExcerptTailer tailer = queue.createTailer();
for (int t = 0; t < 5; t++) {
long start = System.nanoTime();
int messages = 100000;
for (int i = 0; i < messages; i++) {
try (DocumentContext dc = tailer.readingDocument()) {
if (!dc.isPresent())
throw new AssertionError("Missing t: " + t + ", i: " + i);
ValueIn in = dc.wire().getValueIn();
String buy = in.text();
Exchange exchange2 = in.asEnum(Exchange.class);
CurrencyPair currencyPair2 = in.asEnum(CurrencyPair.class);
long time = in.int64();
double price2 = in.float64();
}
}
long time = System.nanoTime() - start;
System.out.printf("Read Throughput was %,d messages per second%n", (long) (messages * 1e9 / time));
}
注意:它读取的消息数量与写入的消息数量相同。
打印
Read Throughput was 477,849 messages per second
Read Throughput was 3,083,642 messages per second
Read Throughput was 5,100,516 messages per second
Read Throughput was 6,342,525 messages per second
Read Throughput was 6,672,971 messages per second
关于java - 多线程追加器队列上的慢队列尾部,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48172892/