java - Activiti 并行服务任务

标签 java activiti

我正在尝试在 Activiti 中实现两个应该并行运行的服务任务。下面编写的代码可以随机(有趣地)正常工作。

我的意思是它偶尔只打印“first”(或“second”)或者它打印两个“first”一个“second”等。

问题:如何使这些服务不断并行运行;无论当前运行的服务数量如何?

PS:当我删除activiti:async="true"从流程定义来看,它只打印了“first”或“second”。我想我需要那个:)

流程定义

<?xml version='1.0' encoding='UTF-8'?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:activiti="http://activiti.org/bpmn" targetNamespace="Examples">

    <process id='testparallelact' name="Developer Hiring" isExecutable="true" activiti:exclusive="false" activiti:async="true">

        <startEvent id="theStart" />
        <sequenceFlow id="flow1" sourceRef="theStart" targetRef="fork" />

        <parallelGateway id="fork"  activiti:async="true" />
        <sequenceFlow sourceRef="fork" targetRef="receivePayment" />
        <sequenceFlow sourceRef="fork" targetRef="shipOrder" />


        <serviceTask id="receivePayment" name="Receive Payment" activiti:async="true" activiti:exclusive="false"
activiti:expression="${serviceConnections.runThis2('First')}"/>

        <sequenceFlow sourceRef="receivePayment" targetRef="join" />


        <serviceTask id="shipOrder" name="Ship Order" activiti:async="true" activiti:exclusive="false"
activiti:expression="${serviceConnections.runThis2('Second')}"/>

        <sequenceFlow sourceRef="shipOrder" targetRef="join" />

        <parallelGateway id="join" />
        <sequenceFlow sourceRef="join" targetRef="theEnd" />
        <endEvent id="theEnd" />
    </process>
</definitions> 

流程定义的图形渲染

Graphical rendering of process definition

“runThis2”的代码
public void runThis2(String test1) throws InterruptedException {            
    while(true)
    {
        Thread.sleep(1000);
        System.out.println(test1);              
    }           
}

最佳答案

“异步”和“独占”标志的组合很重要

了解作业是如何在 Activiti 引擎中执行的很重要。
以下论坛主题很好地描述了它:

https://community.alfresco.com/thread/221453-multiinstance-wont-run-task-in-parallel

2013-10-25 的原始 Activiti 架构师之一 Tijs Rademakers 的关键摘录:

The parallel gateway and multiinstance constructs are able to run multiple user tasks in parallel for example. But for service and script tasks they are basically executed serial still. Async can change this behavior if you also set exclusive to false (the default is true). Then the job executor will just execute all jobs available and not serially. So give it a try to set async to true and exclusive to false.



现在,通过设置 activiti:async="true"activiti:exclusive="false" ,您实际上所做的是通过将服务任务(通常是串行处理)分配给 Job Executor 来在流程中创建“等待状态”。

但:
  • 有多少作业并行执行,
  • 它们由 Job Scheduler
  • 执行

    现在完全由 Job Executor 配置控制。 (线程池大小、超时时间、并发作业数、作业 block 大小均可配置。)

    现在,这并不是您所期望的,这意味着,这取决于您的作业队列的大小、一次扫描中执行了多少作业以及每个作业的持续时间以及何时执行服务任务。意思是,它们可能并行执行,也可能串行执行。同样,您无法控制它们的顺序,因为再次由 Job Executor 来确定它做什么以及何时执行。

    好的,假设这符合您的要求...

    乐观锁定概念

    ...您可能还会遇到另一个问题(事实上,这就是 activiti:exclusive 标志首先被引入的原因)。服务任务完成后,执行上下文将提交到数据库中的流程实例记录以及历史记录。 Activiti 出于性能目的使用“乐观锁定”记录。

    现在,如果您的流程分支在时间上彼此相对接近完成,那么您有可能(实际上很有可能)将收到 Optimistic Locking Exception在数据库更新上,如下所示:

    09:59:52,432 [flowable-async-job-executor-thread-2] ERROR org.flowable.job.service.impl.asyncexecutor.DefaultAsyncRunnableExecutionExceptionHandler - Job 12575 failed org.flowable.engine.common.api.FlowableOptimisticLockingException: ProcessInstance[12567] was updated by another transaction concurrently



    (注意:上面的错误实际上不是来自 Activiti,而是来自一个名为“Flowable”的项目。但是,在最初提出这个问题时,它们都具有与 Activiti 6 基本相同的代码库。(2017 年 11 月)。)

    这将导致服务任务被标记为 FAILED,它将是 再次尝试 .如果您对 SOR(记录系统)或其他遗留系统进行外部调用,这可能会出现问题。 (考虑一下如果您的航类实际上已成功预订,但由于被认为失败而再次发出预订电话,会发生什么情况。)

    所有有趣的事情以及所有可以通过良好的设计和使用最佳实践来解决的事情。

    希望这可以帮助您了解正在发生的事情。

    格雷格@BP3

    进一步阅读

    Alfresco 论坛帖子包含几个死链接。以下是实时链接。

    死问题跟踪器链接
  • 死者:http://jira.codehaus.org/browse/ACT-1814 =>
  • 活着:https://activiti.atlassian.net/browse/ACT-1814
  • 死者:http://jira.codehaus.org/browse/ACT-2126 =>
  • 活着:https://activiti.atlassian.net/browse/ACT-2126

  • 附加说明:Activiti 目前(2019 年)不再使用这两个(codehaus.org 或 atlassian.net)跟踪器。相反,他们使用这个 GitHub 跟踪器:https://github.com/Activiti/Activiti/issues

    死亡常见问题解答链接
  • 死者:http://activiti.org/faq.html#WhatIsTheDifferenceBetweenProcessConcurrencyAndJavaConcurrency =>
  • 活着:https://web.archive.org/web/20170830013058/https://www.activiti.org/about

  • Activity 标志
  • activiti:async标志:https://www.activiti.org/userguide/#asyncContinuations
  • activiti:exclusive标志:https://www.activiti.org/userguide/#exclusiveJobs
  • 开发人员 Joram Barrez 对使用 async=true 和 Exclusive=false 的评论:https://community.alfresco.com/thread/220483-what-is-activitiexclusivefalse-mean

  • 卡蒙达手册
  • “Camunda”是另一个“Activiti”分支。所以两者的技术细节不会完全相同。然而,在 Camunda 手册中有一个关于事务和“乐观锁定”概念的很好的一般性讨论:
  • https://docs.camunda.org/manual/latest/user-guide/process-engine/transactions-in-processes/#optimistic-locking
  • https://docs.camunda.org/manual/latest/user-guide/process-engine/the-job-executor/#exclusive-jobs
  • 关于java - Activiti 并行服务任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47301783/

    相关文章:

    java - 基于 XML 的调用流程 - JBPM JPDL 替代方案

    java - activiti 6 中表达中使用的未知方法

    java - SimpleDateFormat 中未找到方法错误

    java - Java 中列表如何保存对象引用/值?

    java - 为什么我的嵌套 if 语句没有将 int 变量向上迭代 1?

    java - 无法自动连接 activiti RuntimeService

    java - ImageView的onClick方法

    java - 如果发生异常,使用 newBufferedReader 打开的文件是否会关闭?

    java - Alfresco - 获取附件的方法