python - Spark 连接呈指数级缓慢

标签 python scala apache-spark

我正在尝试连接两个 Spark RDD。我有一个链接到类别的交易日志。我已将我的事务 RDD 格式化为将类别 ID 作为键。

transactions_cat.take(3)
[(u'707', [u'86246', u'205', u'7', u'707', u'1078778070', u'12564', u'2012-03-02 00:00:00', u'12', u'OZ', u'1', u'7.59']), 
(u'6319', [u'86246', u'205', u'63', u'6319', u'107654575', u'17876', u'2012-03-02 00:00:00', u'64', u'OZ', u'1', u'1.59']), 
(u'9753', [u'86246', u'205', u'97', u'9753', u'1022027929', u'0', u'2012-03-02 00:00:00', u'1', u'CT', u'1', u'5.99'])]

categories.take(3)
[(u'2202', 0), (u'3203', 0), (u'1726', 0)]

事务日志约为 20 GB(3.5 亿行)。 类别列表小于 1KB。

当我运行时

transactions_cat.join(categories).count()

Spark 开始变得非常慢。我有一个有 643 个任务的阶段。前 10 个任务大约需要 1 分钟。然后每个任务越来越慢(第 60 个任务大约需要 15 分钟)。我不确定出了什么问题。

请查看这些屏幕截图以获得更好的想法。 enter image description here enter image description here enter image description here

我正在运行 Spark 1.1.0,4 个工作人员使用 python shell,总内存为 50 GB。 仅用 RDD 计算事务是相当快的(30 分钟)

最佳答案

问题可能在于 Spark 没有注意到您有一个简单的连接问题案例。当您要加入的两个 RDD 中的一个太小时,您最好不要成为 RDD。然后你可以推出自己的 hash join 实现,这实际上比听起来简单得多。基本上,您需要:

  • 使用 collect() 将您的类别列表从 RDD 中拉出来——由此产生的通信很容易收回成本(或者,如果可能,不要这样做)首先是一个 RDD)
  • 将它变成一个哈希表,其中一个条目包含一个键的所有值(假设您的键不是唯一的)
  • 对于大型 RDD 中的每一对,在哈希表中查找键并为列表中的每个值生成一对(如果未找到,则该特定对不会产生任何结果)

我有一个 implementation in Scala -- 请随时提出有关翻译的问题,但这应该很容易。

另一个有趣的可能性是尝试使用 Spark SQL .我很确定该项目的长期目标将包括自动为您执行此操作,但我不知道他们是否已经实现了。

关于python - Spark 连接呈指数级缓慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26238794/

相关文章:

java - 如何处理 Spark rdd 生成上的 CSV 文件列?

hadoop - 如何在 spark newAPIHadoopRDD 中获取 hbase 单元的所有版本?

python - 推特 API : How to search tweets based on query words and predetermined time span + tweets characteristics

Python 'for word in word' 循环

scala - Spark如何从多个Elastic Search集群读取

scala - map <字符串,列表<?在 Scala 中扩展 T>>

scala - Akka:坚持到 Cassandra 并将多个事件发布到 Kafka

python - 使用 tensorflow.data.Dataset.from_generator 时出现 InvalidArgumentError

python - 如何在 Windows 中使用 Python 删除只读属性目录?

hadoop - java.io.IOException : Not a data file 异常