我正在尝试连接两个 Spark RDD。我有一个链接到类别的交易日志。我已将我的事务 RDD 格式化为将类别 ID 作为键。
transactions_cat.take(3)
[(u'707', [u'86246', u'205', u'7', u'707', u'1078778070', u'12564', u'2012-03-02 00:00:00', u'12', u'OZ', u'1', u'7.59']),
(u'6319', [u'86246', u'205', u'63', u'6319', u'107654575', u'17876', u'2012-03-02 00:00:00', u'64', u'OZ', u'1', u'1.59']),
(u'9753', [u'86246', u'205', u'97', u'9753', u'1022027929', u'0', u'2012-03-02 00:00:00', u'1', u'CT', u'1', u'5.99'])]
categories.take(3)
[(u'2202', 0), (u'3203', 0), (u'1726', 0)]
事务日志约为 20 GB(3.5 亿行)。 类别列表小于 1KB。
当我运行时
transactions_cat.join(categories).count()
Spark 开始变得非常慢。我有一个有 643 个任务的阶段。前 10 个任务大约需要 1 分钟。然后每个任务越来越慢(第 60 个任务大约需要 15 分钟)。我不确定出了什么问题。
请查看这些屏幕截图以获得更好的想法。
我正在运行 Spark 1.1.0,4 个工作人员使用 python shell,总内存为 50 GB。 仅用 RDD 计算事务是相当快的(30 分钟)
最佳答案
问题可能在于 Spark 没有注意到您有一个简单的连接问题案例。当您要加入的两个 RDD
中的一个太小时,您最好不要成为 RDD
。然后你可以推出自己的 hash join 实现,这实际上比听起来简单得多。基本上,您需要:
- 使用
collect()
将您的类别列表从RDD
中拉出来——由此产生的通信很容易收回成本(或者,如果可能,不要这样做)首先是一个RDD
) - 将它变成一个哈希表,其中一个条目包含一个键的所有值(假设您的键不是唯一的)
- 对于大型
RDD
中的每一对,在哈希表中查找键并为列表中的每个值生成一对(如果未找到,则该特定对不会产生任何结果)
我有一个 implementation in Scala -- 请随时提出有关翻译的问题,但这应该很容易。
另一个有趣的可能性是尝试使用 Spark SQL .我很确定该项目的长期目标将包括自动为您执行此操作,但我不知道他们是否已经实现了。
关于python - Spark 连接呈指数级缓慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26238794/