java - Java和RabbitMQ-排队和多线程-或Couchbase作为作业队列

标签 java multithreading mongodb rabbitmq couchbase

我有一个Job Distributor,他在不同的Channels上发布消息。

此外,我希望有两个(将来会更多)Consumers来处理不同的任务并在不同的机器上运行。 (当前我只有一个,并且需要扩展它)

让我们将这些任务命名为示例(仅作为示例):

  • FIBONACCI(生成斐波那契数字)
  • RANDOMBOOKS(生成随机句子以写书)

  • 这些任务最多需要2-3个小时,因此应该平均分配给每个Consumer

    每个使用者都可以有x 并行线程来执行这些任务。
    所以我说:(这些数字仅是示例,将被变量替换)
  • 机器1可以消耗3个并行作业作为FIBONACCI和5个并行作业用于RANDOMBOOKS
  • 机器2可以消耗7个并行作业作为FIBONACCI,并消耗3个并行作业用于RANDOMBOOKS

  • 我怎样才能做到这一点?

    我是否必须为每个x启动Channel线程才能在每个Consumer上进行监听?

    我什么时候必须确认?

    我目前仅使用一个Consumer的方法是:启动每个任务的x线程-每个线程都是一个实现Runnable的Defaultconsumer。在handleDelivery方法中,我调用basicAck(deliveryTag,false),然后完成工作。

    此外:我想将一些任务发送给特殊使用者。如何结合上述公平分配来实现这一目标?

    这是我的publishing的代码
    String QUEUE_NAME = "FIBONACCI";
    
    Channel channel = this.clientManager.getRabbitMQConnection().createChannel();
    
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
    channel.basicPublish("", QUEUE_NAME,
                    MessageProperties.BASIC,
                    Control.getBytes(this.getArgument()));
    
    channel.close();
    

    这是我的Consumer的代码
    public final class Worker extends DefaultConsumer implements Runnable {
        @Override
        public void run() {
    
            try {
                this.getChannel().queueDeclare(this.jobType.toString(), true, false, false, null);
                this.getChannel().basicConsume(this.jobType.toString(), this);
    
                this.getChannel().basicQos(1);
            } catch (IOException e) {
                // catch something
            }
            while (true) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Control.getLogger().error("Exception!", e);
                }
    
            }
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] bytes) throws IOException {
            String routingKey = envelope.getRoutingKey();
            String contentType = properties.getContentType();
            this.getChannel().basicAck(deliveryTag, false); // Is this right?
            // Start new Thread for this task with my own ExecutorService
    
        }
    }
    

    在这种情况下,类Worker会启动两次:一次是FIBUNACCI,一次是RANDOMBOOKS
    更新

    正如答案所表明的那样,RabbitMQ并不是最佳解决方案,但是Couchbase或MongoDB拉取方法将是最佳选择。我是那些系统的新手,是否有人可以向我解释这将如何实现?

    最佳答案

    这是我将如何在沙发上构建此概念的观点。

  • 您有一定数量的机器来处理作业,并且一定数量的机器(也许是相同的)创建要执行的作业。
  • 您可以在沙发基础上的存储桶中为每个作业创建一个文档(并将其类型设置为“job”,或者将其与该存储桶中的其他数据混合使用)。
  • 每个作业描述以及要执行的特定命令都可以包括其创建时间,到期时间(如果有特定的到期时间)以及某种生成的工作值。该功数值可以是任意单位。
  • 每个工作使用者都会知道一次可以处理多少个工作单位,以及有多少个可用工作单位(因为其他 worker 可能正在工作。)
  • 因此,一台容量为10个工作单元且已完成6个工作单元的机器将执行查询以查找4个工作单元或更少的工作。
  • 在couchbase中,有一些 View 是增量更新的map/reduce作业,我认为您只需要在map阶段。您将编写一个 View ,使您可以按时查询到期时间,输入到系统中的时间以及工作单位数量。这样,您可以获得“4个工作单元或更少的工作中最过期的工作”。
  • 随着容量的释放,这种查询将首先获得最多的过期工作,尽管您可以获得最大的过期工作,如果没有,则将获得最大的未过期工作。 (其中“过期”是当前时间与作业的到期日期之间的差额。)
  • Couchbase View 允许像这样的非常复杂的查询。而且,尽管它们是增量更新的,但它们并不是完全实时的。因此,您不会只寻找一份工作,而是寻找一份求职者列表(可根据需要订购)。
  • 因此,下一步将是获取求职者列表并检查第二个位置-可能是membase存储桶(例如:RAM Cache,非持久性)以查找锁定文件。锁定文件将具有多个阶段(在这里,您可以使用CRDT或任何最适合您需要的方法来做一些分区解析逻辑。)
  • 由于此存储桶基于ram,因此它比 View 快,并且与总状态的滞后性也较小。如果没有锁定文件,则创建一个状态标志为“临时”的文件。
  • 如果另一个工作人员获得了相同的工作并看到了锁定文件,则它可以跳过该应聘者并执行列表中的下一个。
  • 如果,两个工作人员以某种方式尝试为同一作业创建锁定文件,则会发生冲突。如果发生冲突,您可以平底锅。或者,您可以具有使每个工作人员都更新锁定文件的逻辑(CRDT分辨率,以便使这些幂等,以便可以将同级合并),可能放入一个随机数或某个优先级数字。
  • 在指定的时间段(可能是几秒钟)后,工作人员将检查锁定文件,并且如果不必进行任何种族分辨率更改,它会将锁定文件的状态从“临时”更改为“已拍摄”
  • 然后,它使用“已采取”或诸如此类的状态来更新作业本身,这样当其他 worker 正在寻找可用的作业时,它就不会显示在 View 中。
  • 最后,您需要添加另一步,在执行查询以获取我上面介绍的这些求职者之前,您将执行一个特殊查询以查找已采取的工作,但所涉及的工作人员已死亡。 (例如:过期的工作)。
  • 知道工作人员何时死亡的一种方法是,放入membase存储桶中的锁定文件应具有到期时间,这将导致该文件最终消失。可能这一次时间可能很短,工作人员只需触摸它即可更新到期时间(在ouchabase API中受支持)
  • 如果 worker 死亡,最终其锁定文件将消失,孤立的作业将被标记为“已采取”,但没有锁定文件,这是寻找工作的 worker 可以寻找的条件。

  • 因此,总而言之,每个工作人员都会查询孤立的作业(如果有),依次检查是否有针对他们的锁定文件,如果没有,则创建一个锁定文件,并遵循上述正常锁定协议(protocol)。如果没有孤立的作业,则它将查找过期的作业,并遵循锁定协议(protocol)。如果没有过期的作业,则它仅接受最旧的作业并遵循锁定协议(protocol)。

    当然,如果您的系统没有“过期”之类的东西,那么这也将起作用,如果及时性无关紧要,那么您可以使用另一种方法来代替最老的工作。

    另一种方法可能是创建一个介于1-N之间的随机值,其中N是一个相当大的数字,例如为 worker 数量的4倍,并使用该值标记每个作业。每次 worker 去找工作时,它都可以掷骰子,看看是否有该数字的工作。如果没有,它将再次这样做,直到找到具有该编号的工作。这样,与其分散多个 worker 争夺少数“最老”或最高优先级的工作以及更多的锁争用,不如将它们分散出去……以时间为代价,因为队列比FIFO情况更加随机。

    随机方法也可以应用在必须承受负载值的情况下(这样一台机器不会承受太大的负载),而不是采用最老的候选方法,而是采用以下形式的随机候选方法:可行的工作 list ,并尝试做到这一点。

    编辑以添加:

    在步骤12中,我说“可能放入一个随机数”是指,如果 worker 知道优先级(例如:哪个人最需要完成工作),他们可以将代表该数字的数字放入文件中。如果没有“需要”工作的概念,那么他们都可以掷骰子。他们使用骰子的角色更新此文件。然后他们两个都可以看一下,看看对方滚动了什么。如果他们输了,他们就会平底锅,而另一名 worker 知道它有。这样,您无需很多复杂的协议(protocol)或协商就可以确定哪个 worker 上类了。我假设两个工作人员都在这里命中相同的锁文件,可以用两个锁文件和一个查找所有锁文件的查询来实现。如果经过一段时间后,没有一个 worker 调高了工作量(而新 worker 想着自己的工作会知道其他人已经在滚动工作,那么他们会跳过它)您可以放心地工作,知道您是唯一的 worker 正在努力。

    关于java - Java和RabbitMQ-排队和多线程-或Couchbase作为作业队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12277067/

    相关文章:

    java - 大十进制类中的意外输出,java

    java - AsyncTask的onPostExecuted什么时候执行?

    mongodb - docker 中的 mongodump 不创建任何文件

    mongodb - 在 vanilla GraphQL 中实现分页

    java - 如何将 XML 转换为 List<Map>

    java - 一个线程写数据不读,多线程读数据不写

    java - 将 AmCharts 导出为 PNG、JPG 等

    java - Java中如何从另一个线程访问ThreadLocal中的信息

    .net - ReaderWriterLockSlim(不是 slim )替换,递归,可升级读取?

    mysql - Mongodb 每周计数行数