feat: 增加重新上传的批处理

This commit is contained in:
liu.zixi
2025-12-01 19:44:01 +08:00
parent 49d3ad5999
commit 38c2784f51
3 changed files with 73 additions and 11 deletions

View File

@@ -1,5 +1,6 @@
package com.xboe.module.boecase.api; package com.xboe.module.boecase.api;
import com.xboe.module.boecase.task.CaseUploadTask;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate;
@@ -20,14 +21,21 @@ public class CaseUploadTaskApi {
@Autowired @Autowired
private StringRedisTemplate stringRedisTemplate; private StringRedisTemplate stringRedisTemplate;
private static final String CASE_UPLOAD_LAST_ID_KEY = "case:upload:last:id";
/** /**
* 清除处理位置标记,使下次任务从头开始执行 * 清除处理位置标记,使下次任务从头开始执行
*/ */
@PostMapping("/reset") @PostMapping("/reset")
public void resetLastProcessedId() { public void resetLastProcessedId() {
stringRedisTemplate.delete(CASE_UPLOAD_LAST_ID_KEY); stringRedisTemplate.delete(CaseUploadTask.CASE_UPLOAD_LAST_ID_KEY);
log.info("已清除上次处理位置标记");
}
/**
* 清除处理位置标记,使下次任务从头开始执行
*/
@PostMapping("/reload/reset")
public void resetReloadProcessedId() {
stringRedisTemplate.delete(CaseUploadTask.CASE_RELOAD_LAST_ID_KEY);
log.info("已清除上次处理位置标记"); log.info("已清除上次处理位置标记");
} }
} }

View File

