fix: 超时异常处理、批处理逻辑修正

This commit is contained in:
liu.zixi
2025-11-24 15:36:28 +08:00
committed by joshen
parent e83f3adb94
commit e8b31f4216
4 changed files with 166 additions and 24 deletions

View File

@@ -64,6 +64,26 @@ public class CaseAiChatApi extends ApiBaseController {
return caseAiChatService.chat(caseAiChatDto, getCurrent());
}
/**
* 停止当前聊天输出
* @param conversationId 会话ID
* @return 是否成功停止
*/
@PostMapping("/stop")
public JsonResponse<Boolean> stopChat(@RequestParam String conversationId) {
try {
boolean result = caseAiChatService.stopChatOutput(conversationId);
if (result) {
return success(true, "成功停止输出");
} else {
return success(false, "未找到对应的会话或会话已结束");
}
} catch (Exception e) {
log.error("停止聊天输出异常", e);
return error("停止输出失败", e.getMessage());
}
}
/**
* 根据conversationId查看会话内消息记录
* @param conversationId 会话ID

View File

@@ -43,4 +43,10 @@ public interface ICaseAiChatService {
* @param endTime 结束时间
*/
void downloadConversationExcel(LocalDateTime startTime, LocalDateTime endTime);
}
/**
* 停止当前聊天输出
* @param conversationId 会话ID
* @return 是否成功停止
*/
boolean stopChatOutput(String conversationId);

View File

