这个问题是快速序列化库特有的。 https://github.com/RuedigerMoeller/fast-serialization
我在 Windows 7、jdk 1.7 上使用 FSTLongOffheapMap(版本 2.29)来存储一些对象。我测试了存储对象的延迟,99.99% 的延迟约为 100 微秒。考虑到我在 Windows 上运行它 + 没有编写自定义序列化程序 + 没有对 GC 进行太多调整,这是优秀。
但是,如果我按如下方式更改设置,性能会急剧下降:
a) 创建一个生产者线程和一个通过 ArrayBlockingQueue 进行管道传输的消费者线程。
b) 让生产者生产 50,000 个对象并尽快将它们放入队列中。
c) 消费者线程拉出对象,将其分派(dispatch)给监听器,然后将其存储在堆外映射中。
在此设置中,第 99.99 个百分点的延迟达到~135 毫秒! 但最大的惊喜是,如果我只是注释掉执行 put 的行,延迟就会下降到约 400 微秒。这对我来说似乎不合逻辑,因为我从之前的测试中知道,放入 map 的延迟只有 100 微秒。
下面是重现我的发现的测试,我将不胜感激任何建议/提示/想法。运行测试所需的唯一外部库是 Gil Tene 的 HdrHistogram。 https://github.com/HdrHistogram/HdrHistogram
-非常感谢
<小时/>package com.mine.serialization.perf;
import java.io.*;
import java.util.*;
import org.nustaq.offheap.*;
import org.nustaq.serialization.simpleapi.*;
public final class MyFSTSerializer{
private final boolean toStore;
private final String fileName;
private final long memorySize;
private final FSTCoder fastCoder;
private final FSTLongOffheapMap<MktDataEvent> offHeapMap;
public MyFSTSerializer( boolean toStore, String location, String journalName, FSTCoder fastCoder, long memorySize, int count ) throws Exception{
this.toStore = toStore;
this.fileName = location + File.separator + journalName + ".mmf";
this.memorySize = memorySize;
this.fastCoder = fastCoder;
this.offHeapMap = new FSTLongOffheapMap<>( fileName, memorySize, count, fastCoder );
}
public final boolean toStore( ){
return toStore;
}
public final String getFilename( ){
return fileName;
}
public final void start( ){
fastCoder.getConf().setCrossPlatform( false );
fastCoder.getConf().setPreferSpeed( true );
fastCoder.getConf().setShareReferences( false );
fastCoder.getConf().registerClass( Long.class, MktDataEvent.class );
System.out.println("Journaling started at " + fileName + " with Memory " + memorySize ) ;
}
public final void storeEvent( MktDataEvent event ){
offHeapMap.put( event.getSequenceId(), event );
}
public final Collection<MktDataEvent> retrieveAllEvents( ){
Map<Long, MktDataEvent> retrievedMap = new LinkedHashMap<>();
for( Iterator<MktDataEvent> iterator = offHeapMap.values(); iterator.hasNext(); ){
MktDataEvent event = (MktDataEvent) iterator.next();
retrievedMap.put( event.getSequenceId(), event );
}
return retrievedMap.values();
}
public final void stop( ){
try{
offHeapMap.free( );
System.out.println("Stopped Journal and freed memory." );
}catch( Exception e ){
e.printStackTrace( );
}
}
}
<小时/>
package com.mine.serialization.perf;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
public final class MktDataEvent implements Serializable{
private final long sequenceId;
private final long creationTime;
private final String symbol;
private final double bidPrice;
private final long bidQuantity;
private final double askPrice;
private final long askQuantity;
private final static long serialVersionUID = 1L;
private final static AtomicLong SEQUENCE = new AtomicLong();
public MktDataEvent( String symbol, double bidPrice, long bidQuantity, double askPrice, long askQuantity ){
this.creationTime = System.nanoTime( );
this.sequenceId = SEQUENCE.incrementAndGet();
this.symbol = symbol;
this.bidPrice = bidPrice;
this.bidQuantity = bidQuantity;
this.askPrice = askPrice;
this.askQuantity = askQuantity;
}
public final long getSequenceId( ){
return sequenceId;
}
public final long getCreationTime( ){
return creationTime;
}
public final String getSymbol(){
return symbol;
}
public final double getBidPrice( ){
return bidPrice;
}
public final long getBidQuantity( ){
return bidQuantity;
}
public final double getAskPrice( ){
return askPrice;
}
public final long getAskQuantity( ){
return askQuantity;
}
}
//-------------------------------------------------------- -------------------
package com.mine.serialization.perf;
import java.util.*;
import java.util.concurrent.*;
public final class MktDataDispatcher implements Runnable{
private volatile boolean keepDispatching;
private final ExecutorService service;
private final MyFSTSerializer serializer;
private final MktDataListener listener;
private final AbstractQueue<MktDataEvent> eventQueue;
public MktDataDispatcher( int queueSize, MyFSTSerializer serializer, MktDataListener listener ){
this.serializer = serializer;
this.listener = listener;
this.eventQueue = new ArrayBlockingQueue<MktDataEvent>( queueSize );
this.service = Executors.newFixedThreadPool( 1 );
}
public final void start( ){
serializer.start( );
keepDispatching = true;
service.execute( this );
}
public final boolean enqueue( final MktDataEvent event ){
return eventQueue.offer( event );
}
@Override
public final void run( ){
while( keepDispatching ){
try{
MktDataEvent event = eventQueue.poll();
if( event == null ){
Thread.yield();
continue;
}
if( serializer.toStore() ){
serializer.storeEvent( event );
}
listener.update( event );
}catch( Exception e ){
e.printStackTrace( );
}
}
}
protected final int getQueueSize( ){
return eventQueue.size( );
}
public final void stop(){
serializer.stop( );
keepDispatching = false;
service.shutdown();
}
public interface MktDataListener{
public boolean update( MktDataEvent event );
}
}
<小时/>
package com.mine.serialization.perf;
import java.io.*;
import java.util.concurrent.*;
import org.HdrHistogram.*;
import org.nustaq.serialization.simpleapi.*;
import com.mine.serialization.perf.MktDataDispatcher.*;
public final class TestFSTSerializer{
protected static void printResult( Histogram histogram ){
System.out.println( "\nDetail Result (in micros)");
System.out.println( "------------------------------------------------------------------");
histogram.outputPercentileDistribution( System.out, 1000.0 );
double valueAt99Percentile = histogram.getValueAtPercentile( 99.99d );
System.out.println( "\nValue 99.99th percentile >> " + valueAt99Percentile/1000.0 );
}
protected static MyFSTSerializer createFSTSerializer( boolean toStore, int eventCount, int memorySizeOf1Object ) throws Exception{
long expectedMemory = memorySizeOf1Object * eventCount;
String fileLocation = "C:\\Temp";
String journalName = "Test";
MyFSTSerializer ser = new MyFSTSerializer( toStore, fileLocation, journalName, new DefaultCoder(), expectedMemory, eventCount );
return ser;
}
protected static void destroyFSTSerializer( MyFSTSerializer serializer ){
if( serializer != null ){
serializer.stop();
boolean deleted = new File( serializer.getFilename() ).delete();
if( deleted ){
System.out.println( "Deleted file from " + serializer.getFilename());
}else{
throw new RuntimeException( "TEST FAILED as we failed to delete file " + serializer.getFilename() );
}
}
}
public static void testOffHeapPersistence( ){
MyFSTSerializer serializer= null;
try{
int eventCount = 50000;
int memorySizeOf1Object = 1000;
Histogram histogram = new Histogram( TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS), 2);
System.out.println( "Testing off heap persistence performance of FSTLongOffheapMap by storing " + eventCount + " events.");
serializer = createFSTSerializer( true, eventCount, memorySizeOf1Object );
serializer.start( );
for( int i =0; i<eventCount; i++ ){
MktDataEvent event = new MktDataEvent( "EDM6", 99.0, (100 + i), 99.50, (200 + i) );
serializer.storeEvent( event );
histogram.recordValue(System.nanoTime() - event.getCreationTime() );
}
int retrievedEventSize = serializer.retrieveAllEvents().size();
if( eventCount != retrievedEventSize )
throw new RuntimeException("Store failed as we stored " + eventCount + " events but retrieved " + retrievedEventSize );
printResult( histogram );
}catch( Exception e ){
throw new RuntimeException("TEST FAILED as ", e);
}finally{
destroyFSTSerializer( serializer );
}
}
public static void testDispatchAndPersistence( boolean toStore ) throws Exception{
int eventCount = 50000;
int memorySizeOf1Object = 1000;
DummyListener listener = new DummyListener( );
MyFSTSerializer serializer = createFSTSerializer( toStore, eventCount, memorySizeOf1Object );
MktDataDispatcher dispatcher = new MktDataDispatcher( eventCount, serializer, listener );
if( toStore ){
System.out.println( "Testing off heap persistence with dispathcer performance of FSTLongOffheapMap by storing " + eventCount + " events.");
}else{
System.out.println( "Testing off heap persistence with dispathcer performance of FSTLongOffheapMap WITHOUT storing " + eventCount + " events.");
}
dispatcher.start();
System.gc();
Thread.sleep( 3000 );
for( int i = 0; i< eventCount; i++ ){
MktDataEvent event = new MktDataEvent( "EDM6", 99.0, (100 + i), 99.50, (200 + i) );
dispatcher.enqueue( event );
}
//Let the listener get all the elements
while( (dispatcher.getQueueSize() != 0) ){
Thread.yield();
}
Thread.sleep( 2000 );
dispatcher.stop();
listener.generateLatencyStats();
destroyFSTSerializer( serializer );
}
public static class DummyListener implements MktDataListener{
private final Histogram histogram;
public DummyListener( ){
this.histogram = new Histogram( TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS), 2);
}
@Override
public final boolean update( MktDataEvent event ){
histogram.recordValue( (System.nanoTime() - event.getCreationTime()) );
return true;
}
public final void generateLatencyStats( ){
histogram.outputPercentileDistribution( System.out, 1000.0 );
double valueAt99Percentile = histogram.getValueAtPercentile( 99.99d );
System.out.println( "\nValue at 99.99th percentile (micros) >> " + valueAt99Percentile/1000.0 );
}
}
public static void main( String ... args ) throws Exception{
testOffHeapPersistence( );
System.gc();
Thread.sleep( 2000 );
testDispatchAndPersistence( false );
System.gc();
Thread.sleep( 2000 );
testDispatchAndPersistence( true );
}
}
最佳答案
嗨(我是 FST 的作者):
我认为测试有缺陷:
在循环中运行同步(无队列+线程上下文切换)测试(=适当的预热)时,我得到的平均值为0.7微秒和最大异常值为14微秒(虽然 map 中的元素数量增加了一倍)存储单个事件。
这就是 FST 的性能,您看到的丢失和延迟是由您的排队/线程上下文切换引起的。另外该测试还有一个缺陷:
您将突发的 50k 个事件放入队列中,这需要时间来创建事件。由于放置事件比存储事件快得多,因此您可以获得累积:第 N 个事件获得所有 0..n-1 个累积事件的延迟;)。
由于缺少 JVM 预热,第一次运行似乎很好:事件创建速度很慢,因此事件没有排队。
其他问题:
1) 专业:无热身。在查看数字之前,放置一个循环并让测试运行几次(例如 10 次)。
2)(次要)通过 Offer 完成排队,而不检查结果
3) 如果没有可用的事件,您将轮询队列以进行产量,这可能会导致不确定的延迟峰值。仅在一些退避之后才进行 yield 。
您应该每隔约 1-2 微秒将一个事件放入队列中,以避免以这种方式对事件进行排队并测量聚合时间。
将 TestFSTSerializer 更改为:
for( int i = 0; i< eventCount; i++ ){
MktDataEvent event = new MktDataEvent( "EDM6", 99.0, (100 + i), 99.50, (200 + i) );
dispatcher.enqueue( event );
long nanos = System.nanoTime();
while( System.nanoTime() - nanos < 3000 )
Thread.yield();
}
和主要方法(预热,忽略第一次运行):
public static void main( String ... args ) throws Exception{
for (int i = 0; i < 1000; i++) {
System.gc();
Thread.sleep( 2000 );
System.out.println("start test ==>");
testDispatchAndPersistence( true );
//testOffHeapPersistence();
}
产量:
[平均值 = 5.19,标准差 = 29.67]
[最大值 = 544.77,总计数 = 50000]
[存储桶 = 23,子存储桶 = 256]
请注意,线程上下文切换会花费 3-8 微秒(因此高端内核绕过网络的速度几乎与线程之间排队的速度一样快!!)。
您可以尝试使用比 java.concurrent 更快的队列来进一步减少延迟。
后期测试注意:由于持久性依赖于操作系统写回渴望,因此您需要调整操作系统设置以非常不渴望写回和/或使用 SSD。
参见https://github.com/RuedigerMoeller/fast-serialization/tree/master/src/test/ser/offheaplatency修改源代码
关于java - 快速序列化的性能下降,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30928650/