java - Google PubSub 重新发送的消息未得到处理

标签 java google-cloud-platform google-cloud-pubsub

我使用了 Google PubSub 的 Google 文档中的订阅者示例 我所做的唯一修改是注释掉消息的确认。

订阅者不再向队列添加消息,而应根据 Google 云控制台中设置的时间间隔重新发送消息。

为什么会发生这种情况或者我错过了什么?

public class SubscriberExample {

use the default project id
private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();

private static final BlockingQueue<PubsubMessage> messages = new LinkedBlockingDeque<>();

static class MessageReceiverExample implements MessageReceiver {



    @Override
    public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        messages.offer(message);

        //consumer.ack();
    }
}
/** Receive messages over a subscription. */
public static void main(String[] args) throws Exception {
    // set subscriber id, eg. my-sub
    String subscriptionId = args[0];
    ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(
            PROJECT_ID, subscriptionId);
    Subscriber subscriber = null;
    try {
        // create a subscriber bound to the asynchronous message receiver
        subscriber = Subscriber.newBuilder(subscriptionName, new MessageReceiverExample()).build();
        subscriber.startAsync().awaitRunning();
        // Continue to listen to messages
        while (true) {
            PubsubMessage message = messages.take();
            System.out.println("Message Id: " + message.getMessageId());
            System.out.println("Data: " + message.getData().toStringUtf8());
        }
    } finally {
       if (subscriber != null) {
            subscriber.stopAsync();
        }
    }
}
}

最佳答案

当您不确认消息时,Java 客户端库将调用 modifyAckDeadline直到 maxAckExtensionPeriod通过。默认情况下,该值为一小时。因此,如果您不确认/不确认消息或更改此值,则消息可能在一小时内不会重新传送。如果您想更改最大确认延长期限,请在构建器上设置:

subscriber = Subscriber.newBuilder(subscriptionName, new MessageReceiverExample())
    .setMaxAckExtensionPeriod(Duration.ofSeconds(60))
    .build();               

还值得注意的是,当你没有ack或nack消息时,那么flow control可能会阻止更多消息的传递。默认情况下,Java 客户端库允许最多 1000 条消息处于未完成状态,即等待 ack 或 nack 或等待最大 ack 延长周期过去。

关于java - Google PubSub 重新发送的消息未得到处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53521930/

相关文章:

python - 如何转换 Google AppEngine db.TimeProperty 中的时间字符串?

python - Flask-Talisman 破坏了 Flask-restplus 的 swagger 文档

google-compute-engine - 无法配置 Google Cloud Pub/Sub 推送订阅者

java - JUnit-Mockito 测试因 InvalidUseOfMatchersException 而失败

java - 将整数传递给线程

java - 使用 jQueryUI datepicker 和 php 获取一系列日期(西类牙语)

amazon-web-services - Pod 重新调度时,AWS EBS/GCP PersistentDisk 能否拒绝 ReadWriteOnce PVC?

google-cloud-platform - 使用推送订阅作为负载均衡器的细节

java - 订阅 Google Pub/Sub 主题 X 秒,如果没有收到消息则停止

java - 部分线程安全是否使 Java 类线程安全?