apache-spark - Spark Multiple Join 内存不足错误

标签 apache-spark join

我需要在 apache spark 中执行 300 多个连接。我已将我的问题简化为以下示例,这些示例仍然会在 Spark 1.6 和 2.0 平台中产生内存不足错误:

有没有更好的方法来执行过度连接?我尝试了 union/ 的新颖组合groupByKey 哪些工作但不保留理解数据所必需的列。我还尝试了广播连接、cogroup、persist 和更多变体。

var data = Seq((1,2))
var data2 = Seq((1,3))
var rdd1 = sc.parallelize(data)
var rdd2 = sc.parallelize(data2)
var d2 = rdd1.toDF()
var d5 = rdd2.toDF()
var  d3   = d2.join(d5, Seq("_1"), "left_outer")
var  d4   = d3.join(d5, Seq("_1"), "left_outer")
var  d5a   = d4.join(d5, Seq("_1"), "left_outer")
var  d6   = d5a.join(d5, Seq("_1"), "left_outer")
var  d7   = d6.join(d5, Seq("_1"), "left_outer")
var  d8   = d7.join(d5, Seq("_1"), "left_outer")
var  d9   = d8.join(d5, Seq("_1"), "left_outer")
var  d10   = d9.join(d5, Seq("_1"), "left_outer")
var  d11   = d10.join(d5, Seq("_1"), "left_outer")
var  d12   = d11.join(d5, Seq("_1"), "left_outer")
var  d13   = d12.join(d5, Seq("_1"), "left_outer")
var  d14   = d13.join(d5, Seq("_1"), "left_outer")
var  d15   = d14.join(d5, Seq("_1"), "left_outer")
var  d16   = d15.join(d5, Seq("_1"), "left_outer")
var  d17   = d16.join(d5, Seq("_1"), "left_outer")
var  d18   = d17.join(d5, Seq("_1"), "left_outer")
var  d19   = d18.join(d5, Seq("_1"), "left_outer")
var  d20   = d19.join(d5, Seq("_1"), "left_outer")
var  d21   = d20.join(d5, Seq("_1"), "left_outer")
var  d22   = d21.join(d5, Seq("_1"), "left_outer")
var  d23   = d22.join(d5, Seq("_1"), "left_outer")
var  d24   = d23.join(d5, Seq("_1"), "left_outer")
var  d25   = d24.join(d5, Seq("_1"), "left_outer")
var  d26   = d25.join(d5, Seq("_1"), "left_outer")
var  d27   = d26.join(d5, Seq("_1"), "left_outer")
var  d28   = d27.join(d5, Seq("_1"), "left_outer")
var  d29   = d28.join(d5, Seq("_1"), "left_outer")
var  d30   = d29.join(d5, Seq("_1"), "left_outer")
var  d31   = d30.join(d5, Seq("_1"), "left_outer")
var  d32   = d31.join(d5, Seq("_1"), "left_outer")
var  d33   = d32.join(d5, Seq("_1"), "left_outer")
var  d34   = d33.join(d5, Seq("_1"), "left_outer")
var  d35   = d34.join(d5, Seq("_1"), "left_outer")
var  d36   = d35.join(d5, Seq("_1"), "left_outer")
var  d37   = d36.join(d5, Seq("_1"), "left_outer")
var  d38   = d37.join(d5, Seq("_1"), "left_outer")
var  d39   = d38.join(d5, Seq("_1"), "left_outer")
var  d40   = d39.join(d5, Seq("_1"), "left_outer")
var  d41   = d40.join(d5, Seq("_1"), "left_outer")
var  d42   = d41.join(d5, Seq("_1"), "left_outer")
var  d43   = d42.join(d5, Seq("_1"), "left_outer")
var  d44   = d43.join(d5, Seq("_1"), "left_outer")
var  d45   = d44.join(d5, Seq("_1"), "left_outer")
var  d46   = d45.join(d5, Seq("_1"), "left_outer")
var  d47   = d46.join(d5, Seq("_1"), "left_outer")
var  d48   = d47.join(d5, Seq("_1"), "left_outer")
var  d49   = d48.join(d5, Seq("_1"), "left_outer")
var  d50   = d49.join(d5, Seq("_1"), "left_outer")
var  d51   = d50.join(d5, Seq("_1"), "left_outer")
var  d52   = d51.join(d5, Seq("_1"), "left_outer")
var  d53   = d52.join(d5, Seq("_1"), "left_outer")
var  d54   = d53.join(d5, Seq("_1"), "left_outer")
var  d55   = d54.join(d5, Seq("_1"), "left_outer")
var  d56   = d55.join(d5, Seq("_1"), "left_outer")
var  d57   = d56.join(d5, Seq("_1"), "left_outer")
var  d58   = d57.join(d5, Seq("_1"), "left_outer")
var  d59   = d58.join(d5, Seq("_1"), "left_outer")
var  d60   = d59.join(d5, Seq("_1"), "left_outer")
var  d61   = d60.join(d5, Seq("_1"), "left_outer")
var  d62   = d61.join(d5, Seq("_1"), "left_outer")
var  d63   = d62.join(d5, Seq("_1"), "left_outer")
var  d64   = d63.join(d5, Seq("_1"), "left_outer")
var  d65   = d64.join(d5, Seq("_1"), "left_outer")
var  d66   = d65.join(d5, Seq("_1"), "left_outer")
var  d67   = d66.join(d5, Seq("_1"), "left_outer")
var  d68   = d67.join(d5, Seq("_1"), "left_outer")
var  d69   = d68.join(d5, Seq("_1"), "left_outer")
var  d70   = d69.join(d5, Seq("_1"), "left_outer")
var  d71   = d70.join(d5, Seq("_1"), "left_outer")
var  d72   = d71.join(d5, Seq("_1"), "left_outer")
var  d73   = d72.join(d5, Seq("_1"), "left_outer")
var  d74   = d73.join(d5, Seq("_1"), "left_outer")
var  d75   = d74.join(d5, Seq("_1"), "left_outer")
var  d76   = d75.join(d5, Seq("_1"), "left_outer")
var  d77   = d76.join(d5, Seq("_1"), "left_outer")
var  d78   = d77.join(d5, Seq("_1"), "left_outer")
var  d79   = d78.join(d5, Seq("_1"), "left_outer")
var  d80   = d79.join(d5, Seq("_1"), "left_outer")
var  d81   = d80.join(d5, Seq("_1"), "left_outer")
var  d82   = d81.join(d5, Seq("_1"), "left_outer")
var  d83   = d82.join(d5, Seq("_1"), "left_outer")
var  d84   = d83.join(d5, Seq("_1"), "left_outer")
var  d85   = d84.join(d5, Seq("_1"), "left_outer")
var  d86   = d85.join(d5, Seq("_1"), "left_outer")
var  d87   = d86.join(d5, Seq("_1"), "left_outer")
var  d88   = d87.join(d5, Seq("_1"), "left_outer")
var  d89   = d88.join(d5, Seq("_1"), "left_outer")
var  d90   = d89.join(d5, Seq("_1"), "left_outer")
var  d91   = d90.join(d5, Seq("_1"), "left_outer")
var  d92   = d91.join(d5, Seq("_1"), "left_outer")
var  d93   = d92.join(d5, Seq("_1"), "left_outer")
var  d94   = d93.join(d5, Seq("_1"), "left_outer")
var  d95   = d94.join(d5, Seq("_1"), "left_outer")
var  d96   = d95.join(d5, Seq("_1"), "left_outer")
var  d97   = d96.join(d5, Seq("_1"), "left_outer")
var  d98   = d97.join(d5, Seq("_1"), "left_outer")
var  d99   = d98.join(d5, Seq("_1"), "left_outer")
var  d100   = d99.join(d5, Seq("_1"), "left_outer")
var  d101   = d100.join(d5, Seq("_1"), "left_outer")
var  d102   = d101.join(d5, Seq("_1"), "left_outer")
var  d103   = d102.join(d5, Seq("_1"), "left_outer")
var  d104   = d103.join(d5, Seq("_1"), "left_outer")
var  d105   = d104.join(d5, Seq("_1"), "left_outer")
var  d106   = d105.join(d5, Seq("_1"), "left_outer")
var  d107   = d106.join(d5, Seq("_1"), "left_outer")
var  d108   = d107.join(d5, Seq("_1"), "left_outer")
var  d109   = d108.join(d5, Seq("_1"), "left_outer")
var  d110   = d109.join(d5, Seq("_1"), "left_outer")
var  d111   = d110.join(d5, Seq("_1"), "left_outer")
var  d112   = d111.join(d5, Seq("_1"), "left_outer")
var  d113   = d112.join(d5, Seq("_1"), "left_outer")
var  d114   = d113.join(d5, Seq("_1"), "left_outer")
var  d115   = d114.join(d5, Seq("_1"), "left_outer")
var  d116   = d115.join(d5, Seq("_1"), "left_outer")
var  d117   = d116.join(d5, Seq("_1"), "left_outer")
var  d118   = d117.join(d5, Seq("_1"), "left_outer")
var  d119   = d118.join(d5, Seq("_1"), "left_outer")
var  d120   = d119.join(d5, Seq("_1"), "left_outer")
var  d121   = d120.join(d5, Seq("_1"), "left_outer")
var  d122   = d121.join(d5, Seq("_1"), "left_outer")
var  d123   = d122.join(d5, Seq("_1"), "left_outer")
var  d124   = d123.join(d5, Seq("_1"), "left_outer")
var  d125   = d124.join(d5, Seq("_1"), "left_outer")
var  d126   = d125.join(d5, Seq("_1"), "left_outer")
var  d127   = d126.join(d5, Seq("_1"), "left_outer")
var  d128   = d127.join(d5, Seq("_1"), "left_outer")
var  d129   = d128.join(d5, Seq("_1"), "left_outer")
var  d130   = d129.join(d5, Seq("_1"), "left_outer")
var  d131   = d130.join(d5, Seq("_1"), "left_outer")
var  d132   = d131.join(d5, Seq("_1"), "left_outer")
var  d133   = d132.join(d5, Seq("_1"), "left_outer")
var  d134   = d133.join(d5, Seq("_1"), "left_outer")
var  d135   = d134.join(d5, Seq("_1"), "left_outer")
var  d136   = d135.join(d5, Seq("_1"), "left_outer")
var  d137   = d136.join(d5, Seq("_1"), "left_outer")
var  d138   = d137.join(d5, Seq("_1"), "left_outer")
var  d139   = d138.join(d5, Seq("_1"), "left_outer")
var  d140   = d139.join(d5, Seq("_1"), "left_outer")
var  d141   = d140.join(d5, Seq("_1"), "left_outer")
var  d142   = d141.join(d5, Seq("_1"), "left_outer")
var  d143   = d142.join(d5, Seq("_1"), "left_outer")
var  d144   = d143.join(d5, Seq("_1"), "left_outer")
var  d145   = d144.join(d5, Seq("_1"), "left_outer")
var  d146   = d145.join(d5, Seq("_1"), "left_outer")
var  d147   = d146.join(d5, Seq("_1"), "left_outer")
var  d148   = d147.join(d5, Seq("_1"), "left_outer")
var  d149   = d148.join(d5, Seq("_1"), "left_outer")
var  d150   = d149.join(d5, Seq("_1"), "left_outer")
var  d151   = d150.join(d5, Seq("_1"), "left_outer")
var  d152   = d151.join(d5, Seq("_1"), "left_outer")
var  d153   = d152.join(d5, Seq("_1"), "left_outer")
var  d154   = d153.join(d5, Seq("_1"), "left_outer")
var  d155   = d154.join(d5, Seq("_1"), "left_outer")
var  d156   = d155.join(d5, Seq("_1"), "left_outer")
var  d157   = d156.join(d5, Seq("_1"), "left_outer")
var  d158   = d157.join(d5, Seq("_1"), "left_outer")
var  d159   = d158.join(d5, Seq("_1"), "left_outer")
var  d160   = d159.join(d5, Seq("_1"), "left_outer")
var  d161   = d160.join(d5, Seq("_1"), "left_outer")
var  d162   = d161.join(d5, Seq("_1"), "left_outer")
var  d163   = d162.join(d5, Seq("_1"), "left_outer")
var  d164   = d163.join(d5, Seq("_1"), "left_outer")
var  d165   = d164.join(d5, Seq("_1"), "left_outer")
var  d166   = d165.join(d5, Seq("_1"), "left_outer")
var  d167   = d166.join(d5, Seq("_1"), "left_outer")
var  d168   = d167.join(d5, Seq("_1"), "left_outer")
var  d169   = d168.join(d5, Seq("_1"), "left_outer")
var  d170   = d169.join(d5, Seq("_1"), "left_outer")
var  d171   = d170.join(d5, Seq("_1"), "left_outer")
var  d172   = d171.join(d5, Seq("_1"), "left_outer") 
var  d173   = d172.join(d5, Seq("_1"), "left_outer")
var  d174   = d173.join(d5, Seq("_1"), "left_outer")
var  d175   = d174.join(d5, Seq("_1"), "left_outer")
var  d176   = d175.join(d5, Seq("_1"), "left_outer")
var  d177   = d176.join(d5, Seq("_1"), "left_outer")
var  d178   = d177.join(d5, Seq("_1"), "left_outer")
var  d179   = d178.join(d5, Seq("_1"), "left_outer")
var  d180   = d179.join(d5, Seq("_1"), "left_outer")
var  d181   = d180.join(d5, Seq("_1"), "left_outer")
var  d182   = d181.join(d5, Seq("_1"), "left_outer")
var  d183   = d182.join(d5, Seq("_1"), "left_outer")
var  d184   = d183.join(d5, Seq("_1"), "left_outer")
var  d185   = d184.join(d5, Seq("_1"), "left_outer")
var  d186   = d185.join(d5, Seq("_1"), "left_outer")
var  d187   = d186.join(d5, Seq("_1"), "left_outer")
var  d188   = d187.join(d5, Seq("_1"), "left_outer")
var  d189   = d188.join(d5, Seq("_1"), "left_outer")
var  d190   = d189.join(d5, Seq("_1"), "left_outer")
var  d191   = d190.join(d5, Seq("_1"), "left_outer")
var  d192   = d191.join(d5, Seq("_1"), "left_outer")
var  d193   = d192.join(d5, Seq("_1"), "left_outer")
var  d194   = d193.join(d5, Seq("_1"), "left_outer")
var  d195   = d194.join(d5, Seq("_1"), "left_outer")
var  d196   = d195.join(d5, Seq("_1"), "left_outer")
var  d197   = d196.join(d5, Seq("_1"), "left_outer")
var  d198   = d197.join(d5, Seq("_1"), "left_outer")
var  d199   = d198.join(d5, Seq("_1"), "left_outer")
var  d200   = d199.join(d5, Seq("_1"), "left_outer")
var  d201   = d200.join(d5, Seq("_1"), "left_outer")
var  d202   = d201.join(d5, Seq("_1"), "left_outer")
var  d203   = d202.join(d5, Seq("_1"), "left_outer")
var  d204   = d203.join(d5, Seq("_1"), "left_outer")
var  d205   = d204.join(d5, Seq("_1"), "left_outer")
var  d206   = d205.join(d5, Seq("_1"), "left_outer")
var  d207   = d206.join(d5, Seq("_1"), "left_outer")
var  d208   = d207.join(d5, Seq("_1"), "left_outer")
var  d209   = d208.join(d5, Seq("_1"), "left_outer")
var  d210   = d209.join(d5, Seq("_1"), "left_outer")
var  d211   = d210.join(d5, Seq("_1"), "left_outer")
var  d212   = d211.join(d5, Seq("_1"), "left_outer")
var  d213   = d212.join(d5, Seq("_1"), "left_outer")
var  d214   = d213.join(d5, Seq("_1"), "left_outer")
var  d215   = d214.join(d5, Seq("_1"), "left_outer")  
var  d216   = d215.join(d5, Seq("_1"), "left_outer")
var  d217   = d216.join(d5, Seq("_1"), "left_outer")  
var  d218   = d217.join(d5, Seq("_1"), "left_outer")
var  d219   = d218.join(d5, Seq("_1"), "left_outer")
var  d220   = d219.join(d5, Seq("_1"), "left_outer")
var  d221   = d220.join(d5, Seq("_1"), "left_outer")
var  d222   = d221.join(d5, Seq("_1"), "left_outer")
var  d223   = d222.join(d5, Seq("_1"), "left_outer")
var  d224   = d223.join(d5, Seq("_1"), "left_outer")
var  d225   = d224.join(d5, Seq("_1"), "left_outer")
var  d226   = d225.join(d5, Seq("_1"), "left_outer")
var  d227   = d226.join(d5, Seq("_1"), "left_outer")
var  d228   = d227.join(d5, Seq("_1"), "left_outer")
var  d229   = d228.join(d5, Seq("_1"), "left_outer")
var  d230   = d229.join(d5, Seq("_1"), "left_outer")
var  d231   = d230.join(d5, Seq("_1"), "left_outer")
var  d232   = d231.join(d5, Seq("_1"), "left_outer")
var  d233   = d232.join(d5, Seq("_1"), "left_outer")
var  d234   = d233.join(d5, Seq("_1"), "left_outer")
var  d235   = d234.join(d5, Seq("_1"), "left_outer")
var  d236   = d235.join(d5, Seq("_1"), "left_outer")
var  d237   = d236.join(d5, Seq("_1"), "left_outer")
var  d238   = d237.join(d5, Seq("_1"), "left_outer")
var  d239   = d238.join(d5, Seq("_1"), "left_outer")
var  d240   = d239.join(d5, Seq("_1"), "left_outer")
var  d241   = d240.join(d5, Seq("_1"), "left_outer")
var  d242   = d241.join(d5, Seq("_1"), "left_outer")
var  d243   = d242.join(d5, Seq("_1"), "left_outer")
var  d244   = d243.join(d5, Seq("_1"), "left_outer")
var  d245   = d244.join(d5, Seq("_1"), "left_outer")
var  d246   = d245.join(d5, Seq("_1"), "left_outer")
var  d247   = d246.join(d5, Seq("_1"), "left_outer")
var  d248   = d247.join(d5, Seq("_1"), "left_outer")
var  d249   = d248.join(d5, Seq("_1"), "left_outer")
var  d250   = d249.join(d5, Seq("_1"), "left_outer")
var  d251   = d250.join(d5, Seq("_1"), "left_outer")
var  d252   = d251.join(d5, Seq("_1"), "left_outer")
var  d253   = d252.join(d5, Seq("_1"), "left_outer")
var  d254   = d253.join(d5, Seq("_1"), "left_outer")
var  d255   = d254.join(d5, Seq("_1"), "left_outer")
var  d256   = d255.join(d5, Seq("_1"), "left_outer")
var  d257   = d256.join(d5, Seq("_1"), "left_outer")
var  d258   = d257.join(d5, Seq("_1"), "left_outer")
var  d259   = d258.join(d5, Seq("_1"), "left_outer")
var  d260   = d259.join(d5, Seq("_1"), "left_outer")
var  d261   = d260.join(d5, Seq("_1"), "left_outer")
var  d262   = d261.join(d5, Seq("_1"), "left_outer")
var  d263   = d262.join(d5, Seq("_1"), "left_outer")
var  d264   = d263.join(d5, Seq("_1"), "left_outer")
var  d265   = d264.join(d5, Seq("_1"), "left_outer")
var  d266   = d265.join(d5, Seq("_1"), "left_outer")
var  d267   = d266.join(d5, Seq("_1"), "left_outer")
var  d268   = d267.join(d5, Seq("_1"), "left_outer")
var  d269   = d268.join(d5, Seq("_1"), "left_outer")
var  d270   = d269.join(d5, Seq("_1"), "left_outer")
var  d271   = d270.join(d5, Seq("_1"), "left_outer")
var  d272   = d271.join(d5, Seq("_1"), "left_outer")
var  d273   = d272.join(d5, Seq("_1"), "left_outer")
var  d274   = d273.join(d5, Seq("_1"), "left_outer")
var  d275   = d274.join(d5, Seq("_1"), "left_outer")
var  d276   = d275.join(d5, Seq("_1"), "left_outer")
var  d277   = d276.join(d5, Seq("_1"), "left_outer")
var  d278   = d277.join(d5, Seq("_1"), "left_outer")
var  d279   = d278.join(d5, Seq("_1"), "left_outer")
var  d280   = d279.join(d5, Seq("_1"), "left_outer")
var  d281   = d280.join(d5, Seq("_1"), "left_outer")
var  d282   = d281.join(d5, Seq("_1"), "left_outer")
var  d283   = d282.join(d5, Seq("_1"), "left_outer")
var  d284   = d283.join(d5, Seq("_1"), "left_outer")
var  d285   = d284.join(d5, Seq("_1"), "left_outer")
var  d286   = d285.join(d5, Seq("_1"), "left_outer")
var  d287   = d286.join(d5, Seq("_1"), "left_outer")
var  d288   = d287.join(d5, Seq("_1"), "left_outer")
var  d289   = d288.join(d5, Seq("_1"), "left_outer")
var  d290   = d289.join(d5, Seq("_1"), "left_outer")
var  d291   = d290.join(d5, Seq("_1"), "left_outer")
var  d292   = d291.join(d5, Seq("_1"), "left_outer")
var  d293   = d292.join(d5, Seq("_1"), "left_outer")
var  d294   = d293.join(d5, Seq("_1"), "left_outer")
var  d295   = d294.join(d5, Seq("_1"), "left_outer")
var  d296   = d295.join(d5, Seq("_1"), "left_outer")
var  d297   = d296.join(d5, Seq("_1"), "left_outer")
var  d298   = d297.join(d5, Seq("_1"), "left_outer")
var  d299   = d298.join(d5, Seq("_1"), "left_outer")
var  d300   = d299.join(d5, Seq("_1"), "left_outer")

