apache-spark - 使用 HDFS 存储的 Spark 作业

标签 apache-spark hadoop pyspark hdfs google-cloud-dataproc

我在 Google Cloud Dataproc 上运行了一个长期运行的 Spark Structured Streaming Job,它使用 Kafka 作为源和接收器。我还将检查点保存在 Google Cloud Storage 中。

运行一周后,我注意到它一直在消耗所有 100 GB 磁盘存储空间,将文件保存到 /hadoop/dfs/data/current/BP-315396706-10.128.0.26-1568586969675/current/finalized/...。 .

我的理解是我的 Spark 作业不应该对本地磁盘存储有任何依赖。

我在这里完全误解了吗?

我像这样提交了我的工作:

(cd  app/src/packages/ &&  zip -r mypkg.zip mypkg/ ) && mv app/src/packages/mypkg.zip build
gcloud dataproc jobs submit pyspark \
    --cluster cluster-26aa \
    --region us-central1 \
    --properties ^#^spark.jars.packages=org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.3,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 \
    --py-files build/mypkg.zip \
    --max-failures-per-hour 10 \
    --verbosity info \
    app/src/explode_rmq.py

这些是我工作的相关部分:

来源:
 spark = SparkSession \
        .builder \
        .appName("MyApp") \
        .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    spark.sparkContext.addPyFile('mypkg.zip')

    df = spark \
        .readStream \
        .format("kafka") \
        .options(**config.KAFKA_PARAMS) \
        .option("subscribe", "lsport-rmq-12") \
        .option("startingOffsets", "earliest") \
        .load() \
        .select(f.col('key').cast(t.StringType()), f.col('value').cast(t.StringType()))

下沉:
    sink_kafka_q = sink_df \
        .writeStream \
        .format("kafka") \
        .options(**config.KAFKA_PARAMS) \
        .option("topic", "my_topic") \
        .option("checkpointLocation", "gs://my-bucket-data/checkpoints/my_topic") \
        .start()

最佳答案

如果内存不够,Spark 会将信息持久化到本地磁盘。您可以像这样禁用磁盘上的持久性:

df.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)

或者你可以尝试像这样序列化信息来占用更少的内存
df.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)

读取序列化数据将更加占用 CPU。

每个数据帧都有其独特的序列化级别。

更多信息:https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence

关于apache-spark - 使用 HDFS 存储的 Spark 作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58184273/

相关文章:

function - 如何在 Spark Java 中使用分析/窗口函数?

hadoop - 在Hive中的许多表上执行有效的联接

c++ - 在CDH4中获取mapreduce.task.partition

apache-spark - 当系列到系列(PandasUDFType.SCALAR)可用时,为什么系列迭代器到系列 pandasUDF(PandasUDFType.SCALAR_ITER)的迭代器?

pandas - 将带有日期列的 pyspark DataFrame 转换为 Pandas 会导致 AttributeError

hadoop - 通过 Oozie 运行 Spark 作业

apache-spark - 如何避免连接中键列名称重复?

python-3.x - 如何在 Spark 数据帧中存储 Python 字节串

elasticsearch - Spark 应用程序无法写入在 docker 中运行的 elasticsearch 集群

hadoop - 是否必须将主服务器和从服务器的配置文件放到所有从服务器和辅助名称节点上?