spring-cloud-stream - Stream 应用程序中的瓶颈导致消息丢失

标签 spring-cloud-stream spring-cloud-dataflow

  • 适用于 Cloud Foundry v1.4.x 的 Spring Cloud Data Flow (SCDF) 服务器
  • 为消息传输提供 RabbitMQ 服务磁贴

部署的 Spring Cloud Data Flow 流有一个处理器,它可以比下游处理器或接收器处理传入消息更快地生成传出消息。这会导致 RabbitMQ 传输出现瓶颈,最终导致消息丢失。

在我们的私有(private)云环境中,我们的 Rabbit 服务磁贴的默认设置为 max-length=1000max-length-bytes=1000000 .我们目前无法修改这些设置来增加其中任何一个容量。

我们尝试设置 prefetch对消费应用程序的值(value)(我相信设置将是 deployer.<appname>.rabbit.bindings.consumer.prefetch=10000 ),这似乎实际上增加了消费应用程序在更短的时间内消费更多消息的能力,但这只在有限的场景中有效。如果我们有大量数据通过流,我们仍然可能会遇到消息丢失的限制。在上面的示例中,我们似乎通过设置预取将消费应用程序的容量从 1000 增加到 11000。

我们还尝试使用自动缩放服务,因此我们可以增加消费应用程序的事件实例数,这也可以显着增加其容量。然而,这似乎也像是用创可贴解决问题,而不是使用本质上能够以弹性方式处理潜在交易量预期的基础设施和/或服务。如果我们不知道一天中的特定时间什么时候卷会显着增加,如果卷增加的速度使得自动缩放器设置中的 CPU 阈值不能足够快地跟上事件实例以避免丢失消息?

  • 我们还没有尝试设置 RabbitMQ 服务来保证交付。根据文档,判断消息何时未送达似乎更容易,而不是确定送达。我们不知道这是否是一个好的可行选择,正在寻求建议。
  • 我们没有尝试在我们的流应用程序本身中实现任何限制。我们不知道这是否是一个好的可行选择,正在寻求建议。
  • 我们没有尝试将应用程序绑定(bind)到 DLQ 或重新排队处理失败的消息。我们不知道这是否是一个好的可行选择,正在寻求建议。
  • 我们还没有尝试将 SCDF 服务器绑定(bind)到 Cloud Foundry 服务磁贴之外的我们自己的 Rabbit 服务实例。从理论上讲,这将是一个 RabbitMQ 实例,我们可以更好地控制队列深度和字节大小限制,我们可以将它们设置为更轻松地处理我们预期的负载。
  • 我们还没有尝试过像 Kafka 这样的替代传输机制。我们不知道这是否是一个好的可行选择,正在寻求建议。

我很难相信其他人在这些流范例中没有遇到过类似性质的问题,我很好奇是否有公认的最佳实践解决方案,或者我们是否需要仔细研究在这些情况下流式传输范例是否被误用。

我们的基本要求是,在任何流应用程序上下文中丢失消息都是 Not Acceptable 情况,我们需要确定一种最佳方法来配置我们的环境,或分析我们的实现选择以确保我们的实现在繁重的情况下稳健可靠加载。

社区或 Pivotal 人员对此有何建议?

最佳答案

钱宁

感谢您提供如此多的细节、问题以及您对 Spring Cloud Stream 和 SCDF 的兴趣,但我希望您明白这不是真正的 SO 问题,因为它有太多变量,它不可能有一个答案,更适合某种类型的讨论。也许在 GitHub 中针对所提到的任何一个项目提出了功能请求,我们可以在那里进行讨论。 无论如何,我会尽我所能确保它不会无人问津。

你问的是背压,确实这是一个非常有效的问题。然而,需要理解的是,Spring Cloud Stream 和随后的 SCDF 选择支持多个消息系统/协议(protocol)(通过绑定(bind)器)将微服务连接在一起,而不是创建我们自己的。并不是每个消息系统/协议(protocol)都支持背压,并且曾经提供不同的机制来实现它,因此很难/不可能在框架级别提供某种通用的抽象。

因此,它实际上更像是一种架构/设计讨论,我很乐意参与其中,但需要更多背景信息。 例如,在 RabbitMQ 的上下文中,生产者可以轮询队列大小 (RabbitAdmin.queueProperties(queue)) 并在超过某个阈值时停止发布。但正如我所说,有更多的技巧和方法可以做事,我们肯定需要更多的背景信息。

我还应该提到,我们正在研究 RSocket 绑定(bind)器,它是一个原生支持背压的系统和协议(protocol)。

希望对您有所帮助。 . .

关于spring-cloud-stream - Stream 应用程序中的瓶颈导致消息丢失,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55908927/

相关文章:

spring-cloud - 将 spring-cloud-starter-dataflow-server-local 升级到 1.3.0 时出现构建错误

java - String Cloud Stream - 如果启用重试,如何强制消息成为死信而不重新排队?

spring-boot - Spring Cloud 流 'bindingService'错误

java - Spring Cloud Stream发送到Kafka错误控制处理

java - 如何解决通过字段 'schedulerService' 表示的不满足的依赖关系?

java - Spring Cloud DataFlow - 如何在 TCP 源中使用自定义 TCP 编码器/解码器

java - Spring Cloud Dataflow Kubernetes 从 dockerfile 获取 jar 的属性

java - Spring Cloud 流: Republish to other amqp connection if current connection throws exception

spring-cloud-dataflow - Spring 配置文件与 Spring Cloud Data Flow

kubernetes - 从Kubernetes部署公开SCDF服务