fix: 完善接口监听逻辑

This commit is contained in:
liu.zixi
2025-11-27 15:26:53 +08:00
parent 8a3899dfd1
commit 07bf665220
3 changed files with 129 additions and 54 deletions

View File

@@ -65,6 +65,24 @@ public class ThreadPoolConfig {
return executor;
}
/**
* 异步存会话数据线程池
* @return
*/
@Bean(name = "esChatExecutor")
public ThreadPoolTaskExecutor esChatExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(500);
executor.setQueueCapacity(10);
executor.setThreadNamePrefix("es-chat-");
executor.setKeepAliveSeconds(300);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
@Bean(name = "customDispatcher")
public Dispatcher customDispatcher(@Qualifier("eventStreamExecutor") ThreadPoolTaskExecutor eventStreamExecutor) {
return new Dispatcher(eventStreamExecutor.getThreadPoolExecutor());

View File

@@ -7,4 +7,8 @@ public class CaseAiConstants {
public static final String CASE_DOC_UPLOAD_INTERFACE_NAME = "文档上传";
public static final String CASE_DOC_DELETE_INTERFACE_NAME = "文档删除";
public static final String CHAT_SYS_ERR_MSG = "服务繁忙,请稍后再试。";
public static final String CHAT_NET_ERR_MSG = "网络异常,请稍后再试。";
}

View File

@@ -2,6 +2,7 @@ package com.xboe.module.boecase.service.impl;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xboe.constants.CaseAiConstants;
import com.xboe.core.CurrentUser;
import com.xboe.core.orm.FieldFilters;
import com.xboe.enums.CaseAiChatStatusEnum;
@@ -22,13 +23,12 @@ import com.xboe.module.boecase.entity.AiChatConversationData;
import com.xboe.module.boecase.vo.ConversationExcelVo;
import com.xboe.system.organization.vo.OrgSimpleVo;
import com.xboe.system.user.service.IUserService;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import okhttp3.sse.EventSources;
import org.apache.http.HttpEntity;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
@@ -40,25 +40,14 @@ import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.ss.util.CellRangeAddress;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.servlet.http.HttpServletResponse;
@@ -81,11 +70,13 @@ import java.util.concurrent.TimeUnit;
@Slf4j(topic = "caseAiChatLogger")
public class CaseAiChatServiceImpl implements ICaseAiChatService {
private static final String SYS_ERR_MSG = "服务繁忙,请稍后再试。";
@Autowired
private CaseAiProperties caseAiProperties;
@Autowired
@Qualifier("esChatExecutor")
private ThreadPoolTaskExecutor esChatExecutor;
@Autowired
@Qualifier("customDispatcher")
private Dispatcher dispatcher;
@@ -122,7 +113,7 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
conversationId = getOrCreateConversationId(caseAiChatDto, currentUser);
} catch (Exception e) {
log.error("获取会话ID失败", e);
errMessage(sseEmitter, null, SYS_ERR_MSG);
errMessage(sseEmitter, null, CaseAiConstants.CHAT_SYS_ERR_MSG);
sseEmitter.complete();
return sseEmitter;
}
@@ -170,18 +161,20 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
try {
accessToken = aiAccessTokenService.getAccessToken();
if (org.apache.commons.lang3.StringUtils.isBlank(accessToken)) {
errMessage(sseEmitter, conversationId, SYS_ERR_MSG);
errMessage(sseEmitter, conversationId, CaseAiConstants.CHAT_SYS_ERR_MSG);
// 先响应给前端
sseEmitter.complete();
conversationData.appendAnswer(SYS_ERR_MSG);
elasticSearchIndexService.createData(conversationData);
conversationData.appendAnswer(CaseAiConstants.CHAT_SYS_ERR_MSG);
saveConversationData(conversationData);
return sseEmitter;
}
} catch (Exception e) {
log.error("获取access_token失败", e);
errMessage(sseEmitter, conversationId, SYS_ERR_MSG);
conversationData.appendAnswer(SYS_ERR_MSG);
elasticSearchIndexService.createData(conversationData);
errMessage(sseEmitter, conversationId, CaseAiConstants.CHAT_SYS_ERR_MSG);
// 先响应给前端
sseEmitter.complete();
conversationData.appendAnswer(CaseAiConstants.CHAT_SYS_ERR_MSG);
saveConversationData(conversationData);
return sseEmitter;
}
String apiCode = caseAiProperties.getChatApiCode();
@@ -197,6 +190,45 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
EventSourceListener listener = new EventSourceListener() {
@Override
public void onOpen(@NotNull EventSource eventSource, @NotNull Response response) {
// 检查contentType
String contentType = response.header("Content-Type");
if (contentType == null || !contentType.contains("text/event-stream")) {
// 服务器返回的不是SSE流需要额外处理
log.error("调用接口 [{}] 返回的Content-Type不是text/event-stream实际ContentType: {}", request.url(), contentType);
String sseContent;
try {
ResponseBody responseBody = response.body();
if (responseBody == null) {
sseContent = CaseAiConstants.CHAT_SYS_ERR_MSG;
} else {
String responseBodyStr = responseBody.string();
log.error("调用 [{}] 返回值: {}", request.url(), responseBodyStr);
// 判断是否为json
if (contentType != null && contentType.contains("application/json")) {
JSONObject responseData = JSONObject.parseObject(responseBodyStr);
if (responseData.containsKey("message") && StringUtils.isNotBlank(responseData.getString("message"))) {
sseContent = responseData.getString("message");
} else {
sseContent = CaseAiConstants.CHAT_SYS_ERR_MSG;
}
} else {
sseContent = CaseAiConstants.CHAT_SYS_ERR_MSG;
}
}
} catch (IOException e) {
log.error("解析接口响应失败", e);
// 处理失败的情况
sseContent = CaseAiConstants.CHAT_SYS_ERR_MSG;
}
errMessage(sseEmitter, conversationId, sseContent);
sseEmitter.complete();
conversationData.appendAnswer(sseContent);
saveConversationData(conversationData);
// 关闭eventSource
eventSource.cancel();
return;
}
log.info("调用接口 [{}] 接口开始监听", request.url());
// 将EventSource存储到Map中以便后续可以中断
conversationEventSourceMap.put(conversationId, eventSource);
@@ -206,7 +238,7 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
public void onClosed(@NotNull EventSource eventSource) {
log.info("调用接口 [{}] 接口关闭", request.url());
// 对话完成保存到ES
elasticSearchIndexService.createData(conversationData);
saveConversationData(conversationData);
// 从Map中移除已完成的会话
conversationEventSourceMap.remove(conversationId);
sseEmitter.complete();
@@ -267,42 +299,45 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
@Override
public void onFailure(@NotNull EventSource eventSource, @Nullable Throwable e, @Nullable Response response) {
if (e == null) {
sseEmitter.completeWithError(new RuntimeException("调用接口异常, 异常未捕获"));
return;
// 只要有异常,必打日志
if (e != null) {
log.error("调用接口 [{}] 时发生错误,捕获到异常", request.url(), e);
} else {
log.error("调用接口 [{}] 时发生错误,未捕获到异常", request.url());
}
log.error("调用接口 [{}] 接口异常", request.url(), e);
if (isTimeoutException(e)) {
log.warn("接口调用超时conversationId: {}", conversationId);
errMessage(sseEmitter, conversationId, SYS_ERR_MSG);
sseEmitter.complete();
// 从Map中移除失败的会话
conversationEventSourceMap.remove(conversationId);
// 即使失败也要将已有的对话数据保存到ES
conversationData.appendAnswer(SYS_ERR_MSG);
elasticSearchIndexService.createData(conversationData);
return;
}
// 如果是 content-type 错误,尝试作为普通 HTTP 请求处理
if (e instanceof IllegalStateException && e.getMessage() != null && e.getMessage().contains("Invalid content-type")) {
log.warn("服务器返回的 Content-Type 不是 text/event-stream尝试作为普通 HTTP 请求处理");
handleAsRegularHttpRequest(request, sseEmitter, conversationData);
// 从Map中移除失败的会话
conversationEventSourceMap.remove(conversationId);
// 即使失败也要将已有的对话数据保存到ES
conversationData.appendAnswer(SYS_ERR_MSG);
elasticSearchIndexService.createData(conversationData);
return;
String errorMessage = CaseAiConstants.CHAT_SYS_ERR_MSG;
// 优先处理错误响应
if (response != null) {
try {
log.error("调用接口 [{}] 时发生错误,响应码: {}", request.url(), response.code());
if (response.body() != null) {
String body = response.body().string();
log.error("调用接口 [{}] 时的错误响应内容: {}", request.url(), body);
// 将错误内容发送至SseEmitter
if (StringUtils.contains(response.header("Content-Type"), "application/json")) {
// json解析
JSONObject jsonData = JSONObject.parseObject(body);
if (jsonData.containsKey("message") && StringUtils.isNotBlank(jsonData.getString("message"))) {
errorMessage = jsonData.getString("message");
}
}
}
} catch (IOException ex) {
log.error("解析异常请求时错误", ex);
}
} else if (e != null) {
if (isTimeoutException(e)) {
errorMessage = CaseAiConstants.CHAT_NET_ERR_MSG;
}
}
errMessage(sseEmitter, conversationId, errorMessage);
sseEmitter.complete();
// 从Map中移除失败的会话
conversationEventSourceMap.remove(conversationId);
// 即使失败也要将已有的对话数据保存到ES
elasticSearchIndexService.createData(conversationData);
sseEmitter.completeWithError(e);
conversationData.appendAnswer(errorMessage);
saveConversationData(conversationData);
}
};
@@ -446,9 +481,13 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
/**
* 从 ES 数据中解析消息对象
* 已迁移
* @see IElasticSearchIndexService
*
* @param sourceMap ES数据
* @return 消息对象
*/
@Deprecated
private CaseAiMessageVo parseMessageFromES(Map<String, Object> sourceMap) {
try {
CaseAiMessageVo messageVo = new CaseAiMessageVo();
@@ -570,6 +609,7 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
/**
* 处理文件引用(原方法,保留用于数据收集)
*/
@Deprecated
private void handleFileRefer(JSONObject responseData, AiChatConversationData conversationData) {
try {
JSONObject fileRefer = responseData.getJSONObject("fileRefer");
@@ -657,7 +697,9 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
/**
* 当 SSE 失败时,作为普通 HTTP 请求处理
* 不再使用
*/
@Deprecated
private void handleAsRegularHttpRequest(Request request, SseEmitter sseEmitter, AiChatConversationData conversationData) {
try {
OkHttpClient client = new OkHttpClient.Builder()
@@ -690,6 +732,9 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
}
}
/**
* 发送错误信息
*/
private void errMessage(SseEmitter sseEmitter, String conversationId, String message) {
JSONObject conversationData = new JSONObject();
conversationData.put("conversationId", conversationId);
@@ -857,4 +902,12 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
return workbook;
}
/**
* 异步存储会话数据
* @param conversationData
*/
private void saveConversationData(AiChatConversationData conversationData) {
esChatExecutor.execute(() -> elasticSearchIndexService.createData(conversationData));
}
}