我的 BigData DB(在我的例子中是 Cassandra)中有行,列名称为 col1,col2,col3,val1,val2
在 SQL 方法中,我可以按 col1,col2 或 col2,col1 或任何其他可能的方式进行分组。这样我就可以轻松地形成树形层次结构。
但是现在我们使用Cassandra来存储不支持group by的数据。所以我们想使用 Storm 进行分组和聚合。 我们写了一些示例代码进行聚合和分组,但我们无法形成是否可以实现的意见。
数据看起来像这样
col1,col2,col3,val1,val2
------------------------
a1,b1,c1,10,20
a1,b1,c2,11,13
a1,b2,c1,9,15
a1,b2,c3,13,88
a2,b1,c1,30,44
a2,b3,c2,22,33
a4,b4,c4,99,66
就像在 Excel 数据透视表中一样,我想构建层次结构 root->child1->child2->child3-val1,val2 那么如果我的层次结构是 col1->col2->col3,它可能看起来像这样
a1 {43,136}
--b1 {21,33}
--c1 10,20
--c2 11,13
--b2 {22,103}
--c1 9,15
--c3 13,88
a2 {52,77}
--b1 {30,44}
--c1 30,44
--b3 {22,33}
--c2 22,33
a4 {99,66}
--b4 {99,66}
--c4 99,66
我想为用户提供重新排列层次结构元素的功能,例如 col3->col1->col2 (或其他动态的东西) 在这种情况下,数据将如下所示
c1 {49,79}
--a1 {19,35}
--b1 10,20
--b2 9,15
--a2 {30,44}
--b1 30,44
c2 {11,13}
--a1 {11,13}
--b1 11,13
--a2 {22,33}
--b3 22,33
c3 {13,88}
--a1 {13,88}
--b2 13,88
c4 {99,66}
--a4 {99,66}
--b4 99,66
我的三叉戟代码中有几行看起来像这样,但它没有按预期工作。
topology.newStream("aggregation", spout)
.groupBy(new Fields("col1","col2","col3","val1","val2"))
.aggregate(new Fields("val1","val2"), new Sum(), new Fields("val1sum","val2sum"))
.each(new Fields("col1","col2","col3","val1sum","val2sum"), new Utils.PrintFilter());
为了进行上述转换,我想使用 Storm,无论是否支持 Trident API。 谁能指导我如何实现它?非常感谢任何程序想法。
最佳答案
您应该在 groupBy 中仅包含维度(您的 col1、col2 和 col3)而不是度量(您的 val1、val2)。 当您需要聚合多个度量时,您需要使用 chainedAgg() 构造。 以下是针对您的用例更改后的拓扑代码:
topology.newStream("aggregation", spout)
.groupBy(new Fields("col1","col2"))
.chainedAgg()
.aggregate(new Fields("val1"), new Sum(), new Fields("val1sum"))
.aggregate(new Fields("val2"), new Sum(), new Fields("val2sum"))
.chainEnd()
.each(new Fields("col1","col2","val1sum", "val2sum"), new Utils.PrintFilter());
它会产生以下输出,正如您所期望的!
分区Id=0, [a1, b1, 21, 33]
PartitionId=0, [a1, b2, 22, 103]
分区Id=0, [a4, b4, 99, 66]
PartitionId=0, [a2, b1, 30, 44]
PartitionId=0, [a2, b3, 22, 33]
干杯!
MK
关于java - 使用 Storm 进行动态枢轴,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21476197/