我有一份工作,每 10 秒从 Kafka 接收一次数据,然后我格式化数据并插入到 cassandra 中,但我的工作越来越慢,这让我很困惑。
根据我的统计,每 10 秒有不到 100 条消息,第一次处理最多只需要 1 秒,但几天后处理速度变慢,处理 10 秒需要 14 秒现在的数据。
我很困惑是否有某些因素会使工作变慢。
而且我注意到处理 python -m pyspark.daemon
也会消耗越来越多的内存,有没有一些方法可以降低内存成本。
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
24527 yao.yu 20 0 10.334g 9.823g 3580 R 96.8 66.9 3424:56 python
代码如下:
if __name__ == "__main__":
conf = SparkConf().setAppName("Kafka_To_Cassandra").set("spark.streaming.kafka.maxRatePerPartition", "1000")
sc = SparkContext(conf = conf)
ssc = StreamingContext(sc, 10)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers, "auto.offset.reset": "smallest"})
lines = kvs.map(lambda x: x[1]) \
.filter(lambda s: 'identifier' in s) \
.filter(lambda s: 'app_name' in s) \
.filter(lambda s: 'app_version' in s)
map_lines = lines.map(mapper).filter(lambda s: 'JsonLoadException' not in s)
#map_lines.pprint()
map_lines.foreachRDD(lambda rdd: rdd.foreachPartition(save_to_cassandra))
ssc.start()
ssc.awaitTermination()
最佳答案
此配置可能对您有所帮助。
spark.cleaner.ttl
Spark 将记住任何元数据(生成的阶段、生成的任务等)的持续时间(秒)。定期清理将确保忘记超过此持续时间的元数据。这对于运行 Spark 数小时/数天非常有用(例如,在 Spark Streaming 应用程序的情况下全天候运行)。请注意,任何在内存中持续存在超过此持续时间的 RDD 也将被清除。
关于python - 为什么我的 Spark Streaming 工作变慢了,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32730638/