java - 我可以在 Spark 中动态控制 map 函数的并发吗?

标签 java multithreading apache-spark

我正在应用一个 map 函数来对我的数据执行一些 ETL。此功能通常非常快速,并且由于数据分布良好,因此创建了足够多的任务,因此我得到了良好且统一的利用率。

问题是 map 函数在某些数据组合上会成为 I/O 绑定(bind)。发生的情况是,通常触发数据将出现在单个 block 上(它们按顺序到达),因此被单个节点/任务拾取。然后发生的事情是,处理 100GB 需要 5-6 秒,而处理单个 block (在 MapR 中为 256mb)需要 20 分钟,因为它是由单个线程执行的。

有没有办法只为这个 block 增加并行化? 在这种情况下,人们通常会做什么?

到目前为止我已经确定的选项(我将其描述为解决方法)是:

  1. spark.default.parallelism :这将影响全局执行并导致整体时间不理想。尽管文档声明这是混洗操作的并行性,但我观察到它也会影响 map 并行性。您能否详细说明内部发生的情况?这会覆盖 block 的处理方式吗?
  2. spark.task.cpus :这太粗粒度了,它会再次影响全局执行特性。
  3. map 函数内使用 fork/join 并在检测到 I/O 绑定(bind)延迟时委托(delegate)给 ExecutorService:这会使事情变得复杂并且从框架中获取资源控制,该框架将在难以解决的恶劣情况下实现。
  4. sc.textFile("theFile.txt", 100) :这将影响我的主要 RDD(即 100GB)和整个集合的后续转换/操作。 tahn 1 好一点,但仍然不理想(从 pzecevic 的回答更新)

最佳答案

您可以在应用映射转换的 RDD 上设置并行度。

rdd.repartition(100)

我不知道您是如何创建 RDD 的,但有时您可以在创建 RDD 时指定并行度:

sc.textFile("theFile.txt", 100)

这将直接影响映射任务的数量(在本例中为 100)。

关于java - 我可以在 Spark 中动态控制 map 函数的并发吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28294385/

相关文章:

java - 同步(this)和同步(其他对象)之间有什么区别

apache-spark - [Pyspark SQL : Multi-column sessionization

scala - 无法启动 spark-shell,因为它会在 hadoop 集群配置上产生错误,但是,在没有 hadoop 集群的情况下工作正常

scala - spark-shell 依赖项,从 sbt 翻译

java - hibernate 性能问题,坚持一个接一个还是批量?

java - SimpleDateFormat 仅限于预定义日期

java - 如何避免从 Java Web 服务器向自身发送 HTTP 请求?

java - 解决 Spark Core 和 azure Key Vault 依赖项之间的 guava 冲突

mysql - MySQL 可以支持每个连接的并发查询吗?

python - 在调用者线程中捕获线程的异常?