python - 为什么我的 Spark Streaming 工作变慢了

标签 python cassandra apache-spark spark-streaming pyspark

我有一份工作,每 10 秒从 Kafka 接收一次数据,然后我格式化数据并插入到 cassandra 中,但我的工作越来越慢,这让我很困惑。

根据我的统计,每 10 秒有不到 100 条消息,第一次处理最多只需要 1 秒,但几天后处理速度变慢,处理 10 秒需要 14 秒现在的数据。

我很困惑是否有某些因素会使工作变慢。

而且我注意到处理 python -m pyspark.daemon 也会消耗越来越多的内存,有没有一些方法可以降低内存成本。

PID   USER      PR   NI VIRT    RES     SHR  S  %CPU %MEM   TIME+ COMMAND 

24527 yao.yu    20   0 10.334g 9.823g   3580 R  96.8 66.9   3424:56 python                                                                                                                                                     

代码如下:

if __name__ == "__main__":
    conf = SparkConf().setAppName("Kafka_To_Cassandra").set("spark.streaming.kafka.maxRatePerPartition", "1000")
    sc = SparkContext(conf = conf)
    ssc = StreamingContext(sc, 10)

    brokers, topic = sys.argv[1:] 

    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers, "auto.offset.reset": "smallest"})
    lines = kvs.map(lambda x: x[1]) \
           .filter(lambda s: 'identifier' in s) \
           .filter(lambda s: 'app_name' in s) \
           .filter(lambda s: 'app_version' in s)
    map_lines = lines.map(mapper).filter(lambda s: 'JsonLoadException' not in s)
    #map_lines.pprint()
    map_lines.foreachRDD(lambda rdd: rdd.foreachPartition(save_to_cassandra))

    ssc.start()
    ssc.awaitTermination()

最佳答案

此配置可能对您有所帮助。

spark.cleaner.ttl

Spark 将记住任何元数据(生成的阶段、生成的任务等)的持续时间(秒)。定期清理将确保忘记超过此持续时间的元数据。这对于运行 Spark 数小时/数天非常有用(例如,在 Spark Streaming 应用程序的情况下全天候运行)。请注意,任何在内存中持续存在超过此持续时间的 RDD 也将被清除。

关于python - 为什么我的 Spark Streaming 工作变慢了,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32730638/

相关文章:

scala - Scala 中的 Spark : How to avoid linear scan for searching a key in each partition?

apache-spark - Spark CountVectorizer 返回一个 TinyInt

python - 任何 Python ORM(SQLAlchemy?)都可以与 Google App Engine 一起使用吗?

python - 在 requests.Session 中计算请求并更新请求 header

java - 使用 Datastax Cassandra 驱动程序时重新使用 PreparedStatement?

cassandra - 无法在 cassandra 上创建索引

python - 将辅助变量传递给数据类 __init__ 而不分配它们

python - 防止科学记数法

cassandra - 卡夫卡连接 : Error in sink cassandra connector

apache-spark - 如何在pyspark中将行转换为字典列表?