From 07bf6652202b85de28687302d6eab04bf38e3451 Mon Sep 17 00:00:00 2001 From: "liu.zixi" Date: Thu, 27 Nov 2025 15:26:53 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E5=AE=8C=E5=96=84=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E7=9B=91=E5=90=AC=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/xboe/config/ThreadPoolConfig.java | 18 ++ .../com/xboe/constants/CaseAiConstants.java | 4 + .../service/impl/CaseAiChatServiceImpl.java | 161 ++++++++++++------ 3 files changed, 129 insertions(+), 54 deletions(-) diff --git a/servers/boe-server-all/src/main/java/com/xboe/config/ThreadPoolConfig.java b/servers/boe-server-all/src/main/java/com/xboe/config/ThreadPoolConfig.java index 4fa37ae8..c420239f 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/config/ThreadPoolConfig.java +++ b/servers/boe-server-all/src/main/java/com/xboe/config/ThreadPoolConfig.java @@ -65,6 +65,24 @@ public class ThreadPoolConfig { return executor; } + /** + * 异步存会话数据线程池 + * @return + */ + @Bean(name = "esChatExecutor") + public ThreadPoolTaskExecutor esChatExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(10); + executor.setMaxPoolSize(500); + executor.setQueueCapacity(10); + executor.setThreadNamePrefix("es-chat-"); + executor.setKeepAliveSeconds(300); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.initialize(); + return executor; + } + @Bean(name = "customDispatcher") public Dispatcher customDispatcher(@Qualifier("eventStreamExecutor") ThreadPoolTaskExecutor eventStreamExecutor) { return new Dispatcher(eventStreamExecutor.getThreadPoolExecutor()); diff --git a/servers/boe-server-all/src/main/java/com/xboe/constants/CaseAiConstants.java b/servers/boe-server-all/src/main/java/com/xboe/constants/CaseAiConstants.java index 770c70d8..dcd6279d 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/constants/CaseAiConstants.java +++ b/servers/boe-server-all/src/main/java/com/xboe/constants/CaseAiConstants.java @@ -7,4 +7,8 @@ public class CaseAiConstants { public static final String CASE_DOC_UPLOAD_INTERFACE_NAME = "文档上传"; public static final String CASE_DOC_DELETE_INTERFACE_NAME = "文档删除"; + + public static final String CHAT_SYS_ERR_MSG = "服务繁忙,请稍后再试。"; + + public static final String CHAT_NET_ERR_MSG = "网络异常,请稍后再试。"; } diff --git a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/impl/CaseAiChatServiceImpl.java b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/impl/CaseAiChatServiceImpl.java index 5d4aff3e..9c084941 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/impl/CaseAiChatServiceImpl.java +++ b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/impl/CaseAiChatServiceImpl.java @@ -2,6 +2,7 @@ package com.xboe.module.boecase.service.impl; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; +import com.xboe.constants.CaseAiConstants; import com.xboe.core.CurrentUser; import com.xboe.core.orm.FieldFilters; import com.xboe.enums.CaseAiChatStatusEnum; @@ -22,13 +23,12 @@ import com.xboe.module.boecase.entity.AiChatConversationData; import com.xboe.module.boecase.vo.ConversationExcelVo; import com.xboe.system.organization.vo.OrgSimpleVo; import com.xboe.system.user.service.IUserService; -import lombok.Data; import lombok.extern.slf4j.Slf4j; import okhttp3.*; import okhttp3.sse.EventSource; import okhttp3.sse.EventSourceListener; import okhttp3.sse.EventSources; -import org.apache.http.HttpEntity; +import org.apache.commons.lang3.StringUtils; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; @@ -40,25 +40,14 @@ import org.apache.poi.ss.usermodel.Sheet; import org.apache.poi.ss.usermodel.Workbook; import org.apache.poi.ss.util.CellRangeAddress; import org.apache.poi.xssf.usermodel.XSSFWorkbook; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHits; -import org.elasticsearch.search.builder.SearchSourceBuilder; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import org.springframework.util.StringUtils; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import javax.servlet.http.HttpServletResponse; @@ -81,11 +70,13 @@ import java.util.concurrent.TimeUnit; @Slf4j(topic = "caseAiChatLogger") public class CaseAiChatServiceImpl implements ICaseAiChatService { - private static final String SYS_ERR_MSG = "服务繁忙,请稍后再试。"; - @Autowired private CaseAiProperties caseAiProperties; + @Autowired + @Qualifier("esChatExecutor") + private ThreadPoolTaskExecutor esChatExecutor; + @Autowired @Qualifier("customDispatcher") private Dispatcher dispatcher; @@ -122,7 +113,7 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService { conversationId = getOrCreateConversationId(caseAiChatDto, currentUser); } catch (Exception e) { log.error("获取会话ID失败", e); - errMessage(sseEmitter, null, SYS_ERR_MSG); + errMessage(sseEmitter, null, CaseAiConstants.CHAT_SYS_ERR_MSG); sseEmitter.complete(); return sseEmitter; } @@ -170,18 +161,20 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService { try { accessToken = aiAccessTokenService.getAccessToken(); if (org.apache.commons.lang3.StringUtils.isBlank(accessToken)) { - errMessage(sseEmitter, conversationId, SYS_ERR_MSG); + errMessage(sseEmitter, conversationId, CaseAiConstants.CHAT_SYS_ERR_MSG); + // 先响应给前端 sseEmitter.complete(); - conversationData.appendAnswer(SYS_ERR_MSG); - elasticSearchIndexService.createData(conversationData); + conversationData.appendAnswer(CaseAiConstants.CHAT_SYS_ERR_MSG); + saveConversationData(conversationData); return sseEmitter; } } catch (Exception e) { log.error("获取access_token失败", e); - errMessage(sseEmitter, conversationId, SYS_ERR_MSG); - conversationData.appendAnswer(SYS_ERR_MSG); - elasticSearchIndexService.createData(conversationData); + errMessage(sseEmitter, conversationId, CaseAiConstants.CHAT_SYS_ERR_MSG); + // 先响应给前端 sseEmitter.complete(); + conversationData.appendAnswer(CaseAiConstants.CHAT_SYS_ERR_MSG); + saveConversationData(conversationData); return sseEmitter; } String apiCode = caseAiProperties.getChatApiCode(); @@ -197,6 +190,45 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService { EventSourceListener listener = new EventSourceListener() { @Override public void onOpen(@NotNull EventSource eventSource, @NotNull Response response) { + // 检查contentType + String contentType = response.header("Content-Type"); + if (contentType == null || !contentType.contains("text/event-stream")) { + // 服务器返回的不是SSE流,需要额外处理 + log.error("调用接口 [{}] 返回的Content-Type不是text/event-stream,实际ContentType: {}", request.url(), contentType); + String sseContent; + try { + ResponseBody responseBody = response.body(); + if (responseBody == null) { + sseContent = CaseAiConstants.CHAT_SYS_ERR_MSG; + } else { + String responseBodyStr = responseBody.string(); + log.error("调用 [{}] 返回值: {}", request.url(), responseBodyStr); + // 判断是否为json + if (contentType != null && contentType.contains("application/json")) { + JSONObject responseData = JSONObject.parseObject(responseBodyStr); + if (responseData.containsKey("message") && StringUtils.isNotBlank(responseData.getString("message"))) { + sseContent = responseData.getString("message"); + } else { + sseContent = CaseAiConstants.CHAT_SYS_ERR_MSG; + } + } else { + sseContent = CaseAiConstants.CHAT_SYS_ERR_MSG; + } + } + } catch (IOException e) { + log.error("解析接口响应失败", e); + // 处理失败的情况 + sseContent = CaseAiConstants.CHAT_SYS_ERR_MSG; + } + + errMessage(sseEmitter, conversationId, sseContent); + sseEmitter.complete(); + conversationData.appendAnswer(sseContent); + saveConversationData(conversationData); + // 关闭eventSource + eventSource.cancel(); + return; + } log.info("调用接口 [{}] 接口开始监听", request.url()); // 将EventSource存储到Map中,以便后续可以中断 conversationEventSourceMap.put(conversationId, eventSource); @@ -206,7 +238,7 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService { public void onClosed(@NotNull EventSource eventSource) { log.info("调用接口 [{}] 接口关闭", request.url()); // 对话完成,保存到ES - elasticSearchIndexService.createData(conversationData); + saveConversationData(conversationData); // 从Map中移除已完成的会话 conversationEventSourceMap.remove(conversationId); sseEmitter.complete(); @@ -267,42 +299,45 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService { @Override public void onFailure(@NotNull EventSource eventSource, @Nullable Throwable e, @Nullable Response response) { - if (e == null) { - sseEmitter.completeWithError(new RuntimeException("调用接口异常, 异常未捕获")); - return; + // 只要有异常,必打日志 + if (e != null) { + log.error("调用接口 [{}] 时发生错误,捕获到异常", request.url(), e); + } else { + log.error("调用接口 [{}] 时发生错误,未捕获到异常", request.url()); } - log.error("调用接口 [{}] 接口异常", request.url(), e); - if (isTimeoutException(e)) { - log.warn("接口调用超时,conversationId: {}", conversationId); - errMessage(sseEmitter, conversationId, SYS_ERR_MSG); - sseEmitter.complete(); - // 从Map中移除失败的会话 - conversationEventSourceMap.remove(conversationId); - - // 即使失败,也要将已有的对话数据保存到ES - conversationData.appendAnswer(SYS_ERR_MSG); - elasticSearchIndexService.createData(conversationData); - return; - } - - // 如果是 content-type 错误,尝试作为普通 HTTP 请求处理 - if (e instanceof IllegalStateException && e.getMessage() != null && e.getMessage().contains("Invalid content-type")) { - log.warn("服务器返回的 Content-Type 不是 text/event-stream,尝试作为普通 HTTP 请求处理"); - handleAsRegularHttpRequest(request, sseEmitter, conversationData); - // 从Map中移除失败的会话 - conversationEventSourceMap.remove(conversationId); - // 即使失败,也要将已有的对话数据保存到ES - conversationData.appendAnswer(SYS_ERR_MSG); - elasticSearchIndexService.createData(conversationData); - return; + String errorMessage = CaseAiConstants.CHAT_SYS_ERR_MSG; + // 优先处理错误响应 + if (response != null) { + try { + log.error("调用接口 [{}] 时发生错误,响应码: {}", request.url(), response.code()); + if (response.body() != null) { + String body = response.body().string(); + log.error("调用接口 [{}] 时的错误响应内容: {}", request.url(), body); + // 将错误内容发送至SseEmitter + if (StringUtils.contains(response.header("Content-Type"), "application/json")) { + // json解析 + JSONObject jsonData = JSONObject.parseObject(body); + if (jsonData.containsKey("message") && StringUtils.isNotBlank(jsonData.getString("message"))) { + errorMessage = jsonData.getString("message"); + } + } + } + } catch (IOException ex) { + log.error("解析异常请求时错误", ex); + } + } else if (e != null) { + if (isTimeoutException(e)) { + errorMessage = CaseAiConstants.CHAT_NET_ERR_MSG; + } } + errMessage(sseEmitter, conversationId, errorMessage); + sseEmitter.complete(); // 从Map中移除失败的会话 conversationEventSourceMap.remove(conversationId); - // 即使失败,也要将已有的对话数据保存到ES - elasticSearchIndexService.createData(conversationData); - sseEmitter.completeWithError(e); + conversationData.appendAnswer(errorMessage); + saveConversationData(conversationData); } }; @@ -446,9 +481,13 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService { /** * 从 ES 数据中解析消息对象 + * 已迁移 + * @see IElasticSearchIndexService + * * @param sourceMap ES数据 * @return 消息对象 */ + @Deprecated private CaseAiMessageVo parseMessageFromES(Map sourceMap) { try { CaseAiMessageVo messageVo = new CaseAiMessageVo(); @@ -570,6 +609,7 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService { /** * 处理文件引用(原方法,保留用于数据收集) */ + @Deprecated private void handleFileRefer(JSONObject responseData, AiChatConversationData conversationData) { try { JSONObject fileRefer = responseData.getJSONObject("fileRefer"); @@ -657,7 +697,9 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService { /** * 当 SSE 失败时,作为普通 HTTP 请求处理 + * 不再使用 */ + @Deprecated private void handleAsRegularHttpRequest(Request request, SseEmitter sseEmitter, AiChatConversationData conversationData) { try { OkHttpClient client = new OkHttpClient.Builder() @@ -690,6 +732,9 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService { } } + /** + * 发送错误信息 + */ private void errMessage(SseEmitter sseEmitter, String conversationId, String message) { JSONObject conversationData = new JSONObject(); conversationData.put("conversationId", conversationId); @@ -857,4 +902,12 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService { return workbook; } + + /** + * 异步存储会话数据 + * @param conversationData + */ + private void saveConversationData(AiChatConversationData conversationData) { + esChatExecutor.execute(() -> elasticSearchIndexService.createData(conversationData)); + } }