java - Spark MapwithState stateSnapshots 不缩放(Java)

标签 java apache-spark apache-kafka spark-streaming

我正在使用 spark 从 Kafka Stream 接收数据,以接收有关发送定期健康更新的 IOT 设备的状态以及有关设备中存在的各种传感器的状态。我的 Spark 应用程序监听单个主题以使用 Spark 直接流从 Kafka 流接收更新消息。我需要根据每个设备的传感器状态触发不同的警报。但是,当我添加更多使用 Kakfa 将数据发送到 spark 的 IOT 设备时,尽管添加了更多的机器数量并且执行程序的数量增加了,但 Spark 并没有扩展。下面我给出了我的 Spark 应用程序的精简版本,其中删除了具有相同性能问题的通知触发部分。

   // Method for update the Device state , it just a in memory object which tracks the device state  .
private static Optional<DeviceState> trackDeviceState(Time time, String key, Optional<ProtoBufEventUpdate> updateOpt,
            State<DeviceState> state) {
            int batchTime = toSeconds(time);
            ProtoBufEventUpdate eventUpdate = (updateOpt == null)?null:updateOpt.orNull();
            if(eventUpdate!=null)
                eventUpdate.setBatchTime(ProximityUtil.toSeconds(time));
            if (state!=null && state.exists()) {
                DeviceState deviceState = state.get();
                if (state.isTimingOut()) {
                    deviceState.markEnd(batchTime);
                }
                if (updateOpt.isPresent()) {
                        deviceState = DeviceState.updatedDeviceState(deviceState, eventUpdate);
                        state.update(deviceState);
                }
            } else if (updateOpt.isPresent()) {
                DeviceState deviceState = DeviceState.newDeviceState(eventUpdate);
                state.update(deviceState);              
                return Optional.of(deviceState);
            } 

        return Optional.absent();
}
    SparkConf conf = new SparkConf()
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .set("spark.streaming.receiver.writeAheadLog.enable", "true")
    .set("spark.rpc.netty.dispatcher.numThreads", String.valueOf(Runtime.getRuntime().availableProcessors()))
     JavaStreamingContext context= new JavaStreamingContext(conf, Durations.seconds(10));
Map<String, String> kafkaParams = new HashMap<String, String>();
        kafkaParams.put( “zookeeper.connect”, “192.168.60.20:2181,192.168.60.21:2181,192.168.60.22:2181”);
        kafkaParams.put("metadata.broker.list", “192.168.60.20:9092,192.168.60.21:9092,192.168.60.22:9092”);
        kafkaParams.put(“group.id”, “spark_iot”);
        HashSet<String> topics=new HashSet<>();
        topics.add(“iottopic”);

JavaPairInputDStream<String, ProtoBufEventUpdate> inputStream = KafkaUtils.
            createDirectStream(context, String.class, ProtoBufEventUpdate.class,  KafkaKryoCodec.class, ProtoBufEventUpdateCodec.class, kafkaParams, topics);

JavaPairDStream<String, ProtoBufEventUpdate> updatesStream = inputStream.mapPartitionsToPair(t -> {
            List<Tuple2<String, ProtoBufEventUpdate>> eventupdateList=new ArrayList<>();
            t.forEachRemaining(tuple->{
                    String key=tuple._1;
                    ProtoBufEventUpdate eventUpdate =tuple._2;                  
                    Util.mergeStateFromStats(eventUpdate);
                    eventupdateList.add(new Tuple2<String, ProtoBufEventUpdate>(key,eventUpdate));

            });
            return eventupdateList.iterator();
});

JavaMapWithStateDStream<String, ProtoBufEventUpdate, DeviceState, DeviceState> devceMapStream = null;

devceMapStream=updatesStream.mapWithState(StateSpec.function(Engine::trackDeviceState)
                             .numPartitions(20)
                             .timeout(Durations.seconds(1800)));
devceMapStream.checkpoint(new Duration(batchDuration*1000));


JavaPairDStream<String, DeviceState> deviceStateStream = devceMapStream
                .stateSnapshots()
                .cache();

deviceStateStream.foreachRDD(rdd->{
                if(rdd != null && !rdd.isEmpty()){
                    rdd.foreachPartition(tuple->{
                    tuple.forEachRemaining(t->{
                        SparkExecutorLog.error("Engine::getUpdates Tuple data  "+ t._2);
                    });
                });
                }
});

