apache-spark - 理解 Spark 结构化流并行

标签 apache-spark apache-spark-sql spark-structured-streaming

我是 Spark 世界的新手,并且在一些概念上苦苦挣扎。

使用来自 Kafka 的 Spark Structured Streaming 时,并行性如何发生?

让我们考虑以下片段代码:

SparkSession spark = SparkSession
          .builder()
          .appName("myApp")
          .getOrCreate();   

Dataset<VideoEventData> ds = spark
  .readStream()
  .format("kafka")
  ...

gDataset = ds.groupByKey(...)

pDataset = gDataset.mapGroupsWithState(
      ...
      /* process each key - values */
      loop values
        if value is valid - save key/value result in the HDFS
      ... 
)

StreamingQuery query = pDataset.writeStream()
          .outputMode("update")
          .format("console")
          .start();

//await
query.awaitTermination();

我读过并行度与数据分区的数量有关,数据集的分区数量基于 spark.sql.shuffle.partitions范围。
  • 对于每一批次(从 Kafka 拉取),拉取的项目是否会被分配到 spark.sql.shuffle.partitions 个 | 中?例如,spark.sql.shuffle.partitions=5Batch1=100行,我们最终会得到 5 个分区,每个分区 20 行吗?
  • 考虑到提供的片段代码,由于 groupByKey,我们是否仍然利用 Spark 并行性?后跟一个 mapGroups/mapGroupsWithState职能 ?

  • 更新:

    gDataset.mapGroupsWithState是我处理每个键/值并将结果存储在 HDFS 的地方。因此,输出接收器仅用于在控制台中输出一些统计信息。

    最佳答案

    For every Batch (pull from the Kafka), will the pulled items be divided among the number of spark.sql.shuffle.partitions?



    一旦到达groupByKey就会被分割这是一个洗牌边界。第一次检索数据时,分区数将等于 Kafka 分区数

    Considering the snippet code provided, do we still leverage in the Spark parallelism due to the groupByKey followed by a mapGroups/mapGroupsWithState functions



    通常是的,但这也取决于您如何设置 Kafka 主题。尽管您从代码中看不到,但 Spark 会在内部将不同阶段的数据拆分为更小的任务,并将它们分配到集群中可用的执行程序中。如果您的 Kafka 主题只有 1 个分区,则意味着在 groupByKey 之前,您的内部流将包含一个分区,该分区不会被并行化,而是在单个执行程序上执行。只要您的 Kafka 分区计数大于 1,您的处理就会并行。在shuffle边界之后,Spark将重新分区数据以包含spark.sql.shuffle.partitions指定的分区数量。 .

    关于apache-spark - 理解 Spark 结构化流并行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48239970/

    相关文章:

    apache-spark - 如何在pyspark中使用foreach sink?

    scala - 如何使用FileFormat格式的更新输出模式?

    apache-spark - 使用ALS进行训练时,Spark会给出StackOverflowError

    apache-spark - 如何在本地模式下运行 Spark SQL Thrift Server 并使用 JDBC 连接到 Delta

    apache-spark - Spark SQL-Hive “Cannot overwrite table”解决方法

    scala - 如何迭代scalawrappedArray? ( Spark )

    scala - 为什么在流数据集上使用缓存会失败并显示 "AnalysisException: Queries with streaming sources must be executed with writeStream.start()"?

    apache-spark - Spark : Removing rows which occur less than N times

    apache-spark - 根据数据帧条件在 Spark 中创建自定义计数器

    apache-spark - Jupyter 和 PySpark : How to run multiple notebooks