From e8b31f4216b206547fc3a477568334e13226fb47 Mon Sep 17 00:00:00 2001 From: "liu.zixi" Date: Mon, 24 Nov 2025 15:36:28 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E8=B6=85=E6=97=B6=E5=BC=82=E5=B8=B8?= =?UTF-8?q?=E5=A4=84=E7=90=86=E3=80=81=E6=89=B9=E5=A4=84=E7=90=86=E9=80=BB?= =?UTF-8?q?=E8=BE=91=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../module/boecase/api/CaseAiChatApi.java | 20 ++++ .../boecase/service/ICaseAiChatService.java | 8 +- .../service/impl/CaseAiChatServiceImpl.java | 102 ++++++++++++++++-- .../module/boecase/task/CaseUploadTask.java | 60 ++++++++--- 4 files changed, 166 insertions(+), 24 deletions(-) diff --git a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/api/CaseAiChatApi.java b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/api/CaseAiChatApi.java index 64f57387..e71c6aae 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/api/CaseAiChatApi.java +++ b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/api/CaseAiChatApi.java @@ -64,6 +64,26 @@ public class CaseAiChatApi extends ApiBaseController { return caseAiChatService.chat(caseAiChatDto, getCurrent()); } + /** + * 停止当前聊天输出 + * @param conversationId 会话ID + * @return 是否成功停止 + */ + @PostMapping("/stop") + public JsonResponse stopChat(@RequestParam String conversationId) { + try { + boolean result = caseAiChatService.stopChatOutput(conversationId); + if (result) { + return success(true, "成功停止输出"); + } else { + return success(false, "未找到对应的会话或会话已结束"); + } + } catch (Exception e) { + log.error("停止聊天输出异常", e); + return error("停止输出失败", e.getMessage()); + } + } + /** * 根据conversationId查看会话内消息记录 * @param conversationId 会话ID diff --git a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/ICaseAiChatService.java b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/ICaseAiChatService.java index 6f840458..9bedf359 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/ICaseAiChatService.java +++ b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/ICaseAiChatService.java @@ -43,4 +43,10 @@ public interface ICaseAiChatService { * @param endTime 结束时间 */ void downloadConversationExcel(LocalDateTime startTime, LocalDateTime endTime); -} \ No newline at end of file + + /** + * 停止当前聊天输出 + * @param conversationId 会话ID + * @return 是否成功停止 + */ + boolean stopChatOutput(String conversationId); \ No newline at end of file diff --git a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/impl/CaseAiChatServiceImpl.java b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/impl/CaseAiChatServiceImpl.java index d79fd098..a6712e1b 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/impl/CaseAiChatServiceImpl.java +++ b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/impl/CaseAiChatServiceImpl.java @@ -64,6 +64,7 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.File; import java.io.IOException; import java.io.OutputStream; +import java.net.SocketTimeoutException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -71,6 +72,7 @@ import java.nio.file.Paths; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @EnableConfigurationProperties({CaseAiProperties.class}) @@ -78,6 +80,8 @@ import java.util.concurrent.TimeUnit; @Slf4j public class CaseAiChatServiceImpl implements ICaseAiChatService { + private static final String SYS_ERR_MSG = "服务繁忙,请稍后再试。"; + @Autowired private CaseAiProperties caseAiProperties; @@ -102,6 +106,9 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService { @Autowired private CasesDao casesDao; + + // 用于存储会话ID与EventSource的映射关系,以便能够中断特定会话 + private final Map conversationEventSourceMap = new ConcurrentHashMap<>(); @Override @Transactional @@ -114,7 +121,7 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService { conversationId = getOrCreateConversationId(caseAiChatDto, currentUser); } catch (Exception e) { log.error("获取会话ID失败", e); - errMessage(sseEmitter, "服务繁忙,请稍后再试。"); + errMessage(sseEmitter, SYS_ERR_MSG); sseEmitter.complete(); return sseEmitter; } @@ -156,7 +163,7 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService { accessToken = aiAccessTokenService.getAccessToken(); } catch (Exception e) { log.error("获取access_token失败", e); - errMessage(sseEmitter, "服务繁忙,请稍后再试。"); + errMessage(sseEmitter, SYS_ERR_MSG); sseEmitter.complete(); return sseEmitter; } @@ -181,6 +188,8 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService { @Override public void onOpen(@NotNull EventSource eventSource, @NotNull Response response) { log.info("调用接口 [{}] 接口开始监听", request.url()); + // 将EventSource存储到Map中,以便后续可以中断 + conversationEventSourceMap.put(conversationId, eventSource); } @Override @@ -188,6 +197,8 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService { log.info("调用接口 [{}] 接口关闭", request.url()); // 对话完成,保存到ES elasticSearchIndexService.createData(conversationData); + // 从Map中移除已完成的会话 + conversationEventSourceMap.remove(conversationId); sseEmitter.complete(); } @@ -246,20 +257,40 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService { @Override public void onFailure(@NotNull EventSource eventSource, @Nullable Throwable e, @Nullable Response response) { + if (e == null) { + sseEmitter.completeWithError(new RuntimeException("调用接口异常, 异常未捕获")); + return; + } log.error("调用接口 [{}] 接口异常", request.url(), e); + if (isTimeoutException(e)) { + log.warn("接口调用超时,conversationId: {}", conversationId); + errMessage(sseEmitter, SYS_ERR_MSG); + sseEmitter.complete(); + // 从Map中移除失败的会话 + conversationEventSourceMap.remove(conversationId); + + // 即使失败,也要将已有的对话数据保存到ES + elasticSearchIndexService.createData(conversationData); + return; + } // 如果是 content-type 错误,尝试作为普通 HTTP 请求处理 if (e instanceof IllegalStateException && e.getMessage() != null && e.getMessage().contains("Invalid content-type")) { log.warn("服务器返回的 Content-Type 不是 text/event-stream,尝试作为普通 HTTP 请求处理"); handleAsRegularHttpRequest(request, sseEmitter, conversationData); + // 从Map中移除失败的会话 + conversationEventSourceMap.remove(conversationId); + // 即使失败,也要将已有的对话数据保存到ES + elasticSearchIndexService.createData(conversationData); return; } + + sseEmitter.completeWithError(e); + // 从Map中移除失败的会话 + conversationEventSourceMap.remove(conversationId); - if (e != null) { - sseEmitter.completeWithError(e); - } else { - sseEmitter.completeWithError(new RuntimeException("调用接口异常, 异常未捕获")); - } + // 即使失败,也要将已有的对话数据保存到ES + elasticSearchIndexService.createData(conversationData); } }; @@ -726,4 +757,61 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService { sseEmitter.completeWithError(e); } } + + @Override + public boolean stopChatOutput(String conversationId) { + EventSource eventSource = conversationEventSourceMap.get(conversationId); + if (eventSource != null) { + try { + // 取消事件源,中断连接 + eventSource.cancel(); + // 注意:cancel()会触发onFailure回调,在onFailure中会清理资源 + log.info("成功发送停止会话 {} 的指令", conversationId); + return true; + } catch (Exception e) { + log.error("停止会话 {} 输出时发生异常", conversationId, e); + // 即使出现异常,也从Map中移除,避免内存泄漏 + conversationEventSourceMap.remove(conversationId); + return false; + } + } else { + log.warn("未找到会话 {} 对应的事件源,可能已经完成或不存在", conversationId); + return false; + } + } + + /** + * 判断Throwable是否为超时类异常 + * @param e + * @return + */ + private boolean isTimeoutException(@Nullable Throwable e) { + if (e == null) { + return false; + } + + // ConnectException SocketTimeoutException + if (e instanceof java.net.ConnectException || e instanceof java.net.SocketTimeoutException) { + return true; + } + + // 可能是包装后的异常,递归检查 cause + Throwable cause = e.getCause(); + while (cause != null) { + if (cause instanceof java.net.ConnectException || cause instanceof java.net.SocketTimeoutException) { + return true; + } + cause = cause.getCause(); + } + + // 有些情况下 OkHttp 会抛出 IOException 并包含 "timeout" 字样 + if (e instanceof java.io.IOException) { + String msg = e.getMessage(); + if (msg != null && msg.toLowerCase().contains("timeout")) { + return true; + } + } + + return false; + } } diff --git a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/task/CaseUploadTask.java b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/task/CaseUploadTask.java index e8d18120..b888099b 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/task/CaseUploadTask.java +++ b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/task/CaseUploadTask.java @@ -22,6 +22,7 @@ import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; /** * 旧案例上传 @@ -46,6 +47,7 @@ public class CaseUploadTask { @XxlJob("oldDataUploadJob") public void oldDataUploadJob() { + String currentLastId = null; try { // log.info("开始执行旧案例上传任务"); @@ -61,6 +63,7 @@ public class CaseUploadTask { // log.info("没有需要处理的案例数据"); return; } + currentLastId = casesToProcess.get(casesToProcess.size() - 1).getId(); // 批量检查这些案例是否已在CaseDocumentLog中存在记录,提升性能 List caseIds = new ArrayList<>(); @@ -76,18 +79,37 @@ public class CaseUploadTask { // 过滤出未在CaseDocumentLog中存在的案例 List casesList = new ArrayList<>(); for (Cases cases : casesToProcess) { - boolean exists = false; - for (CaseDocumentLog log : existingLogs) { - if (cases.getId().equals(log.getCaseId()) - && StringUtils.equals(log.getRequestUrl(), CaseAiConstants.CASE_DOC_UPLOAD_INTERFACE_NAME) - && Objects.equals(log.getRunStatus(), CaseDocumentLogRunStatusEnum.COMPLETED.getCode()) - && Objects.equals(log.getOptStatus(), CaseDocumentLogOptStatusEnum.SUCCESS.getCode()) - && Objects.equals(log.getRunStatus(), CaseDocumentLogCaseStatusEnum.SUCCESS.getCode())) { - exists = true; - break; - } - } - if (!exists) { +// boolean exists = false; +// for (CaseDocumentLog log : existingLogs) { +// if (cases.getId().equals(log.getCaseId()) +// && StringUtils.equals(log.getRequestUrl(), CaseAiConstants.CASE_DOC_UPLOAD_INTERFACE_NAME) +// && Objects.equals(log.getRunStatus(), CaseDocumentLogRunStatusEnum.COMPLETED.getCode()) +// && Objects.equals(log.getOptStatus(), CaseDocumentLogOptStatusEnum.SUCCESS.getCode()) +// && Objects.equals(log.getRunStatus(), CaseDocumentLogCaseStatusEnum.SUCCESS.getCode())) { +// exists = true; +// break; +// } +// } +// if (!exists) { +// casesList.add(cases); +// } + List thisCaseLogs = existingLogs.stream() + .filter(log -> cases.getId().equals(log.getCaseId())) + .collect(Collectors.toList()); + if (thisCaseLogs == null || thisCaseLogs.isEmpty()) { + casesList.add(cases); + } else if (thisCaseLogs.stream() + .noneMatch(caseLog -> { + // 1. 是否存在已上传完成的案例 + boolean hasCompleted = StringUtils.equals(caseLog.getRequestUrl(), CaseAiConstants.CASE_DOC_UPLOAD_INTERFACE_NAME) + && Objects.equals(caseLog.getRunStatus(), CaseDocumentLogRunStatusEnum.COMPLETED.getCode()) + && Objects.equals(caseLog.getOptStatus(), CaseDocumentLogOptStatusEnum.SUCCESS.getCode()) + && Objects.equals(caseLog.getRunStatus(), CaseDocumentLogCaseStatusEnum.SUCCESS.getCode()); + // 2. 是否存在上传中的案例 + boolean hasUploading = StringUtils.equals(caseLog.getRequestUrl(), CaseAiConstants.CASE_DOC_UPLOAD_INTERFACE_NAME) + && Objects.equals(caseLog.getRunStatus(), CaseDocumentLogRunStatusEnum.RUNNING.getCode()); + return hasCompleted || hasUploading; + })) { casesList.add(cases); } } @@ -98,17 +120,18 @@ public class CaseUploadTask { // 调用异步处理方法 caseAiDocumentAsyncHandler.process(CaseDocumentLogOptTypeEnum.CREATE, casesList.toArray(new Cases[0])); - // 将当前处理的最后一条数据ID存入Redis - String currentLastId = casesList.get(casesList.size() - 1).getId(); - stringRedisTemplate.opsForValue().set(CASE_UPLOAD_LAST_ID_KEY, currentLastId); - log.info("已处理案例,最后一条记录ID已更新为: {}", currentLastId); } else { log.info("没有新的案例需要处理"); } + // 将当前处理的最后一条数据ID存入Redis // log.info("旧案例上传任务执行完成"); } catch (Exception e) { log.error("执行旧案例上传任务时发生异常", e); + } finally { + if (currentLastId != null) { + fixOnLastCase(currentLastId); + } } } @@ -137,4 +160,9 @@ public class CaseUploadTask { return casesDao.findList(queryBuilder.builder()); } + + private void fixOnLastCase(String currentLastId) { + stringRedisTemplate.opsForValue().set(CASE_UPLOAD_LAST_ID_KEY, currentLastId); + log.info("已处理案例,最后一条记录ID已更新为: {}", currentLastId); + } } \ No newline at end of file