我正在评估 Chronicle Queue 在我们的软件中的使用情况,我一定做错了什么。
我有一个追加器,可以非常快地写入大约 650k 条目。之后,它逐渐停止,此时内存已达到允许的最大值,并最终达到 OutOfMemory。
这是我的代码:
final class LogEntryOutput implements WriteBytesMarshallable
{
private final int maxMessageSize;
private TLogEntry logEntry;
LogEntryOutput(final int maxMessageSize)
{
this.maxMessageSize = maxMessageSize;
}
public void setMarshallable(final TLogEntry logEntry)
{
this.logEntry = logEntry;
}
@Override
@SuppressWarnings({"rawtypes", "No way to provide generic type and override WriteBytesMarshallable."})
public void writeMarshallable(final BytesOut bytes)
{
bytes.writeLong(this.logEntry.getSessionId());
bytes.writeInt(this.logEntry.getLogLevel());
bytes.writeInt(this.logEntry.getSecurityLevel());
bytes.writeLong(this.logEntry.getPosixTimestamp());
// Limit size of string messages.
final int messageSize = Math.min(this.logEntry.getMessage().length(), this.maxMessageSize);
// Write message length
bytes.writeStopBit((long)messageSize);
// Write message bytes.
bytes.write(this.logEntry.getMessage(), 0, messageSize);
}
}
final TLogEntry entry = new TLogEntry();
entry.setSessionId(321234L);
entry.setLogLevel(77);
entry.setSecurityLevel(1234);
entry.setPosixTimestamp(6141234321L);
entry.setMessage("This is a test message for the system................................ A");
final LogEntryOutput output = new LogEntryOutput(1024);
output.setMarshallable(entry);
final ChronicleQueue queue = SingleChronicleQueueBuilder.binary(config.getQueueDirectory())
.rollCycle(RollCycles.HOURLY)
.build();
final ExcerptAppender appender = queue.acquireAppender();
for (int j = 0; j < 100; ++j)
{
for (int i = 0; i < 10000; ++i)
{
appender.writeBytes(output);
}
System.out.println((j+1) * 10000);
Jvm.pause(100L);
}
queue.close();
它在带有 64 位 JVM 的 Windows 7 x64 中运行,使用:-Xmx1024m
有什么想法我可能做错了什么吗?
编辑:我有更多信息。我在内存峰值之后拍摄了对象分配的快照。很多对象数组等等。 以及当我收到 OOM 错误时的堆栈跟踪。
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.concurrent.ConcurrentLinkedQueue.offer(ConcurrentLinkedQueue.java:328)
at java.util.concurrent.ConcurrentLinkedQueue.add(ConcurrentLinkedQueue.java:297)
at net.openhft.chronicle.core.ReferenceCounter.recordRelease(ReferenceCounter.java:88)
at net.openhft.chronicle.core.ReferenceCounter.release(ReferenceCounter.java:79)
at net.openhft.chronicle.bytes.NativeBytesStore.release(NativeBytesStore.java:267)
at net.openhft.chronicle.bytes.MappedBytes.acquireNextByteStore(MappedBytes.java:186)
at net.openhft.chronicle.bytes.MappedBytes.peekVolatileInt(MappedBytes.java:388)
at net.openhft.chronicle.wire.AbstractWire.readMetaDataHeader(AbstractWire.java:222)
at net.openhft.chronicle.queue.impl.single.SCQIndexing.arrayForAddress(SCQIndexing.java:190)
at net.openhft.chronicle.queue.impl.single.SCQIndexing.sequenceForPosition(SCQIndexing.java:492)
at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueStore.sequenceForPosition(SingleChronicleQueueStore.java:272)
at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts$StoreAppender.checkWritePositionHeaderNumber(SingleChronicleQueueExcerpts.java:339)
at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts$StoreAppender.writingDocument(SingleChronicleQueueExcerpts.java:267)
at net.openhft.chronicle.wire.MarshallableOut.writingDocument(MarshallableOut.java:55)
at net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts$StoreAppender.writeBytes(SingleChronicleQueueExcerpts.java:117)
at com.selinc.winchester.ledger.writer.harness.queue.LogEntryConsumers$LogEntryChronicle.accept(LogEntryConsumers.java:78)
at com.selinc.winchester.ledger.writer.harness.queue.LogEntryConsumers$LogEntryChronicle.accept(LogEntryConsumers.java:45)
at com.selinc.winchester.ledger.writer.harness.queue.LogEntryConsumersTest.test(LogEntryConsumersTest.java:56)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:86)
at org.testng.internal.Invoker.invokeMethod(Invoker.java:643)
at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:820)
at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1128)
at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:129)
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:112)
at org.testng.TestRunner.privateRun(TestRunner.java:782)
at org.testng.TestRunner.run(TestRunner.java:632)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:366)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:361)
最佳答案
Chronicle Queue 有一些额外的检查来检测内存泄漏,这些检查是通过 -ea
打开的。如果您在启用这些额外检查的情况下运行,则在您的情况下,队列确实会减慢大约 90,000 条消息的速度。如果关闭断言,它会运行更长时间。
在具有 8 GB 内存的 Windows 笔记本电脑上,断言关闭下,可在 5.5 秒内运行 10,000,000 个条目
它还能在 66 秒内完成 1 亿条记录。
public class ATest {
static class TLogEntry {
private long sessionId;
private int logLevel;
private int securityLevel;
private long posixTimestamp;
private CharSequence message;
public long getSessionId() {
return sessionId;
}
public void setSessionId(long sessionId) {
this.sessionId = sessionId;
}
public int getLogLevel() {
return logLevel;
}
public void setLogLevel(int logLevel) {
this.logLevel = logLevel;
}
public int getSecurityLevel() {
return securityLevel;
}
public void setSecurityLevel(int securityLevel) {
this.securityLevel = securityLevel;
}
public long getPosixTimestamp() {
return posixTimestamp;
}
public void setPosixTimestamp(long posixTimestamp) {
this.posixTimestamp = posixTimestamp;
}
public CharSequence getMessage() {
return message;
}
public void setMessage(CharSequence message) {
this.message = message;
}
}
static class LogEntryOutput implements WriteBytesMarshallable {
private final int maxMessageSize;
private TLogEntry logEntry;
LogEntryOutput(final int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
public void setMarshallable(final TLogEntry logEntry) {
this.logEntry = logEntry;
}
@Override
@SuppressWarnings({"rawtypes", "No way to provide generic type and override WriteBytesMarshallable."})
public void writeMarshallable(final BytesOut bytes) {
bytes.writeLong(this.logEntry.getSessionId());
bytes.writeInt(this.logEntry.getLogLevel());
bytes.writeInt(this.logEntry.getSecurityLevel());
bytes.writeLong(this.logEntry.getPosixTimestamp());
// Limit size of string messages.
final int messageSize = Math.min(this.logEntry.getMessage().length(), this.maxMessageSize);
// Write message length
bytes.writeStopBit((long) messageSize);
// Write message bytes.
bytes.write(this.logEntry.getMessage(), 0, messageSize);
}
}
@Test
public void test() {
final TLogEntry entry = new TLogEntry();
entry.setSessionId(321234L);
entry.setLogLevel(77);
entry.setSecurityLevel(1234);
entry.setPosixTimestamp(6141234321L);
entry.setMessage("This is a test message for the system................................ A");
final LogEntryOutput output = new LogEntryOutput(1024);
output.setMarshallable(entry);
final ChronicleQueue queue = SingleChronicleQueueBuilder.binary(
OS.TARGET + "/test-" + System.nanoTime())
.rollCycle(RollCycles.HOURLY)
.build();
final ExcerptAppender appender = queue.acquireAppender();
Jvm.setExceptionHandlers(Slf4jExceptionHandler.FATAL, Slf4jExceptionHandler.WARN, Slf4jExceptionHandler.WARN);
for (int j = 0; j < 1000; ++j) {
for (int i = 0; i < 10000; ++i) {
appender.writeBytes(output);
}
System.out.println((j + 1) * 10000);
// Jvm.pause(100L);
}
queue.close();
}
}
关于java - 历史记录队列速度变慢并耗尽内存,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44058298/