java - 在 KafkaStreams 中的聚合数据上使用 SessionWindows (0.11)

标签 java aggregate type-mismatch apache-kafka-streams

我尝试在 Kafka (0.11) 的聚合函数中使用 SessionWindows,但无法理解为什么会出现错误。

这是我的代码片段:

// defining some values:
public static final Integer SESSION_TIMEOUT_MS = 6000000;
public static final String INTOPIC = "input";
public static final String HOST = "host";

// setting up serdes:
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

// some more code to build up the streams
KStreamBuilder builder = new KStreamBuilder();
KStream<String, JsonNode> dataStream = builder.stream(Serdes.String(), jsonSerde, INTOPIC);

// constructing the initalMessage ObjectNode:
ObjectNode initialMessage = JsonNodeFactory.instance.objectNode();
initialMessage.put("count", 0);
initialMessage.put("endTime", "");

// transforming data to KGroupedStream<String,JsonNode>
KGroupedStream<String, JsonNode> data = dataStream.map((key, value) ->{return new KeyValue<>(value.get(HOST).asText(), value);  }).groupByKey(Serdes.String(), jsonSerde);

// finally aggregate the data usind SessionWindows
KTable<Windowed<String>, JsonNode> aggregatedData = data.aggregate(
            () -> initialMessage,

            (key, incomingMessage, initialMessage) -> countData(incomingMessage, initialMessage),

            SessionWindows.with(SESSION_TIMEOUT_MS),

            jsonSerde, 

            "aggregated-data");

 private static JsonNode countData(JsonNode incomingMessage, JsonNode initialMessage){
 // some dataprocessing
 }

当我改变

 KTable<Windowed<String>,JsonNode>

 KTable<String, JsonNode>

并删除

 SessionWindows.with(SESSION_TIMEOUT_MS)

从聚合函数来看,一切正常。

如果我不这样做,Eclipse 会告诉我行

 KTable<Windowed<String>, JsonNode> aggregatedData = data.aggregate( [...])

The method aggregate(Initializer, Aggregator, Windows, Serde, String) in the type KGroupedStream is not applicable for the arguments (() -> {}, ( key, incomingMessage, initialMessage) -> {}, SessionWindows, Serde, String)

对于该行

() -> initialMessage

Type mismatch: cannot convert from ObjectNode to VR

和:

 (key, incomingMessage, initialMessage) -> countData(incomingMessage, initialMessage),

The method countData(JsonNode, JsonNode) in the type DataWindowed is not applicable for the arguments (JsonNode, VR)

我真的不明白,类型在哪里丢失了! 任何提示都会很棒!

谢谢:D

最佳答案

我确实需要实现合并:

Merger<? super String, JsonNode>tmpMerger = new MergerClass<String, JsonNode>();

并将其添加到聚合函数中:

KTable<Windowed<String>, JsonNode> aggregatedData = data.aggregate(
        () -> initialMessage,

        (key, incomingMessage, initialMessage) -> countData(incomingMessage, initialMessage),

        tmpMerger,

        SessionWindows.with(SESSION_TIMEOUT_MS),

        jsonSerde, 

        "aggregated-data");

关于java - 在 KafkaStreams 中的聚合数据上使用 SessionWindows (0.11),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46662688/

相关文章:

java - Android - 在等待来自 URL 的 ImageView 时显示进度对话框

javascript - 如果存在输入,则 MongoDB 匹配(聚合)

mysql - 聚合第一行和最后一行的值

arrays - JSON 导入生成运行时错误 '13' 类型不匹配

for-loop - F# for 循环错误

java - 不寻常的 "static"方法声明

java - Rectangle2D,边框丢失了?

R聚合错误: count distinct

java - 类型不匹配 : cannot convert from Item to Item

java - 插入Room持久库后获取rowId