异步serviceTasks的Activiti Job Executor问题Activiti> = 5.17

问题描述

@H_404_0@请考虑下图

@H_404_0@

enter image description here

@H_404_0@ MyProcess.bpmn

<?xml version="1.0" encoding="UTF-8"?>
<deFinitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:activiti="http://activiti.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC" xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI" typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath" targetNamespace="http://www.activiti.org/test">
  <process id="myProcess" name="My process" isExecutable="true">
    <startEvent id="startevent1" name="Start"></startEvent>
    <userTask id="evl" name="Evaluation"></userTask>
    <boundaryEvent id="timer_event_autocomplete" name="Timer" attachedToRef="evl" cancelActivity="false">
      <timerEventDeFinition>
        <timeDate>PT2S</timeDate>
      </timerEventDeFinition>
    </boundaryEvent>
    <serviceTask id="timer_service" name="Timed Autocomplete" activiti:async="true" activiti:class="com.example.service.TimerService"></serviceTask>
    <serviceTask id="store_docs_service" name="Store Documents" activiti:async="true" activiti:class="com.example.service.StoreDocsService"></serviceTask>
    <sequenceFlow id="flow1" sourceRef="startevent1" targetRef="evl"></sequenceFlow>
    <sequenceFlow id="flow2" sourceRef="timer_event_autocomplete" targetRef="timer_service"></sequenceFlow>
    <sequenceFlow id="flow3" sourceRef="evl" targetRef="store_docs_service"></sequenceFlow>
    <sequenceFlow id="flow4" sourceRef="store_docs_service" targetRef="endevent1"></sequenceFlow>
    <endEvent id="endevent1" name="End"></endEvent>
  </process>

</deFinitions>
@H_404_0@要用语言描述它,有一个用户任务(评估)和一个计时器连接到它(配置为在2秒内触发)。触发计时器后,其Java委托TimerService中的定时自动完成异步服务任务将尝试完成用户任务(评估)。完成用户任务(评估)后,流程移至另一个异步服务任务(存储文档),调用其Java委托StoreDocsService,流程结束。

@H_404_0@ TimerService.java

public class TimerService implements JavaDelegate {
    Logger LOGGER = LoggerFactory.getLogger(TimerService.class);

    @Override
    public void execute(DelegateExecution execution) throws Exception {
        LOGGER.info("*** Executing Timer autocomplete ***");
        Task task = execution.getEngineservices().getTaskService().createTaskQuery().active().singleResult();
        execution.getEngineservices().getTaskService().complete(task.getId());
        LOGGER.info("*** Task: {} autocompleted by timer ***",task.getId());
    }
}
@H_404_0@ StoreDocsService.java

public class StoreDocsService implements JavaDelegate {
    Logger LOGGER = LoggerFactory.getLogger(StoreDocsService.class);

    @Override
    public void execute(DelegateExecution execution) throws Exception {
        LOGGER.info("*** Executing Store Documents ***");
    }
}
@H_404_0@ App.java

public class App
{
    public static void main( String[] args ) throws Exception
    {

//        DefaultAsyncJobExecutor demoAsyncJobExecutor = new DefaultAsyncJobExecutor();
//        demoAsyncJobExecutor.setCorePoolSize(10);
//        demoAsyncJobExecutor.setMaxPoolSize(50);
//        demoAsyncJobExecutor.setKeepAliveTime(10000);
//        demoAsyncJobExecutor.setMaxAsyncJobsDuePerAcquisition(50);

        ProcessEngineConfiguration cfg = new StandaloneProcessEngineConfiguration()
                .setJdbcUrl("jdbc:h2:mem:activiti;DB_CLOSE_DELAY=1000")
                .setJdbcUsername("sa")
                .setJdbcPassword("")
                .setJdbcDriver("org.h2.Driver")
                .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE)
//                .setAsyncExecutorActivate(true)
//                .setAsyncExecutorEnabled(true)
//                .setAsyncExecutor(demoAsyncJobExecutor)
                .setJobExecutorActivate(true)
                ;
        ProcessEngine processEngine = cfg.buildProcessEngine();
        String pName = processEngine.getName();
        String ver = ProcessEngine.VERSION;
        System.out.println("ProcessEngine [" + pName + "] Version: [" + ver + "]");

