java - Kafka 流应用程序将读取与写入分开

标签 java stream apache-kafka apache-kafka-streams

我对 Kafka 和 Kafka Streams 还很陌生,所以请耐心等待。我想知道我是否走在正确的道路上。

我目前正在写入 Kafka 主题,并尝试通过休息服务访问数据。原始数据在被访问之前需要进行转换。

到目前为止,我拥有的是一个将原始数据写入主题的生产者。

1.) 现在我想要流应用程序(应该是在容器中运行的 jar),它只是将数据转换为我想要的形状。遵循此处的物化 View 范例。

1 的简化版本。)

    KStreamBuilder builder = new KStreamBuilder();

    KStream<String, String> source = 
    builder.stream("my-raw-data-topic");

    KafkaStreams streams = new KafkaStreams(builder,props);
    KTable<String, Long> t =  source.groupByKey().count("My-Table");
    streams.start();

2.) 另一个流应用程序(应该是在容器中运行的 jar)仅将 KTable 作为某种存储库,可以通过包装休息服务进行访问。

在这里,我有点被困在使用 api 的正确方法上。 访问和查询 KTable 的最低限度是多少?我需要再次将转换拓扑分配给构建器吗?

KStreamBuilder builder = new KStreamBuilder();
KTable table = builder.table("My-Table"); //Casting?
KafkaStreams streams = new KafkaStreams(builder, props);

RestService service = new RestService(table); 
// Use the Table as Repository which is wrapped by a Rest-Service and gets updated reactivly 

现在这是伪代码

我走的路正确吗?将 1.)2.) 分开有意义吗?这是使用流来具体化 View 的缩进方式吗?对我来说,在我看到更多流量的情况下,通过容器独立地扩展写入和读取将会有好处。

1.)2.) 崩溃时如何处理 KTable 的重新填充。这是通过复制到流 api 完成的还是我需要通过代码解决这个问题。喜欢重置光标并回复事件吗?

最佳答案

一些评论:

在代码片段 (1) 中,将构建器交给 KafkaStreams 构造函数后修改拓扑:

KafkaStreams streams = new KafkaStreams(builder,props);
// don't modify builder anymore!

您不应该这样做,而是首先指定拓扑,然后创建 KafkaStreams 实例。

关于将您的应用程序一分为二。这对于独立缩放两个部分是有意义的。但一般来说很难说。但是,如果您确实同时吐出两者,则第一个需要将转换后的日期写入输出主题,第二个应该将此输出主题作为表读取 (builder.table("output-topic-of-transformation") 以服务 REST 请求。

要访问 KTable 的存储,您需要通过提供的存储名称获取查询句柄:

ReadOnlyKeyValueStore keyValueStore =
streams.store("My-Table", QueryableStoreTypes.keyValueStore());

有关更多详细信息,请参阅文档:

http://docs.confluent.io/current/streams/developer-guide.html#interactive-queries

关于java - Kafka 流应用程序将读取与写入分开,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45631320/

相关文章:

java - 多经纪人集群

java - 是否可以在单个 bean 声明中声明多个微米绑定(bind)器

c# - 如何将数据从托管程序集流式传输到 native 库并再次返回?

java - Spring 启动批处理: How to run job with job parameters

java - Docker和Java套接字: Share data between containers

node.js - Node.js Stream 是否会计入您的虚拟主机流量带宽分配?

apache-kafka - Kafka 甚至在达到段大小之前删除段

apache-kafka - Kafka Streams - 使用 through() 与 toStream() + to() 重用流

java - OSGI - 将实体拆分为多个包

java - elasticsearch如何按数组中的重复项进行分组而无需区分