java - 分配处理时通常有多少开销?

标签 java performance apache-spark apache-flink hazelcast-jet

For impatient readers: this is a work in progress, where I ask for help, during the process. Please do not judge the tools by my temporary data, as they can change while I try to get better results.



我们正处于架构决策过程的中间,该工具用于分析协同仿真的输出。

作为该过程的一部分,我被要求编写一个基准测试工具,并获取有关多个分布式处理框架速度的数据。

我测试的框架是:Apache Spark、Apache Flink、Hazelcast Jet。并作为比较基准纯 Java。

我使用的测试用例是一个简单的“这是一个 Pojo 列表,pojo 中的一个字段是 double 值。找到最小的(最小)值”。

简单,直接,希望具有高度可比性。

四分之三的测试使用一个简单的比较器,第四个(flink)使用与比较器基本相同的 reducer 。分析函数如下所示:
Java: double min = logs.stream().min(new LogPojo.Comp()).get().getValue();

Spark: JavaRDD<LogPojo> logData = sc.parallelize(logs, num_partitions);
double min = logData.min(new LogPojo.Comp()).getValue();

Hazel: IStreamList<LogPojo> iLogs = jet.getList("logs");
iLogs.addAll(logs);
double min = iLogs.stream().min(new LogPojo.Comp()).get().getValue();

Flink: DataSet<LogPojo> logSet = env.fromCollection(logs);
double min = logSet.reduce(new LogReducer()).collect().get(0).getValue();

