From f35ab10bd20dd35f79bdbbd6a138a6d2d8e2acd7 Mon Sep 17 00:00:00 2001 From: "liu.zixi" Date: Thu, 9 Oct 2025 14:38:39 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=B9=E5=A4=84=E7=90=86=EF=BC=9AJobHandler?= =?UTF-8?q?=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../module/boecase/api/CaseUploadTaskApi.java | 33 +++++ .../async/CaseAiDocumentAsyncHandler.java | 3 +- .../service/ICaseKnowledgeService.java | 3 + .../impl/CaseKnowledgeServiceImpl.java | 60 ++++---- .../boecase/task/CaseDocumentLogTask.java | 9 +- .../module/boecase/task/CaseUploadTask.java | 133 +++++++++++++++++- 6 files changed, 207 insertions(+), 34 deletions(-) create mode 100644 servers/boe-server-all/src/main/java/com/xboe/module/boecase/api/CaseUploadTaskApi.java diff --git a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/api/CaseUploadTaskApi.java b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/api/CaseUploadTaskApi.java new file mode 100644 index 00000000..e8a28a6a --- /dev/null +++ b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/api/CaseUploadTaskApi.java @@ -0,0 +1,33 @@ +package com.xboe.module.boecase.api; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; + +/** + * 案例上传任务API + */ +@Slf4j +@RestController +@RequestMapping("/xboe/m/boe/caseUpload") +public class CaseUploadTaskApi { + + @Autowired + private StringRedisTemplate stringRedisTemplate; + + private static final String CASE_UPLOAD_LAST_ID_KEY = "case:upload:last:id"; + + /** + * 清除处理位置标记,使下次任务从头开始执行 + */ + @PostMapping("/reset") + public void resetLastProcessedId() { + stringRedisTemplate.delete(CASE_UPLOAD_LAST_ID_KEY); + log.info("已清除上次处理位置标记"); + } +} \ No newline at end of file diff --git a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/async/CaseAiDocumentAsyncHandler.java b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/async/CaseAiDocumentAsyncHandler.java index 3357e8a7..4bc4a7f1 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/async/CaseAiDocumentAsyncHandler.java +++ b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/async/CaseAiDocumentAsyncHandler.java @@ -59,8 +59,9 @@ public class CaseAiDocumentAsyncHandler { caseKnowledgeService.uploadCaseDocument(cases.getId()); break; } + log.info("处理案例成功,caseId: {}, 操作类型: {}", cases.getId(), optTypeEnum.getDesc()); } catch (Exception e) { - log.error("处理案例失败,caseId: {}, optType: {}", cases.getId(), optTypeEnum.getCode(), e); + log.error("处理案例失败,caseId: {}, 操作类型: {}", cases.getId(), optTypeEnum.getDesc(), e); } } } diff --git a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/ICaseKnowledgeService.java b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/ICaseKnowledgeService.java index f2159e81..9aceef4d 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/ICaseKnowledgeService.java +++ b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/ICaseKnowledgeService.java @@ -1,5 +1,7 @@ package com.xboe.module.boecase.service; +import org.springframework.transaction.annotation.Transactional; + /** * 案例-知识库 */ @@ -42,5 +44,6 @@ public interface ICaseKnowledgeService { /** * 批量检查文件状态 */ + @Transactional(rollbackFor = Throwable.class) void batchCheckFileStatus(); } diff --git a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/impl/CaseKnowledgeServiceImpl.java b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/impl/CaseKnowledgeServiceImpl.java index b550e72c..869ae7dc 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/impl/CaseKnowledgeServiceImpl.java +++ b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/impl/CaseKnowledgeServiceImpl.java @@ -2,6 +2,7 @@ package com.xboe.module.boecase.service.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; +import com.xboe.common.PageList; import com.xboe.common.utils.IDGenerator; import com.xboe.common.utils.StringUtil; import com.xboe.common.OrderCondition; @@ -537,13 +538,14 @@ public class CaseKnowledgeServiceImpl implements ICaseKnowledgeService { public void batchCheckFileStatus() { log.info("开始批量检查文件状态"); - // 1. 查询CaseDocumentLog表中所有run_status等于0的数据 - List runningLogs = caseDocumentLogDao.getGenericDao() - .findList(CaseDocumentLog.class, + // 1. 查询CaseDocumentLog表中前10条run_status等于0的数据,并按创建时间升序排序 + PageList runningLogPage = caseDocumentLogDao.getGenericDao() + .findPage(1, 10, CaseDocumentLog.class, OrderCondition.asc("sysCreateTime"), FieldFilters.eq("runStatus", CaseDocumentLogRunStatusEnum.RUNNING.getCode())); + List runningLogs = runningLogPage.getList(); // 2. 如果没有符合条件的数据,完成 - if (runningLogs.isEmpty()) { + if (runningLogs == null || runningLogs.isEmpty()) { log.info("没有需要检查状态的文档,批量检查完成"); return; } @@ -572,33 +574,30 @@ public class CaseKnowledgeServiceImpl implements ICaseKnowledgeService { } // 5. 调用三方接口检查状态 - try { - String statusUrl = caseAiProperties.getBaseUrl() + "/apigateway/knowledge/v1/file/status"; - String kId = caseAiProperties.getCaseKnowledgeId(); - String taskIdsParam = String.join(",", taskIds); - String queryParams = "kId=" + URLEncoder.encode(kId, StandardCharsets.UTF_8.name()) + - "&taskIds=" + URLEncoder.encode(taskIdsParam, StandardCharsets.UTF_8.name()); - - try (CloseableHttpClient httpClient = HttpClients.createDefault()) { - HttpGet httpGet = new HttpGet(statusUrl + "?" + queryParams); - httpGet.setHeader("X-AI-ApiCode", caseAiProperties.getAiApiCode()); - httpGet.setHeader("access_token", accessToken); - - try (CloseableHttpResponse response = httpClient.execute(httpGet)) { - int statusCode = response.getStatusLine().getStatusCode(); - String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); - - if (statusCode == 200) { - JSONObject result = JSON.parseObject(responseBody); - if (result.getBooleanValue("success")) { - // 6. 解析返回结果并更新状态 - processFileStatusResponse(result, runningLogs); - } else { - log.error("调用文件状态接口业务处理失败,response: {}", responseBody); - } + String statusUrl = caseAiProperties.getBaseUrl() + "/apigateway/knowledge/v1/file/status"; + String kId = caseAiProperties.getCaseKnowledgeId(); + String taskIdsParam = String.join(",", taskIds); + String queryParams = "kId=" + kId + "&taskIds=" + taskIdsParam; + + try (CloseableHttpClient httpClient = HttpClients.createDefault()) { + HttpGet httpGet = new HttpGet(statusUrl + "?" + queryParams); + httpGet.setHeader("X-AI-ApiCode", caseAiProperties.getAiApiCode()); + httpGet.setHeader("access_token", accessToken); + + try (CloseableHttpResponse response = httpClient.execute(httpGet)) { + int statusCode = response.getStatusLine().getStatusCode(); + String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + + if (statusCode == 200) { + JSONObject result = JSON.parseObject(responseBody); + if (result.getBooleanValue("success")) { + // 6. 解析返回结果并更新状态 + processFileStatusResponse(result, runningLogs); } else { - log.error("调用文件状态接口失败,status: {}, response: {}", statusCode, responseBody); + log.error("调用文件状态接口业务处理失败,response: {}", responseBody); } + } else { + log.error("调用文件状态接口失败,status: {}, response: {}", statusCode, responseBody); } } } catch (Exception e) { @@ -884,8 +883,9 @@ public class CaseKnowledgeServiceImpl implements ICaseKnowledgeService { caseDocumentLogDao.save(caseLog); log.info("更新CaseDocumentLog成功,logId: {}, taskId: {}, fileStatus: {}", caseLog.getId(), caseLog.getTaskId(), fileStatus); + } else { + log.debug("无需更新CaseDocumentLog,taskId: {}, fileStatus: {}", caseLog.getTaskId(), fileStatus); } - } catch (Exception e) { log.error("更新日志状态异常,taskId: {}, fileStatus: {}", caseLog.getTaskId(), fileStatus, e); } diff --git a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/task/CaseDocumentLogTask.java b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/task/CaseDocumentLogTask.java index a6fafbe6..ffbada63 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/task/CaseDocumentLogTask.java +++ b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/task/CaseDocumentLogTask.java @@ -13,8 +13,13 @@ public class CaseDocumentLogTask { @Autowired private ICaseKnowledgeService caseKnowledgeService; -// @XxlJob("batchCheckFileStatus") - public void batchCheckFileStatus() { + /** + * 批量查询文件状态并修改 + * 目前每次查看10条数据,批处理拟每10秒一次,每分钟可运行6次,60条数据 + * cron: 0/10 * * * * ? + */ + @XxlJob("batchCheckFileStatusJob") + public void batchCheckFileStatusJob() { log.info("开始批量查询文件状态"); caseKnowledgeService.batchCheckFileStatus(); log.info("结束批量查询文件状态"); diff --git a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/task/CaseUploadTask.java b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/task/CaseUploadTask.java index 3239922b..6bc41b84 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/task/CaseUploadTask.java +++ b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/task/CaseUploadTask.java @@ -1,12 +1,143 @@ package com.xboe.module.boecase.task; +import com.xboe.enums.CaseDocumentLogOptTypeEnum; +import com.xboe.module.boecase.async.CaseAiDocumentAsyncHandler; +import com.xboe.module.boecase.dao.CaseDocumentLogDao; +import com.xboe.module.boecase.dao.CasesDao; +import com.xboe.module.boecase.entity.CaseDocumentLog; +import com.xboe.module.boecase.entity.Cases; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; /** * 旧案例上传 */ @Component @Slf4j +@RestController +@RequestMapping("/xboe/m/boe/caseUpload") public class CaseUploadTask { -} + + @Resource + private CasesDao casesDao; + + @Resource + private CaseDocumentLogDao caseDocumentLogDao; + + @Autowired + private CaseAiDocumentAsyncHandler caseAiDocumentAsyncHandler; + + @Autowired + private StringRedisTemplate stringRedisTemplate; + + private static final String CASE_UPLOAD_LAST_ID_KEY = "case:upload:last:id"; + + @PostMapping("/execute") + public void oldDataUploadJob() { + try { + log.info("开始执行旧案例上传任务"); + + // 从Redis获取上次处理的最后一条记录ID + String lastProcessedId = stringRedisTemplate.opsForValue().get(CASE_UPLOAD_LAST_ID_KEY); + log.info("上次处理的最后一条记录ID: {}", lastProcessedId); + + // 查询符合条件的案例数据 + List casesToProcess = findCasesToProcess(lastProcessedId); + log.info("查询到待处理案例数量: {}", casesToProcess.size()); + + if (casesToProcess.isEmpty()) { + log.info("没有需要处理的案例数据"); + return; + } + + // 批量检查这些案例是否已在CaseDocumentLog中存在记录,提升性能 + List caseIds = new ArrayList<>(); + for (Cases cases : casesToProcess) { + caseIds.add(cases.getId()); + } + + // 一次性查询所有相关案例的记录 + List existingLogs = caseDocumentLogDao.getGenericDao() + .findList(CaseDocumentLog.class, + com.xboe.core.orm.FieldFilters.in("caseId", caseIds)); + + // 过滤出未在CaseDocumentLog中存在的案例 + List casesList = new ArrayList<>(); + for (Cases cases : casesToProcess) { + boolean exists = false; + for (CaseDocumentLog log : existingLogs) { + if (cases.getId().equals(log.getCaseId())) { + exists = true; + break; + } + } + if (!exists) { + casesList.add(cases); + } + } + + log.info("过滤后需要处理的案例数量: {}", casesList.size()); + + if (!casesList.isEmpty()) { + // 调用异步处理方法 + caseAiDocumentAsyncHandler.process(CaseDocumentLogOptTypeEnum.CREATE, casesList.toArray(new Cases[0])); + + // 将当前处理的最后一条数据ID存入Redis + String currentLastId = casesList.get(casesList.size() - 1).getId(); + stringRedisTemplate.opsForValue().set(CASE_UPLOAD_LAST_ID_KEY, currentLastId); + log.info("已处理案例,最后一条记录ID已更新为: {}", currentLastId); + } else { + log.info("没有新的案例需要处理"); + } + + log.info("旧案例上传任务执行完成"); + } catch (Exception e) { + log.error("执行旧案例上传任务时发生异常", e); + } + } + + /** + * 清除处理位置标记,使下次任务从头开始执行 + */ + @PostMapping("/reset") + public void resetLastProcessedId() { + stringRedisTemplate.delete(CASE_UPLOAD_LAST_ID_KEY); + log.info("已清除上次处理位置标记"); + } + + /** + * 查询需要处理的案例数据 + * + * @param lastProcessedId 上次处理的最后一条记录ID + * @return 案例列表 + */ + private List findCasesToProcess(String lastProcessedId) { + com.xboe.core.orm.QueryBuilder queryBuilder = com.xboe.core.orm.QueryBuilder.from(Cases.class); + queryBuilder.addFilter(com.xboe.core.orm.FieldFilters.eq("deleted", false)); + // 只处理有文件路径的案例 + queryBuilder.addFilter(com.xboe.core.orm.FieldFilters.isNotNull("filePath")); + queryBuilder.addFilter(com.xboe.core.orm.FieldFilters.notEq("filePath", "")); + + // 如果有上次处理的ID,则从该ID之后开始查询 + if (lastProcessedId != null && !lastProcessedId.isEmpty()) { + queryBuilder.addFilter(com.xboe.core.orm.FieldFilters.gt("id", lastProcessedId)); + } + + // 按创建时间升序排序 + queryBuilder.addOrder(com.xboe.common.OrderCondition.asc("id")); + // 限制每次处理的数量,避免一次性处理太多数据 + queryBuilder.setPageSize(100); + + return casesDao.findList(queryBuilder.builder()); + } +} \ No newline at end of file