        RepositoryService repositoryService = processEngine.getRepositoryService();
        Deployment deployment = repositoryService.createDeployment()
                .addClassPathResource("MyProcess.bpmn").deploy();
        ProcessDeFinition processDeFinition = repositoryService.createProcessDeFinitionQuery()
                .deploymentId(deployment.getId()).singleResult();
        System.out.println(
                "Found process deFinition ["
                        + processDeFinition.getName() + "] with id ["
                        + processDeFinition.getId() + "]");

        final Map<String,Object> variables = new HashMap<String,Object>();
        final RuntimeService runtimeService = processEngine.getRuntimeService();

        ProcessInstance id = runtimeService.startProcessInstanceByKey("myProcess",variables);
        System.out.println("Started Process Id: "+id.getId());
        try {
            final TaskService taskService = processEngine.getTaskService();
//            List<Task> tasks = taskService.createTaskQuery().active().list();
//            while (!tasks.isEmpty()) {
//                Task task = tasks.get(0);
//                taskService.complete(task.getId());
//                tasks = taskService.createTaskQuery().active().list();
//            }

        } catch (Exception e) {
            System.out.println(e.getMessage());
        } finally {

        }

        while(!runtimeService.createExecutionQuery().list().isEmpty()) {
        }
        processEngine.close();
    }


}
@H_404_0@ Activiti 5.15

@H_404_0@当计时器触发时,上图将按说明执行。我们使用Activiti的DefaultJobExecutor 正如我们在日志中看到的:

[main] INFO  org.activiti.engine.impl.ProcessEngineImpl  - ProcessEngine default created
[main] INFO  org.activiti.engine.impl.jobexecutor.JobExecutor  - Starting up the JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor].
[Thread-1] INFO  org.activiti.engine.impl.jobexecutor.AcquireJobsRunnable  - JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor] starting to acquire jobs
ProcessEngine [default] Version: [5.15]
[main] INFO  org.activiti.engine.impl.bpmn.deployer.BpmnDeployer  - Processing resource MyProcess.bpmn
Found process deFinition [My process] with id [myProcess:1:3]
Started Process Id: 4
[pool-1-thread-1] INFO  com.example.service.TimerService  - *** Executing Timer autocomplete ***
[pool-1-thread-1] INFO  com.example.service.TimerService  - *** Task: 9 autocompleted by timer ***
[pool-1-thread-1] INFO  com.example.service.StoreDocsService  - *** Executing Store Documents ***
[main] INFO  org.activiti.engine.impl.jobexecutor.JobExecutor  - Shutting down the JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor].
[Thread-1] INFO  org.activiti.engine.impl.jobexecutor.AcquireJobsRunnable  - JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor] stopped job acquisition
@H_404_0@ Activiti> = 5.17 仅将pom.xml中的activiti版本更改为5.17.0及更高版本(测试到5.22.0)并执行相同的代码,该流程执行计时器的Java委托TimerService,从而完成了用户任务(评估),但从未调用过存储文档Java委托StoreDocsService。要添加更多内容,似乎流程永无止境,执行仍然停留在Store Documents异步服务任务上。

@H_404_0@日志:

[main] INFO  org.activiti.engine.impl.ProcessEngineImpl  - ProcessEngine default created
[main] INFO  org.activiti.engine.impl.jobexecutor.JobExecutor  - Starting up the JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor].
[Thread-1] INFO  org.activiti.engine.impl.jobexecutor.AcquireJobsRunnableImpl  - JobExecutor[org.activiti.engine.impl.jobexecutor.DefaultJobExecutor] starting to acquire jobs
ProcessEngine [default] Version: [5.17.0.2]
[main] INFO  org.activiti.engine.impl.bpmn.deployer.BpmnDeployer  - Processing resource MyProcess.bpmn
Found process deFinition [My process] with id [myProcess:1:3]
Started Process Id: 4
[pool-1-thread-2] INFO  com.example.service.TimerService  - *** Executing Timer autocomplete ***
[pool-1-thread-2] INFO  com.example.service.TimerService  - *** Task: 9 autocompleted by timer ***
  • 更改为异步作业执行器。 5.17版本的一项功能是新的异步作业执行程序(但是认的非异步执行程序仍配置为认)。因此,尝试通过以下几行在App.java中启用异步执行器:
        DefaultAsyncJobExecutor demoAsyncJobExecutor = new DefaultAsyncJobExecutor();
        demoAsyncJobExecutor.setCorePoolSize(10);
        demoAsyncJobExecutor.setMaxPoolSize(50);
        demoAsyncJobExecutor.setKeepAliveTime(10000);
        demoAsyncJobExecutor.setMaxAsyncJobsDuePerAcquisition(50);

        ProcessEngineConfiguration cfg = new StandaloneProcessEngineConfiguration()
                .setJdbcUrl("jdbc:h2:mem:activiti;DB_CLOSE_DELAY=1000")
                .setJdbcUsername("sa")
                .setJdbcPassword("")
                .setJdbcDriver("org.h2.Driver")
                .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE)
                .setAsyncExecutorActivate(true)
                .setAsyncExecutorEnabled(true)
                .setAsyncExecutor(demoAsyncJobExecutor)
                ;
@H_404_0@该流程似乎正确执行,在StoreDocsService之后调用TimerService,但是它永远不会结束(while(!runtimeService.createExecutionQuery().list().isEmpty())中的App.java语句始终为真)!

@H_404_0@日志:

[main] INFO  org.activiti.engine.impl.ProcessEngineImpl  - ProcessEngine default created
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Starting up the default async job executor [org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor].
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Creating thread pool queue of size 100
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Creating executor service with corePoolSize 10,maxPoolSize 50 and keepAliveTime 10000
[Thread-1] INFO  org.activiti.engine.impl.asyncexecutor.AcquireTimerJobsRunnable  - {} starting to acquire async jobs due
[Thread-2] INFO  org.activiti.engine.impl.asyncexecutor.AcquireAsyncJobsDueRunnable  - {} starting to acquire async jobs due
ProcessEngine [default] Version: [5.17.0.2]
[main] INFO  org.activiti.engine.impl.bpmn.deployer.BpmnDeployer  - Processing resource MyProcess.bpmn
Found process deFinition [My process] with id [myProcess:1:3]
Started Process Id: 4
[pool-1-thread-2] INFO  com.example.service.TimerService  - *** Executing Timer autocomplete ***
[pool-1-thread-2] INFO  com.example.service.TimerService  - *** Task: 9 autocompleted by timer ***
[pool-1-thread-3] INFO  com.example.service.StoreDocsService  - *** Executing Store Documents ***
@H_404_0@ !!!!更新!!!

@H_404_0@ Activiti 6.0.0

@H_404_0@尝试了相同的情况,但使用的是Activiti版本6.0.0TimerService中需要进行的更改无法从Engineservices获取DelegateExecution

public class TimerService implements JavaDelegate {
    Logger LOGGER = LoggerFactory.getLogger(TimerService.class);