我对此进行了广泛的测试,改变了测试列表的大小以及分配的资源。结果让我大吃一惊。最佳结果如下(所有数字以毫秒为单位,1 个 mio pojo,每个 10 个测试):
  • 实例:声明和启动实例需要多长时间
    框架
  • 列表:解析/传输需要多长时间
    列出框架“列表”
  • 过程:需要多长时间
    处理数据以检索最小值
  • 总体:从开始到结束
    每次测试

  • 结果:
    java:
    Instances: 
    List: 
    Process: 37, 24, 16, 17, 16, 16, 16, 16, 16, 16, 
    Overall: 111, 24, 16, 17, 16, 16, 16, 16, 16, 16, 
    
    spark:
    Instances: 2065, 89, 62, 69, 58, 49, 56, 47, 41, 52, 
    List: 166, 5, 1, 1, 2, 1, 0, 0, 0, 0, 
    Process: 2668, 2768, 1936, 2016, 1950, 1936, 2105, 2674, 1913, 1882, 
    Overall: 4943, 2871, 2011, 2094, 2020, 1998, 2172, 2728, 1961, 1943, 
    
    hazel:
    Instances: 6347, 2891, 2817, 3106, 2636, 2936, 3018, 2969, 2622, 2799, 
    List: 1984, 1656, 1470, 1505, 1524, 1429, 1512, 1445, 1394, 1427, 
    Process: 4348, 3809, 3655, 3751, 3927, 3887, 3592, 3810, 3673, 3769, 
    Overall: 12850, 8373, 7959, 8384, 8110, 8265, 8133, 8239, 7701, 8007
    
    flink:
    Instances: 45, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
    List: 92, 35, 16, 13, 17, 15, 19, 11, 19, 24, 
    Process: 34292, 20822, 20870, 19268, 17780, 17390, 17124, 19628, 17487, 18586, 
    Overall: 34435, 20857, 20886, 19281, 17797, 17405, 17143, 19639, 17506, 18610, 
    

    最有趣的部分是:
  • 最好的结果都来自纯粹的本地测试(一个实例)
  • 任何使用分布式机制(附加节点等)的测试,仍然慢了一个数量级(例如,如果分布式,spark 会慢 2.5)。

  • 现在不要误会我的意思,分布式处理必须比单线程处理慢,这是每个核心的基本逻辑。

    但是如果在单线程上使用,即使是 2 个数量级?如果分布,则为 3 个数量级?有人能看到我在所有 3 个分布式进程中显然犯的错误吗?我预计某个因素 < 10,因此可以选择使用更多硬件来杀死它。

    那么有什么方法可以将这些框架的开销减少到,嗯,也许是 x9 而不是 x999?

    我知道我知道,我使用的测试数据非常小,但即使扩大规模,我也没有看到开销与性能的任何减少。它大约是我们需要分析的批量数据的大小(每次模拟 0.1M - 1M 对象/秒)。因此,欢迎您帮助查找我的错误。 :D

    更新 Spark :

    在对 Spark 进行了一些更彻底的测试之后,我仍然没有留下深刻的印象。设置如下:

    一台机器上的 java 客户端 64 核,480 GB RAM 作业
    单独机架上的主站和 7 个从站,32 个 cors,每个 20 GB
        1 mio objects, 256 tasks, 64 cpus local[*]
        java:
          Instances: 
          List: 
          Process: 622, 448, 68, 45, 22, 32, 15, 27, 22, 29, 
        spark:
          Instances: 4865, 186, 160, 133, 121, 112, 106, 78, 121, 106, 
          List: 310, 2, 2, 1, 2, 4, 2, 1, 2, 1, 
          Process: 8190, 4433, 4200, 4073, 4201, 4092, 3822, 3852, 3921, 4051, 
    
        10 mio objects, 256 tasks, 64 cpus local[*]
        java:
          Instances: 
          List: 
          Process: 2329, 144, 50, 65, 75, 70, 69, 66, 66, 66, 
        spark:
          Instances: 20345, 
          List: 258, 2, 1, 1, 1, 4, 1, 1, 1, 1, 
          Process: 55671, 49629, 48612, 48090, 47897, 47857, 48319, 48274, 48199, 47516
    
        1 mio objects, 5.2k tasks, 64 cpus local, 32 cpus each on 1+1 Spark machines (different rack)
        java:
          Instances: 
          List: 
          Process: 748, 376, 70, 31, 69, 64, 46, 17, 50, 53, 
        spark:
          Instances: 4631, 
          List: 249, 1, 2, 2, 3, 3, 1, 1, 2, 1, 
          Process: 12273, 7471, 6314, 6083, 6228, 6158, 5990, 5953, 5981, 5972
    
        1 mio objects, 5.2k tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack)
        java:
          Instances: 
          List: 
          Process: 820, 494, 66, 29, 5, 30, 29, 43, 45, 21, 
        spark:
          Instances: 4513, 
          List: 254, 2, 2, 2, 2, 4, 2, 2, 1, 1, 
          Process: 17007, 6545, 7174, 7040, 6356, 6502, 6482, 6348, 7067, 6335
    
        10 mio objects, 52k tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack)
        java Process: 3037, 78, 48, 45, 53, 73, 72, 73, 74, 64, 
        spark:
          Instances: 20181, 
          List: 264, 3, 2, 2, 1, 4, 2, 2, 1, 1, 
          Process: 77830, 67563, 65389, 63321, 61416, 63007, 64760, 63341, 63440, 65320
    
        1 mio objects, 224*i tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack), i =0 to 100
        java Process: 722, 631, 62, 26, 25, 42, 26, 11, 12, 29, 40, 16, 14, 23, 29, 18, 14, 11, 71, 76, 37, 52, 32, 15, 51, 54, 19, 74, 62, 54, 7, 60, 37, 54, 42, 3, 7, 60, 33, 44, 50, 50, 39, 34, 34, 13, 47, 63, 46, 4, 52, 20, 19, 24, 6, 53, 4, 3, 68, 10, 59, 52, 48, 3, 48, 37, 5, 38, 10, 47, 4, 53, 36, 41, 31, 57, 7, 64, 45, 33, 14, 53, 5, 41, 40, 48, 4, 60, 49, 37, 20, 34, 53, 4, 58, 36, 12, 35, 35, 4, 
        spark:
          Instances: 4612, 
          List: 279, 3, 2, 1, 2, 5, 3, 1, 1, 1, 2, 1, 1, 1, 1, 2, 2, 1, 1, 1, 1, 2, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 0, 2, 1, 1, 1, 1, 1, 0, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 
          Process: 16300, 6577, 5802, 6136, 5389, 5912, 5885, 6157, 5440, 6199, 5902, 6299, 5919, 6066, 5803, 6612, 6120, 6775, 6585, 6146, 6860, 6955, 6661, 6819, 6868, 6700, 7140, 7532, 7077, 7180, 7360, 7526, 7770, 7877, 8048, 7678, 8260, 8131, 7837, 7526, 8261, 8404, 8431, 8340, 9000, 8825, 8624, 9340, 9418, 8677, 8480, 8678, 9003, 9036, 8912, 9235, 9401, 9577, 9808, 9485, 9955, 10029, 9506, 9387, 9794, 9998, 9580, 9963, 9273, 9411, 10113, 10004, 10369, 9880, 10532, 10815, 11039, 10717, 11251, 11475, 10854, 11468, 11530, 11488, 11077, 11245, 10936, 11274, 11233, 11409, 11527, 11897, 11743, 11786, 11086, 11782, 12001, 11795, 12075, 12422
    
        2 mio objects, 224*i tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack), i = 0 to 30
        java Process: 1759, 82, 31, 18, 30, 41, 47, 28, 27, 13, 28, 46, 5, 72, 50, 81, 66, 44, 36, 72, 44, 11, 65, 67, 58, 47, 54, 60, 46, 34, 
        spark:
          Instances: 6316, 
          List: 265, 3, 3, 2, 2, 6, 1, 2, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 2, 1, 1, 5, 1, 1, 1, 1, 2, 1, 1, 1, 
          Process: 24084, 13041, 11451, 11274, 10919, 10972, 10677, 11048, 10659, 10984, 10820, 11057, 11355, 10874, 10896, 11725, 11580, 11149, 11823, 11799, 12414, 11265, 11617, 11762, 11561, 12443, 12448, 11809, 11928, 12095
    
        10 mio objects, 224*i tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack), i = 5 to 30
        java Process: 1753, 91, 57, 71, 86, 86, 151, 80, 85, 72, 61, 78, 80, 87, 93, 89, 70, 83, 166, 84, 87, 94, 90, 88, 92, 89, 196, 96, 97, 89, 
        spark:
          Instances: 21192, 
          List: 282, 3, 2, 2, 3, 4, 2, 2, 1, 0, 1, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 2, 2, 1, 1, 1, 
          Process: 60552, 53960, 53166, 54971, 52827, 54196, 51153, 52626, 54138, 51134, 52427, 53618, 50815, 50807, 52398, 54315, 54411, 51176, 53843, 54736, 55313, 56267, 50837, 54996, 52230, 52845
    

    结果:无论抛出多少硬件,以及任务如何集群,使用 spark 列表中每百万个 pojo 需要 5-6 秒。

    另一方面,Java 处理相同的数量需要 5-30 毫秒。所以基本上是 200-1,000 的系数。

    有没有人建议如何为如此简单的工作“加速”Spark?

    更新淡褐色:

    现在我开始印象深刻。虽然我仍然在与一些奇怪的问题作斗争,但至少 Hazelcast Jet 似乎明白如果可能的话,可以在本地处理本地数据。只有 100%(因子 x2)的开销,这是完全可以接受的。

    10个mio对象
    java:
       Instances: 
       List: 68987, 
       Process: 2288, 99, 54, 52, 54, 64, 89, 83, 79, 88, 
    hazel:
      Instances: 6136, 
      List: 97225, 
      Process: 1112, 375, 131, 123, 148, 131, 137, 119, 176, 140
    

    更新弗林克:

    暂时将其从基准测试中删除,因为它造成了太多麻烦而没有给出很好的结果。

    编辑:整个基准可以在以下位置找到:https://github.com/anderschbe/clusterbench

    spark 的集群设置使用 spark-2.1.0-bin-hadoop2.7,因为它是开箱即用的。 spark_env.sh 有一个小改动:SPARK_NO_DAEMONIZE=true

    使其在集群上运行所需的唯一更改是将 SparcProc 第 25 行中的“localhost”替换为“spark://I_cant_give_you_my_cluster_IP.doo”

    最佳答案

    当你在集群框架中计算某些东西时,比如 Spark 或 Flink,框架:

  • 序列化您的代码
  • 发送资源请求
  • 通过网络发送您的代码
  • 调度执行
  • 等待结果

  • 如您所见,执行了许多步骤 - 不仅仅是您的计算!如果您:
  • 可以将您的计算拆分为可以并行完成的小任务
  • 在一台机器上处理太多数据或在一台机器上处理可能太慢 - 磁盘 I/O,项目或计算中的其他一些特定因素非常具体,需要许多 CPU,通常不止一台机器拥有 - 但随后一部分数据的计算一定很长

  • 尝试计算 10 GB 文本文件中单词的出现次数 - 那么 Spark 和 Flink 将击败单节点 Java

    有时用户代码可能会导致分布式计算的缓慢。典型错误:
  • 用户在具有许多引用的类中编写 lambdas - 所有其他类都被序列化,序列化需要很多时间
  • 任务并不是真正并行的——它们必须相互等待或必须处理大部分数据
  • 数据偏斜 - 对象可能不正确 hashCode实现和 HashPartitioner导致所有数据都去了一个分区=一个节点
  • 分区数不正确 - 您可以再添加 1000 台机器,但如果您仍然有 4 个分区,那么您一次最多可以存档 4 个并行任务
  • 太多的网络通信 - 在你的情况下这不是问题,但有时用户会做很多 joinreduce

  • 编辑 问题编辑后:
    在您的示例中,Spark 在 local 上运行- 这意味着只有 1 个线程!至少使用 local[*]或其他集群管理器。您在此答案中列出了开销,并且只有一个线程

    关于java - 分配处理时通常有多少开销?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42759255/

    相关文章:

    java - 读取大型 txt 文件时模拟器上出现 OutOfMemoryError

    java - 在Java中使用String.format而不是字符串连接是更好的做法吗?

    javascript - (Angular)有很多订阅者

    scala - 如果 SparkSession 没有关闭会发生什么?

    java - SpringFramework服务单元测试

    java - InDesign Server 5.5 一个文档中的多个页面大小

    performance - NodeJs 性能问题

    sql - 为了获得最佳性能,Where 子句中 ANDS 的顺序

    apache-spark - 为什么spark执行器会收到SIGTERM?

    scala - Spark (Scala) 将具有重复项的列表转换为 (list_entry, count) 的映射