我有一个Job Distributor
,他在不同的Channels
上发布消息。
此外,我希望有两个(将来会更多)Consumers
来处理不同的任务并在不同的机器上运行。 (当前我只有一个,并且需要扩展它)
让我们将这些任务命名为示例(仅作为示例):
FIBONACCI
(生成斐波那契数字)RANDOMBOOKS
(生成随机句子以写书)这些任务最多需要2-3个小时,因此应该平均分配给每个
Consumer
。每个使用者都可以有
x
并行线程来执行这些任务。所以我说:(这些数字仅是示例,将被变量替换)
FIBONACCI
和5个并行作业用于RANDOMBOOKS
FIBONACCI
,并消耗3个并行作业用于RANDOMBOOKS
我怎样才能做到这一点?
我是否必须为每个
x
启动Channel
线程才能在每个Consumer
上进行监听?我什么时候必须确认?
我目前仅使用一个
Consumer
的方法是:启动每个任务的x
线程-每个线程都是一个实现Runnable
的Defaultconsumer。在handleDelivery
方法中,我调用basicAck(deliveryTag,false)
,然后完成工作。此外:我想将一些任务发送给特殊使用者。如何结合上述公平分配来实现这一目标?
这是我的
publishing
的代码String QUEUE_NAME = "FIBONACCI";
Channel channel = this.clientManager.getRabbitMQConnection().createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.basicPublish("", QUEUE_NAME,
MessageProperties.BASIC,
Control.getBytes(this.getArgument()));
channel.close();
这是我的
Consumer
的代码public final class Worker extends DefaultConsumer implements Runnable {
@Override
public void run() {
try {
this.getChannel().queueDeclare(this.jobType.toString(), true, false, false, null);
this.getChannel().basicConsume(this.jobType.toString(), this);
this.getChannel().basicQos(1);
} catch (IOException e) {
// catch something
}
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Control.getLogger().error("Exception!", e);
}
}
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] bytes) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
this.getChannel().basicAck(deliveryTag, false); // Is this right?
// Start new Thread for this task with my own ExecutorService
}
}
在这种情况下,类
Worker
会启动两次:一次是FIBUNACCI
,一次是RANDOMBOOKS
更新
正如答案所表明的那样,RabbitMQ并不是最佳解决方案,但是Couchbase或MongoDB拉取方法将是最佳选择。我是那些系统的新手,是否有人可以向我解释这将如何实现?
最佳答案
这是我将如何在沙发上构建此概念的观点。
因此,总而言之,每个工作人员都会查询孤立的作业(如果有),依次检查是否有针对他们的锁定文件,如果没有,则创建一个锁定文件,并遵循上述正常锁定协议(protocol)。如果没有孤立的作业,则它将查找过期的作业,并遵循锁定协议(protocol)。如果没有过期的作业,则它仅接受最旧的作业并遵循锁定协议(protocol)。
当然,如果您的系统没有“过期”之类的东西,那么这也将起作用,如果及时性无关紧要,那么您可以使用另一种方法来代替最老的工作。
另一种方法可能是创建一个介于1-N之间的随机值,其中N是一个相当大的数字,例如为 worker 数量的4倍,并使用该值标记每个作业。每次 worker 去找工作时,它都可以掷骰子,看看是否有该数字的工作。如果没有,它将再次这样做,直到找到具有该编号的工作。这样,与其分散多个 worker 争夺少数“最老”或最高优先级的工作以及更多的锁争用,不如将它们分散出去……以时间为代价,因为队列比FIFO情况更加随机。
随机方法也可以应用在必须承受负载值的情况下(这样一台机器不会承受太大的负载),而不是采用最老的候选方法,而是采用以下形式的随机候选方法:可行的工作 list ,并尝试做到这一点。
编辑以添加:
在步骤12中,我说“可能放入一个随机数”是指,如果 worker 知道优先级(例如:哪个人最需要完成工作),他们可以将代表该数字的数字放入文件中。如果没有“需要”工作的概念,那么他们都可以掷骰子。他们使用骰子的角色更新此文件。然后他们两个都可以看一下,看看对方滚动了什么。如果他们输了,他们就会平底锅,而另一名 worker 知道它有。这样,您无需很多复杂的协议(protocol)或协商就可以确定哪个 worker 上类了。我假设两个工作人员都在这里命中相同的锁文件,可以用两个锁文件和一个查找所有锁文件的查询来实现。如果经过一段时间后,没有一个 worker 调高了工作量(而新 worker 想着自己的工作会知道其他人已经在滚动工作,那么他们会跳过它)您可以放心地工作,知道您是唯一的 worker 正在努力。
关于java - Java和RabbitMQ-排队和多线程-或Couchbase作为作业队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12277067/