java - 快速序列化的性能下降

标签 java performance serialization

这个问题是快速序列化库特有的。 https://github.com/RuedigerMoeller/fast-serialization

我在 Windows 7、jdk 1.7 上使用 FSTLongOffheapMap(版本 2.29)来存储一些对象。我测试了存储对象的延迟,99.99% 的延迟约为 100 微秒。考虑到我在 Windows 上运行它 + 没有编写自定义序列化程序 + 没有对 GC 进行太多调整,这是优秀

但是,如果我按如下方式更改设置,性能会急剧下降:

a) 创建一个生产者线程和一个通过 ArrayBlockingQueue 进行管道传输的消费者线程。

b) 让生产者生产 50,000 个对象并尽快将它们放入队列中。

c) 消费者线程拉出对象,将其分派(dispatch)给监听器,然后将其存储在堆外映射中。

在此设置中,第 99.99 个百分点的延迟达到~135 毫秒! 但最大的惊喜是,如果我只是注释掉执行 put 的行,延迟就会下降到约 400 微秒。这对我来说似乎不合逻辑,因为我从之前的测试中知道,放入 map 的延迟只有 100 微秒。

下面是重现我的发现的测试,我将不胜感激任何建议/提示/想法。运行测试所需的唯一外部库是 Gil Tene 的 HdrHistogram。 https://github.com/HdrHistogram/HdrHistogram

-非常感谢

<小时/>
package com.mine.serialization.perf;

import java.io.*;
import java.util.*;
import org.nustaq.offheap.*;
import org.nustaq.serialization.simpleapi.*;

public final class MyFSTSerializer{

    private final boolean toStore;
    private final String fileName;
    private final long memorySize;
    private final FSTCoder fastCoder;
    private final FSTLongOffheapMap<MktDataEvent> offHeapMap;

    public MyFSTSerializer(  boolean toStore, String location, String journalName, FSTCoder fastCoder, long memorySize, int count ) throws Exception{
        this.toStore        = toStore;
        this.fileName       = location + File.separator + journalName + ".mmf";
        this.memorySize     = memorySize;
        this.fastCoder      = fastCoder;
        this.offHeapMap     = new FSTLongOffheapMap<>( fileName, memorySize, count, fastCoder );
    }

    public final boolean toStore( ){
        return toStore;
    }

    public final String getFilename( ){
        return fileName;
    }

    public final void start( ){
        fastCoder.getConf().setCrossPlatform( false );
        fastCoder.getConf().setPreferSpeed( true );
        fastCoder.getConf().setShareReferences( false );
        fastCoder.getConf().registerClass( Long.class, MktDataEvent.class );    
        System.out.println("Journaling started at " + fileName + " with Memory " +  memorySize ) ;
    }

    public final void storeEvent( MktDataEvent event ){
        offHeapMap.put( event.getSequenceId(), event );
    }

    public final Collection<MktDataEvent> retrieveAllEvents( ){
        Map<Long, MktDataEvent> retrievedMap = new LinkedHashMap<>();

        for( Iterator<MktDataEvent> iterator = offHeapMap.values(); iterator.hasNext(); ){
            MktDataEvent event = (MktDataEvent) iterator.next();
            retrievedMap.put( event.getSequenceId(), event ); 
         }

        return retrievedMap.values();
    }

    public final void stop( ){
        try{
            offHeapMap.free( );
            System.out.println("Stopped Journal and freed memory." );
        }catch( Exception e ){
            e.printStackTrace( );
        }
    }
}
<小时/>
package com.mine.serialization.perf;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;

public final class MktDataEvent implements Serializable{

    private final long sequenceId;
    private final long creationTime;
    private final String symbol;
    private final double bidPrice;
    private final long bidQuantity;
    private final double askPrice;
    private final long askQuantity;

    private final static long serialVersionUID = 1L;
    private final static AtomicLong SEQUENCE    = new AtomicLong();

    public MktDataEvent( String symbol, double bidPrice, long bidQuantity, double askPrice, long askQuantity ){

        this.creationTime   = System.nanoTime( );
        this.sequenceId     = SEQUENCE.incrementAndGet();
        this.symbol         = symbol;
        this.bidPrice       = bidPrice;
        this.bidQuantity    = bidQuantity;
        this.askPrice       = askPrice;
        this.askQuantity    = askQuantity;
    }       

       public final long getSequenceId( ){
        return sequenceId;
    }

    public final long getCreationTime( ){
        return creationTime;
    }

    public final String getSymbol(){
        return symbol;
    }

    public final double getBidPrice( ){
        return bidPrice;
    }

    public final long getBidQuantity( ){
        return bidQuantity;
    }

    public final double getAskPrice( ){
        return askPrice;
    }

    public final long getAskQuantity( ){
        return askQuantity;
    }
}

//-------------------------------------------------------- -------------------

package com.mine.serialization.perf;

