apache-kafka - 通过 Observable(RxJava) 使用 Kafka

标签 apache-kafka rx-java spring-cloud-stream

我有一个生产者(使用 Kafka)和不止一个消费者。所以我在一个主题中发布一条消息,然后我的消费者接收并处理该消息。

我需要在生产者中收到至少一个消费者的响应(如果是第一个消费者会更好)。我正在尝试使用 RxJava 来做到这一点(可观察的)。

可以这样做吗?有人有例子吗?

最佳答案

这是我如何使用 rxjava '2.2.6' 而没有任何额外的依赖来处理 Kafka 事件:

import io.reactivex.Observable;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

...

// Load consumer props 
Properties props = new Properties();  
props.load(KafkaUtils.class.getClassLoader().getResourceAsStream("kafka-client.properties")); 

// Create a consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// Subscribe to topics
consumer.subscribe(Arrays.asList(props.getProperty("kafkaTopics").split("\\s*,\\s*")));

// Create an Observable for topic events
Observable<ConsumerRecords<String, String>> observable = Observable.fromCallable(() -> {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSecond(10);
    return records;
});

// Process Observable events
observable.subscribe(records -> {
    if ((records != null) && (!records.isEmpty())) {
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.offset() + ": " + record.value());
        }
    }
});

关于apache-kafka - 通过 Observable(RxJava) 使用 Kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43941291/

相关文章:

apache-kafka - 如何启动 Zookeeper 和 Kafka?

mysql - 如何在Apache Kafka中访问远程数据库?

java - RxJava : After Observable. 只是它仍然调用 Observable.empty()

使用 RXJava 的 Android,我如何获取带有整数列表的 Observable 并将其转换为对象列表

java - StreamListener内部是否调用MessageChannel.send(Message<?> message)

azure - 如何让azure函数在5到6分钟后停止时继续运行?

ssl - Kafka SSL 握手失败问题

rx-java - 通过 RxJava 发送异常时崩溃

spring-integration - Spring Integration中Codec和MessageConverter的区别

Spring Cloud Stream @StreamListener 自定义 MappingJackson2MesageConverter