apache-spark - Apache Spark中如何实现任务的动态负载均衡

标签 apache-spark spark-streaming load-balancing job-scheduling

我知道在 Spark 中我可以通过使用多个分区来分割计算。如果说我可以将输入 RDD 拆分为 1000 个分区,并且我的机器数量为 100,Spark 会将计算拆分为 1000 个任务,并以某种智能方式将它们动态分配到我的 100 台机器中。

现在假设我最初只能将数据分成 2 个分区,但我仍然有 100 台机器。自然我的98机器就闲置了。但当我处理每个任务时,我可能会将其拆分为可能在不同机器上执行的子任务。它可以通过简单的 Java 队列轻松实现,但我不确定在 Apache Spark 中攻击它的最佳方法是什么。

考虑以下 Java 伪代码:

BlockingQueue<Task> q = new LinkedBlockingQueue<Task>();
q.push(myInitialTask);
...
//On each thread:
while (!queue.isEmpty()) {
    Task nextTask = queue.take();
    List<Task> newTasks = process_task_and_split_to_sub_tasks(nextTask);
    queue.pushAll(newTasks);
} 

假设方法“process_task_and_split_to_sub_tasks()”可以将任何大型任务拆分为多个较小的任务,则上述 Java 代码将使我的所有 100 个线程保持忙碌。

有没有办法在 Spark 中实现相同的效果,可以与其他工具结合使用?


更新:已经正确指出,攻击它的方法之一就是

  1. 生成更细粒度的 key 并
  2. 然后使用智能分区程序将这些键分配给分区。

我想这是解决这个问题的“经典”方法,但它要求我能够正确估计每个键的工作量以正确对其进行分区。如果我没有好的方法提前知道每个键的工作量怎么办?当我的大多数机器保持空闲状态等待一些不幸的分区时,我可能会遇到非常不幸的分区。

示例:让我们以简化的频繁项集挖掘为例。
假设我的文件包含从 a 到 j 的字母(10 个字母),每行中的所有字母均按字母顺序排序且不重复,例如'abcf',任务是找到所有行中 50% 中存在的所有字母组合。例如。如果许多行与模式 'ab.*f' 匹配,则输出将包含 {'a', 'b', 'f', 'ab', 'af', 'bf', 'abf'}。
实现它的方法之一是将所有以“a”开头的行发送到一台映射器(机器),将所有以“b”开头的行发送到另一台映射器(机器),等等。顺便说一下,这就是frequent pattern mining is implemented in Spark的方式。 。现在假设我有 100 台机器(但只有 10 个字母)。那么我的 90 台机器将保持闲置状态。
通过更细粒度的 key 解决方案,我可以生成 10,000 个 4 字母前缀,然后根据每个前缀的估计工作量以某种方式对它们进行分区。但我的分区可能是非常错误的:如果大多数行以“abcd”开头,那么所有工作将由负责该前缀(可能还有除此之外的其他前缀)的机器完成,再次产生当我的大多数机器处于闲置状态等待某些不幸的机器时,就会出现这种情况。

在这种情况下,动态负载平衡将是这样的:收到以“a”开头的行的映射器可能希望进一步拆分其行 - 到以“ab”开头的行'、'ac'、'ad'...,然后将它们发送到其他 10 台机器,这些机器可能会决定将其工作进一步拆分为更多任务。
我知道标准 Apache Spark 没有现成的答案,但我想知道是否有办法实现这一点。

Kafka(即队列,如上所述)+ Spark Streaming 看起来很有前途,您认为我能够以相对简单的方式使用这些工具来实现动态负载均衡吗?您能推荐其他工具吗?

最佳答案

Now suppose I have 100 machines (but only 10 letters). The mapper that has received the lines starting with 'a' might want to further split its lines - to those starting with 'ab', 'ac', 'ad' etc. and then send them to 10 other machines.

这并不是 Spark 的工作原理。 “Mapper”(任务)大多不了解所有分布式上下文。在此级别,无法访问 SparkContext,我们不再拥有 RDD,只需作为本地迭代器输入以及要在其上执行的代码。它无法启动,也无法创建新任务。

同时,你的问题定义是人为的。为了找到频繁的模式,你必须聚合数据,因此你需要随机播放。此时,对应于给定模式的记录必须被洗牌到同一台机器。确保数据正确分布是 Partitioner 的工作,这里确实没有“ split ”的空间。

关于apache-spark - Apache Spark中如何实现任务的动态负载均衡,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48139541/

相关文章:

apache-spark - Spark SQL - 使用 SQL 语句而不是表名通过 JDBC 加载数据

scala - 计算 Spark(结构化)流应用程序的数据处理速率

spark-streaming - Databricks 无法保存流检查点

scala - 如何访问 jar 中的静态资源(对应于 src/main/resources 文件夹)?

asp.net - ASP.NET 站点的负载共享

amazon-ec2 - EC2 用于处理需求高峰

apache-spark - 为什么 SparkSQL 在 SQL 查询中需要两个文字转义反斜杠?

hadoop - Apache 星火RDD

apache-spark - DStream 和 Map 中的 Spark Transform 之间的确切区别是什么?

apache - 如何在 tomcat 负载均衡器中启用粘性 session ?