Skip to content

Commit 5c145fc

Browse files
authored
[app-builder] ensure there is only one process context cleanup task (#510)
1 parent ccfe3d1 commit 5c145fc

File tree

6 files changed

+90
-78
lines changed

6 files changed

+90
-78
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*---------------------------------------------------------------------------------------------
2+
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
3+
* This file is a part of the ModelEngine Project.
4+
* Licensed under the MIT License. See License.txt in the project root for license information.
5+
*--------------------------------------------------------------------------------------------*/
6+
7+
package modelengine.fit.waterflow.appfactory.task;
8+
9+
import modelengine.fit.waterflow.service.SingleFlowRuntimeService;
10+
import modelengine.fitframework.annotation.Component;
11+
import modelengine.fitframework.annotation.Value;
12+
import modelengine.fitframework.log.Logger;
13+
import modelengine.fitframework.schedule.annotation.Scheduled;
14+
import modelengine.fitframework.util.ThreadUtils;
15+
16+
/**
17+
* 定时清理流程中已完成的上下文。
18+
* <p>包括成功、失败、终止的流程数据。</p>
19+
*
20+
* @author 杨祥宇
21+
* @since 2025-04-02
22+
*/
23+
@Component
24+
public class CleanFlowInstances {
25+
private static final Logger LOG = Logger.get(CleanFlowInstances.class);
26+
private static final int LIMIT = 1000;
27+
private static final int BATCH_INTERNAL = 1000;
28+
29+
private final int expiredDays;
30+
private final SingleFlowRuntimeService singleFlowRuntimeService;
31+
32+
public CleanFlowInstances(@Value("${jane.flowsEngine.contextExpiredDays}") int expiredDays,
33+
SingleFlowRuntimeService singleFlowRuntimeService) {
34+
this.expiredDays = expiredDays;
35+
this.singleFlowRuntimeService = singleFlowRuntimeService;
36+
}
37+
38+
/**
39+
* 每天凌晨 3 点定时清理超指定天数的流程运行数据。
40+
* <p>指定天数来源于 {@code ${jane.flowsEngine.contextExpiredDays}} 配置的值。</p>
41+
* <p>多实例并发执行分析:会并发执行超期链路信息查询,可能导致重复获取相同 {@code traceIds},重复删除 {@code traceIds}
42+
* 以及上下文数据不会对结果有影响。</p>
43+
*/
44+
@Scheduled(strategy = Scheduled.Strategy.CRON, value = "0 0 3 * * ?")
45+
public void cleanContextSchedule() {
46+
LOG.info("Starting expired flow instances cleaning");
47+
try {
48+
while (this.singleFlowRuntimeService.cleanInstances(this.expiredDays, LIMIT)) {
49+
ThreadUtils.sleep(BATCH_INTERNAL);
50+
}
51+
} catch (Exception ex) {
52+
LOG.error("Clean expired flow instances error. [errorMessage={}]", ex.getMessage());
53+
LOG.error("Exception:", ex);
54+
} finally {
55+
LOG.info("Finished expired flow instances cleaning.");
56+
}
57+
}
58+
}

app-builder/plugins/waterflow-appfactory/src/main/resources/application.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,6 @@ jane:
1313
flowsEngine:
1414
retry:
1515
scheduleRate: 60000
16+
contextExpiredDays: 3
1617

1718
distributed-lock-provider: databaseDistributedLockProvider

app-builder/waterflow/java/waterflow-runtime-service/src/main/java/modelengine/fit/waterflow/service/SingleFlowRuntimeService.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import modelengine.fit.waterflow.entity.OperationContext;
1010
import modelengine.fit.waterflow.entity.FlowStartDTO;
1111
import modelengine.fit.waterflow.entity.FlowStartInfo;
12-
import modelengine.fit.waterflow.entity.FlowStartParameter;
1312
import modelengine.fit.waterflow.entity.JoberErrorInfo;
1413

1514
import java.util.Map;
@@ -71,4 +70,13 @@ void terminateFlows(String flowDefinitionId, String instanceId, Map<String, Obje
7170
* @param operationContext 操作人,为null则默认为前一个操作人
7271
*/
7372
void failAsyncJob(String flowDataId, JoberErrorInfo errorInfo, OperationContext operationContext);
73+
74+
/**
75+
* 根据过期时间清理实例上下文数据。
76+
*
77+
* @param expiredDays 过期天数。
78+
* @param limit 最大清理数量。
79+
* @return 表示是否有数据被清理。
80+
*/
81+
boolean cleanInstances(int expiredDays, int limit);
7482
}

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/biz/service/SingleFlowRuntimeServiceImpl.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import modelengine.fitframework.annotation.Component;
2727
import modelengine.fitframework.inspection.Validation;
2828
import modelengine.fitframework.log.Logger;
29+
import modelengine.fitframework.transaction.Transactional;
2930

3031
import java.util.ArrayList;
3132
import java.util.Collections;
@@ -127,6 +128,27 @@ public void failAsyncJob(String flowDataId, JoberErrorInfo errorInfo, OperationC
127128
new WaterflowException(FLOW_EXECUTE_ASYNC_JOBER_FAILED, errorInfo), operationContext);
128129
}
129130

131+
@Override
132+
public boolean cleanInstances(int expiredDays, int limit) {
133+
List<String> traceIds = this.traceRepo.getExpiredTrace(expiredDays, limit);
134+
if (traceIds.isEmpty()) {
135+
return false;
136+
}
137+
this.deleteFlowContext(traceIds);
138+
return true;
139+
}
140+
141+
/**
142+
* 根据链路唯一标识列表删除链路信息和上下文数据。
143+
*
144+
* @param traceIds 表示流程链路唯一标识列表的 {@link List}{@code <}{@link String}{@code >}。
145+
*/
146+
@Transactional
147+
public void deleteFlowContext(List<String> traceIds) {
148+
this.repo.deleteByTraceIdList(traceIds);
149+
this.traceRepo.deleteByIdList(traceIds);
150+
}
151+
130152
private static boolean isAllowResumeOrFailAsyncJob(FlowContext<FlowData> context) {
131153
return context != null && FlowNodeStatus.PROCESSING.equals(context.getStatus());
132154
}

app-builder/waterflow/java/waterflow-service/src/main/java/modelengine/fit/waterflow/flowsengine/biz/service/scheduletasks/CleanContextSchedule.java

Lines changed: 0 additions & 76 deletions
This file was deleted.

app-builder/waterflow/java/waterflow-service/src/main/resources/application.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ jane:
1616
scheduleRate: 10000
1717
maxCount: 0
1818
isNeedFlowCallbackAdapt: false
19-
contextExpiredDays: 1
2019

2120
distributed-lock-provider: databaseDistributedLockProvider
2221

0 commit comments

Comments
 (0)