python - Pig Python UDF在MapReduce模式下失败

标签 python hadoop apache-pig

我有一个小的Pig脚本,在其中使用最近引入的StreamingUDF功能来调用Python UDF:

REGISTER 'process_tweet.py' USING streaming_python AS process_tweet;
REGISTER /usr/lib/hbase/lib/*.jar

tweets = LOAD 'hbase://brand_tweets' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('data:json') AS (json:chararray);
tweets = LIMIT tweets 100;

tweets = foreach tweets generate flatten(process_tweet.extract(json)) as (userid:long, text: chararray);

dump tweets;

Python函数process_tweet.extract基本上只是反序列化JSON对象(tweet),并返回其中的一些值。
from pig_util import outputSchema
import json


    @outputSchema("(userid:long, text:chararray)")
    def extract(tweet):
        content = json.loads(tweet)
        return long(content['user']['id']), content['text'] 

在本地模式(pig -x local)下执行时,脚本运行无错误,并返回预期的输出。但是,在MapReduce模式下,作业将花费很长时间运行,然后失败,并显示以下错误消息:
Backend error message
---------------------
AttemptID:attempt_1394221905204_0172_r_000000_1 Info:Error: org.apache.pig.backend.executionengine.ExecException: ERROR 0: Exception while executing [POUserFunc (Name: POUserFunc(org.apache.pig.impl.builtin.StreamingUDF)[tuple] - scope-7 Operator Key: scope-7) children: null at []]: java.lang.NullPointerException
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:338)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:378)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNextTuple(POForEach.java:298)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.runPipeline(PigGenericMapReduce.java:464)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.processOnePackageOutput(PigGenericMapReduce.java:432)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:412)
    at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:256)
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
    at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:645)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:405)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)
Caused by: java.lang.NullPointerException
    at org.apache.pig.impl.builtin.StreamingUDF.getControllerPath(StreamingUDF.java:268)
    at org.apache.pig.impl.builtin.StreamingUDF.constructCommand(StreamingUDF.java:199)
    at org.apache.pig.impl.builtin.StreamingUDF.startUdfController(StreamingUDF.java:163)
    at org.apache.pig.impl.builtin.StreamingUDF.initialize(StreamingUDF.java:156)
    at org.apache.pig.impl.builtin.StreamingUDF.exec(StreamingUDF.java:146)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNext(POUserFunc.java:330)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc.getNextTuple(POUserFunc.java:369)
    at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:333)
    ... 14 more

最佳答案

事实证明,Hadoop 2不支持StreamingUDF。

https://issues.apache.org/jira/browse/PIG-3478

关于python - Pig Python UDF在MapReduce模式下失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23394956/

相关文章:

python - 从另一个数组更新 numpy 二维数组索引

bash - 如何查看整个根 hdfs 磁盘使用情况? (hadoop dfs -du/gets 子文件夹)

hadoop - 无法启动 Ambari 服务

hadoop - 在 Pig 中过滤基于时间的数据

python - pandas.DataFrame.columns.values.tolist() 和 pandas.DataFrame.columns.tolist() 一样吗

python - 为什么我的第二个标签没有更新?

python - 需要将 lambda 函数(python)加入 YAML 中的 CloudFormation 模板,我怎样才能实现这一点?

json - HIVE 以 json 格式插入覆盖目录

hadoop - 通过 Pig 转储中间 MR 作业数据

hadoop - 如何使用 PIG 脚本合并两个文件?