apache-spark - PySpark - 获取具有相同值的数组元素的数量

标签 apache-spark pyspark

我正在学习 Spark,但遇到了无法克服的问题。
我想要实现的是为相同位置的 2 个数组获取具有相同值的元素数量。我可以通过 Python UDF 获得我想要的东西,但我想知道是否有一种方法只使用 Spark 函数。

df_bits = sqlContext.createDataFrame([[[0, 1, 1, 0, 0, ],
                                       [1, 1, 1, 0, 1, ],
                                     ]],['bits1', 'bits2'])
df_bits_with_result = df_bits.select('bits1', 'bits2', some_magic('bits1', 'bits2').show()


+--------------------+--------------------+---------------------------------+
|bits1                  |bits2                  |some_magic(bits1, bits2)|
+--------------------+--------------------+---------------------------------+
|[0, 1, 1, 0, 1, ]    |[1, 1, 1, 0, 0, ]   |3                                      |
+--------------------+--------------------+---------------------------------+
为什么是 3? bits1[1] == bits2[1] AND bits1[2] == bits2[2] AND bits1[3] == bits2[3]
我试图玩 rdd.reduce 但没有运气。

最佳答案

也许这有帮助-
spark>=2.4
使用 aggregate zip_with

 val df = spark.sql("select array(0, 1, 1, 0, 0, null) as bits1, array(1, 1, 1, 0, 1, null) as bits2")
    df.show(false)
    df.printSchema()

    /**
      * +----------------+----------------+
      * |bits1           |bits2           |
      * +----------------+----------------+
      * |[0, 1, 1, 0, 0,]|[1, 1, 1, 0, 1,]|
      * +----------------+----------------+
      *
      * root
      * |-- bits1: array (nullable = false)
      * |    |-- element: integer (containsNull = true)
      * |-- bits2: array (nullable = false)
      * |    |-- element: integer (containsNull = true)
      */

    df.withColumn("x", expr("aggregate(zip_with(bits1, bits2, (x, y) -> if(x=y, 1, 0)), 0, (acc, x) -> acc + x)"))
      .show(false)

    /**
      * +----------------+----------------+---+
      * |bits1           |bits2           |x  |
      * +----------------+----------------+---+
      * |[0, 1, 1, 0, 0,]|[1, 1, 1, 0, 1,]|3  |
      * +----------------+----------------+---+
      */

关于apache-spark - PySpark - 获取具有相同值的数组元素的数量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63156999/

相关文章:

apache-spark - 如何将 PySpark(本地机器)连接到我的 EMR 集群?

apache-spark - Sparkconf 和 Sparkcontext 有什么区别?

apache-spark - 什么是 "Pre-build with user-provided Hadoop"包

python - 如何在 Spark 中使用 Dataframes 的相关性?

apache-spark - 加载 Hive 表时 Spark 创建了多少个分区

azure - 如何使 HDInsight/Spark 群集在空闲时收缩?

java - JSONLD : How to convert a json into JsonLD?

pyspark - 如何在 PySparkcollect_list 中维护排序顺序并收集多个列表

python - pyspark 中的等价概率函数

hadoop - 在Apache Spark中使用spark-submit运行应用程序时,显示警告消息