java - 可以将自定义数据返回给 Kafka Producer

标签 java rest spring-boot apache-kafka spring-kafka

我正在学习 kafka,我想将我的应用程序拆分为 2 个微服务。 首先将所有来自 KafkaConsumer 的传入消息保存到数据库中,然后通过给定的 ID 选择实体。 其次提供 REST api 来保存和获取实体。 它们之间的交互用kafka提供。 如何使用 kafka 在 REST api 中从数据库接收存储的 ID? 这是调用 POST 请求的生产者示例代码。

 public void sendToKafka(MyObject myobject) throws ExecutionException, InterruptedException {
    LOGGER.info("sending payload='{}' to topic='{}'", myobject, myTopic);
    byte[] bytes = parseObjectToByte(myobject);
    ListenableFuture<SendResult<String, byte[]>> resultFuture = kafkaTemplate.send(topicSave, bytes);
    SendResult<String, byte[]> result = resultFuture.get();
    LOGGER.info(result.toString());
}

和 Consumer,将 myObject 保存到数据库

@KafkaListener(topics = "${kafka.topic.mytopic}")
public void saveMyObject(byte[] value) {
    MyObject myobject = parseToMyObject(value);
    LOGGER.info("received myobject='{}'", myobject);
    MyObject myobjectSaved = myObjectRepository.insert(myobject);
}

我正在使用 spring-kafka 和 spring-boot。 Rest api 有两种方法: POST - 保存我的对象 Get - 通过 id 返回保存的对象。 可以用 kafka 做还是我必须直接连接这个微服务?谢谢。

最佳答案

不确定我是否完全理解您的问题,但是如果您想向 kafka 发送一条消息,并等待该消息被某个微服务使用和处理,然后该微服务会将一些信息(主键)返回给发送者如果不向您的架构中添加更多内容,您将无法做到这一点。

发送到 kafka 的消息是“即发即弃”的,从发送者的角度来看,您对这条消息会发生什么一无所知(如果、何时、多久以及有多少消费者会消费它。)

在您的场景中,消费者微服务还可以使用另一个 kafka 主题中的主键发送消息,如果您需要该信息,您将使用该主题。

请记住,Kafka 用于解耦您的架构并引入异步消息处理,如果您需要从消费者同步获得响应,您可能使用了错误的解决方案。

关于java - 可以将自定义数据返回给 Kafka Producer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47526511/

相关文章:

Spring Cloud 流式传输并消费多个 kafka 主题

java - JPA + Spring 异常后回滚事务

c# - "GetBy"Web API中的方法

spring-boot - Spring Boot @DataJpaTest 卡在 HHH000400 : Using dialect: org. hibernate.dialect.Oracle12cDialect

java - 无法绑定(bind)属性

javascript - 使用 Apollo React 使用 GraphQL 包装 REST api

java - 使用迭代器从 ArrayList 中删除字符串数组

java - 更改父目录和子目录中的文件名

java - GWT/文本框-单击和双击处理程序选项?可能的?

rest - 用于生产的独立 WSGI 服务器