python - 尝试运行 pyspark 时无法初始化主类 org.apache.spark.deploy.SparkSubmit

标签 python apache-spark pyspark conda

我有 conda 安装 python 3.7

$python3 --version
Python 3.7.6

pyspark 是通过 pip3 install 安装的(conda 没有它的原生包)。

$conda list | grep pyspark
pyspark                   2.4.5                    pypi_0    pypi

这是 pip3 告诉我的:

$pip3 install pyspark
Requirement already satisfied: pyspark in ./miniconda3/lib/python3.7/site-packages (2.4.5)
Requirement already satisfied: py4j==0.10.7 in ./miniconda3/lib/python3.7/site-packages (from pyspark) (0.10.7)

jdk 11 已安装:

    $java -version
    openjdk version "11.0.2" 2019-01-15
    OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
    OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)

当尝试import pyspark 时,事情并不顺利。这是一个迷你测试程序:

from pyspark.sql import SparkSession
import os, sys
def setupSpark():
    os.environ["PYSPARK_SUBMIT_ARGS"] = "pyspark-shell"
    spark = SparkSession.builder.appName("myapp").master("local").getOrCreate()
    return spark

sp = setupSpark()
df = sp.createDataFrame({'a':[1,2,3],'b':[4,5,6]})
df.show()

结果是:

Error: Unable to initialize main class org.apache.spark.deploy.SparkSubmit Caused by: java.lang.NoClassDefFoundError: org/apache/log4j/spi/Filter

这里是完整的细节:

$python3 sparktest.py 
Error: Unable to initialize main class org.apache.spark.deploy.SparkSubmit
Caused by: java.lang.NoClassDefFoundError: org/apache/log4j/spi/Filter
Traceback (most recent call last):
  File "sparktest.py", line 9, in <module>
    sp = setupSpark()
  File "sparktest.py", line 6, in setupSpark
    spark = SparkSession.builder.appName("myapp").master("local").getOrCreate()
  File "/Users/steve/miniconda3/lib/python3.7/site-packages/pyspark/sql/session.py", line 173, in getOrCreate
    sc = SparkContext.getOrCreate(sparkConf)
  File "/Users/steve/miniconda3/lib/python3.7/site-packages/pyspark/context.py", line 367, in getOrCreate
    SparkContext(conf=conf or SparkConf())
  File "/Users/steve/miniconda3/lib/python3.7/site-packages/pyspark/context.py", line 133, in __init__
    SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
  File "/Users/steve/miniconda3/lib/python3.7/site-packages/pyspark/context.py", line 316, in _ensure_initialized
    SparkContext._gateway = gateway or launch_gateway(conf)
  File "/Users/steve/miniconda3/lib/python3.7/site-packages/pyspark/java_gateway.py", line 46, in launch_gateway
    return _launch_gateway(conf)
  File "/Users/steve/miniconda3/lib/python3.7/site-packages/pyspark/java_gateway.py", line 108, in _launch_gateway
    raise Exception("Java gateway process exited before sending its port number")
Exception: Java gateway process exited before sending its port number

如有任何关于 conda 工作环境的指示或信息,我们将不胜感激。

更新 pyspark 可能只能从 conda-forge 获得:我最近才开始将它用于 conda install .但它不会改变结果:

conda install -c conda-forge conda-forge::pyspark

Collecting package metadata (current_repodata.json): done
Solving environment: done


# All requested packages already installed.

重新运行上面的代码仍然给我们:

Error: Unable to initialize main class org.apache.spark.deploy.SparkSubmit
Caused by: java.lang.NoClassDefFoundError: org/apache/log4j/spi/Filter

最佳答案

以下步骤用于在 Conda 环境中运行您的迷你测试程序:

第 1 步:创建并激活新的 Conda 环境

conda create -n test python=3.7 -y
conda activate test

第 2 步:安装最新的 pysparkpandas

pip install -U pyspark pandas   # Note: I also tested pyspark version 2.4.7

第 3 步:运行迷你测试。 (我更新了一些更改以从 DataFrame 而不是 dict 创建 DataFrame)

from pyspark.sql import SparkSession
import os, sys
import pandas as pd

def setupSpark():
    os.environ["PYSPARK_SUBMIT_ARGS"] = "pyspark-shell"
    spark = SparkSession.builder.appName("myapp").master("local").getOrCreate()
    return spark

sp = setupSpark()
df = sp.createDataFrame(pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}))
df.show()

第四步:享受输出

+---+---+
|  a|  b|
+---+---+
|  1|  4|
|  2|  5|
|  3|  6|
+---+---+

我用来安装pyspark的Java版本

$ java -version
java version "15.0.2" 2021-01-19
Java(TM) SE Runtime Environment (build 15.0.2+7-27)
Java HotSpot(TM) 64-Bit Server VM (build 15.0.2+7-27, mixed mode, sharing)

关于python - 尝试运行 pyspark 时无法初始化主类 org.apache.spark.deploy.SparkSubmit,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64287893/

相关文章:

python - 即使没有 maxDF 参数,如何将 maxDF 设置为 pyspark.ml.feature.CountVectorizer?

python - 解析 flask-restful 中的整数列表

python - 属性错误 : 'NoneType' object has no attribute 'loader'

performance - 为什么要使用 DataFrame.select 而不是 DataFrame.rdd.map (反之亦然)?

返回整数数组的 Java Spark UDF 给我 ClassException

scala - 非时间戳列上的 Spark 结构化流窗口

python - 数据分组问题,但基于 "window"

python - 访问 WrappedArray 元素

python - 如何在 Python 中从其他项目中导入函数?

python:命名具有两个单词名称的模块