    @Override
    public void execute(DelegateExecution execution) {
        LOGGER.info("*** Executing Timer autocomplete ***");
        Task task = Context.getProcessEngineConfiguration().getTaskService().createTaskQuery().active().singleResult();
        Context.getProcessEngineConfiguration().getTaskService().complete(task.getId());
//        Task task = execution.getEngineservices().getTaskService().createTaskQuery().active().singleResult();
//        execution.getEngineservices().getTaskService().complete(task.getId());
        LOGGER.info("*** Task: {} autocompleted by timer ***",task.getId());
    }
}
@H_404_0@,此版本仅具有异步执行器,因此ProcessEngineConfiguration中的App.java更改为:

        ProcessEngineConfiguration cfg = new StandaloneProcessEngineConfiguration()
                .setJdbcUrl("jdbc:h2:mem:activiti;DB_CLOSE_DELAY=1000")
                .setJdbcUsername("sa")
                .setJdbcPassword("")
                .setJdbcDriver("org.h2.Driver")
                .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE)
                .setAsyncExecutorActivate(true)
//                .setAsyncExecutorEnabled(true)
//                .setAsyncExecutor(demoAsyncJobExecutor)
//                .setJobExecutorActivate(true)
                ;
@H_404_0@使用6.0.0版本和异步执行程序,该过程成功完成,如我们在日志中所见:

[main] INFO  org.activiti.engine.impl.ProcessEngineImpl  - ProcessEngine default created
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Starting up the default async job executor [org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor].
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Creating thread pool queue of size 100
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Creating executor service with corePoolSize 2,maxPoolSize 10 and keepAliveTime 5000
[Thread-1] INFO  org.activiti.engine.impl.asyncexecutor.AcquireAsyncJobsDueRunnable  - {} starting to acquire async jobs due
[Thread-2] INFO  org.activiti.engine.impl.asyncexecutor.AcquireTimerJobsRunnable  - {} starting to acquire async jobs due
[Thread-3] INFO  org.activiti.engine.impl.asyncexecutor.ResetExpiredJobsRunnable  - {} starting to reset expired jobs
ProcessEngine [default] Version: [6.0.0.4]
Found process deFinition [My process] with id [myProcess:1:3]
Started Process Id: 4
[activiti-async-job-executor-thread-2] INFO  com.example.service.TimerService  - *** Executing Timer autocomplete ***
[activiti-async-job-executor-thread-2] INFO  com.example.service.TimerService  - *** Task: 10 autocompleted by timer ***
[activiti-async-job-executor-thread-2] INFO  com.example.service.StoreDocsService  - *** Executing Store Documents ***
[main] INFO  org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor  - Shutting down the default async job executor [org.activiti.engine.impl.asyncexecutor.DefaultAsyncJobExecutor].
[activiti-reset-expired-jobs] INFO  org.activiti.engine.impl.asyncexecutor.ResetExpiredJobsRunnable  - {} stopped resetting expired jobs
[activiti-acquire-timer-jobs] INFO  org.activiti.engine.impl.asyncexecutor.AcquireTimerJobsRunnable  - {} stopped async job due acquisition
[activiti-acquire-async-jobs] INFO  org.activiti.engine.impl.asyncexecutor.AcquireAsyncJobsDueRunnable  - {} stopped async job due acquisition

Process finished with exit code 0
@H_404_0@ 2个问题:

  1. 我们已从Activiti 5.15升级5.22.0,并且我们不使用异步作业执行器。有什么方法可以使这张图的功能保持其在5.15中的行为?
  2. 如果不可避免地要切换到异步作业执行器,那么为了使此过程成功完成,我们缺少什么呢?
@H_404_0@上述示例项目可以在https://github.com/pleft/DemoActiviti

中找到

解决方法

如果没有明确回答需要设置环境和调试的问题,我建议您至少迁移到Activiti 6。 Activiti的5.x分支已经维护了5年以上,实际上已经失效。 由于核心开发人员都已转移到“ Flowable”项目,因此即使是6.x系列也已被放弃。 如果您选择使用Activiti 5.x,则可以选择:

  1. 自己维护代码库(并希望将所做的任何更改/改进都归还给项目)。
  2. 签订Activiti支持服务合同。有两家供应商提供此类服务。