Skip to content

Commit 31b4835

Browse files
mymlfdc310
andauthored
fix: fix n8n streaming support issue (#1787)
* fix: fix n8n streaming support issue Add streaming support detection and proper message type handling for n8n service API runner. Previously, when streaming was enabled, n8n integration would fail due to incorrect message type usage. 1. Added streaming capability detection by checking adapter's is_stream_output_supported method 2. Implemented conditional message generation using MessageChunk for streaming mode and Message for non-streaming mode 3. Added proper error handling for adapters that don't support streaming detection * fix: add n8n webhook streaming model ,Optimized the streaming output when calling n8n. --------- Co-authored-by: Dong_master <[email protected]>
1 parent ba7cf69 commit 31b4835

File tree

1 file changed

+84
-24
lines changed

1 file changed

+84
-24
lines changed

src/langbot/pkg/provider/runners/n8nsvapi.py

Lines changed: 84 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,33 @@ async def _preprocess_user_message(self, query: pipeline_query.Query) -> str:
6868

6969
return plain_text
7070

71+
async def _process_stream_response(self, response: aiohttp.ClientResponse) -> typing.AsyncGenerator[
72+
provider_message.Message, None]:
73+
"""处理流式响应"""
74+
full_content = ""
75+
message_idx = 0
76+
is_final = False
77+
async for chunk in response.content.iter_chunked(1024):
78+
if not chunk:
79+
continue
80+
81+
try:
82+
data = json.loads(chunk)
83+
if data.get('type') == 'item' and 'content' in data:
84+
message_idx += 1
85+
content = data['content']
86+
full_content += content
87+
elif data.get('type') == 'end':
88+
is_final = True
89+
if is_final or message_idx % 8 == 0:
90+
yield provider_message.MessageChunk(
91+
role='assistant',
92+
content=full_content,
93+
is_final=is_final,
94+
)
95+
except json.JSONDecodeError:
96+
self.ap.logger.warning(f"Failed to parse final JSON line: {response.text()}")
97+
7198
async def _call_webhook(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
7299
"""调用n8n webhook"""
73100
# 生成会话ID(如果不存在)
@@ -80,6 +107,7 @@ async def _call_webhook(self, query: pipeline_query.Query) -> typing.AsyncGenera
80107
# 准备请求数据
81108
payload = {
82109
# 基本消息内容
110+
'chatInput' :plain_text, # 考虑到之前用户直接用的message model这里添加新键
83111
'message': plain_text,
84112
'user_message_text': plain_text,
85113
'conversation_id': query.session.using_conversation.uuid,
@@ -91,6 +119,11 @@ async def _call_webhook(self, query: pipeline_query.Query) -> typing.AsyncGenera
91119
# 添加所有变量到payload
92120
payload.update(query.variables)
93121

122+
try:
123+
is_stream = await query.adapter.is_stream_output_supported()
124+
except AttributeError:
125+
is_stream = False
126+
94127
try:
95128
# 准备请求头和认证信息
96129
headers = {}
@@ -126,35 +159,62 @@ async def _call_webhook(self, query: pipeline_query.Query) -> typing.AsyncGenera
126159

127160
# 调用webhook
128161
async with aiohttp.ClientSession() as session:
129-
async with session.post(
130-
self.webhook_url, json=payload, headers=headers, auth=auth, timeout=self.timeout
131-
) as response:
132-
if response.status != 200:
133-
error_text = await response.text()
134-
self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}')
135-
raise Exception(f'n8n webhook call failed: {response.status}, {error_text}')
136-
137-
# 解析响应
138-
response_data = await response.json()
139-
self.ap.logger.debug(f'n8n webhook response: {response_data}')
140-
141-
# 从响应中提取输出
142-
if self.output_key in response_data:
143-
output_content = response_data[self.output_key]
162+
if is_stream:
163+
# 流式请求
164+
async with session.post(
165+
self.webhook_url,
166+
json=payload,
167+
headers=headers,
168+
auth=auth,
169+
timeout=self.timeout
170+
) as response:
171+
if response.status != 200:
172+
error_text = await response.text()
173+
self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}')
174+
raise Exception(f'n8n webhook call failed: {response.status}, {error_text}')
175+
176+
# 处理流式响应
177+
async for chunk in self._process_stream_response(response):
178+
yield chunk
144179
else:
145-
# 如果没有指定的输出键,则使用整个响应
146-
output_content = json.dumps(response_data, ensure_ascii=False)
147-
148-
# 返回消息
149-
yield provider_message.Message(
150-
role='assistant',
151-
content=output_content,
152-
)
180+
async with session.post(
181+
self.webhook_url,
182+
json=payload,
183+
headers=headers,
184+
auth=auth,
185+
timeout=self.timeout
186+
) as response:
187+
try:
188+
async for chunk in self._process_stream_response(response):
189+
output_content = chunk.content if chunk.is_final else ''
190+
except:
191+
# 非流式请求(保持原有逻辑)
192+
if response.status != 200:
193+
error_text = await response.text()
194+
self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}')
195+
raise Exception(f'n8n webhook call failed: {response.status}, {error_text}')
196+
197+
# 解析响应
198+
response_data = await response.json()
199+
self.ap.logger.debug(f'n8n webhook response: {response_data}')
200+
201+
# 从响应中提取输出
202+
if self.output_key in response_data:
203+
output_content = response_data[self.output_key]
204+
else:
205+
# 如果没有指定的输出键,则使用整个响应
206+
output_content = json.dumps(response_data, ensure_ascii=False)
207+
208+
# 返回消息
209+
yield provider_message.Message(
210+
role='assistant',
211+
content=output_content,
212+
)
153213
except Exception as e:
154214
self.ap.logger.error(f'n8n webhook call exception: {str(e)}')
155215
raise N8nAPIError(f'n8n webhook call exception: {str(e)}')
156216

157217
async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
158218
"""运行请求"""
159219
async for msg in self._call_webhook(query):
160-
yield msg
220+
yield msg

0 commit comments

Comments
 (0)