java - 如何根据另一个主题的响应从 Kafka 主题读取消息

标签 java spring-boot apache-kafka kafka-consumer-api spring-kafka

我在 Kafka 中有 2 个主题:MetaData 和 MasterData。 我正在使用 Kafka Listners 实时读取数据。有两种情况:

  1. 我必须先阅读 MetaData 主题,然后阅读 MasterData 主题。
  2. 也有可能元数据主题中没有新消息,但主数据主题中插入了一条消息。在这种情况下,MasterData 主题的消费应该继续进行。

Ant 建议如何实现这一目标?

最佳答案

在评论中澄清之后,这就是我所理解的,你想要实现的目标:

You have a database like system, and the meta data from the kafka topic is used to adjust the table structure. Then the actual data comes into the master kafka topic which you then want to insert into the table. In other words: You want to define the structure of your data using kafka.

我不确定这是否是我所说的 Kafka 的常规用例,因为它是一个交换事件(又名数据)的接口(interface)。您尝试做的是使用系统作为定义数据结构的手段。但这只是我的意见。

正如前面评论中所说,几乎不可能说某个主题中不会有消息。你只能说:“还”没有消息。您“可以”可能做的是:当 master 主题中的消息到达时,您首先检查 meta 主题是否也有消息。如果有,您应用数据结构中的更改,然后从主主题导入消息。

另一种选择是使用 Kafka Streams 而不是原始消费者 将两个主题结合在一起,例如使用join

无论如何,要定义数据结构,通常会使用 Avro 之类的东西,它可以为您提供模式演化等功能。然后编写您的应用程序以了解架构更改,并将它们相应地应用到您的数据库表。

更新:如何使用 Kafka Streams 解决问题

如上所述,使用 Kafka Streams 将是您的情况的一个可能的解决方案。让我解释一下,我在一些伪 kafka 流代码中的意思:

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.GlobalKTable;

import java.nio.charset.StandardCharsets;

/**
 * Just an incomplete Kafka Streams Code Demo to show how the problem could
 * be solved with this framework instead of using the consumers directly.
 * Kafka Streams Boilerplate code not included.
 */
public class Example {
    private static final String META_TOPIC = "meta";
    private static final String MASTER_TOPIC = "master";

    // You have to make sure, the meta data is stored under this
    // specific key in the meta topic.
    private static final byte[] META_KEY = MetaEvent.class.getName().getBytes(StandardCharsets.UTF_8);

    public static void main(String[] args) {
        new Example().createTopology();
    }

    public void createTopology() {

        final StreamsBuilder builder = new StreamsBuilder();

        final GlobalKTable<byte[], MetaEvent> metaTable = builder.globalTable(META_TOPIC);

        builder.<byte[], MasterEvent>stream(MASTER_TOPIC)
                .leftJoin(
                        metaTable,
                        (k, v) -> META_KEY,
                        MasterWithMeta::new)
                .foreach(this::handleEvent);
    }

    private void handleEvent(byte[] key, MasterWithMeta masterWithMeta) {
        // 1) check if meta has changed, if so, apply changes to database
        // 2) import master data to database
    }
}

class MasterWithMeta {
    private final MasterEvent master;
    private final MetaEvent meta;

    public MasterEvent getMaster() {
        return master;
    }

    public MetaEvent getMeta() {
        return meta;
    }

    public MasterWithMeta(MasterEvent master, MetaEvent meta) {
        this.master = master;
        this.meta = meta;
    }

    public static MasterWithMeta create(MasterEvent master, MetaEvent meta) {
        return new MasterWithMeta(master, meta);
    }
}

class MetaEvent {
    // ...
}

class MasterEvent {
    // ...
}

关于java - 如何根据另一个主题的响应从 Kafka 主题读取消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60090897/

相关文章:

java - 使用 Java 中的 for 循环查找斐波那契数列中的第 n 项

java - if 语句表现不同

java - Spring Boot数据JPA持久化到数据库报错

java - 如何修复Spring Web应用程序的 ‘404--Not Found'问题

spring-boot - 事件、 channel 、主题之间有什么区别?

java - Kafka Mirror Maker 执行地点

apache-kafka - Kafka : Error from SyncGroup, 请求超时

java - : "JPA Facet File Change Event Handler" 期间发生内部错误

apache-kafka - Apache 卡夫卡 : Check existence of message in a Topic

java - 不同线程之间代码块的序列化