@@ -652,7 +652,8 @@ public class CaseKnowledgeServiceImpl implements ICaseKnowledgeService {
.findList(CaseDocumentLog.class, 1, .findList(CaseDocumentLog.class, 1,
OrderCondition.desc("sysCreateTime"), OrderCondition.desc("sysCreateTime"),
FieldFilters.eq("caseId", caseId), FieldFilters.eq("caseId", caseId),
FieldFilters.eq("requestUrl", CaseAiConstants.CASE_DOC_UPLOAD_INTERFACE_NAME)); FieldFilters.eq("requestUrl", CaseAiConstants.CASE_DOC_UPLOAD_INTERFACE_NAME),
FieldFilters.eq("caseStatus", CaseDocumentLogCaseStatusEnum.SUCCESS.getCode()));
if (logList.isEmpty()) { if (logList.isEmpty()) {
log.info("删除案例文档失败未找到相关的日志记录caseId: {}", caseId); log.info("删除案例文档失败未找到相关的日志记录caseId: {}", caseId);

View File

@@ -1,6 +1,9 @@
package com.xboe.module.boecase.task; package com.xboe.module.boecase.task;
import com.xboe.common.OrderCondition;
import com.xboe.constants.CaseAiConstants; import com.xboe.constants.CaseAiConstants;
import com.xboe.core.orm.FieldFilters;
import com.xboe.core.orm.QueryBuilder;
import com.xboe.enums.CaseDocumentLogCaseStatusEnum; import com.xboe.enums.CaseDocumentLogCaseStatusEnum;
import com.xboe.enums.CaseDocumentLogOptStatusEnum; import com.xboe.enums.CaseDocumentLogOptStatusEnum;
import com.xboe.enums.CaseDocumentLogOptTypeEnum; import com.xboe.enums.CaseDocumentLogOptTypeEnum;
@@ -43,7 +46,39 @@ public class CaseUploadTask {
@Autowired @Autowired
private StringRedisTemplate stringRedisTemplate; private StringRedisTemplate stringRedisTemplate;
private static final String CASE_UPLOAD_LAST_ID_KEY = "case:upload:last:id"; public static final String CASE_UPLOAD_LAST_ID_KEY = "case:upload:last:id";
public static final String CASE_RELOAD_LAST_ID_KEY = "case:reload:last:id";
@XxlJob("reloadJob")
public void reloadJob() {
String currentLastId = null;
try {
// 从Redis获取上次处理的最后一条记录ID
String lastProcessedId = stringRedisTemplate.opsForValue().get(CASE_RELOAD_LAST_ID_KEY);
// 查询需要重新加载的案例
List<CaseDocumentLog> logsToReload = listToReload(lastProcessedId);
if (logsToReload.isEmpty()) {
return;
}
currentLastId = logsToReload.get(logsToReload.size() - 1).getId();
for (CaseDocumentLog log : logsToReload) {
String caseId = log.getCaseId();
Cases cases = casesDao.get(caseId);
if (cases != null && StringUtils.isNotBlank(cases.getFilePath())) {
// 更新
caseAiDocumentAsyncHandler.process(CaseDocumentLogOptTypeEnum.UPDATE, cases);
}
}
} catch (Exception e) {
log.error("[reload]执行重新上传任务时发生异常", e);
} finally {
if (currentLastId != null) {
stringRedisTemplate.opsForValue().set(CASE_RELOAD_LAST_ID_KEY, currentLastId);
log.info("[reload] 已重新上传案例最后一条caseDocumentLogId 已更新为: {}", currentLastId);
}
}
}
@XxlJob("oldDataUploadJob") @XxlJob("oldDataUploadJob")
public void oldDataUploadJob() { public void oldDataUploadJob() {
@@ -142,25 +177,43 @@ public class CaseUploadTask {
* @return 案例列表 * @return 案例列表
*/ */
private List<Cases> findCasesToProcess(String lastProcessedId) { private List<Cases> findCasesToProcess(String lastProcessedId) {
com.xboe.core.orm.QueryBuilder queryBuilder = com.xboe.core.orm.QueryBuilder.from(Cases.class); QueryBuilder queryBuilder = QueryBuilder.from(Cases.class);
queryBuilder.addFilter(com.xboe.core.orm.FieldFilters.eq("deleted", false)); queryBuilder.addFilter(FieldFilters.eq("deleted", false));
// 只处理有文件路径的案例 // 只处理有文件路径的案例
queryBuilder.addFilter(com.xboe.core.orm.FieldFilters.isNotNull("filePath")); queryBuilder.addFilter(FieldFilters.isNotNull("filePath"));
queryBuilder.addFilter(com.xboe.core.orm.FieldFilters.ne("filePath", "")); queryBuilder.addFilter(FieldFilters.ne("filePath", ""));
// 如果有上次处理的ID则从该ID之后开始查询 // 如果有上次处理的ID则从该ID之后开始查询
if (lastProcessedId != null && !lastProcessedId.isEmpty()) { if (lastProcessedId != null && !lastProcessedId.isEmpty()) {
queryBuilder.addFilter(com.xboe.core.orm.FieldFilters.gt("id", lastProcessedId)); queryBuilder.addFilter(FieldFilters.gt("id", lastProcessedId));
} }
// 按创建时间升序排序 // 按创建时间升序排序
queryBuilder.addOrder(com.xboe.common.OrderCondition.asc("id")); queryBuilder.addOrder(OrderCondition.asc("id"));
// 限制每次处理的数量,避免一次性处理太多数据 // 限制每次处理的数量,避免一次性处理太多数据
queryBuilder.setPageSize(100); queryBuilder.setPageSize(100);
return casesDao.findList(queryBuilder.builder()); return casesDao.findList(queryBuilder.builder());
} }
/**
* 获取需要重新加载的案例
* @param lastProcessedId
* @return
*/
private List<CaseDocumentLog> listToReload(String lastProcessedId) {
QueryBuilder queryBuilder = QueryBuilder.from(Cases.class);
queryBuilder.addFilter(FieldFilters.eq("deleted", false));
queryBuilder.addFilter(FieldFilters.eq("requestUrl", CaseAiConstants.CASE_DOC_UPLOAD_INTERFACE_NAME));
queryBuilder.addFilter(FieldFilters.eq("caseStatus", CaseDocumentLogCaseStatusEnum.SUCCESS.getCode()));
if (lastProcessedId != null && !lastProcessedId.isEmpty()) {
queryBuilder.addFilter(FieldFilters.gt("id", lastProcessedId));
}
queryBuilder.addOrder(OrderCondition.asc("id"));
queryBuilder.setPageSize(100);
return caseDocumentLogDao.findList(queryBuilder.builder());
}
private void fixOnLastCase(String currentLastId) { private void fixOnLastCase(String currentLastId) {
stringRedisTemplate.opsForValue().set(CASE_UPLOAD_LAST_ID_KEY, currentLastId); stringRedisTemplate.opsForValue().set(CASE_UPLOAD_LAST_ID_KEY, currentLastId);
log.info("已处理案例最后一条记录ID已更新为: {}", currentLastId); log.info("已处理案例最后一条记录ID已更新为: {}", currentLastId);