来自 Quarkus我需要将墓碑消息发布到压缩的 Apache Kafka 主题的应用程序。由于我的用例是必要的,所以我使用 Emitter
用于向主题 ( as suggested in the quarkus blog ) 发送消息。 非逻辑删除 消息(带有负载)的代码是:
@Dependent
public class Publisher {
@Inject
@Channel("theChannelName")
Emitter<MyDataStructure> emitter;
public CompletionStage<Void> publish(final MyDataStructure myData) {
OutgoingKafkaRecordMetadata<String> metadata =
OutgoingKafkaRecordMetadata.<String>builder()
.withKey(myData.getTopicKey())
.build();
return CompletableFuture.runAsync(
() -> emitter.send(Message.of(myData).addMetadata(metadata)));
}
}
Emitter
还实现了 <M extends Message<? extends T>> void send(M msg)
我希望这能让我制作一个 Message
负载为 null
作为墓碑消息。不幸的是 Message.of(..)
的所有实现允许提供元数据(提供消息 key 所需的元数据)的工厂方法,指定有效负载不得为{@code null}。
使用 Emitter
将墓碑消息发布到 Kafka 主题的正确方法是什么(遵循 Quarkus/SmallRye Reactive Messaging 概念) ?
最佳答案
我建议使用 Record
类(参见 documentation )。
Record
是一个key/value对,代表要写入的Kafka记录的key和value。两者都可以是 null
,但在您的情况下,只有值部分应该是 null
: Record.of(key, null);
.
因此,您需要将发射器的类型更改为 Record<Key, Value>
,例如:
@Dependent
public class Publisher {
@Inject
@Channel("theChannelName")
Emitter<Record<Key, MyDataStructure>> emitter;
public CompletionStage<Void> publish(final MyDataStructure myData) {
return emitter.send(Record.of(myData.getTopicKey(), null);
}
}
同时 runAsync
很方便,发射器已经是异步的。所以,没必要用那个。此外,容器中的行为可能会很明显(如果您的并行度小于 2)。
我的代码返回了 send
的结果方法是 CompletionStage
.当记录写入 Kafka(并被代理确认)时,该阶段将完成。
关于java - 如何从 quarkus 应用程序中正确地将逻辑删除消息发布到压缩的 kafka 主题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67601699/