不确定这是代码设计问题还是配置问题,我已经尝试了许多配置,包括以下内容:
  spark-shell 
  --master yarn-client  
  --executor-memory 8g  
  --executor-cores 2 
  --driver-java-options 
  "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -Xms1g  -XX:MaxNewSize=2g"  
  --conf spark.network.timeout=10000000  
  --conf spark.executor.heartbeatInterval=10000000 
  --conf spark.dynamicAllocation.enabled=true -i <program name> 

最佳答案

您可以使用 foldleft 进行连接,而不是手动进行。
你不需要在这里使用 var 。只需使用 val。
如果连接很重,最好在每次连接后坚持以防止失败。

  val joinFunction: (DataFrame, DataFrame) => DataFrame = (accumulator, tableToJoin) => {
  val result = accumulator.join(tableToJoin, Seq("_1"), "left_outer").persist()
  accumulator.unpersist()
  Logger.getLogger(getClass).info(s"Joined table count: ${result.count()}")
  result
}

val sqlCtx = sqlContext
import sqlCtx.implicits._
val data = Seq((1,2))
val data2 = Seq((1,3))
val rdd1 = sc.parallelize(data)
val rdd2 = sc.parallelize(data2)
val d2 = rdd1.toDF()
val d5 = rdd2.toDF()
val d3: DataFrame = d2.join(d5, Seq("_1"), "left_outer")
val listDF  = Range(1, 10).map(x => d3).toList
val result: DataFrame =  listDF.reduce{joinFunction}

关于apache-spark - Spark Multiple Join 内存不足错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47994961/

相关文章:

java - Spark /Java : NoClassDefFoundError in GSON dependency

apache-spark - 我可以在 pyspark 中提取 Logistic 回归系数的有效值吗

python - 如何使用 pyspark 读取 Parquet 文件、更改数据类型并写入 Hadoop 中的另一个 Parquet 文件

mysql - 从两个不同的表中查询同一列

c# - Linq:连接中的 == 和 equals 有什么区别?

linq - Linq Join 中的大于条件

python - reduceByKey 在 Spark 中有两列

scala - Spark RDD 或 SQL 操作来计算条件计数

python - 基于内部连接连接字符串

python - 如何在一列和一个索引上连接两个 Pandas 数据框