java - 如何为 Spring Kafka Listener 创建集成测试

标签 java spring spring-kafka junit5 spring-kafka-test

我有一个微服务,它向另一个应该使用的微服务消息发送消息。

所以,kafka 配置有效,一切正常,但我需要为此代码创建一个集成测试,但我不知道如何操作。

我的 KafkaConsumer.Class 带有组件注释:

private static final Logger logger = LoggerFactory.getLogger(KafkaReactionConsumerMessageComponent.class);

private final ReactionsService reactionsService;

public KafkaReactionConsumerMessageComponent(ReactionsService reactionsService) {
    this.reactionsService = reactionsService;
}

   @KafkaListener(topics = "reaction-topic", clientIdPrefix = "string", groupId = "magpie-trending")
   public void consumingReactionMessages(ConsumerRecord<String, String> cr,
                                      @Payload String payload){
    logger.info("[JSON] received Payload: {}", payload);

    try {
        ObjectMapper mapper = new ObjectMapper();
        ReactionMessage message = mapper.readValue(payload, ReactionMessage.class);
        if(StringUtils.equals("unloved", message.getReactionType())) {
            reactionsService.deleteReactionsByUserIdAndPostId(message.getPost().getPostId(), message.getUser().getUserId());
            logger.info("Deleted reactions from database with postId: {} and userId: {}", message.getPost().getPostId(), message.getUser().getUserId());
        } else {
            List<Reaction> reactions = creatReactions(message).stream()
                    .map(reactionsService::insertReaction).collect(Collectors.toList());
            logger.info("Added reactions to database: {}", reactions);
        }
    } catch (Exception e){
        logger.error("Cannot Deserialize payload to ReactionMessage");
    }

}

我的集成测试是

private static final String TOPIC = "reaction-topic";
  private final Logger logger = LoggerFactory.getLogger(KafkaDeletePostConsumerMessageComponent.class);

  private final KafkaReactionConsumerMessageComponent kafkaReactionConsumerMessageComponent;
  private final EmbeddedKafkaBroker embeddedKafkaBroker;

  private Consumer<String, String> consumer;

  @SuppressWarnings("SpringJavaAutowiringInspection")
  @Autowired
  public KafkaReactionMessageConsumerTest(KafkaReactionConsumerMessageComponent kafkaReactionConsumerMessageComponent,
                                        EmbeddedKafkaBroker embeddedKafkaBroker) {
    this.kafkaReactionConsumerMessageComponent = kafkaReactionConsumerMessageComponent;
    this.embeddedKafkaBroker = embeddedKafkaBroker;
}

@BeforeEach
public void setUp() {
    Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "true", embeddedKafkaBroker));
    consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();
    consumer.subscribe(Collections.singleton(TOPIC));
    consumer.poll(Duration.ZERO);
}

@AfterEach
public void tearDown() {
    consumer.close();
}

@Test
public void shoudlConsumeAndInsertInDatabaseReactionDomain() {
    ReactionMessage reactionMessage = new ReactionMessage(new PostMessage("1", Set.of("a", "b", "c")),
            new UserMessage("2"), LocalDateTime.now().toString(), "loved");

    Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
    Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();


    producer.send(new ProducerRecord<>(TOPIC, "1", reactionMessage.toString()));
    producer.flush();

    assertEquals(3, mongoTemplate.getCollection("reactions").countDocuments());
}

抽象类:

 @ExtendWith(SpringExtension.class)
  @SpringBootTest
  @AutoConfigureMockMvc
  @EmbeddedKafka(brokerProperties={
        "log.dir=out/embedded-kafka"
  })
  public abstract class AbstractMongoEmbeddedTest {

    @Autowired
    private static MongodExecutable mongodExecutable;

    @Autowired
    protected MongoTemplate mongoTemplate;

    @BeforeEach
    private void dropPostCollection(){
       mongoTemplate.dropCollection(Reaction.class);
    }

最佳答案

由于您使用的是嵌入式 Kafka 代理,因此您可以简单地在集成测试中生成/使用所需的主题。

消费

消费可以通过简单的 jUnit 规则来完成。可以找到用于此目的的规则 here 。请随意使用它。

您可以像这样使用它来断言已消费的消息:

assertThat(kafkaConsumeMessagesRule.pollMessage()).isEqualTo("your-expected-message");

制作

要生成消息,您只需连接 org.springframework.kafka.core.KafkaTemplate在您的集成测试中并向给定主题发送消息。

关于java - 如何为 Spring Kafka Listener 创建集成测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59823798/

相关文章:

java - 为什么 @Size 注解在 List 上不起作用?

java - 在php、bean中,是否允许使用数组来收集变量?

spring - 如何检查 IDP 是否仍然存在有效 session ?

java - 如何在 Spring Boot 中重载 KafkaListener 方法

spring-boot - Kafka、Spring Kafka 和重新传递旧消息

spring-kafka - 从 KafkaTemplate 获取主题元数据

java - 使用 DisplayTag 时如何自定义 paging.banner.some_items_found

java - 为什么-Xrs会降低性能

java - 正弦波在Java中交替失真

java - 在应用程序打开时手动更改时间后,Volley 未请求新请求