python - spark中的高效对称计算

标签 python apache-spark pyspark


for (int i = 0; i < n ; i++) {
    for (int j = i+1; j < n ; j++) {
        [compute x]
        objects[i][j] += x;
        objects[j][i] -= x;
这(虽然仍然具有 O(n^2) 复杂性)减少了利用对称性所需的计算量。您能告诉我在 pyspark 代码中引入这种优化的方法是什么吗?
例如,我编写了代码,根据公式(其中 r 是位置)计算作用在系统中每个粒子上的每单位质量的力:
         N    m_j*(r_i - r_j)
F = -G * Σ   -----------------
        i!=j   |r_i - r_j|^3
在其中,我首先将我的数据帧与自身进行叉积以获得每个成对的相互作用,然后通过 id 将它们全部聚合以获得作用在每个粒子上的总力:
def calc_F(df_clust, G=1):

    # cartesian product of the dataframe with itself
    renameCols = [f"`{col}` as `{col}_other`" for col in df_clust.columns]
    df_cart = df_clust.crossJoin(df_clust.selectExpr(renameCols))
    df_clust_cartesian = df_cart.filter("id != id_other")

    df_F_cartesian = df_clust_cartesian.selectExpr("id", "id_other", "m_other",
                                                   "`x` - `x_other` as `diff(x)`",
                                                   "`y` - `y_other` as `diff(y)`",
                                                   "`z` - `z_other` as `diff(z)`"
    df_F_cartesian = df_F_cartesian.selectExpr("id", "id_other",
                                               "`diff(x)` * `m_other` as `num(x)`",
                                               "`diff(y)` * `m_other` as `num(y)`",
                                               "`diff(z)` * `m_other` as `num(z)`",
                                               "sqrt(`diff(x)` * `diff(x)` + `diff(y)`"
                                               "* `diff(y)` + `diff(z)` * `diff(z)`) as `denom`",
    df_F_cartesian = df_F_cartesian.selectExpr("id", "id_other",
                                               "`num(x)` / pow(`denom`, 3) as `Fx`",
                                               "`num(y)` / pow(`denom`, 3) as `Fy`",
                                               "`num(z)` / pow(`denom`, 3) as `Fz`",
    # squish back to inital particles
    sumCols = ["Fx", "Fy", "Fz"]
    df_agg = df_F_cartesian.groupBy("id").sum(*sumCols)
    renameCols = [f"`sum({col})` as `{col}`" for col in sumCols]
    df_F = df_agg.selectExpr("id", *renameCols)

    df_F = df_F.selectExpr("id",
                           f"`Fx` * {-G} as Fx",
                           f"`Fy` * {-G} as Fy",
                           f"`Fz` * {-G} as Fz")

    return df_F
但我知道两个粒子之间的力是对称的 - F_ij = -F_ji (我假设所有质量都相等) - 所以在这里我计算了两次力,而不是重复使用它们。所以在这种特殊情况下,我想转 df_clust_cartesian = df_cart.filter("id != id_other")进入 df_clust_cartesian = df_cart.filter("id < id_other")例如,在计算函数的第二部分中的总力时以某种方式重用这些力。 (当然,理想情况下,我想学习一般情况下的做法)
a = sc.parallelize([

from pyspark.sql.types import * 
clust_input = StructType([ 
    StructField('x',  DoubleType(), False), 
    StructField('y',  DoubleType(), False), 
    StructField('z',  DoubleType(), False), 
    StructField('vx', DoubleType(), False), 
    StructField('vy', DoubleType(), False), 
    StructField('vz', DoubleType(), False), 
    StructField('m',  DoubleType(), False), 
    StructField('id', IntegerType(), False) 

df_clust = a.toDF(schema=clust_input) 


基本上你只想在 id < other_id 时计算你的公式,并使用该结果通过对称生成 id > other_id 的所有元素.

df_clust_cartesian = df_cart.filter("id < id_other")
然后,一旦你有了数据框 df_F_cartesian每个 (id, id_other) 您只有一行一对。您可以使用该行生成对应于 (id_other, id) 的行。并为 Fx, Fy 添加一个减号和 Fz .
from pyspark.sql import functions as F

sumCols = ["Fx", "Fy", "Fz"]
oppositeSums = [ (-F.col(c)).alias(c) for c in sumCols]
df_F_cartesian =
    F.struct(F.col("id"), *sumCols),
    F.struct(F.col("id_other").alias("id"), *oppositeSums)