@@ -64,6 +64,7 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -71,6 +72,7 @@ import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@EnableConfigurationProperties({CaseAiProperties.class})
@@ -78,6 +80,8 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public class CaseAiChatServiceImpl implements ICaseAiChatService {
private static final String SYS_ERR_MSG = "服务繁忙,请稍后再试。";
@Autowired
private CaseAiProperties caseAiProperties;
@@ -102,6 +106,9 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
@Autowired
private CasesDao casesDao;
// 用于存储会话ID与EventSource的映射关系以便能够中断特定会话
private final Map<String, EventSource> conversationEventSourceMap = new ConcurrentHashMap<>();
@Override
@Transactional
@@ -114,7 +121,7 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
conversationId = getOrCreateConversationId(caseAiChatDto, currentUser);
} catch (Exception e) {
log.error("获取会话ID失败", e);
errMessage(sseEmitter, "服务繁忙,请稍后再试。");
errMessage(sseEmitter, SYS_ERR_MSG);
sseEmitter.complete();
return sseEmitter;
}
@@ -156,7 +163,7 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
accessToken = aiAccessTokenService.getAccessToken();
} catch (Exception e) {
log.error("获取access_token失败", e);
errMessage(sseEmitter, "服务繁忙,请稍后再试。");
errMessage(sseEmitter, SYS_ERR_MSG);
sseEmitter.complete();
return sseEmitter;
}
@@ -181,6 +188,8 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
@Override
public void onOpen(@NotNull EventSource eventSource, @NotNull Response response) {
log.info("调用接口 [{}] 接口开始监听", request.url());
// 将EventSource存储到Map中以便后续可以中断
conversationEventSourceMap.put(conversationId, eventSource);
}
@Override
@@ -188,6 +197,8 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
log.info("调用接口 [{}] 接口关闭", request.url());
// 对话完成保存到ES
elasticSearchIndexService.createData(conversationData);
// 从Map中移除已完成的会话
conversationEventSourceMap.remove(conversationId);
sseEmitter.complete();
}
@@ -246,20 +257,40 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
@Override
public void onFailure(@NotNull EventSource eventSource, @Nullable Throwable e, @Nullable Response response) {
if (e == null) {
sseEmitter.completeWithError(new RuntimeException("调用接口异常, 异常未捕获"));
return;
}
log.error("调用接口 [{}] 接口异常", request.url(), e);
if (isTimeoutException(e)) {
log.warn("接口调用超时conversationId: {}", conversationId);
errMessage(sseEmitter, SYS_ERR_MSG);
sseEmitter.complete();
// 从Map中移除失败的会话
conversationEventSourceMap.remove(conversationId);
// 即使失败也要将已有的对话数据保存到ES
elasticSearchIndexService.createData(conversationData);
return;
}
// 如果是 content-type 错误,尝试作为普通 HTTP 请求处理
if (e instanceof IllegalStateException && e.getMessage() != null && e.getMessage().contains("Invalid content-type")) {
log.warn("服务器返回的 Content-Type 不是 text/event-stream尝试作为普通 HTTP 请求处理");
handleAsRegularHttpRequest(request, sseEmitter, conversationData);
// 从Map中移除失败的会话
conversationEventSourceMap.remove(conversationId);
// 即使失败也要将已有的对话数据保存到ES
elasticSearchIndexService.createData(conversationData);
return;
}
sseEmitter.completeWithError(e);
// 从Map中移除失败的会话
conversationEventSourceMap.remove(conversationId);
if (e != null) {
sseEmitter.completeWithError(e);
} else {
sseEmitter.completeWithError(new RuntimeException("调用接口异常, 异常未捕获"));
}
// 即使失败也要将已有的对话数据保存到ES
elasticSearchIndexService.createData(conversationData);
}
};
@@ -726,4 +757,61 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
sseEmitter.completeWithError(e);
}
}
@Override
public boolean stopChatOutput(String conversationId) {
EventSource eventSource = conversationEventSourceMap.get(conversationId);
if (eventSource != null) {
try {
// 取消事件源,中断连接
eventSource.cancel();
// 注意cancel()会触发onFailure回调在onFailure中会清理资源
log.info("成功发送停止会话 {} 的指令", conversationId);
return true;
} catch (Exception e) {
log.error("停止会话 {} 输出时发生异常", conversationId, e);
// 即使出现异常也从Map中移除避免内存泄漏
conversationEventSourceMap.remove(conversationId);
return false;
}
} else {
log.warn("未找到会话 {} 对应的事件源,可能已经完成或不存在", conversationId);
return false;
}
}
/**
* 判断Throwable是否为超时类异常
* @param e
* @return
*/
private boolean isTimeoutException(@Nullable Throwable e) {
if (e == null) {
return false;
}
// ConnectException SocketTimeoutException
if (e instanceof java.net.ConnectException || e instanceof java.net.SocketTimeoutException) {
return true;
}
// 可能是包装后的异常,递归检查 cause
Throwable cause = e.getCause();
while (cause != null) {
if (cause instanceof java.net.ConnectException || cause instanceof java.net.SocketTimeoutException) {
return true;
}
cause = cause.getCause();
}
// 有些情况下 OkHttp 会抛出 IOException 并包含 "timeout" 字样
if (e instanceof java.io.IOException) {
String msg = e.getMessage();
if (msg != null && msg.toLowerCase().contains("timeout")) {
return true;
}
}
return false;
}
}

View File

