我正在尝试做一些相当简单的事情。我有一个日期时间对象作为我的数据框的一部分,在制作 map 时,我想以特定的方式格式化日期。我创建了一个自定义函数:
def format_date(dt):
"""Set this for date formatting. dt is datetime."""
return dt.strftime("%Y/%m/%d %H:%M:%S")
然后,我在 map 调用中使用它(x.t 是一个日期时间对象):
unique = df.map(lambda x: (x.id,[[format_date(x.t),x.val]]))\
.reduceByKey(lambda x,y: x+y)\
.collectAsMap()
当作为作业提交时,这会导致以下异常:
An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 9, preteckt1.softlayer.com): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
command = pickleSer._read_with_length(infile)
File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
ImportError: No module named analysis
请注意,我的脚本名称是“run_analysiss.py”,并从“analysis.py”导入所有函数。我使用
提交作业/opt/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --total-executor-cores 12 run_analyses.py
最奇怪的是,如果我将代码复制到交互式 pyspark session (或者如果我删除 format_date 调用),它就可以正常工作。我可以通过创建一个新列并在 format_date 函数上使用 UDF 来创建新列来解决此问题,但我想知道为什么这种方法会失败。
我在下面粘贴了更完整的代码。
编辑:如果我直接从analysis.py运行代码,它似乎会成功,但如果我从run_analysis.py运行它,则会失败。我更改了下面的代码以更准确地显示这一点。
run_analysis.py
import datetime, json, math, subprocess
from os.path import expanduser
from pyspark import SparkContext
from pyspark.sql import SQLContext, HiveContext
from analysis import *
sc = SparkContext()
sqlCtx = HiveContext(sc)
ids = {}
...
my_func(sqlCtx,ids)
分析.py
def my_func(sqlCtx,ids):
df = sqlCtx.read.format("org.apache.spark.sql.cassandra").load(table="table_name", keyspace="keyspace_name").select("id","t","val")
df = df.filter((df.t > last_week)&(df.t < now))
df = df.filter(df.val > 0)
write_vals(df)
...
def write_vals(df):
unique = df.map(lambda x: (x.id,[[format_date(x.t),x.val]]))\
.reduceByKey(lambda x,y: x+y)\
.collectAsMap()
...
return
最佳答案
关键在于回溯:
ImportError: No module named analysis
PySpark 告诉您工作进程无权访问analysis.py。初始化 SparkContext 时,您可以传递应复制到工作线程的文件列表:
sc = SparkContext("local", "App Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg'])
更多信息: https://spark.apache.org/docs/0.9.0/python-programming-guide.html#standalone-use
关于python - Pyspark - 在 lambda 中调用函数会导致导入错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36481137/