Skip to content

Commit

Permalink
4.8.10 workflow perf (#2596)
Browse files Browse the repository at this point in the history
* perf: run plugin variables init

* perf: init free plan

* perf: dataset data ui

* perf: workflow theme

* perf: plugin input modal ui

* perf: workflow dispatch

* fix: account ui

* feat: 4810 doc
  • Loading branch information
c121914yu authored Sep 3, 2024
1 parent 5ebe001 commit 761e35c
Show file tree
Hide file tree
Showing 19 changed files with 214 additions and 178 deletions.
56 changes: 29 additions & 27 deletions docSite/content/zh-cn/docs/development/upgrading/4810.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,30 +55,32 @@ curl --location --request POST 'https://{{host}}/api/admin/initv4810' \
6. 新增 - 工作流版本支持重命名
7. 新增 - 应用调用迁移成单独节点,同时可以传递全局变量和用户的文件。
8. 新增 - 插件增加使用说明配置。
9. 商业版新增 - 飞书机器人接入
10. 商业版新增 - 公众号接入接入
11. 商业版新增 - 自助开票申请
12. 商业版新增 - SSO 定制
13. 优化 - SSE 响应优化。
14. 优化 - 无 SSL 证书情况下,优化复制。
15. 优化 - 单选框打开后自动滚动到选中的位置。
16. 优化 - 知识库集合禁用,目录禁用会递归修改其下所有 children 的禁用状态。
17. 优化 - 节点选择,避免切换 tab 时候,path 加载报错。
18. 优化 - 最新 React Markdown 组件,支持 Base64 图片。
19. 优化 - 知识库列表 UI。
20. 优化 - 知识库详情页 UI。
21. 优化 - 支持无网络配置情况下运行。
22. 优化 - 部分全局变量,增加数据类型约束。
23. 修复 - 全局变量 key 可能重复。
24. 修复 - Prompt 模式调用工具,stream=false 模式下,会携带 0: 开头标记。
25. 修复 - 对话日志鉴权问题:仅为 APP 管理员的用户,无法查看对话日志详情。
26. 修复 - 选择 Milvus 部署时,无法导出知识库。
27. 修复 - 创建 APP 副本,无法复制系统配置。
28. 修复 - 图片识别模式下,自动解析图片链接正则不够严谨问题。
29. 修复 - 内容提取的数据类型与输出数据类型未一致。
30. 修复 - 工作流运行时间统计错误。
31. 修复 - stream 模式下,工具调用有可能出现 undefined
32. 修复 - 全局变量在 API 中无法持久化。
33. 修复 - OpenAPI,detail=false模式下,不应该返回 tool 调用结果,仅返回文字。(可解决 cow 不适配问题)
34. 修复 - 知识库标签重复加载。
35. 修复 - Debug 模式下,循环调用边问题。
9. 新增 - 工作流导出导入,支持直接导出和导入 JSON 文件,便于交流。
10. 商业版新增 - 飞书机器人接入
11. 商业版新增 - 公众号接入接入
12. 商业版新增 - 自助开票申请
13. 商业版新增 - SSO 定制
14. 优化 - 工作流循环校验,避免 skip 循环空转。同时支持分支完全并发执行。
15. 优化 - SSE 响应优化。
16. 优化 - 无 SSL 证书情况下,优化复制。
17. 优化 - 单选框打开后自动滚动到选中的位置。
18. 优化 - 知识库集合禁用,目录禁用会递归修改其下所有 children 的禁用状态。
19. 优化 - 节点选择,避免切换 tab 时候,path 加载报错。
20. 优化 - 最新 React Markdown 组件,支持 Base64 图片。
21. 优化 - 知识库列表 UI。
22. 优化 - 知识库详情页 UI。
23. 优化 - 支持无网络配置情况下运行。
24. 优化 - 部分全局变量,增加数据类型约束。
25. 修复 - 全局变量 key 可能重复。
26. 修复 - Prompt 模式调用工具,stream=false 模式下,会携带 0: 开头标记。
27. 修复 - 对话日志鉴权问题:仅为 APP 管理员的用户,无法查看对话日志详情。
28. 修复 - 选择 Milvus 部署时,无法导出知识库。
29. 修复 - 创建 APP 副本,无法复制系统配置。
30. 修复 - 图片识别模式下,自动解析图片链接正则不够严谨问题。
31. 修复 - 内容提取的数据类型与输出数据类型未一致。
32. 修复 - 工作流运行时间统计错误。
33. 修复 - stream 模式下,工具调用有可能出现 undefined
34. 修复 - 全局变量在 API 中无法持久化。
35. 修复 - OpenAPI,detail=false模式下,不应该返回 tool 调用结果,仅返回文字。(可解决 cow 不适配问题)
36. 修复 - 知识库标签重复加载。
37. 修复 - Debug 模式下,循环调用边问题。
4 changes: 2 additions & 2 deletions packages/global/core/workflow/runtime/type.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ export type ChatDispatchProps = {
res?: NextApiResponse;
requestOrigin?: string;
mode: 'test' | 'chat' | 'debug';
teamId: string;
tmbId: string;
teamId: string; // App teamId
tmbId: string; // App tmbId
user: UserModelSchema;
app: AppDetailType | AppSchema;
chatId?: string;
Expand Down
172 changes: 106 additions & 66 deletions packages/service/core/workflow/dispatch/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ import {
InteractiveNodeResponseItemType,
UserSelectInteractive
} from '@fastgpt/global/core/workflow/template/system/userSelect/type';
import { dispatchRunAppNode } from './agent/runApp';
import { dispatchRunAppNode } from './plugin/runApp';

const callbackMap: Record<FlowNodeTypeEnum, Function> = {
[FlowNodeTypeEnum.workflowStart]: dispatchWorkflowStart,
Expand Down Expand Up @@ -186,7 +186,10 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
function nodeOutput(
node: RuntimeNodeItemType,
result: Record<string, any> = {}
): RuntimeNodeItemType[] {
): {
nextStepActiveNodes: RuntimeNodeItemType[];
nextStepSkipNodes: RuntimeNodeItemType[];
} {
pushStore(node, result);

// Assign the output value to the next node
Expand All @@ -211,16 +214,32 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
}
});

const nextStepNodes = runtimeNodes.filter((node) => {
return targetEdges.some((item) => item.target === node.nodeId);
const nextStepActiveNodes: RuntimeNodeItemType[] = [];
const nextStepSkipNodes: RuntimeNodeItemType[] = [];
runtimeNodes.forEach((node) => {
if (targetEdges.some((item) => item.target === node.nodeId && item.status === 'active')) {
nextStepActiveNodes.push(node);
}
if (targetEdges.some((item) => item.target === node.nodeId && item.status === 'skipped')) {
nextStepSkipNodes.push(node);
}
});

if (props.mode === 'debug') {
debugNextStepRunNodes = debugNextStepRunNodes.concat(nextStepNodes);
return [];
debugNextStepRunNodes = debugNextStepRunNodes.concat([
...nextStepActiveNodes,
...nextStepSkipNodes
]);
return {
nextStepActiveNodes: [],
nextStepSkipNodes: []
};
}

return nextStepNodes;
return {
nextStepActiveNodes,
nextStepSkipNodes
};
}

/* Have interactive result, computed edges and node outputs */
Expand Down Expand Up @@ -281,69 +300,82 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
});
}
/* Check node run/skip or wait */
function checkNodeCanRun(nodes: RuntimeNodeItemType[] = []): Promise<any> {
return Promise.all(
nodes.map(async (node) => {
const status = checkNodeRunStatus({
node,
runtimeEdges
});
async function checkNodeCanRun(
node: RuntimeNodeItemType,
skippedNodeIdList = new Set<string>()
): Promise<RuntimeNodeItemType[]> {
if (res?.closed || props.maxRunTimes <= 0) return [];
// Thread avoidance
await surrenderProcess();

if (res?.closed || props.maxRunTimes <= 0) return;
addLog.debug(`Run node`, { maxRunTimes: props.maxRunTimes, uid: user._id });

addLog.debug(`Run node`, { maxRunTimes: props.maxRunTimes, uid: user._id });
// Get node run status by edges
const status = checkNodeRunStatus({
node,
runtimeEdges
});
const nodeRunResult = await (() => {
if (status === 'run') {
props.maxRunTimes--;
addLog.debug(`[dispatchWorkFlow] nodeRunWithActive: ${node.name}`);
return nodeRunWithActive(node);
}
if (status === 'skip' && !skippedNodeIdList.has(node.nodeId)) {
props.maxRunTimes -= 0.1;
skippedNodeIdList.add(node.nodeId);
addLog.debug(`[dispatchWorkFlow] nodeRunWithSkip: ${node.name}`);
return nodeRunWithSkip(node);
}
})();

// Thread avoidance
await surrenderProcess();
if (!nodeRunResult) return [];

if (status === 'run') {
addLog.debug(`[dispatchWorkFlow] nodeRunWithActive: ${node.name}`);
return nodeRunWithActive(node);
}
if (status === 'skip') {
addLog.debug(`[dispatchWorkFlow] nodeRunWithSkip: ${node.name}`);
return nodeRunWithSkip(node);
}
// Update the node output at the end of the run and get the next nodes
let { nextStepActiveNodes, nextStepSkipNodes } = nodeOutput(
nodeRunResult.node,
nodeRunResult.result
);
// Remove repeat nodes(Make sure that the node is only executed once)
nextStepActiveNodes = nextStepActiveNodes.filter(
(node, index, self) => self.findIndex((t) => t.nodeId === node.nodeId) === index
);
nextStepSkipNodes = nextStepSkipNodes.filter(
(node, index, self) => self.findIndex((t) => t.nodeId === node.nodeId) === index
);

return;
})
).then((result) => {
props.maxRunTimes--;

const flat = result.flat().filter(Boolean) as unknown as {
node: RuntimeNodeItemType;
runStatus: 'run' | 'skip';
result: Record<string, any>;
}[];
// If there are no running nodes, the workflow is complete
if (flat.length === 0) return;

// Update the node output at the end of the run and get the next nodes
const nextNodes = flat.map((item) => nodeOutput(item.node, item.result)).flat();
// Remove repeat nodes(Make sure that the node is only executed once)
const filterNextNodes = nextNodes.filter(
(node, index, self) => self.findIndex((t) => t.nodeId === node.nodeId) === index
// In the current version, only one interactive node is allowed at the same time
const interactiveResponse = nodeRunResult.result?.[DispatchNodeResponseKeyEnum.interactive];
if (interactiveResponse) {
chatAssistantResponse.push(
handleInteractiveResult({
entryNodeIds: [nodeRunResult.node.nodeId],
interactiveResponse
})
);
return [];
}

// In the current version, only one interactive node is allowed at the same time
const haveInteractiveResponse = flat
.map((response) => {
const interactiveResponse = response.result?.[DispatchNodeResponseKeyEnum.interactive];
if (interactiveResponse) {
chatAssistantResponse.push(
handleInteractiveResult({
entryNodeIds: [response.node.nodeId],
interactiveResponse
})
);
return 1;
}
})
.filter(Boolean);
if (haveInteractiveResponse.length > 0) return;
// Run next nodes(先运行 run 的,再运行 skip 的)
const nextStepActiveNodesResults = (
await Promise.all(nextStepActiveNodes.map((node) => checkNodeCanRun(node)))
).flat();

return checkNodeCanRun(filterNextNodes);
});
// 如果已经 active 运行过,不再执行 skip(active 中有闭环)
nextStepSkipNodes = nextStepSkipNodes.filter(
(node) => !nextStepActiveNodesResults.some((item) => item.nodeId === node.nodeId)
);

const nextStepSkipNodesResults = (
await Promise.all(nextStepSkipNodes.map((node) => checkNodeCanRun(node, skippedNodeIdList)))
).flat();

return [
...nextStepActiveNodes,
...nextStepSkipNodes,
...nextStepActiveNodesResults,
...nextStepSkipNodesResults
];
}
/* Inject data into module input */
function getNodeRunParams(node: RuntimeNodeItemType) {
Expand Down Expand Up @@ -396,7 +428,11 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons

return params;
}
async function nodeRunWithActive(node: RuntimeNodeItemType) {
async function nodeRunWithActive(node: RuntimeNodeItemType): Promise<{
node: RuntimeNodeItemType;
runStatus: 'run';
result: Record<string, any>;
}> {
// push run status messages
if (node.showStatus) {
props.workflowStreamResponse?.({
Expand Down Expand Up @@ -465,8 +501,12 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
}
};
}
async function nodeRunWithSkip(node: RuntimeNodeItemType) {
// 其后所有target的节点,都设置为skip
async function nodeRunWithSkip(node: RuntimeNodeItemType): Promise<{
node: RuntimeNodeItemType;
runStatus: 'skip';
result: Record<string, any>;
}> {
// Set target edges status to skipped
const targetEdges = runtimeEdges.filter((item) => item.source === node.nodeId);
nodeRunAfterHook(node);

Expand All @@ -486,7 +526,7 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
// runtimeNodes.forEach((item) => {
// item.isEntry = false;
// });
await checkNodeCanRun(entryNodes);
await Promise.all(entryNodes.map((node) => checkNodeCanRun(node)));

// focus try to run pluginOutput
const pluginOutputModule = runtimeNodes.find(
Expand Down
6 changes: 5 additions & 1 deletion packages/service/core/workflow/dispatch/plugin/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ export const dispatchRunPlugin = async (props: RunPluginProps): Promise<RunPlugi

const { flowResponses, flowUsages, assistantResponses } = await dispatchWorkFlow({
...props,
variables: filterSystemVariables(props.variables),

variables: {
...filterSystemVariables(props.variables),
appId: String(plugin.id)
},
runtimeNodes,
runtimeEdges: initWorkflowEdgeStatus(plugin.edges)
});
Expand Down
50 changes: 28 additions & 22 deletions packages/service/support/wallet/sub/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export const getTeamStandPlan = async ({ teamId }: { teamId: string }) => {
};
};

export const initTeamStandardPlan2Free = async ({
export const initTeamFreePlan = async ({
teamId,
session
}: {
Expand All @@ -59,23 +59,28 @@ export const initTeamStandardPlan2Free = async ({
}) => {
const freePoints = global?.subPlans?.standard?.[StandardSubLevelEnum.free]?.totalPoints || 100;

const teamStandardSub = await MongoTeamSub.findOne({ teamId, type: SubTypeEnum.standard });

if (teamStandardSub) {
teamStandardSub.currentMode = SubModeEnum.month;
teamStandardSub.nextMode = SubModeEnum.month;
teamStandardSub.startTime = new Date();
teamStandardSub.expiredTime = addMonths(new Date(), 1);

teamStandardSub.currentSubLevel = StandardSubLevelEnum.free;
teamStandardSub.nextSubLevel = StandardSubLevelEnum.free;

teamStandardSub.totalPoints = freePoints;
teamStandardSub.surplusPoints =
teamStandardSub.surplusPoints && teamStandardSub.surplusPoints < 0
? teamStandardSub.surplusPoints + freePoints
const freePlan = await MongoTeamSub.findOne({
teamId,
type: SubTypeEnum.standard,
currentSubLevel: StandardSubLevelEnum.free
});

// Reset one month free plan
if (freePlan) {
freePlan.currentMode = SubModeEnum.month;
freePlan.nextMode = SubModeEnum.month;
freePlan.startTime = new Date();
freePlan.expiredTime = addMonths(new Date(), 1);

freePlan.currentSubLevel = StandardSubLevelEnum.free;
freePlan.nextSubLevel = StandardSubLevelEnum.free;

freePlan.totalPoints = freePoints;
freePlan.surplusPoints =
freePlan.surplusPoints && freePlan.surplusPoints < 0
? freePlan.surplusPoints + freePoints
: freePoints;
return teamStandardSub.save({ session });
return freePlan.save({ session });
}

return MongoTeamSub.create(
Expand Down Expand Up @@ -123,13 +128,14 @@ export const getTeamPlanStatus = async ({

// Free user, first login after expiration. The free subscription plan will be reset
if (
standardPlan &&
standardPlan.expiredTime &&
standardPlan.currentSubLevel === StandardSubLevelEnum.free &&
dayjs(standardPlan.expiredTime).isBefore(new Date())
(standardPlan &&
standardPlan.expiredTime &&
standardPlan.currentSubLevel === StandardSubLevelEnum.free &&
dayjs(standardPlan.expiredTime).isBefore(new Date())) ||
teamStandardPlans.length === 0
) {
console.log('Init free stand plan', { teamId });
await initTeamStandardPlan2Free({ teamId });
await initTeamFreePlan({ teamId });
return getTeamPlanStatus({ teamId });
}

Expand Down
Loading

0 comments on commit 761e35c

Please sign in to comment.