import java.util.*;
import java.util.concurrent.*;

public final class MktDataDispatcher implements Runnable{

    private volatile boolean keepDispatching;
    private final ExecutorService service;
    private final MyFSTSerializer serializer;
    private final MktDataListener listener;
    private final AbstractQueue<MktDataEvent> eventQueue;

    public MktDataDispatcher( int queueSize, MyFSTSerializer serializer, MktDataListener listener ){
        this.serializer     = serializer;
        this.listener       = listener;
        this.eventQueue = new ArrayBlockingQueue<MktDataEvent>( queueSize );
        this.service        = Executors.newFixedThreadPool(  1 );
    }

    public final void start( ){
        serializer.start( );
        keepDispatching = true;
        service.execute( this );
    }

    public final boolean enqueue( final MktDataEvent event ){
        return eventQueue.offer( event );
    }

    @Override
    public final void run( ){

        while( keepDispatching ){

            try{
                MktDataEvent event  = eventQueue.poll();
                if( event == null ){
                    Thread.yield();
                    continue;
                }

                if( serializer.toStore() ){
                    serializer.storeEvent( event );
                }
                listener.update( event );

            }catch( Exception e ){
                e.printStackTrace( );                   
            }
            }
        }

    protected final int getQueueSize( ){
        return eventQueue.size( );
    }

    public final void stop(){
        serializer.stop( );
        keepDispatching = false;
        service.shutdown();
    }

    public interface MktDataListener{
        public boolean update( MktDataEvent event );
    }

}
<小时/>
package com.mine.serialization.perf;

import java.io.*;
import java.util.concurrent.*;

import org.HdrHistogram.*;
import org.nustaq.serialization.simpleapi.*;

import com.mine.serialization.perf.MktDataDispatcher.*;



public final class TestFSTSerializer{


    protected static void printResult( Histogram histogram ){
        System.out.println( "\nDetail Result (in micros)");
        System.out.println( "------------------------------------------------------------------");

        histogram.outputPercentileDistribution( System.out, 1000.0 );
        double valueAt99Percentile  = histogram.getValueAtPercentile( 99.99d );

        System.out.println( "\nValue 99.99th percentile >> " + valueAt99Percentile/1000.0 );
    }


    protected static MyFSTSerializer createFSTSerializer(  boolean toStore, int eventCount, int memorySizeOf1Object ) throws Exception{

        long expectedMemory     = memorySizeOf1Object * eventCount;
        String fileLocation     = "C:\\Temp";
        String journalName      = "Test";
        MyFSTSerializer ser     = new MyFSTSerializer( toStore, fileLocation, journalName, new DefaultCoder(), expectedMemory, eventCount );

        return ser;
    }


    protected static void destroyFSTSerializer( MyFSTSerializer serializer ){

        if( serializer != null ){
            serializer.stop();
            boolean deleted = new File( serializer.getFilename() ).delete();
            if( deleted ){
                System.out.println( "Deleted file from " +  serializer.getFilename());      
            }else{
                throw new RuntimeException( "TEST FAILED as we failed to delete file " + serializer.getFilename() );
            }
        }

    }


    public static void testOffHeapPersistence( ){

        MyFSTSerializer serializer= null;

        try{

            int eventCount          = 50000;
            int memorySizeOf1Object = 1000;
            Histogram  histogram    = new Histogram( TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS), 2);

            System.out.println( "Testing off heap persistence performance of FSTLongOffheapMap by storing " + eventCount + " events.");
            serializer              = createFSTSerializer( true, eventCount, memorySizeOf1Object );
            serializer.start( );

            for( int i =0; i<eventCount; i++ ){

                MktDataEvent event = new MktDataEvent( "EDM6", 99.0, (100 + i), 99.50, (200 + i) );
                serializer.storeEvent( event );
                histogram.recordValue(System.nanoTime() - event.getCreationTime() );

            }

            int retrievedEventSize  = serializer.retrieveAllEvents().size();
            if( eventCount != retrievedEventSize )
                throw new RuntimeException("Store failed as we stored " + eventCount + " events but retrieved " + retrievedEventSize );


            printResult( histogram );

        }catch( Exception e ){
            throw new RuntimeException("TEST FAILED as ", e);

        }finally{
            destroyFSTSerializer( serializer );
        }

    }



    public static void testDispatchAndPersistence( boolean toStore ) throws Exception{

        int eventCount                  = 50000;
        int memorySizeOf1Object         = 1000;

        DummyListener listener          = new DummyListener( );
        MyFSTSerializer serializer      = createFSTSerializer( toStore, eventCount, memorySizeOf1Object );
        MktDataDispatcher dispatcher    = new MktDataDispatcher( eventCount, serializer, listener );

        if( toStore ){
            System.out.println( "Testing off heap persistence with dispathcer performance of FSTLongOffheapMap by storing " + eventCount + " events."); 
        }else{
            System.out.println( "Testing off heap persistence with dispathcer performance of FSTLongOffheapMap WITHOUT storing " + eventCount + " events.");
        }

        dispatcher.start();
        System.gc();
        Thread.sleep( 3000 );

        for( int i = 0; i< eventCount; i++ ){
            MktDataEvent event = new MktDataEvent( "EDM6", 99.0, (100 + i), 99.50, (200 + i) );
            dispatcher.enqueue( event );
        }

        //Let the listener get all the elements
        while( (dispatcher.getQueueSize() != 0) ){
            Thread.yield();
        }

        Thread.sleep( 2000 );
        dispatcher.stop();
        listener.generateLatencyStats();
        destroyFSTSerializer( serializer );

    }


    public static class DummyListener implements MktDataListener{

        private final Histogram histogram;

        public DummyListener( ){
            this.histogram  = new Histogram( TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS), 2);
        }


        @Override
        public final boolean update( MktDataEvent event ){
            histogram.recordValue( (System.nanoTime() - event.getCreationTime()) );
            return true;
        }


        public final void generateLatencyStats( ){

            histogram.outputPercentileDistribution( System.out, 1000.0 );
            double valueAt99Percentile  = histogram.getValueAtPercentile( 99.99d );
            System.out.println( "\nValue at 99.99th percentile (micros) >> " + valueAt99Percentile/1000.0 );

        }

    }

    public static void main( String ... args ) throws Exception{

        testOffHeapPersistence( );

        System.gc();
        Thread.sleep( 2000 );
        testDispatchAndPersistence( false );

        System.gc();
        Thread.sleep( 2000 );
        testDispatchAndPersistence( true );

    }

}

