11package io .github .timemachinelab .controller ;
22
3+ import io .github .timemachinelab .core .session .application .ConversationService ;
4+ import io .github .timemachinelab .core .session .application .MessageProcessingService ;
5+ import io .github .timemachinelab .core .session .application .SessionManagementService ;
6+ import io .github .timemachinelab .core .session .domain .entity .ConversationSession ;
7+ import io .github .timemachinelab .core .session .infrastructure .ai .QuestionGenerationOperation ;
8+ import io .github .timemachinelab .core .session .infrastructure .web .dto .UnifiedAnswerRequest ;
9+ import io .github .timemachinelab .core .session .infrastructure .web .dto .MessageResponse ;
310import io .github .timemachinelab .entity .req .RetryRequest ;
411import io .github .timemachinelab .entity .resp .ApiResult ;
512import io .github .timemachinelab .entity .resp .RetryResponse ;
613import lombok .extern .slf4j .Slf4j ;
14+ import org .springframework .http .MediaType ;
715import org .springframework .http .ResponseEntity ;
816import org .springframework .validation .annotation .Validated ;
917import org .springframework .web .bind .annotation .*;
18+ import org .springframework .web .servlet .mvc .method .annotation .SseEmitter ;
1019
20+ import javax .annotation .Resource ;
1121import javax .validation .Valid ;
22+ import java .io .IOException ;
23+ import java .util .Map ;
24+ import java .util .UUID ;
25+ import java .util .concurrent .ConcurrentHashMap ;
1226
1327/**
1428 * 用户交互控制器
2337@ Validated
2438public class UserInteractionController {
2539
40+ @ Resource
41+ private ConversationService conversationService ;
42+ @ Resource
43+ private MessageProcessingService messageProcessingService ;
44+ @ Resource
45+ private SessionManagementService sessionManagementService ;
46+ private final Map <String , SseEmitter > sseEmitters = new ConcurrentHashMap <>();
47+
48+ /**
49+ * 建立SSE连接
50+ */
51+ @ GetMapping (value = "/sse" , produces = MediaType .TEXT_EVENT_STREAM_VALUE )
52+ public SseEmitter streamConversation (@ RequestParam (required = false ) String sessionId ) {
53+ log .info ("建立SSE连接 - 会话ID: {}" , sessionId );
54+
55+ if (sessionId == null || sessionId .isEmpty ()) {
56+ sessionId = UUID .randomUUID ().toString ();
57+ }
58+ SseEmitter emitter = new SseEmitter (Long .MAX_VALUE );
59+ sseEmitters .put (sessionId , emitter );
60+
61+ // 连接建立时发送欢迎消息
62+ try {
63+ emitter .send (SseEmitter .event ()
64+ .name ("connected" )
65+ .data ("SSE连接已建立,会话ID: " + sessionId ));
66+ } catch (IOException e ) {
67+ log .error ("发送欢迎消息失败: {}" , e .getMessage ());
68+ }
69+
70+ // 设置连接事件处理
71+ String finalSessionId = sessionId ;
72+ emitter .onCompletion (() -> {
73+ log .info ("SSE连接完成: {}" , finalSessionId );
74+ });
75+
76+ emitter .onTimeout (() -> {
77+ log .info ("SSE连接超时: {}" , finalSessionId );
78+ sseEmitters .remove (finalSessionId );
79+ });
80+
81+ emitter .onError ((ex ) -> {
82+ log .error ("SSE连接错误: {} - {}" , finalSessionId , ex .getMessage ());
83+ sseEmitters .remove (finalSessionId );
84+ });
85+
86+ return emitter ;
87+ }
88+
2689 /**
2790 * 重试接口
2891 *
@@ -34,7 +97,9 @@ public ResponseEntity<ApiResult<RetryResponse>> retry(@Valid @RequestBody RetryR
3497 try {
3598 log .info ("收到重试请求 - nodeId: {}, sessionId: {}, whyretry: {}" ,
3699 request .getNodeId (), request .getSessionId (), request .getWhyretry ());
37-
100+
101+
102+
38103 // 构建响应数据
39104 RetryResponse response = RetryResponse .builder ()
40105 .nodeId (request .getNodeId ())
@@ -53,4 +118,90 @@ public ResponseEntity<ApiResult<RetryResponse>> retry(@Valid @RequestBody RetryR
53118 return ResponseEntity .badRequest ().body (ApiResult .serverError ("重试请求处理失败: " + e .getMessage ()));
54119 }
55120 }
121+
122+ /**
123+ * 处理统一答案请求
124+ * 支持单选、多选、输入框、表单等多种问题类型的回答
125+ */
126+ @ PostMapping ("/message" )
127+ public ResponseEntity <String > processAnswer (@ Validated @ RequestBody UnifiedAnswerRequest request ) {
128+ try {
129+ log .info ("接收到答案请求 - 会话ID: {}, 节点ID: {}, 问题类型: {}" ,
130+ request .getSessionId (),
131+ request .getNodeId (),
132+ request .getQuestionType ());
133+
134+ // 1. 会话管理和验证
135+ String userId = request .getUserId ();
136+
137+ ConversationSession session = sessionManagementService .getOrCreateSession (userId , request .getSessionId ());
138+
139+ // 2. 验证nodeId是否属于该会话
140+ if (request .getNodeId () != null && !sessionManagementService .validateNodeId (session .getSessionId (), request .getNodeId ())) {
141+ log .warn ("无效的节点ID - 会话: {}, 节点: {}" , session .getSessionId (), request .getNodeId ());
142+ return ResponseEntity .badRequest ().body ("无效的节点ID" );
143+ }
144+
145+ // 3. 验证答案格式
146+ if (!messageProcessingService .validateAnswer (request )) {
147+ log .warn ("答案格式验证失败: {}" , request );
148+ return ResponseEntity .badRequest ().body ("答案格式不正确" );
149+ }
150+
151+ // 4. 处理答案并转换为消息
152+ String processedMessage = messageProcessingService .preprocessMessage (
153+ null , // 没有额外的原始消息
154+ request ,
155+ session
156+ );
157+
158+ // 5. 发送处理后的消息给AI服务
159+ conversationService .processUserMessage (
160+ session .getUserId (),
161+ processedMessage ,
162+ response -> sendSseMessage (session .getSessionId (), response )
163+ );
164+
165+ return ResponseEntity .ok ("答案处理成功" );
166+
167+ } catch (Exception e ) {
168+ log .error ("处理答案失败 - 会话ID: {}, 错误: {}" , request .getSessionId (), e .getMessage (), e );
169+ return ResponseEntity .internalServerError ().body ("答案处理失败: " + e .getMessage ());
170+ }
171+ }
172+
173+ /**
174+ * 通过SSE发送消息给客户端
175+ *
176+ * @param sessionId 会话ID
177+ * @param response 消息响应对象
178+ */
179+ private void sendSseMessage (String sessionId , QuestionGenerationOperation .QuestionGenerationResponse response ) {
180+ SseEmitter emitter = sseEmitters .get (sessionId );
181+ if (emitter != null ) {
182+ try {
183+ emitter .send (SseEmitter .event ()
184+ .name ("message" )
185+ .data (response ));
186+ log .info ("SSE消息发送成功 - 会话: {}, 消息: {}" , sessionId , response );
187+ } catch (IOException e ) {
188+ log .error ("SSE消息发送失败 - 会话: {}, 错误: {}" , sessionId , e .getMessage ());
189+ sseEmitters .remove (sessionId );
190+ }
191+ } else {
192+ log .warn ("SSE连接不存在 - 会话: {}" , sessionId );
193+ }
194+ }
195+
196+ /**
197+ * 获取SSE连接状态
198+ */
199+ @ GetMapping ("/sse-status" )
200+ public ResponseEntity <Map <String , Object >> getSseStatus () {
201+ Map <String , Object > status = new ConcurrentHashMap <>();
202+ status .put ("connectedSessions" , sseEmitters .keySet ());
203+ status .put ("totalConnections" , sseEmitters .size ());
204+ status .put ("timestamp" , System .currentTimeMillis ());
205+ return ResponseEntity .ok (status );
206+ }
56207}
0 commit comments