hadoop - 在 PIG 中传递一个包作为 UDF 的输入

标签 hadoop mapreduce apache-pig

我正在尝试将数据包(最终)作为输入传递。

 dump final;

给出:-

(4,john,john,David,Banking ,4,M,20-01-1994,78.65,345000,Arkansasdest1,Destination)
(4,john,john,David,Banking ,4,M,20-01-1994,78.65,345000,Arkanssdest2,Destination)
(4,johns,johns,David,Banking ,4,M,20-01-1994,78.65,345000,ArkansasSrc1,source)
(4,johns,johns,David,Banking ,4,M,20-01-1994,78.65,345000,ArkansaSrc2,source)

我将要编写一个 UDF 来处理上述数据包并发现源和目标之间的不匹配,为此我必须检查我的 UDF 是否接受数据包。所以我在下面写了一个示例 UDF:

package PigUDFpck;

import java.io.IOException;
import java.util.Iterator;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;


public class databag extends EvalFunc<DataBag> {
TupleFactory mTupleFactory = TupleFactory.getInstance();
BagFactory mBagFactory = BagFactory.getInstance();

public DataBag exec(Tuple input) throws IOException { // different return type

    DataBag result = mBagFactory.newDefaultBag(); // change here
    DataBag values = (DataBag)input.get(0);
    for (Iterator<Tuple> iterator = values.iterator(); iterator.hasNext();) {
        Tuple tuple = iterator.next();

        //logic
        Tuple t = mTupleFactory.getInstance().newTuple();


        t.append(tuple);

        result.add(t);
    }
    return result; // change here
}

}

之后我用

注册了路径
REGISTER /usr/local/pig/UDF/UDFBAG.jar;
DEFINE Databag Databag(); // not sure how to define it 

2017-02-16 19:07:05,875 [main] WARN org.apache.pig.newplan.BaseOperatorPlan - 遇到警告 IMPLICIT_CAST_TO_INT 2 次。//定义后得到这个警告。

final1 = FOREACH final GENERATE(Databag(final));

错误 1200:Pig 脚本解析失败: 无效的标量投影:最终:需要从关系中投影列才能将其用作标量

请帮助我定义 UDF 以及如何将 DataBag 传递给 UDF

谢谢

最佳答案

尝试

final1 = FOREACH final GENERATE(Databag(*));

尽管据我所知,您的 final 包含元组,而不是元组包,因此您可能需要先按某个键对其进行分组。在那种情况下它会像

final1 = FOREACH (group final [by key or all]) GENERATE(Databag(final));

关于hadoop - 在 PIG 中传递一个包作为 UDF 的输入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42275735/

相关文章:

java - Hadoop 序列文件 : process key/value only up to a certain size?

performance - 估计伪分布式节点上的 Hadoop 可扩展性性能?

java - 借助log4j输出hadoop程序的变量

java - 由于 ConnectException,Hadoop 映射失败

hive - Pig 脚本失败,出现 java.io.EOFException : Unexpected end of input stream

regex - 使用 csv 格式的非结构化 GPS 数据包创建结构化 hive 表

hadoop - hadoop 0.20.2 中不支持通用选项 -D 吗?

java - 带有 RDF/XML 文件的 Hadoop MapReduce

hadoop - 按多个字段分组并输出元组

hadoop - pig 的GROUP和COUNT后减少