Skip to content

App run node update #2542

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docSite/content/zh-cn/docs/development/upgrading/4811.md
Original file line number Diff line number Diff line change
@@ -18,4 +18,6 @@ weight: 813
1.
2. 新增 - 插件自定义输入支持单选框
3. 新增 - 插件输出,支持指定某些字段为工具调用结果
4. 新增 - 插件支持配置使用引导、全局变量和文件输入
4. 新增 - 插件支持配置使用引导、全局变量和文件输入
5. 优化 - SSE 响应代码。
6. 优化 - 非 HTTPS 环境下支持复制(除非 textarea 复制也不支持)
1 change: 1 addition & 0 deletions packages/global/core/workflow/constants.ts
Original file line number Diff line number Diff line change
@@ -128,6 +128,7 @@ export enum NodeInputKeyEnum {

// read files
fileUrlList = 'fileUrlList',

// user select
userSelectOptions = 'userSelectOptions'
}
1 change: 1 addition & 0 deletions packages/global/core/workflow/node/constant.ts
Original file line number Diff line number Diff line change
@@ -106,6 +106,7 @@ export enum FlowNodeTypeEnum {
contentExtract = 'contentExtract',
httpRequest468 = 'httpRequest468',
runApp = 'app',
appModule = 'appModule',
pluginModule = 'pluginModule',
pluginInput = 'pluginInput',
pluginOutput = 'pluginOutput',
3 changes: 2 additions & 1 deletion packages/global/core/workflow/runtime/type.d.ts
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ import { RuntimeNodeItemType } from '../runtime/type';
import { RuntimeEdgeItemType } from './edge';
import { ReadFileNodeResponse } from '../template/system/readFiles/type';
import { UserSelectOptionType } from '../template/system/userSelect/type';
import { WorkflowResponseType } from '../../../../service/core/workflow/dispatch/type';

/* workflow props */
export type ChatDispatchProps = {
@@ -36,9 +37,9 @@ export type ChatDispatchProps = {
query: UserChatItemValueItemType[]; // trigger query
chatConfig: AppSchema['chatConfig'];
stream: boolean;
detail: boolean; // response detail
maxRunTimes: number;
isToolCall?: boolean;
workflowStreamResponse?: WorkflowResponseType;
};

export type ModuleDispatchProps<T> = ChatDispatchProps & {
4 changes: 2 additions & 2 deletions packages/global/core/workflow/runtime/utils.ts
Original file line number Diff line number Diff line change
@@ -236,7 +236,7 @@ export const textAdaptGptResponse = ({
finish_reason?: null | 'stop';
extraData?: Object;
}) => {
return JSON.stringify({
return {
...extraData,
id: '',
object: '',
@@ -252,7 +252,7 @@ export const textAdaptGptResponse = ({
finish_reason
}
]
});
};
};

/* Update runtimeNode's outputs with interactive data from history */
8 changes: 5 additions & 3 deletions packages/global/core/workflow/template/constants.ts
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ import { RunAppModule } from './system/runApp/index';
import { PluginInputModule } from './system/pluginInput';
import { PluginOutputModule } from './system/pluginOutput';
import { RunPluginModule } from './system/runPlugin';
import { RunAppPluginModule } from './system/runAppPlugin';
import { AiQueryExtension } from './system/queryExtension';

import type { FlowNodeTemplateType } from '../type/node';
@@ -44,8 +45,8 @@ const systemNodes: FlowNodeTemplateType[] = [
LafModule,
IfElseNode,
VariableUpdateNode,
CodeNode,
RunAppModule
CodeNode
// RunAppModule
];
/* app flow module templates */
export const appSystemModuleTemplates: FlowNodeTemplateType[] = [
@@ -70,5 +71,6 @@ export const moduleTemplatesFlat: FlowNodeTemplateType[] = [
)
),
EmptyNode,
RunPluginModule
RunPluginModule,
RunAppPluginModule
];
9 changes: 9 additions & 0 deletions packages/global/core/workflow/template/input.ts
Original file line number Diff line number Diff line change
@@ -73,3 +73,12 @@ export const Input_Template_Text_Quote: FlowNodeInputItemType = {
description: i18nT('app:document_quote_tip'),
valueType: WorkflowIOValueTypeEnum.string
};
export const Input_Template_File_Link: FlowNodeInputItemType = {
key: NodeInputKeyEnum.fileUrlList,
renderTypeList: [FlowNodeInputTypeEnum.reference],
required: true,
label: i18nT('app:workflow.user_file_input'),
debugLabel: i18nT('app:workflow.user_file_input'),
description: i18nT('app:workflow.user_file_input_desc'),
valueType: WorkflowIOValueTypeEnum.arrayString
};
19 changes: 19 additions & 0 deletions packages/global/core/workflow/template/system/runAppPlugin.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { FlowNodeTemplateTypeEnum } from '../../constants';
import { FlowNodeTypeEnum } from '../../node/constant';
import { FlowNodeTemplateType } from '../../type/node';
import { getHandleConfig } from '../utils';

export const RunAppPluginModule: FlowNodeTemplateType = {
id: FlowNodeTypeEnum.appModule,
templateType: FlowNodeTemplateTypeEnum.other,
flowNodeType: FlowNodeTypeEnum.appModule,
sourceHandle: getHandleConfig(true, true, true, true),
targetHandle: getHandleConfig(true, true, true, true),
intro: '',
name: '',
showStatus: false,
isTool: false,
version: '481',
inputs: [], // [{key:'pluginId'},...]
outputs: []
};
Original file line number Diff line number Diff line change
@@ -22,5 +22,3 @@ type UserSelectInteractive = {
};

export type InteractiveNodeResponseItemType = InteractiveBasicType & UserSelectInteractive;

export type UserInteractiveType = UserSelectInteractive;
93 changes: 88 additions & 5 deletions packages/global/core/workflow/utils.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import { FlowNodeInputTypeEnum, FlowNodeOutputTypeEnum, FlowNodeTypeEnum } from './node/constant';
import {
chatHistoryValueDesc,
FlowNodeInputTypeEnum,
FlowNodeOutputTypeEnum,
FlowNodeTypeEnum
} from './node/constant';
import {
WorkflowIOValueTypeEnum,
NodeInputKeyEnum,
VariableInputEnum,
variableMap,
VARIABLE_NODE_ID
VARIABLE_NODE_ID,
NodeOutputKeyEnum
} from './constants';
import { FlowNodeInputItemType, FlowNodeOutputItemType, ReferenceValueProps } from './type/io.d';
import { StoreNodeItemType } from './type/node';
@@ -25,6 +31,7 @@ import {
import { IfElseResultEnum } from './template/system/ifElse/constant';
import { RuntimeNodeItemType } from './runtime/type';
import { getReferenceVariableValue } from './runtime/utils';
import { Input_Template_History, Input_Template_UserChatInput } from './template/input';

export const getHandleId = (nodeId: string, type: 'source' | 'target', key: string) => {
return `${nodeId}-${type}-${key}`;
@@ -147,9 +154,11 @@ export const getModuleInputUiField = (input: FlowNodeInputItemType) => {
return {};
};

export const pluginData2FlowNodeIO = (
nodes: StoreNodeItemType[]
): {
export const pluginData2FlowNodeIO = ({
nodes
}: {
nodes: StoreNodeItemType[];
}): {
inputs: FlowNodeInputItemType[];
outputs: FlowNodeOutputItemType[];
} => {
@@ -180,6 +189,80 @@ export const pluginData2FlowNodeIO = (
};
};

export const appData2FlowNodeIO = ({
chatConfig
}: {
chatConfig?: AppChatConfigType;
}): {
inputs: FlowNodeInputItemType[];
outputs: FlowNodeOutputItemType[];
} => {
const variableInput = !chatConfig?.variables
? []
: chatConfig.variables.map((item) => {
const renderTypeMap = {
[VariableInputEnum.input]: [FlowNodeInputTypeEnum.input, FlowNodeInputTypeEnum.reference],
[VariableInputEnum.textarea]: [
FlowNodeInputTypeEnum.textarea,
FlowNodeInputTypeEnum.reference
],
[VariableInputEnum.select]: [FlowNodeInputTypeEnum.select],
[VariableInputEnum.custom]: [
FlowNodeInputTypeEnum.input,
FlowNodeInputTypeEnum.reference
],
default: [FlowNodeInputTypeEnum.reference]
};

return {
key: item.key,
renderTypeList: renderTypeMap[item.type] || renderTypeMap.default,
label: item.label,
debugLabel: item.label,
description: '',
valueType: WorkflowIOValueTypeEnum.any,
required: item.required,
list: item.enums.map((enumItem) => ({
label: enumItem.value,
value: enumItem.value
}))
};
});

// const showFileLink =
// chatConfig?.fileSelectConfig?.canSelectFile || chatConfig?.fileSelectConfig?.canSelectImg;

return {
inputs: [
Input_Template_History,
Input_Template_UserChatInput,
// ...(showFileLink ? [Input_Template_File_Link] : []),
...variableInput
],
outputs: [
{
id: NodeOutputKeyEnum.history,
key: NodeOutputKeyEnum.history,
required: true,
label: 'core.module.output.label.New context',
description: 'core.module.output.description.New context',
valueType: WorkflowIOValueTypeEnum.chatHistory,
valueDesc: chatHistoryValueDesc,
type: FlowNodeOutputTypeEnum.static
},
{
id: NodeOutputKeyEnum.answerText,
key: NodeOutputKeyEnum.answerText,
required: false,
label: 'core.module.output.label.Ai response content',
description: 'core.module.output.description.Ai response content',
valueType: WorkflowIOValueTypeEnum.string,
type: FlowNodeOutputTypeEnum.static
}
]
};
};

export const formatEditorVariablePickerIcon = (
variables: { key: string; label: string; type?: `${VariableInputEnum}`; required?: boolean }[]
): EditorVariablePickerType[] => {
17 changes: 11 additions & 6 deletions packages/service/core/app/plugin/controller.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { FlowNodeTemplateType } from '@fastgpt/global/core/workflow/type/node.d';
import { FlowNodeTypeEnum, defaultNodeVersion } from '@fastgpt/global/core/workflow/node/constant';
import { pluginData2FlowNodeIO } from '@fastgpt/global/core/workflow/utils';
import { appData2FlowNodeIO, pluginData2FlowNodeIO } from '@fastgpt/global/core/workflow/utils';
import { PluginSourceEnum } from '@fastgpt/global/core/plugin/constants';
import type { PluginRuntimeType } from '@fastgpt/global/core/workflow/runtime/type';
import { FlowNodeTemplateTypeEnum } from '@fastgpt/global/core/workflow/constants';
@@ -52,10 +52,10 @@ const getPluginTemplateById = async (
showStatus: true,
workflow: {
nodes: item.modules,
edges: item.edges
edges: item.edges,
chatConfig: item.chatConfig
},
templateType: FlowNodeTemplateTypeEnum.teamApp,
isTool: true,
version: item?.pluginData?.nodeVersion || defaultNodeVersion,
originCost: 0,
currentCost: 0
@@ -71,22 +71,27 @@ const getPluginTemplateById = async (
/* format plugin modules to plugin preview module */
export async function getPluginPreviewNode({ id }: { id: string }): Promise<FlowNodeTemplateType> {
const plugin = await getPluginTemplateById(id);
const isPlugin = !!plugin.workflow.nodes.find(
(node) => node.flowNodeType === FlowNodeTypeEnum.pluginInput
);

return {
id: getNanoid(),
pluginId: plugin.id,
templateType: plugin.templateType,
flowNodeType: FlowNodeTypeEnum.pluginModule,
flowNodeType: isPlugin ? FlowNodeTypeEnum.pluginModule : FlowNodeTypeEnum.appModule,
avatar: plugin.avatar,
name: plugin.name,
intro: plugin.intro,
inputExplanationUrl: plugin.inputExplanationUrl,
showStatus: plugin.showStatus,
isTool: plugin.isTool,
isTool: isPlugin,
version: plugin.version,
sourceHandle: getHandleConfig(true, true, true, true),
targetHandle: getHandleConfig(true, true, true, true),
...pluginData2FlowNodeIO(plugin.workflow.nodes)
...(isPlugin
? pluginData2FlowNodeIO({ nodes: plugin.workflow.nodes })
: appData2FlowNodeIO({ chatConfig: plugin.workflow.chatConfig }))
};
}

Original file line number Diff line number Diff line change
@@ -1,85 +1,88 @@
// @ts-nocheck
import type { ChatItemType } from '@fastgpt/global/core/chat/type.d';
import type { ModuleDispatchProps } from '@fastgpt/global/core/workflow/runtime/type';
import { SelectAppItemType } from '@fastgpt/global/core/workflow/template/system/runApp/type';
import { dispatchWorkFlowV1 } from '../index';
import { MongoApp } from '../../../../core/app/schema';
import { responseWrite } from '../../../../common/response';
import { dispatchWorkFlow } from '../index';
import { ChatRoleEnum } from '@fastgpt/global/core/chat/constants';
import { SseResponseEventEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import { textAdaptGptResponse } from '@fastgpt/global/core/workflow/runtime/utils';
import {
getWorkflowEntryNodeIds,
initWorkflowEdgeStatus,
storeNodes2RuntimeNodes,
textAdaptGptResponse
} from '@fastgpt/global/core/workflow/runtime/utils';
import { NodeInputKeyEnum, NodeOutputKeyEnum } from '@fastgpt/global/core/workflow/constants';
import { DispatchNodeResponseKeyEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import { getHistories, setEntryEntries } from '../utils';
import { getHistories } from '../utils';
import { chatValue2RuntimePrompt, runtimePrompt2ChatsValue } from '@fastgpt/global/core/chat/adapt';
import { DispatchNodeResultType } from '@fastgpt/global/core/workflow/runtime/type';
import { authAppByTmbId } from '../../../../support/permission/app/auth';
import { ReadPermissionVal } from '@fastgpt/global/support/permission/constant';

type Props = ModuleDispatchProps<{
[NodeInputKeyEnum.userChatInput]: string;
[NodeInputKeyEnum.history]?: ChatItemType[] | number;
app: SelectAppItemType;
[NodeInputKeyEnum.fileUrlList]?: string[];
}>;
type Response = DispatchNodeResultType<{
[NodeOutputKeyEnum.answerText]: string;
[NodeOutputKeyEnum.history]: ChatItemType[];
}>;

export const dispatchAppRequest = async (props: Props): Promise<Response> => {
export const dispatchRunAppNode = async (props: Props): Promise<Response> => {
const {
res,
teamId,
stream,
detail,
app: workflowApp,
histories,
inputFiles,
params: { userChatInput, history, app }
query,
node: { pluginId },
workflowStreamResponse,
params
} = props;
let start = Date.now();

const { userChatInput, history, ...variables } = params;
if (!userChatInput) {
return Promise.reject('Input is empty');
}
if (!pluginId) {
return Promise.reject('pluginId is empty');
}

const appData = await MongoApp.findOne({
_id: app.id,
teamId
// Auth the app by tmbId(Not the user, but the workflow user)
const { app: appData } = await authAppByTmbId({
appId: pluginId,
tmbId: workflowApp.tmbId,
per: ReadPermissionVal
});

if (!appData) {
return Promise.reject('App not found');
}

if (stream) {
responseWrite({
res,
event: detail ? SseResponseEventEnum.answer : undefined,
data: textAdaptGptResponse({
text: '\n'
})
});
}
// Auto line
workflowStreamResponse?.({
event: SseResponseEventEnum.answer,
data: textAdaptGptResponse({
text: '\n'
})
});

const chatHistories = getHistories(history, histories);
const { files } = chatValue2RuntimePrompt(query);

const { flowResponses, flowUsages, assistantResponses } = await dispatchWorkFlowV1({
const { flowResponses, flowUsages, assistantResponses } = await dispatchWorkFlow({
...props,
appId: app.id,
modules: setEntryEntries(appData.modules),
runtimeModules: undefined, // must reset
app: appData,
runtimeNodes: storeNodes2RuntimeNodes(
appData.modules,
getWorkflowEntryNodeIds(appData.modules)
),
runtimeEdges: initWorkflowEdgeStatus(appData.edges),
histories: chatHistories,
inputFiles,
startParams: {
userChatInput
}
query: runtimePrompt2ChatsValue({
files,
text: userChatInput
}),
variables: variables
});

const completeMessages = chatHistories.concat([
{
obj: ChatRoleEnum.Human,
value: runtimePrompt2ChatsValue({
files: inputFiles,
text: userChatInput
})
value: query
},
{
obj: ChatRoleEnum.AI,
121 changes: 56 additions & 65 deletions packages/service/core/workflow/dispatch/agent/runTool/functionCall.ts
Original file line number Diff line number Diff line change
@@ -11,18 +11,14 @@ import {
ChatCompletionAssistantMessageParam
} from '@fastgpt/global/core/ai/type.d';
import { NextApiResponse } from 'next';
import {
responseWrite,
responseWriteController,
responseWriteNodeStatus
} from '../../../../../common/response';
import { responseWriteController } from '../../../../../common/response';
import { SseResponseEventEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import { textAdaptGptResponse } from '@fastgpt/global/core/workflow/runtime/utils';
import { ChatCompletionRequestMessageRoleEnum } from '@fastgpt/global/core/ai/constants';
import { dispatchWorkFlow } from '../../index';
import { DispatchToolModuleProps, RunToolResponse, ToolNodeItemType } from './type.d';
import json5 from 'json5';
import { DispatchFlowResponse } from '../../type';
import { DispatchFlowResponse, WorkflowResponseType } from '../../type';
import { countGptMessagesTokens } from '../../../../../common/string/tiktoken/index';
import { getNanoid, sliceStrStartEnd } from '@fastgpt/global/common/string/tools';
import { AIChatItemType } from '@fastgpt/global/core/chat/type';
@@ -50,9 +46,9 @@ export const runToolWithFunctionCall = async (
res,
requestOrigin,
runtimeNodes,
detail = false,
node,
stream,
workflowStreamResponse,
params: { temperature = 0, maxToken = 4000, aiChatVision }
} = props;
const assistantResponses = response?.assistantResponses || [];
@@ -143,9 +139,9 @@ export const runToolWithFunctionCall = async (
if (res && stream) {
return streamResponse({
res,
detail,
toolNodes,
stream: aiResponse
stream: aiResponse,
workflowStreamResponse
});
} else {
const result = aiResponse as ChatCompletion;
@@ -216,21 +212,18 @@ export const runToolWithFunctionCall = async (
content: stringToolResponse
};

if (stream && detail) {
responseWrite({
res,
event: SseResponseEventEnum.toolResponse,
data: JSON.stringify({
tool: {
id: tool.id,
toolName: '',
toolAvatar: '',
params: '',
response: sliceStrStartEnd(stringToolResponse, 500, 500)
}
})
});
}
workflowStreamResponse?.({
event: SseResponseEventEnum.toolResponse,
data: {
tool: {
id: tool.id,
toolName: '',
toolAvatar: '',
params: '',
response: sliceStrStartEnd(stringToolResponse, 500, 500)
}
}
});

return {
toolRunResponse,
@@ -260,12 +253,14 @@ export const runToolWithFunctionCall = async (
];
// console.log(tokens, 'tool');

if (stream && detail) {
responseWriteNodeStatus({
res,
// Run tool status
workflowStreamResponse?.({
event: SseResponseEventEnum.flowNodeStatus,
data: {
status: 'running',
name: node.name
});
}
}
});

// tool assistant
const toolAssistants = toolsRunResponse
@@ -337,14 +332,14 @@ export const runToolWithFunctionCall = async (

async function streamResponse({
res,
detail,
toolNodes,
stream
stream,
workflowStreamResponse
}: {
res: NextApiResponse;
detail: boolean;
toolNodes: ToolNodeItemType[];
stream: StreamChatType;
workflowStreamResponse?: WorkflowResponseType;
}) {
const write = responseWriteController({
res,
@@ -367,9 +362,9 @@ async function streamResponse({
const content = responseChoice?.content || '';
textAnswer += content;

responseWrite({
workflowStreamResponse?.({
write,
event: detail ? SseResponseEventEnum.answer : undefined,
event: SseResponseEventEnum.answer,
data: textAdaptGptResponse({
text: content
})
@@ -397,22 +392,20 @@ async function streamResponse({
toolAvatar: toolNode.avatar
});

if (detail) {
responseWrite({
write,
event: SseResponseEventEnum.toolCall,
data: JSON.stringify({
tool: {
id: functionId,
toolName: toolNode.name,
toolAvatar: toolNode.avatar,
functionName: functionCall.name,
params: functionCall.arguments,
response: ''
}
})
});
}
workflowStreamResponse?.({
write,
event: SseResponseEventEnum.toolCall,
data: {
tool: {
id: functionId,
toolName: toolNode.name,
toolAvatar: toolNode.avatar,
functionName: functionCall.name,
params: functionCall.arguments,
response: ''
}
}
});
}

continue;
@@ -424,21 +417,19 @@ async function streamResponse({
if (currentTool) {
currentTool.arguments += arg;

if (detail) {
responseWrite({
write,
event: SseResponseEventEnum.toolParams,
data: JSON.stringify({
tool: {
id: functionId,
toolName: '',
toolAvatar: '',
params: arg,
response: ''
}
})
});
}
workflowStreamResponse?.({
write,
event: SseResponseEventEnum.toolParams,
data: {
tool: {
id: functionId,
toolName: '',
toolAvatar: '',
params: arg,
response: ''
}
}
});
}
}
}
100 changes: 46 additions & 54 deletions packages/service/core/workflow/dispatch/agent/runTool/promptCall.ts
Original file line number Diff line number Diff line change
@@ -8,11 +8,7 @@ import {
ChatCompletionAssistantMessageParam
} from '@fastgpt/global/core/ai/type';
import { NextApiResponse } from 'next';
import {
responseWrite,
responseWriteController,
responseWriteNodeStatus
} from '../../../../../common/response';
import { responseWriteController } from '../../../../../common/response';
import { SseResponseEventEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import { textAdaptGptResponse } from '@fastgpt/global/core/workflow/runtime/utils';
import { ChatCompletionRequestMessageRoleEnum } from '@fastgpt/global/core/ai/constants';
@@ -30,6 +26,7 @@ import { AIChatItemType } from '@fastgpt/global/core/chat/type';
import { GPTMessages2Chats } from '@fastgpt/global/core/chat/adapt';
import { updateToolInputValue } from './utils';
import { computedMaxToken, computedTemperature } from '../../../../ai/utils';
import { WorkflowResponseType } from '../../type';

type FunctionCallCompletion = {
id: string;
@@ -56,9 +53,9 @@ export const runToolWithPromptCall = async (
res,
requestOrigin,
runtimeNodes,
detail = false,
node,
stream,
workflowStreamResponse,
params: { temperature = 0, maxToken = 4000, aiChatVision }
} = props;
const assistantResponses = response?.assistantResponses || [];
@@ -143,9 +140,9 @@ export const runToolWithPromptCall = async (
if (res && stream) {
const { answer } = await streamResponse({
res,
detail,
toolNodes,
stream: aiResponse
stream: aiResponse,
workflowStreamResponse
});

return answer;
@@ -159,9 +156,8 @@ export const runToolWithPromptCall = async (
const { answer: replaceAnswer, toolJson } = parseAnswer(answer);
// No tools
if (!toolJson) {
if (replaceAnswer === ERROR_TEXT && stream && detail) {
responseWrite({
res,
if (replaceAnswer === ERROR_TEXT) {
workflowStreamResponse?.({
event: SseResponseEventEnum.answer,
data: textAdaptGptResponse({
text: replaceAnswer
@@ -206,22 +202,19 @@ export const runToolWithPromptCall = async (
})();

// SSE response to client
if (stream && detail) {
responseWrite({
res,
event: SseResponseEventEnum.toolCall,
data: JSON.stringify({
tool: {
id: toolJson.id,
toolName: toolNode.name,
toolAvatar: toolNode.avatar,
functionName: toolJson.name,
params: toolJson.arguments,
response: ''
}
})
});
}
workflowStreamResponse?.({
event: SseResponseEventEnum.toolCall,
data: {
tool: {
id: toolJson.id,
toolName: toolNode.name,
toolAvatar: toolNode.avatar,
functionName: toolJson.name,
params: toolJson.arguments,
response: ''
}
}
});

const moduleRunResponse = await dispatchWorkFlow({
...props,
@@ -245,34 +238,33 @@ export const runToolWithPromptCall = async (
return moduleRunResponse.toolResponses ? String(moduleRunResponse.toolResponses) : 'none';
})();

if (stream && detail) {
responseWrite({
res,
event: SseResponseEventEnum.toolResponse,
data: JSON.stringify({
tool: {
id: toolJson.id,
toolName: '',
toolAvatar: '',
params: '',
response: sliceStrStartEnd(stringToolResponse, 500, 500)
}
})
});
}
workflowStreamResponse?.({
event: SseResponseEventEnum.toolResponse,
data: {
tool: {
id: toolJson.id,
toolName: '',
toolAvatar: '',
params: '',
response: sliceStrStartEnd(stringToolResponse, 500, 500)
}
}
});

return {
moduleRunResponse,
toolResponsePrompt: stringToolResponse
};
})();

if (stream && detail) {
responseWriteNodeStatus({
res,
// Run tool status
workflowStreamResponse?.({
event: SseResponseEventEnum.flowNodeStatus,
data: {
status: 'running',
name: node.name
});
}
}
});

// 合并工具调用的结果,使用 functionCall 格式存储。
const assistantToolMsgParams: ChatCompletionAssistantMessageParam = {
@@ -340,13 +332,13 @@ ANSWER: `;

async function streamResponse({
res,
detail,
stream
stream,
workflowStreamResponse
}: {
res: NextApiResponse;
detail: boolean;
toolNodes: ToolNodeItemType[];
stream: StreamChatType;
workflowStreamResponse?: WorkflowResponseType;
}) {
const write = responseWriteController({
res,
@@ -370,9 +362,9 @@ async function streamResponse({
textAnswer += content;

if (startResponseWrite) {
responseWrite({
workflowStreamResponse?.({
write,
event: detail ? SseResponseEventEnum.answer : undefined,
event: SseResponseEventEnum.answer,
data: textAdaptGptResponse({
text: content
})
@@ -384,9 +376,9 @@ async function streamResponse({
// find first : index
const firstIndex = textAnswer.indexOf(':');
textAnswer = textAnswer.substring(firstIndex + 1).trim();
responseWrite({
workflowStreamResponse?.({
write,
event: detail ? SseResponseEventEnum.answer : undefined,
event: SseResponseEventEnum.answer,
data: textAdaptGptResponse({
text: textAnswer
})
417 changes: 208 additions & 209 deletions packages/service/core/workflow/dispatch/agent/runTool/toolChoice.ts

Large diffs are not rendered by default.

19 changes: 10 additions & 9 deletions packages/service/core/workflow/dispatch/chat/oneapi.ts
Original file line number Diff line number Diff line change
@@ -31,7 +31,7 @@ import {
import type { AIChatNodeProps } from '@fastgpt/global/core/workflow/runtime/type.d';
import { replaceVariable } from '@fastgpt/global/common/string/tools';
import type { ModuleDispatchProps } from '@fastgpt/global/core/workflow/runtime/type';
import { responseWrite, responseWriteController } from '../../../../common/response';
import { responseWriteController } from '../../../../common/response';
import { getLLMModel, ModelTypeEnum } from '../../../ai/model';
import type { SearchDataResponseItemType } from '@fastgpt/global/core/dataset/type';
import { NodeInputKeyEnum, NodeOutputKeyEnum } from '@fastgpt/global/core/workflow/constants';
@@ -41,6 +41,7 @@ import { filterSearchResultsByMaxChars } from '../../utils';
import { getHistoryPreview } from '@fastgpt/global/core/chat/utils';
import { addLog } from '../../../../common/system/log';
import { computedMaxToken, computedTemperature } from '../../../ai/utils';
import { WorkflowResponseType } from '../type';

export type ChatProps = ModuleDispatchProps<
AIChatNodeProps & {
@@ -60,11 +61,11 @@ export const dispatchChatCompletion = async (props: ChatProps): Promise<ChatResp
res,
requestOrigin,
stream = false,
detail = false,
user,
histories,
node: { name },
query,
workflowStreamResponse,
params: {
model,
temperature = 0,
@@ -179,8 +180,8 @@ export const dispatchChatCompletion = async (props: ChatProps): Promise<ChatResp
// sse response
const { answer } = await streamResponse({
res,
detail,
stream: response
stream: response,
workflowStreamResponse
});

if (!answer) {
@@ -340,12 +341,12 @@ async function getChatMessages({

async function streamResponse({
res,
detail,
stream
stream,
workflowStreamResponse
}: {
res: NextApiResponse;
detail: boolean;
stream: StreamChatType;
workflowStreamResponse?: WorkflowResponseType;
}) {
const write = responseWriteController({
res,
@@ -360,9 +361,9 @@ async function streamResponse({
const content = part.choices?.[0]?.delta?.content || '';
answer += content;

responseWrite({
workflowStreamResponse?.({
write,
event: detail ? SseResponseEventEnum.answer : undefined,
event: SseResponseEventEnum.answer,
data: textAdaptGptResponse({
text: content
})
51 changes: 14 additions & 37 deletions packages/service/core/workflow/dispatch/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { NextApiResponse } from 'next';
import { NodeInputKeyEnum } from '@fastgpt/global/core/workflow/constants';
import {
DispatchNodeResponseKeyEnum,
@@ -21,7 +20,6 @@ import {
FlowNodeTypeEnum
} from '@fastgpt/global/core/workflow/node/constant';
import { replaceVariable } from '@fastgpt/global/common/string/tools';
import { responseWrite, responseWriteNodeStatus } from '../../../common/response';
import { getSystemTime } from '@fastgpt/global/common/time/timezone';
import { replaceVariableLabel } from '@fastgpt/global/core/workflow/utils';

@@ -41,8 +39,7 @@ import { dispatchPluginOutput } from './plugin/runOutput';
import { removeSystemVariable, valueTypeFormat } from './utils';
import {
filterWorkflowEdges,
checkNodeRunStatus,
getLastInteractiveValue
checkNodeRunStatus
} from '@fastgpt/global/core/workflow/runtime/utils';
import { ChatNodeUsageType } from '@fastgpt/global/support/wallet/bill/type';
import { dispatchRunTools } from './agent/runTool/index';
@@ -62,12 +59,11 @@ import { dispatchTextEditor } from './tools/textEditor';
import { dispatchCustomFeedback } from './tools/customFeedback';
import { dispatchReadFiles } from './tools/readFiles';
import { dispatchUserSelect } from './interactive/userSelect';
import { FlowNodeOutputItemType } from '@fastgpt/global/core/workflow/type/io';
import {
InteractiveNodeResponseItemType,
UserInteractiveType,
UserSelectInteractive
} from '@fastgpt/global/core/workflow/template/system/userSelect/type';
import { dispatchRunAppNode } from './agent/runAppModule';

const callbackMap: Record<FlowNodeTypeEnum, Function> = {
[FlowNodeTypeEnum.workflowStart]: dispatchWorkflowStart,
@@ -79,6 +75,7 @@ const callbackMap: Record<FlowNodeTypeEnum, Function> = {
[FlowNodeTypeEnum.contentExtract]: dispatchContentExtract,
[FlowNodeTypeEnum.httpRequest468]: dispatchHttp468Request,
[FlowNodeTypeEnum.runApp]: dispatchAppRequest,
[FlowNodeTypeEnum.appModule]: dispatchRunAppNode,
[FlowNodeTypeEnum.pluginModule]: dispatchRunPlugin,
[FlowNodeTypeEnum.pluginInput]: dispatchPluginInput,
[FlowNodeTypeEnum.pluginOutput]: dispatchPluginOutput,
@@ -115,7 +112,6 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
variables = {},
user,
stream = false,
detail = false,
...props
} = data;

@@ -261,13 +257,10 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
nodeOutputs
};

if (stream && res) {
responseWrite({
res,
event: SseResponseEventEnum.interactive,
data: JSON.stringify({ interactive: interactiveResult })
});
}
props.workflowStreamResponse?.({
event: SseResponseEventEnum.interactive,
data: { interactive: interactiveResult }
});

return {
type: ChatItemValueTypeEnum.interactive,
@@ -401,11 +394,13 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
}
async function nodeRunWithActive(node: RuntimeNodeItemType) {
// push run status messages
if (res && stream && detail && node.showStatus) {
responseStatus({
res,
name: node.name,
status: 'running'
if (node.showStatus) {
props.workflowStreamResponse?.({
event: SseResponseEventEnum.flowNodeStatus,
data: {
status: 'running',
name: node.name
}
});
}
const startTime = Date.now();
@@ -420,7 +415,6 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
histories,
user,
stream,
detail,
node,
runtimeNodes,
runtimeEdges,
@@ -510,23 +504,6 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
};
}

/* sse response modules staus */
export function responseStatus({
res,
status,
name
}: {
res: NextApiResponse;
status?: 'running' | 'finish';
name?: string;
}) {
if (!name) return;
responseWriteNodeStatus({
res,
name
});
}

/* get system variable */
export function getSystemVariable({
user,
Original file line number Diff line number Diff line change
@@ -14,7 +14,6 @@ import type {
} from '@fastgpt/global/core/workflow/template/system/userSelect/type';
import { updateUserSelectedResult } from '../../../chat/controller';
import { textAdaptGptResponse } from '@fastgpt/global/core/workflow/runtime/utils';
import { responseWrite } from '../../../../common/response';
import { chatValue2RuntimePrompt } from '@fastgpt/global/core/chat/adapt';

type Props = ModuleDispatchProps<{
@@ -29,10 +28,7 @@ type UserSelectResponse = DispatchNodeResultType<{

export const dispatchUserSelect = async (props: Props): Promise<UserSelectResponse> => {
const {
res,
detail,
histories,
stream,
workflowStreamResponse,
app: { _id: appId },
chatId,
node: { nodeId, isEntry },
@@ -43,10 +39,9 @@ export const dispatchUserSelect = async (props: Props): Promise<UserSelectRespon
// Interactive node is not the entry node, return interactive result
if (!isEntry) {
const answerText = description ? `\n${description}` : undefined;
if (res && stream && answerText) {
responseWrite({
res,
event: detail ? SseResponseEventEnum.fastAnswer : undefined,
if (answerText) {
workflowStreamResponse?.({
event: SseResponseEventEnum.fastAnswer,
data: textAdaptGptResponse({
text: answerText
})
20 changes: 7 additions & 13 deletions packages/service/core/workflow/dispatch/tools/answer.ts
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@ import {
DispatchNodeResponseKeyEnum,
SseResponseEventEnum
} from '@fastgpt/global/core/workflow/runtime/constants';
import { responseWrite } from '../../../../common/response';
import { textAdaptGptResponse } from '@fastgpt/global/core/workflow/runtime/utils';
import type { ModuleDispatchProps } from '@fastgpt/global/core/workflow/runtime/type';
import { NodeOutputKeyEnum } from '@fastgpt/global/core/workflow/constants';
@@ -16,24 +15,19 @@ export type AnswerResponse = DispatchNodeResultType<{

export const dispatchAnswer = (props: Record<string, any>): AnswerResponse => {
const {
res,
detail,
stream,
workflowStreamResponse,
params: { text = '' }
} = props as AnswerProps;

const formatText = typeof text === 'string' ? text : JSON.stringify(text, null, 2);
const responseText = `\n${formatText}`;

if (res && stream) {
responseWrite({
res,
event: detail ? SseResponseEventEnum.fastAnswer : undefined,
data: textAdaptGptResponse({
text: responseText
})
});
}
workflowStreamResponse?.({
event: SseResponseEventEnum.fastAnswer,
data: textAdaptGptResponse({
text: responseText
})
});

return {
[NodeOutputKeyEnum.answerText]: responseText,
Original file line number Diff line number Diff line change
@@ -6,7 +6,6 @@ import type { ModuleDispatchProps } from '@fastgpt/global/core/workflow/runtime/
import { NodeInputKeyEnum } from '@fastgpt/global/core/workflow/constants';
import { DispatchNodeResultType } from '@fastgpt/global/core/workflow/runtime/type';
import { addCustomFeedbacks } from '../../../chat/controller';
import { responseWrite } from '../../../../common/response';
import { textAdaptGptResponse } from '@fastgpt/global/core/workflow/runtime/utils';

type Props = ModuleDispatchProps<{
@@ -16,12 +15,11 @@ type Response = DispatchNodeResultType<{}>;

export const dispatchCustomFeedback = (props: Record<string, any>): Response => {
const {
res,
app: { _id: appId },
chatId,
responseChatItemId: chatItemId,
stream,
detail,
workflowStreamResponse,
params: { system_textareaInput: feedbackText = '' }
} = props as Props;

@@ -36,9 +34,8 @@ export const dispatchCustomFeedback = (props: Record<string, any>): Response =>

if (stream) {
if (!chatId || !chatItemId) {
responseWrite({
res,
event: detail ? SseResponseEventEnum.fastAnswer : undefined,
workflowStreamResponse?.({
event: SseResponseEventEnum.fastAnswer,
data: textAdaptGptResponse({
text: `\n\n**自定义反馈成功: (仅调试模式下展示该内容)**: "${feedbackText}"\n\n`
})
12 changes: 4 additions & 8 deletions packages/service/core/workflow/dispatch/tools/http468.ts
Original file line number Diff line number Diff line change
@@ -14,7 +14,6 @@ import { SERVICE_LOCAL_HOST } from '../../../../common/system/tools';
import { addLog } from '../../../../common/system/log';
import { DispatchNodeResultType } from '@fastgpt/global/core/workflow/runtime/type';
import { getErrText } from '@fastgpt/global/common/error/utils';
import { responseWrite } from '../../../../common/response';
import { textAdaptGptResponse } from '@fastgpt/global/core/workflow/runtime/utils';
import { getSystemPluginCb } from '../../../../../plugins/register';

@@ -43,15 +42,13 @@ const UNDEFINED_SIGN = 'UNDEFINED_SIGN';

export const dispatchHttp468Request = async (props: HttpRequestProps): Promise<HttpResponse> => {
let {
res,
detail,
app: { _id: appId },
chatId,
stream,
responseChatItemId,
variables,
node: { outputs },
histories,
workflowStreamResponse,
params: {
system_httpMethod: httpMethod = 'POST',
system_httpReqUrl: httpReqUrl,
@@ -158,10 +155,9 @@ export const dispatchHttp468Request = async (props: HttpRequestProps): Promise<H
results[key] = valueTypeFormat(formatResponse[key], output.valueType);
}

if (stream && typeof formatResponse[NodeOutputKeyEnum.answerText] === 'string') {
responseWrite({
res,
event: detail ? SseResponseEventEnum.fastAnswer : undefined,
if (typeof formatResponse[NodeOutputKeyEnum.answerText] === 'string') {
workflowStreamResponse?.({
event: SseResponseEventEnum.fastAnswer,
data: textAdaptGptResponse({
text: formatResponse[NodeOutputKeyEnum.answerText]
})
20 changes: 7 additions & 13 deletions packages/service/core/workflow/dispatch/tools/runApp.ts
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@ import type { ChatItemType } from '@fastgpt/global/core/chat/type.d';
import type { ModuleDispatchProps } from '@fastgpt/global/core/workflow/runtime/type';
import { SelectAppItemType } from '@fastgpt/global/core/workflow/template/system/runApp/type';
import { dispatchWorkFlow } from '../index';
import { responseWrite } from '../../../../common/response';
import { ChatRoleEnum } from '@fastgpt/global/core/chat/constants';
import { SseResponseEventEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import {
@@ -31,10 +30,8 @@ type Response = DispatchNodeResultType<{

export const dispatchAppRequest = async (props: Props): Promise<Response> => {
const {
res,
app: workflowApp,
stream,
detail,
workflowStreamResponse,
histories,
query,
params: { userChatInput, history, app }
@@ -51,15 +48,12 @@ export const dispatchAppRequest = async (props: Props): Promise<Response> => {
per: ReadPermissionVal
});

if (res && stream) {
responseWrite({
res,
event: detail ? SseResponseEventEnum.answer : undefined,
data: textAdaptGptResponse({
text: '\n'
})
});
}
workflowStreamResponse?.({
event: SseResponseEventEnum.fastAnswer,
data: textAdaptGptResponse({
text: '\n'
})
});

const chatHistories = getHistories(history, histories);
const { files } = chatValue2RuntimePrompt(query);
14 changes: 5 additions & 9 deletions packages/service/core/workflow/dispatch/tools/runUpdateVar.ts
Original file line number Diff line number Diff line change
@@ -8,15 +8,14 @@ import { getReferenceVariableValue } from '@fastgpt/global/core/workflow/runtime
import { TUpdateListItem } from '@fastgpt/global/core/workflow/template/system/variableUpdate/type';
import { ModuleDispatchProps } from '@fastgpt/global/core/workflow/runtime/type';
import { removeSystemVariable, valueTypeFormat } from '../utils';
import { responseWrite } from '../../../../common/response';

type Props = ModuleDispatchProps<{
[NodeInputKeyEnum.updateList]: TUpdateListItem[];
}>;
type Response = DispatchNodeResultType<{}>;

export const dispatchUpdateVariable = async (props: Props): Promise<Response> => {
const { res, detail, stream, params, variables, runtimeNodes } = props;
const { params, variables, runtimeNodes, workflowStreamResponse } = props;

const { updateList } = params;
updateList.forEach((item) => {
@@ -54,13 +53,10 @@ export const dispatchUpdateVariable = async (props: Props): Promise<Response> =>
}
});

if (detail && stream) {
responseWrite({
res,
event: SseResponseEventEnum.updateVariables,
data: JSON.stringify(removeSystemVariable(variables))
});
}
workflowStreamResponse?.({
event: SseResponseEventEnum.updateVariables,
data: removeSystemVariable(variables)
});

return {
[DispatchNodeResponseKeyEnum.nodeResponse]: {
17 changes: 16 additions & 1 deletion packages/service/core/workflow/dispatch/type.d.ts
Original file line number Diff line number Diff line change
@@ -4,7 +4,10 @@ import {
ChatItemValueItemType,
ToolRunResponseItemType
} from '@fastgpt/global/core/chat/type';
import { DispatchNodeResponseKeyEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import {
DispatchNodeResponseKeyEnum,
SseResponseEventEnum
} from '@fastgpt/global/core/workflow/runtime/constants';
import { RuntimeNodeItemType } from '@fastgpt/global/core/workflow/runtime/type';
import { RuntimeEdgeItemType } from '@fastgpt/global/core/workflow/type/edge';
import { ChatNodeUsageType } from '@fastgpt/global/support/wallet/bill/type';
@@ -21,3 +24,15 @@ export type DispatchFlowResponse = {
[DispatchNodeResponseKeyEnum.assistantResponses]: AIChatItemValueItemType[];
newVariables: Record<string, string>;
};

export type WorkflowResponseType = ({
write,
event,
data,
stream
}: {
write?: ((text: string) => void) | undefined;
event: SseResponseEventEnum;
data: Record<string, any>;
stream?: boolean | undefined;
}) => void;
50 changes: 50 additions & 0 deletions packages/service/core/workflow/dispatch/utils.ts
Original file line number Diff line number Diff line change
@@ -6,6 +6,56 @@ import {
NodeOutputKeyEnum
} from '@fastgpt/global/core/workflow/constants';
import { RuntimeEdgeItemType } from '@fastgpt/global/core/workflow/runtime/type';
import { responseWrite } from '../../../common/response';
import { NextApiResponse } from 'next';
import { SseResponseEventEnum } from '@fastgpt/global/core/workflow/runtime/constants';

export const getWorkflowResponseWrite = ({
res,
detail,
streamResponse,
id
}: {
res?: NextApiResponse;
detail: boolean;
streamResponse: boolean;
id: string;
}) => {
return ({
write,
event,
data,
stream
}: {
write?: (text: string) => void;
event: SseResponseEventEnum;
data: Record<string, any>;
stream?: boolean; // Focus set stream response
}) => {
const useStreamResponse = stream ?? streamResponse;

if (!res || res.closed || !useStreamResponse) return;

const detailEvent = [
SseResponseEventEnum.error,
SseResponseEventEnum.flowNodeStatus,
SseResponseEventEnum.flowResponses,
SseResponseEventEnum.interactive,
SseResponseEventEnum.toolCall,
SseResponseEventEnum.toolParams,
SseResponseEventEnum.toolResponse,
SseResponseEventEnum.updateVariables
];
if (!detail && detailEvent.includes(event)) return;

responseWrite({
res,
write,
event: detail ? event : undefined,
data: JSON.stringify(data)
});
};
};

export const filterToolNodeIdByEdges = ({
nodeId,
321 changes: 0 additions & 321 deletions packages/service/core/workflow/dispatchV1/agent/classifyQuestion.ts

This file was deleted.

384 changes: 0 additions & 384 deletions packages/service/core/workflow/dispatchV1/agent/extract.ts

This file was deleted.

This file was deleted.

This file was deleted.

158 changes: 0 additions & 158 deletions packages/service/core/workflow/dispatchV1/agent/runTool/index.ts

This file was deleted.

388 changes: 0 additions & 388 deletions packages/service/core/workflow/dispatchV1/agent/runTool/promptCall.ts

This file was deleted.

This file was deleted.

413 changes: 0 additions & 413 deletions packages/service/core/workflow/dispatchV1/agent/runTool/toolChoice.ts

This file was deleted.

28 changes: 0 additions & 28 deletions packages/service/core/workflow/dispatchV1/agent/runTool/type.d.ts

This file was deleted.

396 changes: 0 additions & 396 deletions packages/service/core/workflow/dispatchV1/chat/oneapi.ts

This file was deleted.

35 changes: 0 additions & 35 deletions packages/service/core/workflow/dispatchV1/dataset/concat.ts

This file was deleted.

165 changes: 0 additions & 165 deletions packages/service/core/workflow/dispatchV1/dataset/search.ts

This file was deleted.

Loading