java - Google PubSub 异步速率限制无法按预期工作

标签 java kotlin google-cloud-platform concurrency google-cloud-pubsub

我们在产品中使用 PubSub 并发现一个问题,即有更多虚拟机处理我们期望的 PubSub 消息。

我已经使用 PubSub 运行了一整夜的简单测试,看起来速度限制机制的进展并不像我们预期的那么顺利。

这是测试:

  1. 通过请求订阅将一定数量的消息发布到主题中。 在实验中,大约有 2,700 条消息(大约晚上 9 点开始)
  2. 使用 StreamingPull 连接配置一个异步客户端,并将 FlowControl 设置为 2。
  3. 通过将执行移至计时器并确认消息,模拟处理每条传入消息需要 5 秒 仅当计时器结束时。

预期结果: 来自 PubSub 的消息以相同的速度被消耗,每 5 秒一次获取 2 条消息。由于所有网络和处理费用,确认消息和拉取新消息之间预计会出现短暂的超时。

实际结果:PubSub 开始限制,或者类似的事情,有一个巨大的超时。当时没有消息到达。超时取决于订阅中未确认消息的数量。

FlowControl docs 似乎不清楚.

PubSub subscription unacked message count

这是消费者(客户端)的代码:

var concurrentFlowsNumber = config.getLong(CONFIG_NUMBER_OF_THREADS);
    var flowSettings = FlowControlSettings.newBuilder()
      .setMaxOutstandingElementCount(concurrentFlowsNumber)
      .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
      .build();

    var subscriber = Subscriber.newBuilder(subscriptionName, receiver)
      .setCredentialsProvider(() -> serviceAccountCredentials)
      .setFlowControlSettings(flowSettings)
      .build();

    subscriber.addListener(
      new Subscriber.Listener() {
        @Override
        public void failed(ApiService.State from, Throwable failure) {
          logger.error(failure);
        }
      },
      MoreExecutors.directExecutor());

    var apiService = subscriber.startAsync();
    apiService.addListener(new ApiService.Listener() {
      @Override
      public void running() {
         logger.info("Pubsub started");
      }

      @Override
      public void failed(ApiService.State from, Throwable failure) {
        logger.error("Pubsub failed on step: {}", from);
      }
    }, Runnable::run);

消息处理程序是:

private static void handlePubSubMessage(PubsubMessage message, AckReplyConsumer consumer) {
    new Timer().schedule(new TimerTask() {
      @Override
      public void run() {
               consumer.ack();
      }
    }, (long) 3000 + rand.nextInt(5000));
  }

那么,有谁知道如何让客户端(许多虚拟机)使用具有并发处理限制的消息(最多 4 个并发消息)而不因超时而中断?

P.s.这些问题相似,但不相同: Google pubsub flow control pubsub Dynamic rate limiting Cloud pubsub slow poll rate

最佳答案

由于您有积压的订单,您可能会遇到以下问题:https://cloud.google.com/pubsub/docs/pull#streamingpull_dealing_with_large_backlogs_of_small_messages

您未传递的消息将在 Pub/Sub 服务和客户端库之间进行缓冲。消息可能会卡在单个客户端的缓冲区中,或者如果超过 ackDeadline,则会重新传送到同一客户端。

您可以按照建议尝试使用同步拉取。

关于java - Google PubSub 异步速率限制无法按预期工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60900043/

相关文章:

java - 即使指定了类路径也缺少 jar 文件

Kotlin 中的枚举和 With

android - 使用 Dagger2 的多模块 android 应用程序,在功能模块中不使用 `kapt`

Kotlin 中的函数参数名称

java - GCP App 引擎上未找到 logback xml 文件异常

google-cloud-platform - 在 Google Cloud Compute 上永久更改主机名 - WHM

java - 在业务层返回业务验证结果的首选方法是什么

java - 使用 gradle 将 list 文件添加到 jar

java - JMockit - 要测试的类的实例为空

python - 在管道中使用 apache beam 参数