python - 如何在 Pyspark 中使用 Scala 类

标签 python scala apache-spark pyspark apache-spark-sql

我一直在寻找是否有任何方法可以在 Pyspark 中使用 Scala 类,但我没有找到任何相关文档或指南主题。

假设我在 Scala 中创建了一个简单的类,它使用了一些 apache-spark 库,例如:

class SimpleClass(sqlContext: SQLContext, df: DataFrame, column: String) {
  def exe(): DataFrame = {
    import sqlContext.implicits._

    df.select(col(column))
  }
}
  • 有没有办法在 Pyspark 中使用这个类?
  • 是不是太难了?
  • 我必须创建一个 .py 文件吗?
  • 是否有任何指南说明如何做到这一点?

顺便说一句,我还查看了 spark 代码,感觉有点失落,我无法为自己的目的复制它们的功能。

最佳答案

是的,它是可能的,尽管它可能远非微不足道。通常,您需要一个 Java(友好的)包装器,这样您就不必处理使用普通 Java 无法轻松表达的 Scala 功能,因此无法很好地与 Py4J 网关配合使用。

假设您的类在包 com.example 中,并且 Python DataFrame 称为 df

df = ... # Python DataFrame

你必须:

  1. 使用 your favorite build tool 构建一个 jar .

  2. 将其包含在驱动程序类路径中,例如使用 PySpark shell/spark-submit--driver-class-path 参数。根据确切的代码,您可能还必须使用 --jars 传递它

  3. 从 Python SparkContext 实例中提取 JVM 实例:

    jvm = sc._jvm
    
  4. SQLContext 实例中提取 Scala SQLContext:

    ssqlContext = sqlContext._ssql_ctx
    
  5. df 中提取 Java DataFrame:

    jdf = df._jdf
    
  6. 创建 SimpleClass 的新实例:

    simpleObject = jvm.com.example.SimpleClass(ssqlContext, jdf, "v")
    
  7. 调用exe 方法并使用Python DataFrame 包装结果:

    from pyspark.sql import DataFrame
    
    DataFrame(simpleObject.exe(), ssqlContext)
    

结果应该是一个有效的 PySpark DataFrame。您当然可以将所有步骤合并到一个调用中。

重要提示:这种方法只有在 Python 代码仅在驱动程序上执行时才可行。它不能在 Python Action 或转换中使用。见 How to use Java/Scala function from an action or a transformation?了解详情。

关于python - 如何在 Pyspark 中使用 Scala 类,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36023860/

相关文章:

python - 任何好的 Python HTTP 代理?

scala - 在 scala 中的 Play 和控制台应用程序之间共享代码

scala - 通过交错值合并两个集合

scala - 我们可以通过保存 parquet 文件来复制 Spark 的 .cache() 行为吗?

python - 在 PySpark 中读取文本文件时有没有办法控制分区数

python - 将打开的文件从一个函数传递到另一个函数

Python:将索引作为新列添加到二维数组

python - 如何在 Python 中调试 MemoryError?跟踪内存使用的工具?

java - Rectangle2D.contains() 无法正确检测边界中的坐标

python - PySpark:在窗口上加盐并倾斜的 CumSum