最佳答案

嗨(我是 FST 的作者):

我认为测试有缺陷:

在循环中运行同步(无队列+线程上下文切换)测试(=适当的预热)时,我得到的平均值为0.7微秒最大异常值为14微秒(虽然 map 中的元素数量增加了一倍)存储单个事件。
这就是 FST 的性能,您看到的丢失和延迟是由您的排队/线程上下文切换引起的。另外该测试还有一个缺陷:

您将突发的 50k 个事件放入队列中,这需要时间来创建事件。由于放置事件比存储事件快得多,因此您可以获得累积:第 N 个事件获得所有 0..n-1 个累积事件的延迟;)。

由于缺少 JVM 预热,第一次运行似乎很好:事件创建速度很慢,因此事件没有排队。

其他问题:

1) 专业:无热身。在查看数字之前,放置一个循环并让测试运行几次(例如 10 次)。

2)(次要)通过 Offer 完成排队,而不检查结果

3) 如果没有可用的事件,您将轮询队列以进行产量,这可能会导致不确定的延迟峰值。仅在一些退避之后才进行 yield 。

您应该每隔约 1-2 微秒将一个事件放入队列中,以避免以这种方式对事件进行排队并测量聚合时间。

将 TestFSTSerializer 更改为:

    for( int i = 0; i< eventCount; i++ ){
        MktDataEvent event = new MktDataEvent( "EDM6", 99.0, (100 + i), 99.50, (200 + i) );
        dispatcher.enqueue( event );
        long nanos = System.nanoTime();
        while( System.nanoTime() - nanos < 3000 )
            Thread.yield();

    }

和主要方法(预热,忽略第一次运行):

public static void main( String ... args ) throws Exception{

    for (int i = 0; i < 1000; i++) {
        System.gc();
        Thread.sleep( 2000 );
        System.out.println("start test ==>");
        testDispatchAndPersistence( true );
        //testOffHeapPersistence();
    }

产量:

[平均值 = 5.19,标准差 = 29.67]

[最大值 = 544.77,总计数 = 50000]

[存储桶 = 23,子存储桶 = 256]

请注意,线程上下文切换会花费 3-8 微秒(因此高端内核绕过网络的速度几乎与线程之间排队的速度一样快!!)。
您可以尝试使用比 java.concurrent 更快的队列来进一步减少延迟。

后期测试注意:由于持久性依赖于操作系统写回渴望,因此您需要调整操作系统设置以非常不渴望写回和/或使用 SSD。

参见https://github.com/RuedigerMoeller/fast-serialization/tree/master/src/test/ser/offheaplatency修改源代码

关于java - 快速序列化的性能下降,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30928650/

相关文章:

java - 如何填充面板内的浏览器框架?

Java 首选项和国际化 (i18n)

javascript - 为什么 array.push 有时比 array[n] = value 快?

mysql - 如何改进索引以便在毫秒内执行查询?

java - 是否建议序列化和反序列化存储在 arrayList 中的对象?

Java:检查长整型中是否设置了特定位

Java 比 C 快 2 倍以上(作为 C++ 的子集)

c# - 二进制流 'NN' 不包含有效的 BinaryHeader

java - 如何从 byte[] 中读取不同的对象

java - 一个非常简单的塞子