Skip to content

Commit 88a452c

Browse files
committed
fix NotifyNode
1 parent 61d86aa commit 88a452c

8 files changed

Lines changed: 128 additions & 14 deletions

File tree

flow-engine-framework/src/main/java/com/codingapi/flow/node/BaseAuditNode.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ public List<FlowRecord> generateCurrentRecords(FlowSession session) {
135135
for (int order = 0; order < operators.size(); order++) {
136136
IFlowOperator operator = operators.get(order);
137137
FlowRecord flowRecord = new FlowRecord(session.updateSession(operator), order);
138+
flowRecord.cleanAction();
138139
records.add(flowRecord);
139140
}
140141
if (operators.size() > 1) {

flow-engine-framework/src/main/java/com/codingapi/flow/node/BaseFlowNode.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,13 +213,14 @@ public boolean isWaitRecordMargeParallelNode(FlowSession session) {
213213
IRepositoryHolder repositoryHolder = session.getRepositoryHolder();
214214
FlowRecord currentRecord = session.getCurrentRecord();
215215
if (currentRecord != null && this.getId().equals(currentRecord.getParallelBranchNodeId())) {
216-
repositoryHolder.addParallelTriggerCount(currentRecord.getParallelId());
216+
String parallelId = currentRecord.getParallelId();
217+
repositoryHolder.addParallelTriggerCount(parallelId);
217218
int parallelBranchTotal = currentRecord.getParallelBranchTotal();
218-
int parallelBranchCount = repositoryHolder.getParallelBranchTriggerCount(currentRecord.getParallelId());
219+
int parallelBranchCount = repositoryHolder.getParallelBranchTriggerCount(parallelId);
219220
if (parallelBranchCount == parallelBranchTotal) {
220221
// 清空并行节点,防止数据继续继承到后续节点
221222
currentRecord.clearParallel();
222-
repositoryHolder.clearParallelTriggerCount(currentRecord.getParallelId());
223+
repositoryHolder.clearParallelTriggerCount(parallelId);
223224
}
224225
return parallelBranchCount != parallelBranchTotal;
225226
}

flow-engine-framework/src/main/java/com/codingapi/flow/node/nodes/NotifyNode.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.codingapi.flow.session.IRepositoryHolder;
1414
import com.codingapi.flow.strategy.node.*;
1515
import com.codingapi.flow.utils.RandomUtils;
16+
import org.springframework.util.StringUtils;
1617

1718
import java.util.ArrayList;
1819
import java.util.List;
@@ -41,11 +42,15 @@ public NotifyNode() {
4142

4243
@Override
4344
public boolean handle(FlowSession session) {
44-
IRepositoryHolder repositoryHolder = session.getRepositoryHolder();
45-
if (this.isWaitRecordMargeParallelNode(session)) {
46-
return false;
45+
FlowRecord flowRecord = session.getCurrentRecord();
46+
String parallelId = flowRecord.getParallelId();
47+
if(StringUtils.hasText(parallelId)) {
48+
if (this.isWaitRecordMargeParallelNode(session)) {
49+
return false;
50+
}
4751
}
48-
List<FlowRecord> records = this.generateCurrentRecords(session);
52+
IRepositoryHolder repositoryHolder = session.getRepositoryHolder();
53+
List<FlowRecord> records = this.generateCurrentRecords0(session);
4954
for (FlowRecord record : records) {
5055
this.fillNewRecord(session, record);
5156
}
@@ -63,14 +68,18 @@ public boolean isFinish(FlowSession session) {
6368
return true;
6469
}
6570

71+
@Override
72+
public List<FlowRecord> generateCurrentRecords(FlowSession session) {
73+
return new ArrayList<>();
74+
}
75+
6676
/**
6777
* 生成当前节点的记录
6878
*
6979
* @param session 触发会话
7080
* @return 生成当前节点的记录
7181
*/
72-
@Override
73-
public List<FlowRecord> generateCurrentRecords(FlowSession session) {
82+
public List<FlowRecord> generateCurrentRecords0(FlowSession session) {
7483
List<FlowRecord> records = new ArrayList<>();
7584
NodeStrategyManager nodeStrategyManager = this.strategyManager();
7685
OperatorManager operatorManager = nodeStrategyManager.loadOperators(session);

flow-engine-framework/src/main/java/com/codingapi/flow/pojo/response/ProcessNode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ public static class FlowOperatorBody {
247247
public FlowOperatorBody(FlowRecord flowRecord, IFlowOperator flowOperator) {
248248
this.advice = flowRecord.getAdvice();
249249
this.signKey = flowRecord.getSignKey();
250-
this.approveTime = flowRecord.getCreateTime();
250+
this.approveTime = flowRecord.getUpdateTime();
251251
this.actionName = flowRecord.getActionName();
252252
this.actionType = flowRecord.getActionType();
253253
this.flowOperator = flowOperator;

flow-engine-framework/src/main/java/com/codingapi/flow/record/FlowRecord.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,13 @@ public FlowRecord(FlowSession flowSession, int nodeOrder) {
333333
}
334334

335335

336+
public void cleanAction(){
337+
this.actionId = null;
338+
this.actionType = null;
339+
this.actionName = null;
340+
}
341+
342+
336343
/**
337344
* 继承记录
338345
*

flow-engine-framework/src/main/java/com/codingapi/flow/service/impl/FlowProcessNodeService.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -295,14 +295,40 @@ private List<FlowRecord> getNextRecords(long formId) {
295295
public void fetch(long formId) {
296296
List<FlowRecord> batchList = this.getNextRecords(formId);
297297
if (!batchList.isEmpty()) {
298-
this.consumer.accept(batchList.stream().map(record -> new ProcessNode.FlowRecordOperator(record, flowOperatorGateway.getFlowOperator(record.getCurrentOperatorId()))).toList());
298+
// 根据nodeId 进行分组,不同的分组的要分开执行
299299

300-
for (FlowRecord item : batchList) {
301-
this.fetch(item.getId());
300+
Map<String,List<FlowRecord>> groupList = this.loadGroupList(batchList);
301+
for(List<FlowRecord> group:groupList.values()) {
302+
303+
this.consumer.accept(group.stream().map(record -> new ProcessNode.FlowRecordOperator(record, flowOperatorGateway.getFlowOperator(record.getCurrentOperatorId()))).toList());
304+
305+
for (FlowRecord item : group) {
306+
this.fetch(item.getId());
307+
}
302308
}
303309
}
304310
}
305311

312+
313+
private Map<String, List<FlowRecord>> loadGroupList(List<FlowRecord> recordList) {
314+
Map<String, List<FlowRecord>> groupList = new HashMap<>();
315+
316+
for (FlowRecord flowRecord : recordList) {
317+
String nodeId = flowRecord.getNodeId();
318+
319+
List<FlowRecord> list = groupList.get(nodeId);
320+
if (list == null) {
321+
list = new ArrayList<>();
322+
}
323+
list.add(flowRecord);
324+
325+
groupList.put(nodeId, list);
326+
}
327+
328+
return groupList;
329+
}
330+
331+
306332
}
307333

308334
}

flow-engine-framework/src/test/java/com/codingapi/flow/service/FlowParallelServiceTest.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.codingapi.flow.service;
22

33
import com.codingapi.flow.action.IFlowAction;
4+
import com.codingapi.flow.action.actions.PassAction;
45
import com.codingapi.flow.builder.FormFieldPermissionsBuilder;
56
import com.codingapi.flow.builder.NodeStrategyBuilder;
67
import com.codingapi.flow.context.GatewayContext;
@@ -306,4 +307,72 @@ void parallelAndParallel() {
306307
assertEquals(7, records.stream().filter(FlowRecord::isFinish).toList().size());
307308

308309
}
310+
311+
312+
@Test
313+
void parallelTest(){
314+
User user = new User(1, "user");
315+
factory.userGateway.save(user);
316+
317+
String json = """
318+
{"updatedTime":"1779115566400","code":"3MgPSwshbC","nodes":[{"view":"default","strategies":[{"script":"// @SCRIPT_TITLE 你有一条待办\\ndef run(request){\\n return '你有一条待办'\\n}\\n","strategyType":"NodeTitleStrategy"},{"strategyType":"FormFieldPermissionStrategy","fieldPermissions":[{"formCode":"leave","fieldCode":"desc","type":"READ"}]},{"enable":true,"type":"REVOKE_CURRENT","strategyType":"RevokeStrategy"}],"display":true,"name":"开始节点","id":"U7TBIwMsBqtPMlLM2X","type":"START","actions":[{"enable":true,"display":{"title":"通过"},"id":"A48hmH57atyK2inSb6","type":"PASS","title":"通过"},{"enable":true,"display":{"title":"保存"},"id":"WnkyxEDBEMnLtphCQw","type":"SAVE","title":"保存"}],"order":"0"},{"strategies":[],"blocks":[{"strategies":{"$ref":"$.nodes[1].strategies"},"blocks":[{"view":"default","strategies":[{"timeoutTime":"86400000","type":"REMIND","strategyType":"TimeoutStrategy"},{"type":"SEQUENCE","percent":"0.0","strategyType":"MultiOperatorAuditStrategy"},{"type":"MANUAL_PASS","strategyType":"SameOperatorAuditStrategy"},{"enable":false,"strategyType":"RecordMergeStrategy"},{"type":"RESUME","strategyType":"ResubmitStrategy"},{"signRequired":false,"adviceRequired":false,"strategyType":"AdviceStrategy"},{"script":"// @SCRIPT_TITLE 回退至开始节点\\n// @SCRIPT_META {\\"type\\":\\"node\\",\\"node\\":\\"START\\"}\\ndef run(request){\\n return request.getStartNode().getId();\\n}\\n","strategyType":"ErrorTriggerStrategy"},{"script":"// @SCRIPT_TITLE 你有一条待办\\ndef run(request){\\n return '你有一条待办'\\n}\\n","strategyType":"NodeTitleStrategy"},{"strategyType":"FormFieldPermissionStrategy","fieldPermissions":{"$ref":"$.nodes[1].strategies"}},{"selectType":"SCRIPT","script":"// @SCRIPT_TITLE 流程创建者\\n// @SCRIPT_META {\\"type\\":\\"creator\\"}\\ndef run(request){\\n return [request.getCreatedOperatorId()]\\n}\\n","strategyType":"OperatorLoadStrategy"},{"enable":true,"type":"REVOKE_CURRENT","strategyType":"RevokeStrategy"}],"display":true,"name":"审批节点","id":"rNtslevbNRB2P3qPrO","type":"APPROVAL","actions":[{"enable":true,"display":{"title":"通过"},"id":"4vCA3MdH1W8fLoDufv","type":"PASS","title":"通过"},{"enable":true,"display":{"title":"拒绝"},"id":"R4Oz7Qii91QFyfJHOj","type":"REJECT","title":"拒绝","script":"// @SCRIPT_TITLE 返回开始节点\\n// @SCRIPT_META {\\"type\\":\\"START\\"}\\ndef run(request){\\n return request.getStartNode().getId();\\n}\\n"},{"enable":true,"display":{"title":"保存"},"id":"W65JLyz9w67chUuWqK","type":"SAVE","title":"保存"},{"enable":true,"display":{"title":"加签"},"id":"5pT9hbBQiJmJRWMcKv","type":"ADD_AUDIT","title":"加签"},{"enable":true,"display":{"title":"转办"},"id":"cRNl4ls1AVm4J2UiCZ","type":"TRANSFER","title":"转办"},{"enable":true,"display":{"title":"退回"},"id":"Vk7NI8uGA7WZBqeMhp","type":"RETURN","title":"退回"},{"enable":true,"display":{"title":"委派"},"id":"0z1BK3A8OHVtOooajh","type":"DELEGATE","title":"委派"}],"order":"0"}],"display":false,"name":"并行分支节点","id":"QRYBK1RdEGNjqHCWo7","type":"PARALLEL_BRANCH","actions":{"$ref":"$.nodes[1].strategies"},"order":"1"},{"strategies":{"$ref":"$.nodes[1].strategies"},"blocks":[{"view":"default","strategies":[{"timeoutTime":"86400000","type":"REMIND","strategyType":"TimeoutStrategy"},{"type":"SEQUENCE","percent":"0.0","strategyType":"MultiOperatorAuditStrategy"},{"type":"MANUAL_PASS","strategyType":"SameOperatorAuditStrategy"},{"enable":false,"strategyType":"RecordMergeStrategy"},{"type":"RESUME","strategyType":"ResubmitStrategy"},{"signRequired":false,"adviceRequired":false,"strategyType":"AdviceStrategy"},{"script":"// @SCRIPT_TITLE 回退至开始节点\\n// @SCRIPT_META {\\"type\\":\\"node\\",\\"node\\":\\"START\\"}\\ndef run(request){\\n return request.getStartNode().getId();\\n}\\n","strategyType":"ErrorTriggerStrategy"},{"script":"// @SCRIPT_TITLE 你有一条待办\\ndef run(request){\\n return '你有一条待办'\\n}\\n","strategyType":"NodeTitleStrategy"},{"strategyType":"FormFieldPermissionStrategy","fieldPermissions":{"$ref":"$.nodes[1].strategies"}},{"selectType":"SCRIPT","script":"// @SCRIPT_TITLE 流程创建者\\n// @SCRIPT_META {\\"type\\":\\"creator\\"}\\ndef run(request){\\n return [request.getCreatedOperatorId()]\\n}\\n","strategyType":"OperatorLoadStrategy"},{"enable":true,"type":"REVOKE_CURRENT","strategyType":"RevokeStrategy"}],"display":true,"name":"审批节点","id":"q6NO5sCyEmGDw0uNpp","type":"APPROVAL","actions":[{"enable":true,"display":{"title":"通过"},"id":"3MCWiENt6glv9xzea5","type":"PASS","title":"通过"},{"enable":true,"display":{"title":"拒绝"},"id":"dJnTZ9LcBum9gXc1Ba","type":"REJECT","title":"拒绝","script":"// @SCRIPT_TITLE 返回开始节点\\n// @SCRIPT_META {\\"type\\":\\"START\\"}\\ndef run(request){\\n return request.getStartNode().getId();\\n}\\n"},{"enable":true,"display":{"title":"保存"},"id":"UvA6Wl9epOLCpcRERi","type":"SAVE","title":"保存"},{"enable":true,"display":{"title":"加签"},"id":"P8rvYisfKJsQN6xdFt","type":"ADD_AUDIT","title":"加签"},{"enable":true,"display":{"title":"转办"},"id":"MWO88KiwmDGOOd6FJ6","type":"TRANSFER","title":"转办"},{"enable":true,"display":{"title":"退回"},"id":"cLdgknFlunfTV2fiwL","type":"RETURN","title":"退回"},{"enable":true,"display":{"title":"委派"},"id":"kaGmQkZm1lDm1xpLSc","type":"DELEGATE","title":"委派"}],"order":"0"}],"display":false,"name":"并行分支节点","id":"aiDuVhF32mKgmfG17t","type":"PARALLEL_BRANCH","actions":{"$ref":"$.nodes[1].strategies"},"order":"2"}],"display":false,"name":"并行控制节点","id":"xvXK0CmCs8G7vfSNWR","type":"PARALLEL","actions":{"$ref":"$.nodes[1].strategies"},"order":"0"},{"view":"default","strategies":[{"script":"// @SCRIPT_TITLE 回退至开始节点\\n// @SCRIPT_META {\\"type\\":\\"node\\",\\"node\\":\\"START\\"}\\ndef run(request){\\n return request.getStartNode().getId();\\n}\\n","strategyType":"ErrorTriggerStrategy"},{"script":"// @SCRIPT_TITLE 你有一条待办\\ndef run(request){\\n return '你有一条待办'\\n}\\n","strategyType":"NodeTitleStrategy"},{"strategyType":"FormFieldPermissionStrategy","fieldPermissions":[{"formCode":"leave","fieldCode":"desc","type":"WRITE"}]},{"selectType":"SCRIPT","script":"// @SCRIPT_TITLE 流程创建者\\n// @SCRIPT_META {\\"type\\":\\"creator\\"}\\ndef run(request){\\n return [request.getCreatedOperatorId()]\\n}\\n","strategyType":"OperatorLoadStrategy"}],"display":true,"name":"抄送节点","id":"IhWkd6SKzARfMCN2bW","type":"NOTIFY","actions":{"$ref":"$.nodes[1].strategies"},"order":"2"},{"strategies":{"$ref":"$.nodes[1].strategies"},"display":true,"name":"结束节点","id":"IH0QcEwZlODLlq9uq2","type":"END","actions":{"$ref":"$.nodes[1].strategies"},"order":"0"}],"form":{"code":"leave","name":"请假单","fields":[{"code":"desc","hidden":false,"dataType":"INTEGER","name":"天数","attributes":[],"id":"53759b8e-e622-424e-a10b-a92b4fb90ad6","placeholder":"请输入理由","type":"integer","required":true}]},"createdOperator":"1","strategies":[{"enable":true,"strategyType":"InterfereStrategy"},{"enable":true,"interval":"60","strategyType":"UrgeStrategy"}],"description":"这是一个流程的备注信息","createdTime":"1774603227796","id":"JTWgurhWfc998EMqbw","title":"请假","operatorCreateScript":"// @SCRIPT_TITLE 任意用户\\n// @SCRIPT_META {\\"type\\":\\"any\\"}\\ndef run(request){\\n return true\\n}\\n"}
319+
""";
320+
321+
Workflow workflow = Workflow.formJson(json);
322+
workflow.enable();
323+
factory.workflowService.saveWorkflow(workflow);
324+
325+
String processId = null;
326+
327+
IFlowNode startNode = workflow.getStartNode();
328+
Map<String, Object> data = Map.of( "desc", 3);
329+
330+
List<IFlowAction> startActions = startNode.actionManager().getActions();
331+
332+
FlowCreateRequest userCreateRequest = new FlowCreateRequest();
333+
userCreateRequest.setWorkId(workflow.getId());
334+
userCreateRequest.setFormData(data);
335+
userCreateRequest.setActionId(startActions.get(0).id());
336+
userCreateRequest.setOperatorId(user.getUserId());
337+
338+
factory.flowService.create(userCreateRequest);
339+
340+
List<FlowRecord> userRecordList = factory.flowRecordRepository.findTodoByOperator(user.getUserId());
341+
assertEquals(1, userRecordList.size());
342+
343+
processId = userRecordList.get(0).getProcessId();
344+
345+
FlowActionRequest userRequest = new FlowActionRequest();
346+
userRequest.setFormData(data);
347+
userRequest.setRecordId(userRecordList.get(0).getId());
348+
userRequest.setAdvice(new FlowAdviceBody(startActions.get(0).id(), "同意", user.getUserId()));
349+
factory.flowService.action(userRequest);
350+
351+
userRecordList = factory.flowRecordRepository.findTodoByOperator(user.getUserId());
352+
assertEquals(2, userRecordList.size());
353+
354+
355+
for (int i=0;i<2;i++){
356+
FlowRecord currentRecord = userRecordList.get(i);
357+
IFlowNode flowNode = workflow.getFlowNode(currentRecord.getNodeId());
358+
userRequest = new FlowActionRequest();
359+
userRequest.setFormData(data);
360+
userRequest.setRecordId(currentRecord.getId());
361+
userRequest.setAdvice(new FlowAdviceBody(flowNode.actionManager().getAction(PassAction.class).id(), "同意", user.getUserId()));
362+
363+
if(i==1){
364+
System.out.println(currentRecord);
365+
}
366+
factory.flowService.action(userRequest);
367+
}
368+
369+
userRecordList = factory.flowRecordRepository.findTodoByOperator(user.getUserId());
370+
assertEquals(0, userRecordList.size());
371+
372+
List<FlowRecord> records = factory.flowRecordRepository.findProcessRecords(processId);
373+
assertEquals(4, records.size());
374+
assertEquals(0, records.stream().filter(FlowRecord::isTodo).toList().size());
375+
assertEquals(4, records.stream().filter(FlowRecord::isFinish).toList().size());
376+
377+
}
309378
}

flow-engine-starter-infra/src/main/java/com/codingapi/flow/infra/repository/impl/ParallelBranchRepositoryImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.codingapi.flow.repository.ParallelBranchRepository;
66
import lombok.AllArgsConstructor;
77

8+
89
@AllArgsConstructor
910
public class ParallelBranchRepositoryImpl implements ParallelBranchRepository {
1011

@@ -25,7 +26,7 @@ public void addTriggerCount(String parallelId) {
2526
if(entity != null){
2627
entity.setCount(entity.getCount() + 1);
2728
parallelControlEntityRepository.save(entity);
28-
}{
29+
}else {
2930
ParallelControlEntity newEntity = new ParallelControlEntity();
3031
newEntity.setId(parallelId);
3132
newEntity.setCount(1);

0 commit comments

Comments
 (0)