java - 如何从 quarkus 应用程序中正确地将逻辑删除消息发布到压缩的 kafka 主题?

标签 java apache-kafka quarkus smallrye-reactive-messaging

来自 Quarkus我需要将墓碑消息发布到压缩的 Apache Kafka 主题的应用程序。由于我的用例是必要的,所以我使用 Emitter用于向主题 ( as suggested in the quarkus blog ) 发送消息。 非逻辑删除 消息(带有负载)的代码是:

@Dependent
public class Publisher {

  @Inject
  @Channel("theChannelName")
  Emitter<MyDataStructure> emitter;

  public CompletionStage<Void> publish(final MyDataStructure myData) {
    OutgoingKafkaRecordMetadata<String> metadata =
        OutgoingKafkaRecordMetadata.<String>builder()
            .withKey(myData.getTopicKey())
            .build();
    return CompletableFuture.runAsync(
        () -> emitter.send(Message.of(myData).addMetadata(metadata)));
  }
}

Emitter还实现了 <M extends Message<? extends T>> void send(M msg)我希望这能让我制作一个 Message负载为 null作为墓碑消息。不幸的是 Message.of(..) 的所有实现允许提供元数据(提供消息 key 所需的元数据)的工厂方法,指定有效负载不得为{@code null}

使用 Emitter 将墓碑消息发布到 Kafka 主题的正确方法是什么(遵循 Quarkus/SmallRye Reactive Messaging 概念) ?

最佳答案

我建议使用 Record类(参见 documentation )。 Record是一个key/value对,代表要写入的Kafka记录的key和value。两者都可以是 null ,但在您的情况下,只有值部分应该是 null : Record.of(key, null); .

因此,您需要将发射器的类型更改为 Record<Key, Value> ,例如:

@Dependent
public class Publisher {

  @Inject
  @Channel("theChannelName")
  Emitter<Record<Key, MyDataStructure>> emitter;

  public CompletionStage<Void> publish(final MyDataStructure myData) {
      return emitter.send(Record.of(myData.getTopicKey(), null);
  }
}

同时 runAsync很方便,发射器已经是异步的。所以,没必要用那个。此外,容器中的行为可能会很明显(如果您的并行度小于 2)。

我的代码返回了 send 的结果方法是 CompletionStage .当记录写入 Kafka(并被代理确认)时,该阶段将完成。

关于java - 如何从 quarkus 应用程序中正确地将逻辑删除消息发布到压缩的 kafka 主题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67601699/

相关文章:

java - 网络/代理身份验证(WebClient.UseDefaultCredentials 替代方案)

apache-kafka - 卡夫卡流并发?

quarkus - 有没有办法以编程方式覆盖应用程序属性?

java - 通过命令行将 EMX 导入 Eclipse

java.lang.NoSuchMethodError : 'void org.springframework.util.Assert.state(boolean, java.util.function.Supplier)' 错误

java - Java 中的 Active Directory 实现

apache-kafka - 具有高可用性的 kafka 多数据中心

apache-kafka - Kafka 事件携带的状态传输系统是否应该使用 GlobalKTable 进行本地查询来实现?

java - 如何更改 Azure Functions 中的日志级别

cdi - 在使用 Quarkus 进行本地开发期间注入(inject)不同的 bean