我正在应用一个 map
函数来对我的数据执行一些 ETL。此功能通常非常快速,并且由于数据分布良好,因此创建了足够多的任务,因此我得到了良好且统一的利用率。
问题是 map
函数在某些数据组合上会成为 I/O 绑定(bind)。发生的情况是,通常触发数据将出现在单个 block 上(它们按顺序到达),因此被单个节点/任务拾取。然后发生的事情是,处理 100GB 需要 5-6 秒,而处理单个 block (在 MapR 中为 256mb)需要 20 分钟,因为它是由单个线程执行的。
有没有办法只为这个 block 增加并行化? 在这种情况下,人们通常会做什么?
到目前为止我已经确定的选项(我将其描述为解决方法)是:
spark.default.parallelism
:这将影响全局执行并导致整体时间不理想。尽管文档声明这是混洗操作的并行性,但我观察到它也会影响map
并行性。您能否详细说明内部发生的情况?这会覆盖 block 的处理方式吗?spark.task.cpus
:这太粗粒度了,它会再次影响全局执行特性。- 在
map
函数内使用fork/join
并在检测到 I/O 绑定(bind)延迟时委托(delegate)给ExecutorService
:这会使事情变得复杂并且从框架中获取资源控制,该框架将在难以解决的恶劣情况下实现。 sc.textFile("theFile.txt", 100)
:这将影响我的主要 RDD(即 100GB)和整个集合的后续转换/操作。 tahn 1 好一点,但仍然不理想(从 pzecevic 的回答更新)
最佳答案
您可以在应用映射转换的 RDD 上设置并行度。
rdd.repartition(100)
我不知道您是如何创建 RDD 的,但有时您可以在创建 RDD 时指定并行度:
sc.textFile("theFile.txt", 100)
这将直接影响映射任务的数量(在本例中为 100)。
关于java - 我可以在 Spark 中动态控制 map 函数的并发吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28294385/