apache-spark - 在 EMR 上的 PySpark 中运行自定义 Java 类

标签 apache-spark pyspark amazon-emr livy

我正在尝试利用 Cerner Bunsen 包在 AWS EMR 上的 PySpark 中进行 FHIR 处理,特别是 Bundles 类及其方法。我正在使用 Apache Livy API 创建 Spark session ,

def create_spark_session(master_dns, kind, jars):
    # 8998 is the port on which the Livy server runs
    host = 'http://' + master_dns + ':8998'
    data = {'kind': kind, 'jars': jars}
    headers = {'Content-Type': 'application/json'}
    response = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
    logging.info(response.json())
    return response.headers

其中 kind = pyspark3 并且 jars 是存放 jar 的 S3 位置 (bunsen-shaded-1.4.7.jar)

数据转换正在尝试导入 jar 并通过以下方式调用方法:

# Setting the Spark Session and Pulling the Existing SparkContext
sc = SparkContext.getOrCreate()

# Cerner Bunsen
from py4j.java_gateway import java_import, JavaGateway
java_import(sc._gateway.jvm,"com.cerner.bunsen.Bundles")
func = sc._gateway.jvm.Bundles()

我收到的错误是

"py4j.protocol.Py4JError: An error occurred while calling None.com.cerner.bunsen.Bundles. Trace:\npy4j.Py4JException: Constructor com.cerner.bunsen.Bundles([]) does not exist"

这是我第一次尝试使用 java_import,因此我们将不胜感激。

编辑:我稍微更改了转换脚本,现在看到了不同的错误。我可以看到 jar 被添加到日志中,所以我确信它在那里,并且 jars: jars 功能正在按预期工作。新的转变是:

# Setting the Spark Session and Pulling the Existing SparkContext
sc = SparkContext.getOrCreate()

# Manage logging
#sc.setLogLevel("INFO")

# Cerner Bunsen
from py4j.java_gateway import java_import, JavaGateway
java_import(sc._gateway.jvm,"com.cerner.bunsen")
func_main = sc._gateway.jvm.Bundles
func_deep = sc._gateway.jvm.Bundles.BundleContainer

fhir_data_frame = func_deep.loadFromDirectory(spark,"s3://<bucket>/source_database/Patient",1)
fhir_data_frame_fromJson = func_deep.fromJson(fhir_data_frame)
fhir_data_frame_clean = func_main.extract_entry(spark,fhir_data_frame_fromJson,'patient')
fhir_data_frame_clean.show(20, False)

新的错误是:

'JavaPackage' object is not callable

搜索这个错误有点徒劳,但同样,如果有人有想法,我会很乐意接受。

最佳答案

如果你想在Pyspark中使用Scala/Java函数,你还必须在类路径中添加jar包。您可以通过两种不同的方式来完成:

选项1: 在 Spark 中使用标记 --jars 提交

 spark-submit example.py --jars /path/to/bunsen-shaded-1.4.7.jar

选项2:将其添加到属性中的spark-defaults.conf文件中:

在:path/to/spark/conf/spark-defaults.conf

中添加以下代码
# Comma-separated list of jars include on the driver and executor classpaths. 
spark.jars /path/to/bunsen-shaded-1.4.7.jar

关于apache-spark - 在 EMR 上的 PySpark 中运行自定义 Java 类,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59864241/

相关文章:

amazon-web-services - 如何在使用 EMR/Hive 将数据从 S3 导入 DynamoDB 时处理引号 (CSV) 中的字段

hadoop - 使用s3 dist cp将数据从非emr群集复制到s3时出现权限问题

hadoop - AWS EMR:是否为每个插槽或每个节点设置了 “mapred.child.java.opts”选项?

Maven 似乎忽略了 MAVEN_OPTS

azure - 无法删除 Azure Synapse AutoML 需求预测错误 : An invalid value for argument [y] was provided

scala - 如何将 Spark SQL DataFrame 与 flatMap 结合使用?

apache-spark - 我有一个很大的 hql 查询,我正在使用 pyspark sql 调用它。但是我收到错误,例如 Bad connect ack with firstBadLink error

python - Dataframe Spark 2.2.1 上的可调用列对象

apache-spark - 使用 Airflow 运行 Spark 流作业吗?

数据框 API 与 Spark.sql