python - spark中的高效对称计算

标签 python apache-spark pyspark

我在包含对称性的算法中看到的一个常见结构是

for (int i = 0; i < n ; i++) {
    for (int j = i+1; j < n ; j++) {
        [compute x]
        objects[i][j] += x;
        objects[j][i] -= x;
    }
}
这(虽然仍然具有 O(n^2) 复杂性)减少了利用对称性所需的计算量。您能告诉我在 pyspark 代码中引入这种优化的方法是什么吗?
例如,我编写了代码,根据公式(其中 r 是位置)计算作用在系统中每个粒子上的每单位质量的力:
         N    m_j*(r_i - r_j)
F = -G * Σ   -----------------
        i!=j   |r_i - r_j|^3
在其中,我首先将我的数据帧与自身进行叉积以获得每个成对的相互作用,然后通过 id 将它们全部聚合以获得作用在每个粒子上的总力:
def calc_F(df_clust, G=1):

    # cartesian product of the dataframe with itself
    renameCols = [f"`{col}` as `{col}_other`" for col in df_clust.columns]
    df_cart = df_clust.crossJoin(df_clust.selectExpr(renameCols))
    df_clust_cartesian = df_cart.filter("id != id_other")

    df_F_cartesian = df_clust_cartesian.selectExpr("id", "id_other", "m_other",
                                                   "`x` - `x_other` as `diff(x)`",
                                                   "`y` - `y_other` as `diff(y)`",
                                                   "`z` - `z_other` as `diff(z)`"
                                                   )
    df_F_cartesian = df_F_cartesian.selectExpr("id", "id_other",
                                               "`diff(x)` * `m_other` as `num(x)`",
                                               "`diff(y)` * `m_other` as `num(y)`",
                                               "`diff(z)` * `m_other` as `num(z)`",
                                               "sqrt(`diff(x)` * `diff(x)` + `diff(y)`"
                                               "* `diff(y)` + `diff(z)` * `diff(z)`) as `denom`",
                                               )
    df_F_cartesian = df_F_cartesian.selectExpr("id", "id_other",
                                               "`num(x)` / pow(`denom`, 3) as `Fx`",
                                               "`num(y)` / pow(`denom`, 3) as `Fy`",
                                               "`num(z)` / pow(`denom`, 3) as `Fz`",
                                               )
    # squish back to inital particles
    sumCols = ["Fx", "Fy", "Fz"]
    df_agg = df_F_cartesian.groupBy("id").sum(*sumCols)
    renameCols = [f"`sum({col})` as `{col}`" for col in sumCols]
    df_F = df_agg.selectExpr("id", *renameCols)

    df_F = df_F.selectExpr("id",
                           f"`Fx` * {-G} as Fx",
                           f"`Fy` * {-G} as Fy",
                           f"`Fz` * {-G} as Fz")

    return df_F
但我知道两个粒子之间的力是对称的 - F_ij = -F_ji (我假设所有质量都相等) - 所以在这里我计算了两次力,而不是重复使用它们。所以在这种特殊情况下,我想转 df_clust_cartesian = df_cart.filter("id != id_other")进入 df_clust_cartesian = df_cart.filter("id < id_other")例如,在计算函数的第二部分中的总力时以某种方式重用这些力。 (当然,理想情况下,我想学习一般情况下的做法)
这种情况下的示例输入是
a = sc.parallelize([
    [0.48593906,-0.52435857,-0.53198230,0.46153894,-0.33775792E-01,-0.32276499,0.15625001E-04,1],
    [-0.65960690E-01,0.80844238E-01,-0.27603051,-0.57578009,1.1078150,-0.29340765,0.15625001E-04,2],
    [-0.34809157E-01,0.76795481E-01,-0.39087987,-0.55399138,-0.17386098,0.59250806E-01,0.15625001E-04,3]
])                                                           

from pyspark.sql.types import * 
 
clust_input = StructType([ 
    StructField('x',  DoubleType(), False), 
    StructField('y',  DoubleType(), False), 
    StructField('z',  DoubleType(), False), 
    StructField('vx', DoubleType(), False), 
    StructField('vy', DoubleType(), False), 
    StructField('vz', DoubleType(), False), 
    StructField('m',  DoubleType(), False), 
    StructField('id', IntegerType(), False) 
])    

df_clust = a.toDF(schema=clust_input) 

最佳答案

基本上你只想在 id < other_id 时计算你的公式,并使用该结果通过对称生成 id > other_id 的所有元素.
你只需要通过这个修改你的过滤器

df_clust_cartesian = df_cart.filter("id < id_other")
然后,一旦你有了数据框 df_F_cartesian每个 (id, id_other) 您只有一行一对。您可以使用该行生成对应于 (id_other, id) 的行。并为 Fx, Fy 添加一个减号和 Fz .
这可以通过在聚合之前添加以下步骤来完成:
from pyspark.sql import functions as F

sumCols = ["Fx", "Fy", "Fz"]
oppositeSums = [ (-F.col(c)).alias(c) for c in sumCols]
df_F_cartesian = df_F_cartesian.select(F.explode(F.array(
    F.struct(F.col("id"), *sumCols),
    F.struct(F.col("id_other").alias("id"), *oppositeSums)
)).alias("s")).select("s.*")

关于python - spark中的高效对称计算,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64308251/

相关文章:

python - Django MEDIA_URL 在模板中不起作用

python - 有没有办法在我自己的python项目中自动检测所需的模块和包

scala - 如何从 HDFS 检索 Avro 数据?

java - 如何在 Hadoop 文件系统中获取绝对路径?

java - PySpark:无法创建 SparkSession。(Java 网关错误)

python - open()函数python默认目录

python - 使用 Flask 显示 stackOverflow API JSON 数据

apache-spark - Spark 作业继续显示TaskCommitDenied(驱动程序拒绝任务提交)

python - PySpark 无法正确读取 CSV

python - 迭代目录中的文件并使用文件名作为变量,并将文件路径分配给变量