apache-kafka - 使用选择键和转换在 DSL 拓扑上进行流重新分区

标签 apache-kafka apache-kafka-streams

我觉得我可能遗漏了一些非常基本的东西,但我还是会问的。

有多个分区的输入主题。我将 selectKey 用作 DSL 拓扑的一部分。 selectKey 始终返回相同的值。我的期望是,在由 selectKey() 触发的内部重新分区之后,拓扑中的下一个处理器将在同一分区上为同一键调用。然而,下一个处理器 transform() 会在不同的分区上针对相同的键调用。

拓扑:

    Topology buildTopology() {
        final StreamsBuilder builder = new StreamsBuilder();


        builder
            .stream("in-topic", Consumed.with(Serdes.String(), new JsonSerde<>(CatalogEvent.class)))
            .selectKey((k,v) -> "key")
            .transform(() -> new Processor())
            .print();

        return builder.build();
    }

transform 使用的处理器类

public class Processor implements Transformer<String, CatalogEvent, KeyValue<String, DispEvent>> {

    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public KeyValue<String, DispEvent> transform(String key, CatalogEvent catalogEvent) {
        System.out.println("key:" + key + " partition:" + context.partition());
        return null;
    }

    @Override
    public KeyValue<String, DispatcherEvent> punctuate(long timestamp) {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }
}

“in-topic”有两条以随机 UUID 作为键的消息,即“8f45e552-8886-4781-bb0c-79ca98f9d927”、“a794ed2a-6f7d-4522-a7ac-27c51a64fa28”,两条消息的有效载荷相同

Processor::transform 对两个 UUID 的输出是

key:key partition: 2
key:key partition: 0

我如何更改拓扑以确保具有相同 key 的消息将到达同一分区 - 我需要它来确保具有相同 key 的消息将转到相同的本地 Kafka 存储实例(用于插入或更新)。

最佳答案

process()[flat]transform[Values]() 没有自动重新分区。您将需要插入手动 repartition()(或旧版本中的 through())调用以重新分区数据。如果您比较 JavaDocs(与支持自动重新分区的 groupBy()join()),您会发现它们没有提到自动重新分区。

原因是,这三个方法是处理器 API 集成到 DSL 中的一部分,因此没有 DSL 运算符。它们的语义是未知的,因此我们无法判断它们是否需要重新分区,如果 key 已更改。为避免不必要的重新分区,不执行自动重新分区。

还有对应的Jira:https://issues.apache.org/jira/browse/KAFKA-7608

关于apache-kafka - 使用选择键和转换在 DSL 拓扑上进行流重新分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54680774/

相关文章:

java - 使用java api创建Kafka主题,无需zookeeper

amazon-s3 - 使用Kafka Connect从Kafka写入S3有什么好处?

apache-kafka - Confluence Cloud 上的 Apache Kafka - 分区主题和消费者滞后中的不连贯偏移

java - Confluence Kafka Streams - 找不到类 io.confluence.connect.avro.ConnectDefault

apache-kafka - 为什么消费者重启后会读取Kafka主题的所有消息?

scala - Spark 结构化流 + Kafka 集成 : MicroBatchExecution PartitionOffsets Error

apache-kafka - Quarkus 应用程序中 Kafka 反/序列化器中的 CDI 上下文

java - 在一段时间内使用 kafka-streams 处理和检查事件

java - Kafka Streams 在多个流中拆分 1 个流

java - Kafka-Streams 加入 2 个带有 JSON 值的主题 |背压机制?