java - kafka KStream - 进行 n 秒计数的拓扑

标签 java apache-kafka apache-kafka-streams

我有一个 JSON 对象流,我在其中键入一些值的哈希值。我希望以 n 秒(10?60?)间隔按键进行计数,并使用这些值进行一些模式分析。

我的拓扑:K->aggregateByKey(n 秒)->process()

process - init() 步骤中,我调用了ProcessorContent.schedule(60 * 1000L),希望能够调用.punctuate()。从这里我将循环遍历内部哈希中的值并采取相应的行动。

我看到值通过聚合步骤并点击 process() 函数,但 .punctuate() 永远不会被调用。

<小时/>

代码:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> opxLines = kStreamBuilder.stream(TOPIC);

KStream<String, String> mapped = opxLines.map(new ReMapper());

KTable<Windowed<String>, String> ktRtDetail = mapped.aggregateByKey(
            new AggregateInit(),
            new OpxAggregate(),
            TimeWindows.of("opx_aggregate", 60000));

ktRtDetail.toStream().process(new ProcessorSupplier<Windowed<String>, String>() {
                            @Override
                            public Processor<Windowed<String>, String> get() {
                                 return new AggProcessor();
                            }
                       });
    
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

AggregateInit() 返回 null。

我想我可以用一个简单的计时器来完成 .punctuate() 的等效操作,但我想知道为什么这段代码没有按照我希望的方式工作。

最佳答案

我认为这与kafka集群设置不当有关。将文件描述符计数更改为比默认值(1024 -> 65535)高得多的值后,这似乎符合规范。

关于java - kafka KStream - 进行 n 秒计数的拓扑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39500383/

相关文章:

apache-kafka - 使用 enable.idempotence true 时,Spring Cloud Stream Kafka 应用程序的启动速度极慢

apache-kafka - 如何保证Kafka分区中的顺序

unit-testing - 模拟 Kafka API 以进行单元测试

java - 在Kafka Streams中使用KStream将字符串更改为avro时出现空指针异常

apache-spark - 流处理中的非确定性函数

java - 如何从响应实体中提取文件

java - 在 Struts 中,计算应该驻留在哪里?

java - 条件 - 应该在等待之前解锁吗?

Java 和 Myro 突破游戏计划提前结束

apache-kafka - 使用选择键和转换在 DSL 拓扑上进行流重新分区