python - pyspark:Spark 2.3 中的 arrays_zip 等效项

标签 python arrays apache-spark pyspark

arrays_zip的等价函数怎么写在 Spark 2.3 中?

来自 Spark 2.4 的源代码

def arrays_zip(*cols):
    """
    Collection function: Returns a merged array of structs in which the N-th struct contains all
    N-th values of input arrays.

    :param cols: columns of arrays to be merged.

    >>> from pyspark.sql.functions import arrays_zip
    >>> df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
    >>> df.select(arrays_zip(df.vals1, df.vals2).alias('zipped')).collect()
    [Row(zipped=[Row(vals1=1, vals2=2), Row(vals1=2, vals2=3), Row(vals1=3, vals2=4)])]
    """
    sc = SparkContext._active_spark_context
    return Column(sc._jvm.functions.arrays_zip(_to_seq(sc, cols, _to_java_column)))

如何在 PySpark 中实现类似的目标?

最佳答案

您可以通过创建用户定义函数来实现这一点

import pyspark.sql.functions as f
import pyspark.sql.types as t

arrays_zip_ = f.udf(lambda x, y: list(zip(x, y)),  
      t.ArrayType(t.StructType([
          # Choose Datatype according to requirement
          t.StructField("first", t.IntegerType()),
          t.StructField("second", t.StringType())
  ])))

df = spark.createDataFrame([(([1, 2, 3], ['2', '3', '4']))], ['first', 'second'])

现在结果为 Spark <=2.3
df.select(arrays_zip_('first', 'second').alias('zipped')).show(2,False)

+------------------------+
|zipped                  |
+------------------------+
|[[1, 2], [2, 3], [3, 4]]|
+------------------------+

结果是 Spark 2.4 版
df.select(f.arrays_zip('first', 'second').alias('zipped')).show(2,False)

+------------------------+
|zipped                  |
+------------------------+
|[[1, 2], [2, 3], [3, 4]]|
+------------------------+

关于python - pyspark:Spark 2.3 中的 arrays_zip 等效项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61503929/

相关文章:

scala - 如何使用 Scala 聚合 Spark 数据帧以获得稀疏向量?

python - 如何在kivy窗口中放置两个按钮?

javascript - 将关联数组添加到 jQuery 对象作为属性

javascript - 如何使用 JavaScript 从多维对象数组构建 URL?

javascript - 2 个阵列作为 1 个

apache-spark - 带有 HiveContext 的 Apache Spark 查询不起作用

python - Pyspark 爆炸列表创建列表中带有索引的列

python - 检查多个文件之间重复数据的最有效方法是什么?

python - 间歇性 "getrandom() initialization failed"使用 scrapy spider

python - Python mmap 和 multiprocessing.semaphore 的竞争条件