我正在使用 spark 从 Kafka Stream 接收数据,以接收有关发送定期健康更新的 IOT 设备的状态以及有关设备中存在的各种传感器的状态。我的 Spark 应用程序监听单个主题以使用 Spark 直接流从 Kafka 流接收更新消息。我需要根据每个设备的传感器状态触发不同的警报。但是,当我添加更多使用 Kakfa 将数据发送到 spark 的 IOT 设备时,尽管添加了更多的机器数量并且执行程序的数量增加了,但 Spark 并没有扩展。下面我给出了我的 Spark 应用程序的精简版本,其中删除了具有相同性能问题的通知触发部分。
// Method for update the Device state , it just a in memory object which tracks the device state .
private static Optional<DeviceState> trackDeviceState(Time time, String key, Optional<ProtoBufEventUpdate> updateOpt,
State<DeviceState> state) {
int batchTime = toSeconds(time);
ProtoBufEventUpdate eventUpdate = (updateOpt == null)?null:updateOpt.orNull();
if(eventUpdate!=null)
eventUpdate.setBatchTime(ProximityUtil.toSeconds(time));
if (state!=null && state.exists()) {
DeviceState deviceState = state.get();
if (state.isTimingOut()) {
deviceState.markEnd(batchTime);
}
if (updateOpt.isPresent()) {
deviceState = DeviceState.updatedDeviceState(deviceState, eventUpdate);
state.update(deviceState);
}
} else if (updateOpt.isPresent()) {
DeviceState deviceState = DeviceState.newDeviceState(eventUpdate);
state.update(deviceState);
return Optional.of(deviceState);
}
return Optional.absent();
}
SparkConf conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
.set("spark.rpc.netty.dispatcher.numThreads", String.valueOf(Runtime.getRuntime().availableProcessors()))
JavaStreamingContext context= new JavaStreamingContext(conf, Durations.seconds(10));
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put( “zookeeper.connect”, “192.168.60.20:2181,192.168.60.21:2181,192.168.60.22:2181”);
kafkaParams.put("metadata.broker.list", “192.168.60.20:9092,192.168.60.21:9092,192.168.60.22:9092”);
kafkaParams.put(“group.id”, “spark_iot”);
HashSet<String> topics=new HashSet<>();
topics.add(“iottopic”);
JavaPairInputDStream<String, ProtoBufEventUpdate> inputStream = KafkaUtils.
createDirectStream(context, String.class, ProtoBufEventUpdate.class, KafkaKryoCodec.class, ProtoBufEventUpdateCodec.class, kafkaParams, topics);
JavaPairDStream<String, ProtoBufEventUpdate> updatesStream = inputStream.mapPartitionsToPair(t -> {
List<Tuple2<String, ProtoBufEventUpdate>> eventupdateList=new ArrayList<>();
t.forEachRemaining(tuple->{
String key=tuple._1;
ProtoBufEventUpdate eventUpdate =tuple._2;
Util.mergeStateFromStats(eventUpdate);
eventupdateList.add(new Tuple2<String, ProtoBufEventUpdate>(key,eventUpdate));
});
return eventupdateList.iterator();
});
JavaMapWithStateDStream<String, ProtoBufEventUpdate, DeviceState, DeviceState> devceMapStream = null;
devceMapStream=updatesStream.mapWithState(StateSpec.function(Engine::trackDeviceState)
.numPartitions(20)
.timeout(Durations.seconds(1800)));
devceMapStream.checkpoint(new Duration(batchDuration*1000));
JavaPairDStream<String, DeviceState> deviceStateStream = devceMapStream
.stateSnapshots()
.cache();
deviceStateStream.foreachRDD(rdd->{
if(rdd != null && !rdd.isEmpty()){
rdd.foreachPartition(tuple->{
tuple.forEachRemaining(t->{
SparkExecutorLog.error("Engine::getUpdates Tuple data "+ t._2);
});
});
}
});
即使负载增加,我也看不到 Executor 实例的 CPU 使用率增加。大多数时候 Executor 实例 CPU 处于空闲状态。我尝试增加 kakfa 分区(目前 Kafka 有 72 个分区。我也尝试将其降低到 36 个)。我也尝试增加 devceMapStream partitions 。但我看不到任何性能改进。代码没有在 IO 上花费任何时间。
我在 Amazon EMR(Yarn) 上使用 6 个执行程序实例运行我们的 Spark 应用程序,每台机器都有 4 个内核和 32 gb Ram。它尝试将执行程序实例的数量增加到 9,然后增加到 15,但没有看到任何性能改进。还通过将 spark.default.parallelism 值设置为 20, 36, 72, 100 对它进行了一些调整,但我可以看到 20 是给我更好性能的那个(也许每个执行程序的内核数量对此有一些影响)。
spark-submit --deploy-mode cluster --class com.ajay.Engine --supervise --driver-memory 5G --driver-cores 8 --executor-memory 4G --executor-cores 4 --conf spark.default.parallelism=20 --num-executors 36 --conf spark.dynamicAllocation.enabled=false --conf spark.streaming.unpersist=false --conf spark.eventLog.enabled=false --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties --conf spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError --conf spark.executor.extraJavaOptions=-XX:HeapDumpPath=/tmp --conf spark.executor.extraJavaOptions=-XX:+UseG1GC --conf spark.driver.extraJavaOptions=-XX:+UseG1GC --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j.properties s3://test/engine.jar
目前 Spark 很难在 10 秒内完成处理(我什至尝试过不同的批处理持续时间,如 5、10、15 等)。完成一个批处理需要 15-23 秒,输入速率为每秒 1600 条记录,每批处理有 17000 条记录。我需要使用 statesteam 定期检查设备的状态,以查看设备是否发出任何警报或任何传感器是否已停止响应。我不确定如何提高 spark 应用程序的性能?
最佳答案
mapWithState
执行以下操作:
applying a function to every key-value element of this stream, while maintaining some state data for each unique key
根据其文档:PairDStreamFunctions#mapWithState
这也意味着对于每个批处理,所有具有相同键的元素都按顺序处理,并且,因为 StateSpec
中的函数是任意的并且由我们提供,在没有定义状态组合器的情况下,无论您如何在 mapWithState
之前对数据进行分区,它都无法进一步并行化。 IE。当键不同时,并行化会很好,但如果所有 RDD 元素之间只有几个唯一键,那么整个批处理将主要由与唯一键数量相等的核心数处理。
在您的例子中, key 来自 Kafka:
t.forEachRemaining(tuple->{
String key=tuple._1;
并且您的代码片段没有显示它们是如何生成的。
根据我的经验,这可能会发生这种情况:您的批处理的某些部分正在被多个核心快速处理,而另一部分对整体的大部分具有相同的 key ,需要更多时间并延迟批处理,这就是为什么您看到大部分时间只有少数任务在运行,而执行程序负载不足。
要看是否属实,请检查您的键分布,每个键有多少个元素,难道只有几个键有所有元素的 20%?如果这是真的,您有以下选择:
- 更改 key 生成算法
- 在
mapWithState
之前人为地拆分有问题的键,然后合并状态快照以对整体有意义 - 限制每批处理的具有相同键的元素的数量,要么忽略每批中前N个之后的元素,要么将它们发送到其他地方,进入一些“无法及时处理”的Kafka流并处理它们分别
关于java - Spark MapwithState stateSnapshots 不缩放(Java),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45709451/