我需要有人回答一些有关如何在 Java 中处理队列的问题。 这是一个研发项目,我正处于研究的早期阶段,不是java开发人员,很抱歉提出了一些愚蠢的问题。 我尝试用java理解队列处理的图片
这是我的架构
- 我将有一个队列,在亚马逊 SQS(客户端规范)上填充
- 我需要不断地处理消息,我不知道它们什么时候会来。
现在我很难理解拥有一个或多个可以处理消息的消费者是如何工作的。
据我了解,java程序是在.jar中编译的。 您如何设法让 .jar 始终“Activity ”,等待消息处理? 您会使用内置语言功能还是需要库?
考虑到您能够构建一个用 java 编写的程序,该程序能够等待消息处理,如果长时间(假设从几个小时到几天)没有消息要处理,是否存在失败的风险。在这种情况下,您是否需要像supervisord这样的管理器来定期重新启动程序?
您会构建一个能够并行处理许多消息的 java 程序吗?或者您会运行消费者的多个实例吗?或者两者兼而有之?
提前致谢
最佳答案
这个问题可以有很多答案,但我会尝试简化它。您的系统基本上是一个消息处理系统。
您想要的是可以在任何给定时间点获取消息,并且需要在消息到达时执行它们。
您的消息的入口点是您提到的队列。
现在您想要的是能够始终在队列中查找并在消息到达时对其进行处理的东西。
让我们先来一个非常简单的程序:
- 您的程序中有一部分不断向 API 请求消息。
- 消息一到达,您就将其放入队列中。
- 某些东西不断地在 while(true) 循环中循环队列并处理消息,因为它可以看到队列中添加了某些内容。
您的程序将永远运行,因为您将实例化 2 个独立的线程,一个将轮询 API,另一个将轮询队列。这意味着您的程序将永远运行(阅读有关线程的更多信息)
现在,这种方法很糟糕,并且有其缺点,例如,如果我们收到数百万条消息,但您无法全部使用它们,导致您的工作人员填满并最终因内存不足错误而导致整个程序崩溃。 另一个问题是您不断地轮询队列,浪费了 cpu 周期。如果队列中有 100 条消息并且有人刚刚终止了您的程序怎么办?你会失去它们。
让我们采用另一种方法:
- 您不断向 API 请求消息。
- 一旦收到消息,就将其放入队列中,但这次我们确保队列处于阻塞状态。也就是说,您指定队列的大小,只要达到指定的容量,队列就不会接受更多消息,因此它将对您的 API 服务施加反压,最终阻塞它(使其变慢)。
- 此阻塞队列可以是处理它们的线程池的一部分。基本上,您将消息提交到 ThreadPool,ThreadPool 内部有一个您指定大小的阻塞队列,ThreadPool 将在单独的线程上执行任务(在此处了解有关 ThreadPools 的更多信息 Thread Pools)基本上,线程池为您管理线程的创建和销毁,并有一个内部队列,您可以在其中提交任务。
通过这种方法,您解决了方法 1 中遇到的许多问题。假设您的阻塞队列大小为 8,那么如果有人来杀死您的机器,您最多只会丢失 8 条消息。这种方法还可以确保您不会因为无限的消息而炸毁您的机器。它还确保您最终不会浪费 CPU 周期来轮询队列,因为现在您在小任务到达时执行它们(事件驱动编程,您收到的每条消息都是一个事件。在此处了解更多信息:Event Driven Programming)
您还需要在程序中添加一个关闭钩子(Hook),以确保消息读取服务停止消费消息,并且线程池关闭执行所有挂起的任务。
我可以在这里添加更多内容,例如将消息读取部分分离到单独的程序中,并将消息放入像 Kafka 这样的 MessagingQueue 中,并将处理部分作为处理消息事件的单独程序。(在此处了解更多信息 Messaging Queues )。
如果您的程序长时间闲置运行,则不会有失败的风险..:) 是的,程序将是并行的,因为您将使用单独的线程处理事件(只要您使用超过 1 个处理线程)
关于java - 需要有关涉及 java 和消息队列的体系结构的输入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47551076/