mirror of
https://codeup.aliyun.com/67762337eccfc218f6110e0e/per-boe/java-servers.git
synced 2025-12-11 20:06:51 +08:00
三方接口异步处理
This commit is contained in:
@@ -0,0 +1,47 @@
|
|||||||
|
package com.xboe.config;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
|
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 线程池配置类
|
||||||
|
*/
|
||||||
|
@Configuration
|
||||||
|
@Slf4j
|
||||||
|
public class ThreadPoolConfig {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 执行AI文档接口的的线程池
|
||||||
|
* 策略:单线程等待队列
|
||||||
|
*/
|
||||||
|
@Bean(name = "aiDocExecutor")
|
||||||
|
public ThreadPoolTaskExecutor aiDocExecutor() {
|
||||||
|
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||||
|
// 设置核心线程数
|
||||||
|
int corePoolSize = Runtime.getRuntime().availableProcessors();
|
||||||
|
executor.setCorePoolSize(Math.max(4, corePoolSize));
|
||||||
|
// 设置最大线程数
|
||||||
|
executor.setMaxPoolSize(Math.max(16, corePoolSize * 2));
|
||||||
|
// 设置队列容量(确保任务排队)
|
||||||
|
executor.setQueueCapacity(100);
|
||||||
|
// keepalive
|
||||||
|
executor.setKeepAliveSeconds(30);
|
||||||
|
executor.setWaitForTasksToCompleteOnShutdown(true);
|
||||||
|
executor.setAwaitTerminationSeconds(30);
|
||||||
|
// 设置线程名称前缀
|
||||||
|
executor.setThreadNamePrefix("ai_doc_task-");
|
||||||
|
// 设置拒绝策略(当队列满时,由调用线程处理该任务)
|
||||||
|
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
|
||||||
|
// 初始化线程池
|
||||||
|
executor.initialize();
|
||||||
|
log.info("AI文档线程池初始化完成 - 核心线程: {}, 最大线程: {}, 队列容量: {}",
|
||||||
|
executor.getCorePoolSize(),
|
||||||
|
executor.getMaxPoolSize(),
|
||||||
|
executor.getQueueCapacity());
|
||||||
|
return executor;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -4,6 +4,7 @@ import com.xboe.core.api.ApiBaseController;
|
|||||||
import com.xboe.core.JsonResponse;
|
import com.xboe.core.JsonResponse;
|
||||||
import com.xboe.module.boecase.dto.CaseAiChatDto;
|
import com.xboe.module.boecase.dto.CaseAiChatDto;
|
||||||
import com.xboe.module.boecase.service.ICaseAiChatService;
|
import com.xboe.module.boecase.service.ICaseAiChatService;
|
||||||
|
import com.xboe.module.boecase.service.ICaseAiPermissionService;
|
||||||
import com.xboe.module.boecase.vo.CaseAiMessageVo;
|
import com.xboe.module.boecase.vo.CaseAiMessageVo;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
@@ -32,6 +33,9 @@ public class CaseAiChatApi extends ApiBaseController {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private ICaseAiChatService caseAiChatService;
|
private ICaseAiChatService caseAiChatService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ICaseAiPermissionService caseAiPermissionService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 聊天
|
* 聊天
|
||||||
* @param caseAiChatDto
|
* @param caseAiChatDto
|
||||||
@@ -63,4 +67,20 @@ public class CaseAiChatApi extends ApiBaseController {
|
|||||||
return error("查询失败", e.getMessage());
|
return error("查询失败", e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 判断当前登录用户是否显示"案例专家"功能入口
|
||||||
|
* @return 是否显示功能入口
|
||||||
|
*/
|
||||||
|
@GetMapping("/show-entrance")
|
||||||
|
public JsonResponse<Boolean> showCaseAiEntrance() {
|
||||||
|
try {
|
||||||
|
String currentUserCode = getCurrent().getCode();
|
||||||
|
boolean shouldShow = caseAiPermissionService.shouldShowCaseAiEntrance(currentUserCode);
|
||||||
|
return success(shouldShow);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("判断案例专家功能入口显示权限异常", e);
|
||||||
|
return error("判断失败", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -85,13 +85,15 @@ public class CaseDocumentLogApi extends ApiBaseController {
|
|||||||
@PostMapping("/retry")
|
@PostMapping("/retry")
|
||||||
@AutoLog(module = "AI调用日志", action = "重试调用", info = "AI调用日志重试操作")
|
@AutoLog(module = "AI调用日志", action = "重试调用", info = "AI调用日志重试操作")
|
||||||
public JsonResponse<Boolean> retry(@RequestBody RetryRequest request) {
|
public JsonResponse<Boolean> retry(@RequestBody RetryRequest request) {
|
||||||
try {
|
// try {
|
||||||
boolean result = caseDocumentLogService.retryByLogId(request.getLogId());
|
// boolean result = caseDocumentLogService.retryByLogId(request.getLogId());
|
||||||
return success(result);
|
// return success(result);
|
||||||
} catch (Exception e) {
|
// } catch (Exception e) {
|
||||||
log.error("AI调用重试失败", e);
|
// log.error("AI调用重试失败", e);
|
||||||
return error("重试失败", e.getMessage());
|
// return error("重试失败", e.getMessage());
|
||||||
}
|
// }
|
||||||
|
// 先走挡板
|
||||||
|
return success(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -0,0 +1,66 @@
|
|||||||
|
package com.xboe.module.boecase.async;
|
||||||
|
|
||||||
|
import com.xboe.enums.CaseDocumentLogOptTypeEnum;
|
||||||
|
import com.xboe.module.boecase.entity.Cases;
|
||||||
|
import com.xboe.module.boecase.service.ICaseKnowledgeService;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
public class CaseAiDocumentAsyncHandler {
|
||||||
|
|
||||||
|
private final AtomicInteger currentTaskCount = new AtomicInteger(0);
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
@Qualifier("aiDocExecutor")
|
||||||
|
private ThreadPoolTaskExecutor aiDocExecutor;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ICaseKnowledgeService caseKnowledgeService;
|
||||||
|
|
||||||
|
public void process(CaseDocumentLogOptTypeEnum optTypeEnum, Cases... caseList) {
|
||||||
|
for (Cases cases : caseList) {
|
||||||
|
// 控制并发数量
|
||||||
|
while (currentTaskCount.get() >= 15) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
currentTaskCount.incrementAndGet();
|
||||||
|
|
||||||
|
aiDocExecutor.submit(() -> {
|
||||||
|
processCases(cases, optTypeEnum);
|
||||||
|
currentTaskCount.decrementAndGet();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void processCases(Cases cases, CaseDocumentLogOptTypeEnum optTypeEnum) {
|
||||||
|
try {
|
||||||
|
switch (optTypeEnum) {
|
||||||
|
case UPDATE:
|
||||||
|
caseKnowledgeService.updateCaseDocument(cases.getId());
|
||||||
|
break;
|
||||||
|
case DELETE:
|
||||||
|
caseKnowledgeService.deleteCaseDocument(cases.getId());
|
||||||
|
break;
|
||||||
|
case CREATE:
|
||||||
|
default:
|
||||||
|
caseKnowledgeService.uploadCaseDocument(cases.getId());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("处理案例失败,caseId: {}, optType: {}", cases.getId(), optTypeEnum.getCode(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -3,6 +3,8 @@ package com.xboe.module.boecase.properties;
|
|||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 案例专家AI相关配置项
|
* 案例专家AI相关配置项
|
||||||
*/
|
*/
|
||||||
@@ -39,4 +41,14 @@ public class CaseAiProperties {
|
|||||||
* 文档上传回调接口地址
|
* 文档上传回调接口地址
|
||||||
*/
|
*/
|
||||||
private String fileUploadCallbackUrl;
|
private String fileUploadCallbackUrl;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 是否启用白名单
|
||||||
|
*/
|
||||||
|
private boolean useWhiteList;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 白名单用户列表
|
||||||
|
*/
|
||||||
|
private List<String> whiteUserCodeList;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,14 @@
|
|||||||
|
package com.xboe.module.boecase.service;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 案例AI权限服务接口
|
||||||
|
*/
|
||||||
|
public interface ICaseAiPermissionService {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 判断指定用户是否显示"案例专家"功能入口
|
||||||
|
* @param userCode 用户编码
|
||||||
|
* @return 是否显示功能入口
|
||||||
|
*/
|
||||||
|
boolean shouldShowCaseAiEntrance(String userCode);
|
||||||
|
}
|
||||||
@@ -0,0 +1,43 @@
|
|||||||
|
package com.xboe.module.boecase.service.impl;
|
||||||
|
|
||||||
|
import com.xboe.module.boecase.properties.CaseAiProperties;
|
||||||
|
import com.xboe.module.boecase.service.ICaseAiPermissionService;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 案例AI权限服务实现类
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
@Transactional
|
||||||
|
public class CaseAiPermissionServiceImpl implements ICaseAiPermissionService {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private CaseAiProperties caseAiProperties;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 判断指定用户是否显示"案例专家"功能入口
|
||||||
|
* @param userCode 用户编码
|
||||||
|
* @return 是否显示功能入口
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean shouldShowCaseAiEntrance(String userCode) {
|
||||||
|
log.debug("判断用户[{}]是否显示案例专家功能入口", userCode);
|
||||||
|
|
||||||
|
// 如果不启用白名单,直接返回true
|
||||||
|
if (!caseAiProperties.isUseWhiteList()) {
|
||||||
|
log.debug("未启用白名单,所有用户都显示功能入口");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 启用白名单时,判断当前用户是否在白名单中
|
||||||
|
boolean isInWhiteList = caseAiProperties.getWhiteUserCodeList() != null
|
||||||
|
&& caseAiProperties.getWhiteUserCodeList().contains(userCode);
|
||||||
|
|
||||||
|
log.debug("用户[{}]{}在白名单中", userCode, isInWhiteList ? "" : "不");
|
||||||
|
return isInWhiteList;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -16,6 +16,7 @@ import com.xboe.module.boecase.vo.CaseDocumentLogVo;
|
|||||||
import com.xboe.enums.CaseDocumentLogOptTypeEnum;
|
import com.xboe.enums.CaseDocumentLogOptTypeEnum;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.BeanUtils;
|
import org.springframework.beans.BeanUtils;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
||||||
@@ -39,6 +40,9 @@ public class CaseDocumentLogServiceImpl implements ICaseDocumentLogService {
|
|||||||
@Resource
|
@Resource
|
||||||
private ICaseKnowledgeService caseKnowledgeService;
|
private ICaseKnowledgeService caseKnowledgeService;
|
||||||
|
|
||||||
|
@Resource(name = "aiDocExecutor")
|
||||||
|
private ThreadPoolTaskExecutor aiDocExecutor;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PageList<CaseDocumentLogVo> pageQuery(int pageIndex, int pageSize, CaseDocumentLogQueryDto queryDto) {
|
public PageList<CaseDocumentLogVo> pageQuery(int pageIndex, int pageSize, CaseDocumentLogQueryDto queryDto) {
|
||||||
// 构建查询条件
|
// 构建查询条件
|
||||||
@@ -170,56 +174,64 @@ public class CaseDocumentLogServiceImpl implements ICaseDocumentLogService {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("开始重试AI调用,原始日志ID: {}, 案例标题: {}, 操作类型: {}",
|
log.info("开始异步重试AI调用,原始日志ID: {}, 案例标题: {}, 操作类型: {}",
|
||||||
logId, originalLog.getCaseTitle(), originalLog.getOptType());
|
logId, originalLog.getCaseTitle(), originalLog.getOptType());
|
||||||
|
|
||||||
// 2. 执行AI调用重试逻辑
|
// 2. 使用线程池异步执行AI调用重试逻辑
|
||||||
boolean retrySuccess = false;
|
|
||||||
|
|
||||||
try {
|
|
||||||
// 根据操作类型调用对应的接口方法
|
|
||||||
String optType = originalLog.getOptType();
|
String optType = originalLog.getOptType();
|
||||||
String caseId = originalLog.getCaseId();
|
String caseId = originalLog.getCaseId();
|
||||||
|
|
||||||
|
aiDocExecutor.execute(() -> executeRetryLogic(optType, caseId));
|
||||||
|
|
||||||
|
// 立即返回true表示重试请求已接受,具体结果通过日志异步处理
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 执行AI调用重试逻辑
|
||||||
|
* @param optType 操作类型
|
||||||
|
* @param caseId 案例ID
|
||||||
|
*/
|
||||||
|
private void executeRetryLogic(String optType, String caseId) {
|
||||||
|
boolean retrySuccess = false;
|
||||||
|
|
||||||
|
try {
|
||||||
if (StringUtil.isBlank(caseId)) {
|
if (StringUtil.isBlank(caseId)) {
|
||||||
throw new IllegalArgumentException("案例ID不能为空");
|
throw new IllegalArgumentException("案例ID不能为空");
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("正在执行AI调用重试,操作类型: {}, caseId: {}", optType, caseId);
|
log.info("[异步任务] 正在执行AI调用重试,操作类型: {}, caseId: {}", optType, caseId);
|
||||||
|
|
||||||
// 根据操作类型执行对应的方法(这些方法内部会自动创建日志记录)
|
// 根据操作类型执行对应的方法(这些方法内部会自动创建日志记录)
|
||||||
if (CaseDocumentLogOptTypeEnum.CREATE.getCode().equals(optType)) {
|
if (CaseDocumentLogOptTypeEnum.CREATE.getCode().equals(optType)) {
|
||||||
// 上传案例文档
|
// 上传案例文档
|
||||||
retrySuccess = caseKnowledgeService.uploadCaseDocument(caseId);
|
retrySuccess = caseKnowledgeService.uploadCaseDocument(caseId);
|
||||||
log.info("执行上传案例文档重试,caseId: {}, 结果: {}", caseId, retrySuccess);
|
log.info("[异步任务] 执行上传案例文档重试,caseId: {}, 结果: {}", caseId, retrySuccess);
|
||||||
|
|
||||||
} else if (CaseDocumentLogOptTypeEnum.DELETE.getCode().equals(optType)) {
|
} else if (CaseDocumentLogOptTypeEnum.DELETE.getCode().equals(optType)) {
|
||||||
// 删除案例文档
|
// 删除案例文档
|
||||||
retrySuccess = caseKnowledgeService.deleteCaseDocument(caseId);
|
retrySuccess = caseKnowledgeService.deleteCaseDocument(caseId);
|
||||||
log.info("执行删除案例文档重试,caseId: {}, 结果: {}", caseId, retrySuccess);
|
log.info("[异步任务] 执行删除案例文档重试,caseId: {}, 结果: {}", caseId, retrySuccess);
|
||||||
|
|
||||||
} else if (CaseDocumentLogOptTypeEnum.UPDATE.getCode().equals(optType)) {
|
} else if (CaseDocumentLogOptTypeEnum.UPDATE.getCode().equals(optType)) {
|
||||||
// 更新案例文档
|
// 更新案例文档
|
||||||
retrySuccess = caseKnowledgeService.updateCaseDocument(caseId);
|
retrySuccess = caseKnowledgeService.updateCaseDocument(caseId);
|
||||||
log.info("执行更新案例文档重试,caseId: {}, 结果: {}", caseId, retrySuccess);
|
log.info("[异步任务] 执行更新案例文档重试,caseId: {}, 结果: {}", caseId, retrySuccess);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalArgumentException("不支持的操作类型: " + optType);
|
throw new IllegalArgumentException("不支持的操作类型: " + optType);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (retrySuccess) {
|
if (retrySuccess) {
|
||||||
log.info("AI调用重试成功,操作类型: {}, caseId: {}", optType, caseId);
|
log.info("[异步任务] AI调用重试成功,操作类型: {}, caseId: {}", optType, caseId);
|
||||||
} else {
|
} else {
|
||||||
log.warn("AI调用重试失败,操作类型: {}, caseId: {}", optType, caseId);
|
log.warn("[异步任务] AI调用重试失败,操作类型: {}, caseId: {}", optType, caseId);
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("AI调用重试异常,操作类型: {}, caseId: {}",
|
log.error("[异步任务] AI调用重试异常,操作类型: {}, caseId: {}",
|
||||||
originalLog.getOptType(), originalLog.getCaseId(), e);
|
optType, caseId, e);
|
||||||
retrySuccess = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return retrySuccess;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
Reference in New Issue
Block a user