java - 动态主题名称/Quarkus SmallRye Reactive Messaging Kafka

标签 java apache-kafka quarkus smallrye-reactive-messaging smallrye

我想使用这个扩展:[Quarkus Smallrye Reactive Messaging Kafka]

但在我的应用程序中,主题的名称是事先不知道的,它是根据运行时从用户收到的消息指定的。如何在没有注释的情况下以编程方式指定主题名称和与主题相关的设置? (仅用于发送消息到 Kafka -> Produce)

@ApplicationScoped
public class PriceGenerator {

    private Random random = new Random();

    // Don't want to use this 
    // "generated-price" not known at build time
    @Outgoing("generated-price")                       
    public Multi<Integer> generate() {                  
        return Multi.createFrom().ticks().every(Duration.ofSeconds(5))
                .onOverflow().drop()
                .map(tick -> random.nextInt(100));
    }

}

或者这些配置应该在运行时以编程方式设置

mp.messaging.outgoing.generated-price.connector=smallrye-kafka
mp.messaging.outgoing.generated-price.topic=prices
mp.messaging.outgoing.generated-price.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer

因为不懂路,所以用的是原生的Kafka驱动

    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-kafka-client</artifactId>
    </dependency>
Properties props = new Properties();
props.put("bootstrap.servers", "85.93.89.115:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>(topicName.toString(), messageFactory.MessageToString(message)));

最佳答案

您可以在启动时或任何需要的时候动态覆盖主题的值,但这里有一段代码指示如何覆盖主题的预定义值:

@ApplicationScoped
public class AppLifecycleBean {

    void onStart(@Observes StartupEvent ev) {               
        System.setProperty("mp.messaging.outgoing.generated-price.topic", "example");
    }

    void onStop(@Observes ShutdownEvent ev) {               
    }

}

关于java - 动态主题名称/Quarkus SmallRye Reactive Messaging Kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66249352/

相关文章:

java - 如何对指定文件夹中的文件执行 ant 任务?

java - Kafka - 停止重试 ConnectException

jax-rs - 如何在 JAX-RS 过滤器中记录请求处理时间

java - 卡夫卡 : java client failed to send messages after x tries

java - 如何在Fabric8 Open Jdk Container中启用扩展编码?

java - 在 Quarkus 中重定向到 https

java - GUI 框架/按钮

java - 初始化后更新选项卡内容

Java Set 获取重复条目

windows - 在Windows中设置Kafka日志目录属性