即使负载增加,我也看不到 Executor 实例的 CPU 使用率增加。大多数时候 Executor 实例 CPU 处于空闲状态。我尝试增加 kakfa 分区(目前 Kafka 有 72 个分区。我也尝试将其降低到 36 个)。我也尝试增加 devceMapStream partitions 。但我看不到任何性能改进。代码没有在 IO 上花费任何时间。

我在 Amazon EMR(Yarn) 上使用 6 个执行程序实例运行我们的 Spark 应用程序,每台机器都有 4 个内核和 32 gb Ram。它尝试将执行程序实例的数量增加到 9,然后增加到 15,但没有看到任何性能改进。还通过将 spark.default.parallelism 值设置为 20, 36, 72, 100 对它进行了一些调整,但我可以看到 20 是给我更好性能的那个(也许每个执行程序的内核数量对此有一些影响)。

spark-submit --deploy-mode cluster --class com.ajay.Engine --supervise --driver-memory 5G --driver-cores 8 --executor-memory 4G --executor-cores 4 --conf spark.default.parallelism=20 --num-executors 36 --conf spark.dynamicAllocation.enabled=false --conf spark.streaming.unpersist=false --conf spark.eventLog.enabled=false --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties --conf spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError --conf spark.executor.extraJavaOptions=-XX:HeapDumpPath=/tmp --conf spark.executor.extraJavaOptions=-XX:+UseG1GC --conf spark.driver.extraJavaOptions=-XX:+UseG1GC --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j.properties s3://test/engine.jar

目前 Spark 很难在 10 秒内完成处理(我什至尝试过不同的批处理持续时间,如 5、10、15 等)。完成一个批处理需要 15-23 秒,输入速率为每秒 1600 条记录,每批处理有 17000 条记录。我需要使用 statesteam 定期检查设备的状态,以查看设备是否发出任何警报或任何传感器是否已停止响应。我不确定如何提高 spark 应用程序的性能?

最佳答案

mapWithState 执行以下操作:

applying a function to every key-value element of this stream, while maintaining some state data for each unique key

根据其文档:PairDStreamFunctions#mapWithState

这也意味着对于每个批处理,所有具有相同键的元素都按顺序处理,并且,因为 StateSpec 中的函数是任意的并且由我们提供,在没有定义状态组合器的情况下,无论您如何在 mapWithState 之前对数据进行分区,它都无法进一步并行化。 IE。当键不同时,并行化会很好,但如果所有 RDD 元素之间只有几个唯一键,那么整个批处理将主要由与唯一键数量相等的核心数处理。

在您的例子中, key 来自 Kafka:

            t.forEachRemaining(tuple->{
                String key=tuple._1;

并且您的代码片段没有显示它们是如何生成的。

根据我的经验,这可能会发生这种情况:您的批处理的某些部分正在被多个核心快速处理,而另一部分对整体的大部分具有相同的 key ,需要更多时间并延迟批处理,这就是为什么您看到大部分时间只有少数任务在运行,而执行程序负载不足。

要看是否属实,请检查您的键分布,每个键有多少个元素,难道只有几个键有所有元素的 20%?如果这是真的,您有以下选择:

  • 更改 key 生成算法
  • mapWithState 之前人为地拆分有问题的键,然后合并状态快照以对整体有意义
  • 限制每批处理的具有相同键的元素的数量,要么忽略每批中前N个之后的元素,要么将它们发送到其他地方,进入一些“无法及时处理”的Kafka流并处理它们分别

关于java - Spark MapwithState stateSnapshots 不缩放(Java),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45709451/

相关文章:

scala - 如何将 Option 与 Spark UDF 结合使用

java - Windows 10 中的 Kafka 设置

kotlin - Spring Cloud Kafka限制单位时间消息消耗

websocket - 是否可以将 websocket 消息发送到 kafka 主题?

java - 倒计时器显示 toast 一半时间

java - Jar可执行文件不是播放wav,而是播放midi并加载其他资源

java - 如何使用时间进行计算(半小时)

python - PySpark HDFS 数据流读/写

apache-spark - Spark 作业卡在方法收集上

java - 从类路径中动态删除 jar