java - 当从不同主题中具有相同键的主题加载 GlobalKTable 时会发生什么?

标签 java apache-kafka apache-kafka-streams

我们有一个带有单个分区的压缩主题,并向其中添加了一个新分区。

我们没有对现有数据进行重新分区 - 这意味着在添加新分区之前加载的事件仍在分区 0 中。新事件按照标准策略存储,正如预期的那样:具有相同键的所有事件都存储在同一分区中。

我们目前遇到这样的情况:

Partition    Offset    Timestamp      Key         Value
0            586       1545388284240  COD_ISIN    AAA
1            983       1551800369978  COD_ISIN    BBB
1            1141      1556526044144  COD_ISIN    CCC

当我在 GlobalKTable 中加载此主题时,存储中的值为 AAA。我们显然希望将 CCC 作为当前值。

GlobalKTable<String, JsonNode> storeDatacatalog = builder.globalTable(TOPIC, consumed,  Materialized.as(STORE_DATACATALOG));

KStream<String, JsonNode> inEvent = builder.stream(OTHER_TOPIC, consumed);

inEvent = inEvent.transform(
    new TransformerSupplier<String, JsonNode, KeyValue<String, JsonNode>>() {

        @Override
        public Transformer<String, JsonNode, KeyValue<String, JsonNode>> get() {

            return new Transformer<String, JsonNode, KeyValue<String, JsonNode>>() {

                private ProcessorContext context;
                private KeyValueStore<String, JsonNode> dataCatalogueState;

                @Override
                public void init(ProcessorContext context) {

                    this.context = context;
                    this.dataCatalogueState = (KeyValueStore<String, JsonNode>) context.getStateStore(STORE_DATACATALOG);

                    LOGGER.debug("Content of dataCatalogueState: ");
                    KeyValueIterator<String, JsonNode> allDc = this.dataCatalogueState.all();

                    JsonNode valueForIsin = null;

                    while (allDc.hasNext()) {
                        try {
                            KeyValue<String, JsonNode> next = allDc.next();
                            LOGGER.debug(" | " + next.key + " : " + next.value);
                            if ("COD_ISIN".equals(next.key)) 
                                valueForIsin = next.value;
                        } catch (Exception e) {
                            LOGGER.debug("exc" , e.getMessage());
                        }
                    }
                    LOGGER.info(" COD_ISIN ---> " + valueForIsin);
                }

                @Override
                public void close() {
                }

                @Override
                public KeyValue<String, JsonNode> transform(String key, JsonNode value) {
                    return new KeyValue<>(key, value);
                }

                //@Override
                public KeyValue<String, JsonNode> punctuate(long timestamp) {
                    // TODO Auto-generated method stub
                    return null;
                }
            };
        }
    }
)

GlobalKTable 如何构建其状态?是基于Offset还是Timestamp 它是否在内部将 key 粘贴到找到 key 的第一个分区?

我知道如何解决(清除主题并再次填充 - 将应用分区策略)。但我很好奇它的内部是如何工作的。

最佳答案

GlobalKTable 假设数据按键分区。因此,如果不同分区中有具有相同键的记录,则无法保证记录将按哪个顺序应用。仅保证每个分区的顺序。除此之外,atm 更新仅基于分区内的偏移量。

使用上面的示例,顺序可能是

  • AAA、BBB、CCC
  • BBB、AAA、CCC
  • BBB、CCC、AAA

只能保证 BBB 将在 CCC 之前应用。

关于java - 当从不同主题中具有相同键的主题加载 GlobalKTable 时会发生什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56153486/

相关文章:

java: Class.isInstance 与 Class.isAssignableFrom

apache-kafka - put() 在 Kafka Connect 接收器任务中多久触发一次?

linux - kafka的performance脚本怎么用text呢?

apache-kafka - 卡夫卡流 : Graceful shutdown

elasticsearch - Kafka 到 Elasticsearch、HDFS 与 Logstash 或 Kafka Streams/Connect

java - 可能永远不会发生的重新排序

java - 你能在java中写入数组的特定索引吗?

Java 硬编码 SQL 查询有效,但从文件缓冲区读取的完全相同的查询失败

python - Kafka Consumer未获取所有消息

apache-kafka-streams - 如何在 Kafka 流中使用 HashMap 作为值创建状态存储?