java - 云发布/订阅接收器未收到消息

标签 java spring-boot google-cloud-platform google-cloud-pubsub

我有一个 Spring Boot 应用程序,我在应用程序启动时通过传递订阅 ID 来创建订阅者

import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
...
...
public void startAndWait() {
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectName, subscriptionId);
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
log.log(Level.INFO, "Created Subscriber: " + subscriptionName); // I get this logger message when service starts
subscriber.startAsync();
    }

以下是在上述代码中初始化时传递给该订阅者的接收者代码。

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.pubsub.v1.PubsubMessage;
..
..
@Log
@Component
@Configurable
public class SubMessageReceiver implements MessageReceiver {
..
..
@Override
    public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {

try {
   processMessage(message);
} catch (IOException e) {
   log.log(Level.SEVERE, "retrieving message content failed");
} catch (Exception e) {
   log.log(Level.SEVERE, "Failed triggering rules");
}
   consumer.ack();
}

我尝试过 - 手动删除并再次创建订阅,手动发布有关该主题的多条消息,并验证这些消息是否通过谷歌云控制台上的订阅接收。 唯一的问题是来自客户端库的 receiveMessage 方法从未收到消息! 我确认没有其他订阅者具有与我正在使用的相同订阅 ID(不在任何其他环境中) 我验证消息队列未充满未确认的消息。

我期待在上面提到的 processMessage 方法中打印一些记录器,或者当我将调试器放入此方法中时,只要消息到达,它就应该停止在调试指针处。

请支持或发表任何想法! TIA!

最佳答案

代码没有任何变化,但我能够使其与降级版本的 google pub/sub maven 库一起使用。

<dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>google-cloud-pubsub</artifactId>
            <version>1.56.0</version>
</dependency>

现在,对于给定主题的所有消息,它都按预期工作。 希望它会对某人有所帮助。

关于java - 云发布/订阅接收器未收到消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57074335/

相关文章:

java - 下载内部存储器中的 PDF 文件

java - 无法使用返回 play.mvc.WebSocket 的方法作为 Play 中请求的处理程序?

java - 你如何为构造函数依赖注入(inject)编写一个 Akka Typed Extension for Spring?

Spring shell block 测试

google-compute-engine - 从计算引擎实例连接到容器服务 clusterip

reactjs - 如何在 Next.js 上设置健康检查的端点?

java - 数组 - 数组的平方根并打印结果 JAVA

spring-boot - 在 SpringBoot 中,@JmsListener 如何以及何时被调用?

java - Spring Boot,从 websocket 调用 ControllerLinkBuilder.linkTo 时出现 java.lang.IllegalStateException

Elasticsearch 到 BigQuery 管道部署在云数据融合实例上失败