@@ -22,6 +22,7 @@ import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* 旧案例上传
@@ -46,6 +47,7 @@ public class CaseUploadTask {
@XxlJob("oldDataUploadJob")
public void oldDataUploadJob() {
String currentLastId = null;
try {
// log.info("开始执行旧案例上传任务");
@@ -61,6 +63,7 @@ public class CaseUploadTask {
// log.info("没有需要处理的案例数据");
return;
}
currentLastId = casesToProcess.get(casesToProcess.size() - 1).getId();
// 批量检查这些案例是否已在CaseDocumentLog中存在记录提升性能
List<String> caseIds = new ArrayList<>();
@@ -76,18 +79,37 @@ public class CaseUploadTask {
// 过滤出未在CaseDocumentLog中存在的案例
List<Cases> casesList = new ArrayList<>();
for (Cases cases : casesToProcess) {
boolean exists = false;
for (CaseDocumentLog log : existingLogs) {
if (cases.getId().equals(log.getCaseId())
&& StringUtils.equals(log.getRequestUrl(), CaseAiConstants.CASE_DOC_UPLOAD_INTERFACE_NAME)
&& Objects.equals(log.getRunStatus(), CaseDocumentLogRunStatusEnum.COMPLETED.getCode())
&& Objects.equals(log.getOptStatus(), CaseDocumentLogOptStatusEnum.SUCCESS.getCode())
&& Objects.equals(log.getRunStatus(), CaseDocumentLogCaseStatusEnum.SUCCESS.getCode())) {
exists = true;
break;
}
}
if (!exists) {
// boolean exists = false;
// for (CaseDocumentLog log : existingLogs) {
// if (cases.getId().equals(log.getCaseId())
// && StringUtils.equals(log.getRequestUrl(), CaseAiConstants.CASE_DOC_UPLOAD_INTERFACE_NAME)
// && Objects.equals(log.getRunStatus(), CaseDocumentLogRunStatusEnum.COMPLETED.getCode())
// && Objects.equals(log.getOptStatus(), CaseDocumentLogOptStatusEnum.SUCCESS.getCode())
// && Objects.equals(log.getRunStatus(), CaseDocumentLogCaseStatusEnum.SUCCESS.getCode())) {
// exists = true;
// break;
// }
// }
// if (!exists) {
// casesList.add(cases);
// }
List<CaseDocumentLog> thisCaseLogs = existingLogs.stream()
.filter(log -> cases.getId().equals(log.getCaseId()))
.collect(Collectors.toList());
if (thisCaseLogs == null || thisCaseLogs.isEmpty()) {
casesList.add(cases);
} else if (thisCaseLogs.stream()
.noneMatch(caseLog -> {
// 1. 是否存在已上传完成的案例
boolean hasCompleted = StringUtils.equals(caseLog.getRequestUrl(), CaseAiConstants.CASE_DOC_UPLOAD_INTERFACE_NAME)
&& Objects.equals(caseLog.getRunStatus(), CaseDocumentLogRunStatusEnum.COMPLETED.getCode())
&& Objects.equals(caseLog.getOptStatus(), CaseDocumentLogOptStatusEnum.SUCCESS.getCode())
&& Objects.equals(caseLog.getRunStatus(), CaseDocumentLogCaseStatusEnum.SUCCESS.getCode());
// 2. 是否存在上传中的案例
boolean hasUploading = StringUtils.equals(caseLog.getRequestUrl(), CaseAiConstants.CASE_DOC_UPLOAD_INTERFACE_NAME)
&& Objects.equals(caseLog.getRunStatus(), CaseDocumentLogRunStatusEnum.RUNNING.getCode());
return hasCompleted || hasUploading;
})) {
casesList.add(cases);
}
}
@@ -98,17 +120,18 @@ public class CaseUploadTask {
// 调用异步处理方法
caseAiDocumentAsyncHandler.process(CaseDocumentLogOptTypeEnum.CREATE, casesList.toArray(new Cases[0]));
// 将当前处理的最后一条数据ID存入Redis
String currentLastId = casesList.get(casesList.size() - 1).getId();
stringRedisTemplate.opsForValue().set(CASE_UPLOAD_LAST_ID_KEY, currentLastId);
log.info("已处理案例最后一条记录ID已更新为: {}", currentLastId);
} else {
log.info("没有新的案例需要处理");
}
// 将当前处理的最后一条数据ID存入Redis
// log.info("旧案例上传任务执行完成");
} catch (Exception e) {
log.error("执行旧案例上传任务时发生异常", e);
} finally {
if (currentLastId != null) {
fixOnLastCase(currentLastId);
}
}
}
@@ -137,4 +160,9 @@ public class CaseUploadTask {
return casesDao.findList(queryBuilder.builder());
}
private void fixOnLastCase(String currentLastId) {
stringRedisTemplate.opsForValue().set(CASE_UPLOAD_LAST_ID_KEY, currentLastId);
log.info("已处理案例最后一条记录ID已更新为: {}", currentLastId);
}
}