我有一个 JSON 对象流,我在其中键入一些值的哈希值。我希望以 n 秒(10?60?)间隔按键进行计数,并使用这些值进行一些模式分析。
我的拓扑:K->aggregateByKey(n 秒)->process()
在process - init()
步骤中,我调用了ProcessorContent.schedule(60 * 1000L)
,希望能够调用.punctuate()
。从这里我将循环遍历内部哈希中的值并采取相应的行动。
我看到值通过聚合步骤并点击 process()
函数,但 .punctuate()
永远不会被调用。
代码:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> opxLines = kStreamBuilder.stream(TOPIC);
KStream<String, String> mapped = opxLines.map(new ReMapper());
KTable<Windowed<String>, String> ktRtDetail = mapped.aggregateByKey(
new AggregateInit(),
new OpxAggregate(),
TimeWindows.of("opx_aggregate", 60000));
ktRtDetail.toStream().process(new ProcessorSupplier<Windowed<String>, String>() {
@Override
public Processor<Windowed<String>, String> get() {
return new AggProcessor();
}
});
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
AggregateInit() 返回 null。
我想我可以用一个简单的计时器来完成 .punctuate()
的等效操作,但我想知道为什么这段代码没有按照我希望的方式工作。
最佳答案
我认为这与kafka集群设置不当有关。将文件描述符计数更改为比默认值(1024 -> 65535)高得多的值后,这似乎符合规范。
关于java - kafka KStream - 进行 n 秒计数的拓扑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39500383/