python - Pyspark - 在 lambda 中调用函数会导致导入错误

标签 python cassandra pyspark

我正在尝试做一些相当简单的事情。我有一个日期时间对象作为我的数据框的一部分,在制作 map 时,我想以特定的方式格式化日期。我创建了一个自定义函数:

def format_date(dt):
    """Set this for date formatting. dt is datetime."""
    return dt.strftime("%Y/%m/%d %H:%M:%S")

然后,我在 map 调用中使用它(x.t 是一个日期时间对象):

unique = df.map(lambda x: (x.id,[[format_date(x.t),x.val]]))\
      .reduceByKey(lambda x,y: x+y)\
      .collectAsMap()

当作为作业提交时,这会导致以下异常:

An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 9, preteckt1.softlayer.com): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
    command = pickleSer._read_with_length(infile)
  File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
    return self.loads(obj)
  File "/opt/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
    return pickle.loads(obj)
ImportError: No module named analysis

请注意,我的脚本名称是“run_analysiss.py”,并从“analysis.py”导入所有函数。我使用

提交作业
/opt/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --total-executor-cores 12 run_analyses.py

最奇怪的是,如果我将代码复制到交互式 pyspark session (或者如果我删除 format_date 调用),它就可以正常工作。我可以通过创建一个新列并在 format_date 函数上使用 UDF 来创建新列来解决此问题,但我想知道为什么这种方法会失败。

我在下面粘贴了更完整的代码。

编辑:如果我直接从analysis.py运行代码,它似乎会成功,但如果我从run_analysis.py运行它,则会失败。我更改了下面的代码以更准确地显示这一点。

run_analysis.py

import datetime, json, math, subprocess
from os.path import expanduser
from pyspark import SparkContext
from pyspark.sql import SQLContext, HiveContext
from analysis import *

sc = SparkContext()
sqlCtx = HiveContext(sc)
ids = {}
...
my_func(sqlCtx,ids)

分析.py

def my_func(sqlCtx,ids):
    df = sqlCtx.read.format("org.apache.spark.sql.cassandra").load(table="table_name", keyspace="keyspace_name").select("id","t","val")
    df = df.filter((df.t > last_week)&(df.t < now))
    df = df.filter(df.val > 0)
    write_vals(df)
    ...

def write_vals(df):
    unique = df.map(lambda x: (x.id,[[format_date(x.t),x.val]]))\
            .reduceByKey(lambda x,y: x+y)\
            .collectAsMap()
    ...
    return

最佳答案

关键在于回溯:

ImportError: No module named analysis

PySpark 告诉您工作进程无权访问analysis.py。初始化 SparkContext 时,您可以传递应复制到工作线程的文件列表:

sc = SparkContext("local", "App Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg'])

更多信息: https://spark.apache.org/docs/0.9.0/python-programming-guide.html#standalone-use

关于python - Pyspark - 在 lambda 中调用函数会导致导入错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36481137/

相关文章:

python - 查找 Pandas 数据每两行的字符串之间的差异

python - Pyspark 中的宽数据帧操作太慢

python - LightGBM - sklearnAPI 与训练和数据结构 API 以及 lgb.cv 与 gridsearchcv/randomisedsearchcv

python - Jupyter 笔记本错误。没有这样的文件或目录 C :. ../HOME

python - Python 在哪里寻找库二进制文件?

cassandra - 创建键空间时出现错误 "Unable to find replication strategy class ' org.apache.cassandra.locator.simplestrategy"

cassandra - C * Cassandra群集和普通Cassandra群集之间有什么区别?

c# - 当 cassandra 作为服务运行时,Cassandra 添加键空间

apache-spark - Spark fillNa 不替换空值

apache-spark - 何时使用 mapParitions 和 mapPartitionsWithIndex?