java - 在 Storm TrackedTopology 单元测试中运行 Trident 拓扑

标签 java unit-testing apache-storm trident

如何运行 Trident 拓扑的 JUnit 测试,以允许元组流经拓扑,同时测试和验证每个阶段的输出?我尝试在 Storm 的测试框架内运行,但它无法实现 Trident 的验证和一致执行。

这是一个示例拓扑,其中包含一些内嵌注释,其中我遇到的问题最多。

import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.List;
import org.junit.Test;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.Count;
import storm.trident.testing.MemoryMapState;
import storm.trident.testing.Split;
import backtype.storm.Config;
import backtype.storm.ILocalCluster;
import backtype.storm.Testing;
import backtype.storm.testing.FeederSpout;
import backtype.storm.testing.TestJob;
import backtype.storm.testing.TrackedTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;

public class WordCountTopologyTest {

    @Test
    public void testWordCountTopology() throws Exception {
        Testing.withTrackedCluster(new WordCountTestJob());
    }

    public class WordCountTestJob implements TestJob {

        @Override
        public void run(ILocalCluster cluster) throws Exception {

            // Create the test topology to submit
            TridentTopology termCountTopology = new TridentTopology();

            FeederSpout feeder = new FeederSpout(new Fields("text", "author"));

            TridentState tridentState = termCountTopology.newStream("inputSpout", feeder)
                    .each(new Fields("text"), new Split(), new Fields("word"))
                   .groupBy(new Fields("word"))
                   .name("counter-output")
                   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
                   .parallelismHint(6);

            TrackedTopology tracked = Testing.mkTrackedTopology(cluster, termCountTopology.build());

            // Feed some random data into the topology
            feeder.feed(Arrays.asList("Nearly all men can stand adversity, but if you want to test a man's character, give him power.", "Abraham Lincoln"));
            feeder.feed(Arrays.asList("No man has a good enough memory to be a successful liar.", "Abraham Lincoln"));
            feeder.feed(Arrays.asList("Either write something worth reading or do something worth writing.", "Benjamin Franklin"));
            feeder.feed(Arrays.asList("Well done is better than well said.", "Benjamin Franklin"));

            cluster.submitTopology("word-count-testing", new Config(), tracked.getTopology());

            // (!!) Runs, but bad to sleep for any time when may run faster or slower on other systems
            // Utils.sleep(5000);

            // (!!) Fails with 5000ms Topology timeout
            // Testing.trackedWait(tracked, 3);

            /*
             * (!!) Always 0. Trident creates the streams and bolts internally with
             * different names, so how can we read them to verify?
             */
            List outputTuples = Testing.readTuples(tracked, "counter-output");
            assertEquals(0, outputTuples.size());
        }
    }
}
除此之外,我尝试编写自己的 BaseFilter 来标记存储所有元组的流的末尾,但似乎必须有更好的方法。此外,这并不能解决以受控方式运行拓扑的问题。 Trident 支持这个吗?

最佳答案

使用 FeederBatchSpout 类(对于 Trident)而不是 FeederSpout。 FeederBatchSpout 本身会阻塞,无需使用Testing.trackedWait() 或类似的东西。

来源:https://groups.google.com/forum/#!topic/storm-user/CrAdQEXo5OU

关于java - 在 Storm TrackedTopology 单元测试中运行 Trident 拓扑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30244904/

相关文章:

Java System.out.print();,在 Eclipse 上的行为与在 Eclim 上的行为不同

c# - 使用 IStringLocalizer<T> 进行单元测试类

java - 如何修复 NoNode 错误 - Storm kafka?

java - WEB-INF/lib 目录与 Java 9 模块

java - glass gdk 的文件权限

java - 在 Java 中绘制带轮廓的字符串有什么更好的方法吗?

c# - 如何使用 Moq 模拟 ILogger/ILoggerService

unit-testing - 你如何使用 Dapr 进行单元测试?

java - 错误 backtype.storm.util - 异步循环终止

elasticsearch - 由于Netty-Client主机名解析,AWS工作人员无法通信