我们在产品中使用 PubSub 并发现一个问题,即有更多虚拟机处理我们期望的 PubSub 消息。
我已经使用 PubSub 运行了一整夜的简单测试,看起来速度限制机制的进展并不像我们预期的那么顺利。
这是测试:
- 通过请求订阅将一定数量的消息发布到主题中。 在实验中,大约有 2,700 条消息(大约晚上 9 点开始)
- 使用 StreamingPull 连接配置一个异步客户端,并将 FlowControl 设置为 2。
- 通过将执行移至计时器并确认消息,模拟处理每条传入消息需要 5 秒 仅当计时器结束时。
预期结果: 来自 PubSub 的消息以相同的速度被消耗,每 5 秒一次获取 2 条消息。由于所有网络和处理费用,确认消息和拉取新消息之间预计会出现短暂的超时。
实际结果:PubSub 开始限制,或者类似的事情,有一个巨大的超时。当时没有消息到达。超时取决于订阅中未确认消息的数量。
从 FlowControl docs 似乎不清楚.
这是消费者(客户端)的代码:
var concurrentFlowsNumber = config.getLong(CONFIG_NUMBER_OF_THREADS);
var flowSettings = FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(concurrentFlowsNumber)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
.build();
var subscriber = Subscriber.newBuilder(subscriptionName, receiver)
.setCredentialsProvider(() -> serviceAccountCredentials)
.setFlowControlSettings(flowSettings)
.build();
subscriber.addListener(
new Subscriber.Listener() {
@Override
public void failed(ApiService.State from, Throwable failure) {
logger.error(failure);
}
},
MoreExecutors.directExecutor());
var apiService = subscriber.startAsync();
apiService.addListener(new ApiService.Listener() {
@Override
public void running() {
logger.info("Pubsub started");
}
@Override
public void failed(ApiService.State from, Throwable failure) {
logger.error("Pubsub failed on step: {}", from);
}
}, Runnable::run);
消息处理程序是:
private static void handlePubSubMessage(PubsubMessage message, AckReplyConsumer consumer) {
new Timer().schedule(new TimerTask() {
@Override
public void run() {
consumer.ack();
}
}, (long) 3000 + rand.nextInt(5000));
}
那么,有谁知道如何让客户端(许多虚拟机)使用具有并发处理限制的消息(最多 4 个并发消息)而不因超时而中断?
P.s.这些问题相似,但不相同: Google pubsub flow control pubsub Dynamic rate limiting Cloud pubsub slow poll rate
最佳答案
由于您有积压的订单,您可能会遇到以下问题:https://cloud.google.com/pubsub/docs/pull#streamingpull_dealing_with_large_backlogs_of_small_messages
您未传递的消息将在 Pub/Sub 服务和客户端库之间进行缓冲。消息可能会卡在单个客户端的缓冲区中,或者如果超过 ackDeadline,则会重新传送到同一客户端。
您可以按照建议尝试使用同步拉取。
关于java - Google PubSub 异步速率限制无法按预期工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60900043/