python - 使用 JEP 将数据帧从 scala 传递到 python

标签 python scala apache-spark jep

这是我正在尝试做的事情:

  1. 我将数据读入 scala
  2. 提取几列
  3. 使用 JEP 将创建的数据帧传递给 Python 脚本
  4. Python 脚本将数据帧转换为 pandas 执行一些操作并将其返回

但是我不确定如何将数据帧传递给 python 脚本。 这是 python 脚本(这只是示例脚本,而不是实际脚本):

import findspark
findspark.init()
import pandas as pd
#from pyspark.sql import types.*
from pyspark.sql import DataFrame as dataframe

def tes(df: dataframe):
    df = df.toPandas()
    df['concatenate'] = df['country'] + df['datasourceProvidedCountry']
    return dataframe(df)

并且它不断失败并出现以下错误:

jep.JepException: <class 'ImportError'>: py4j.protocol
  at /usr/local/lib64/python3.6/site-packages/jep/java_import_hook.__getattr__(java_import_hook.py:57)
  at /home/hadoop/testpy.<module>(testpy.py:5)
  at jep.Jep.run(Native Method)
  at jep.Jep.runScript(Jep.java:359)
  at jep.Jep.runScript(Jep.java:335)
  ... 49 elided
Caused by: java.lang.ClassNotFoundException: py4j.protocol
  at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  ... 52 more
spark-shell --conf spark.driver.extraLibraryPath=:/usr/local/lib64/python3.6/site-packages/jep:/usr/local/lib/python3.6/site-packages/py4j/ --jars /home/hadoop/jep-3.8.2.jar

任何人都可以建议我如何使用 Jep 将数据帧从 scala 传递到 pyspark (如果这是重复的,请指出我正确的线程,因为我找不到一个)?

最佳答案

我有同样的要求,也尝试过 Jep 。不幸的是,Jep 不适用于此用例。

找不到py4j.protocol是由Jep ClassEnquirer引起的,当python和jave都有同名的库时,Jep会考虑java库。您可以通过从 java 应用程序中排除 Spark 包中的 py4j 来解决此问题,或者创建自定义的 ClassEnquirer 以考虑 python py4j。

您还需要更新 Jep 构造函数,将 useSubInterpreter 值设置为 false 并重建它。

public Jep(JepConfig config) throws JepException {
    this(config, false);
}

现在错误应该已解决。但是,传递给 python 函数的对象是包含 java 引用的 PyObject,它不是 pyspark dataframe 对象,因此它没有 toPandas() 函数。

替代方法可能是使用 gRPC 或 Apache thrift,您可以查看文档了解更多详细信息。

关于python - 使用 JEP 将数据帧从 scala 传递到 python,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56097529/

相关文章:

apache-spark - 使用并行化创建键/值对 RDD?

scala - 如何在 ML Pipeline 中访问底层模型的参数?

apache-spark - Spark中密集等级和行数的差异

python - 如何在不将值保存到磁盘的情况下将张量恢复到过去的值?

Scala Spark - 任务不可序列化

scala - Spark Dataframe 更改列值

scala - 使用 Play Framework 作为组件

python - 使用 scipy 计算卷积时出现 ValueError

python - 在 Python 中使用多处理创建超时函数

python - 多页 scrapy 生成我的项目太快而无法完成 - 功能未链接并等待完成