hadoop - 按 pig 中的相同值对数据包进行分组

标签 hadoop mapreduce bigdata apache-pig

我创建了以下 Pig 脚本来过滤提到电影标题的网络文档集合 (Common Crawl) 中的句子(来自预定义的电影标题数据文件),对这些句子应用情绪分析并将这些情绪分组电影。

register ../commoncrawl-examples/lib/*.jar; 
set mapred.task.timeout= 1000;
register ../commoncrawl-examples/dist/lib/commoncrawl-examples-1.0.1-HM.jar;
register ../dist/lib/movierankings-1.jar
register ../lib/piggybank.jar;
register ../lib/stanford-corenlp-full-2014-01-04/stanford-corenlp-3.3.1.jar;
register ../lib/stanford-corenlp-full-2014-01-04/stanford-corenlp-3.3.1-models.jar;
register ../lib/stanford-corenlp-full-2014-01-04/ejml-0.23.jar;
register ../lib/stanford-corenlp-full-2014-01-04/joda-time.jar;
register ../lib/stanford-corenlp-full-2014-01-04/jollyday.jar;
register ../lib/stanford-corenlp-full-2014-01-04/xom.jar;

DEFINE IsNotWord com.moviereviewsentimentrankings.IsNotWord;
DEFINE IsMovieDocument com.moviereviewsentimentrankings.IsMovieDocument;
DEFINE ToSentenceMoviePairs com.moviereviewsentimentrankings.ToSentenceMoviePairs;
DEFINE ToSentiment com.moviereviewsentimentrankings.ToSentiment;
DEFINE MoviesInDocument com.moviereviewsentimentrankings.MoviesInDocument;

DEFINE SequenceFileLoader org.apache.pig.piggybank.storage.SequenceFileLoader();

-- LOAD pages, movies and words
pages = LOAD '../data/textData-*' USING SequenceFileLoader as (url:chararray, content:chararray);
movies_fltr_grp = LOAD '../data/movie_fltr_grp_2/part-*' as (group: chararray,movies_fltr: {(movie: chararray)});

-- FILTER pages containing movie
movie_pages = FILTER pages BY IsMovieDocument(content, movies_fltr_grp.movies_fltr);

-- SPLIT pages containing movie in sentences and create movie-sentence pairs
movie_sentences = FOREACH movie_pages GENERATE flatten(ToSentenceMoviePairs(content, movies_fltr_grp.movies_fltr)) as (content:chararray, movie:chararray);

-- Calculate sentiment for each movie-sentence pair
movie_sentiment = FOREACH movie_sentences GENERATE flatten(ToSentiment(movie, content)) as (movie:chararray, sentiment:int);

-- GROUP movie-sentiment pairs by movie
movie_sentiment_grp_tups = GROUP movie_sentiment BY movie;

-- Reformat and print movie-sentiment pairs
movie_sentiment_grp = FOREACH movie_sentiment_grp_tups GENERATE group, movie_sentiment.sentiment AS sentiments:{(sentiment: int)};
describe movie_sentiment_grp;

在网络抓取的一小部分上运行的测试显示成功地为我提供了一对电影标题和一个整数数据包(从 1 到 5,代表非常消极、消极、中立、积极和非常积极)。作为最后一步,我想将此数据转换为成对的电影标题和一个包含元组的数据包,其中包含该电影标题及其计数的所有不同整数。脚本末尾的 describe movie_sentiment_grp 返回:

movie_sentiment_grp: {group: chararray,sentiments: {(sentiment: int)}}

所以基本上我可能需要对 movie_sentiment_grp 的每个元素进行 FOREACH 并将情绪数据包分组为具有相同值的组,然后使用 COUNT() 函数来获取每个组中的元素数。然而,我无法找到任何关于如何将整数数据包分组为相同值组的信息。有谁知道如何做到这一点?

虚拟解决方案:

movie_sentiment_grp_cnt = FOREACH movie_sentiment_grp{
    sentiments_grp = GROUP sentiments BY ?;
}

最佳答案

查看 CountEach来自 Apache DataFu 的 UDF .给定一个包,它将生成一个新的不同元组包,并将计数附加到每个相应的元组。

文档中的示例应该清楚说明这一点:

DEFINE CountEachFlatten datafu.pig.bags.CountEach('flatten');

-- input: 
-- ({(A),(A),(C),(B)})
input = LOAD 'input' AS (B: bag {T: tuple(alpha:CHARARRAY, numeric:INT)});

-- output_flatten: 
-- ({(A,2),(C,1),(B,1)})
output_flatten = FOREACH input GENERATE CountEachFlatten(B);

针对您的情况:

DEFINE CountEachFlatten datafu.pig.bags.CountEach('flatten');

movie_sentiment_grp_cnt = FOREACH movie_sentiment_grp GENERATE
     group,
     CountEach(sentiments);

关于hadoop - 按 pig 中的相同值对数据包进行分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21219704/

相关文章:

hadoop - 在hadoop流中读取文件

java - 无法从Hive更改表格位置

python - 类型错误 : 'JavaPackage' object is not callable

hadoop - 如何使用MapReduce处理日志文件

hadoop - 在 Spark 中解压 LZ4 压缩数据

php - 如何使用 mysql 和 php 将大文本数据存储在带有 id 链接的两个表中

string - 如何在字符串集中找到未知的重复模式?

hadoop - Hive 选择查询中列名中的特殊字符 "#"

hadoop - MapReduce - 对于每个学生,学生发布最多帖子的时间是什么时候

java - 实例化 IdentityMapper 得到 ClassException。如何使用IdentityMapper?