批处理:JobHandler开发

This commit is contained in:
liu.zixi
2025-10-09 14:38:39 +08:00
parent 53b6a0203f
commit f0235d5294
6 changed files with 207 additions and 34 deletions

View File

@@ -0,0 +1,33 @@
package com.xboe.module.boecase.api;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* 案例上传任务API
*/
@Slf4j
@RestController
@RequestMapping("/xboe/m/boe/caseUpload")
public class CaseUploadTaskApi {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private static final String CASE_UPLOAD_LAST_ID_KEY = "case:upload:last:id";
/**
* 清除处理位置标记,使下次任务从头开始执行
*/
@PostMapping("/reset")
public void resetLastProcessedId() {
stringRedisTemplate.delete(CASE_UPLOAD_LAST_ID_KEY);
log.info("已清除上次处理位置标记");
}
}

View File

@@ -59,8 +59,9 @@ public class CaseAiDocumentAsyncHandler {
caseKnowledgeService.uploadCaseDocument(cases.getId());
break;
}
log.info("处理案例成功caseId: {}, 操作类型: {}", cases.getId(), optTypeEnum.getDesc());
} catch (Exception e) {
log.error("处理案例失败caseId: {}, optType: {}", cases.getId(), optTypeEnum.getCode(), e);
log.error("处理案例失败caseId: {}, 操作类型: {}", cases.getId(), optTypeEnum.getDesc(), e);
}
}
}

View File

@@ -1,5 +1,7 @@
package com.xboe.module.boecase.service;
import org.springframework.transaction.annotation.Transactional;
/**
* 案例-知识库
*/
@@ -42,5 +44,6 @@ public interface ICaseKnowledgeService {
/**
* 批量检查文件状态
*/
@Transactional(rollbackFor = Throwable.class)
void batchCheckFileStatus();
}

View File

@@ -2,6 +2,7 @@ package com.xboe.module.boecase.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xboe.common.PageList;
import com.xboe.common.utils.IDGenerator;
import com.xboe.common.utils.StringUtil;
import com.xboe.common.OrderCondition;
@@ -537,13 +538,14 @@ public class CaseKnowledgeServiceImpl implements ICaseKnowledgeService {
public void batchCheckFileStatus() {
log.info("开始批量检查文件状态");
// 1. 查询CaseDocumentLog表中所有run_status等于0的数据
List<CaseDocumentLog> runningLogs = caseDocumentLogDao.getGenericDao()
.findList(CaseDocumentLog.class,
// 1. 查询CaseDocumentLog表中前10条run_status等于0的数据,并按创建时间升序排序
PageList<CaseDocumentLog> runningLogPage = caseDocumentLogDao.getGenericDao()
.findPage(1, 10, CaseDocumentLog.class, OrderCondition.asc("sysCreateTime"),
FieldFilters.eq("runStatus", CaseDocumentLogRunStatusEnum.RUNNING.getCode()));
List<CaseDocumentLog> runningLogs = runningLogPage.getList();
// 2. 如果没有符合条件的数据,完成
if (runningLogs.isEmpty()) {
if (runningLogs == null || runningLogs.isEmpty()) {
log.info("没有需要检查状态的文档,批量检查完成");
return;
}
@@ -572,33 +574,30 @@ public class CaseKnowledgeServiceImpl implements ICaseKnowledgeService {
}
// 5. 调用三方接口检查状态
try {
String statusUrl = caseAiProperties.getBaseUrl() + "/apigateway/knowledge/v1/file/status";
String kId = caseAiProperties.getCaseKnowledgeId();
String taskIdsParam = String.join(",", taskIds);
String queryParams = "kId=" + URLEncoder.encode(kId, StandardCharsets.UTF_8.name()) +
"&taskIds=" + URLEncoder.encode(taskIdsParam, StandardCharsets.UTF_8.name());
try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
HttpGet httpGet = new HttpGet(statusUrl + "?" + queryParams);
httpGet.setHeader("X-AI-ApiCode", caseAiProperties.getAiApiCode());
httpGet.setHeader("access_token", accessToken);
try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
int statusCode = response.getStatusLine().getStatusCode();
String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
if (statusCode == 200) {
JSONObject result = JSON.parseObject(responseBody);
if (result.getBooleanValue("success")) {
// 6. 解析返回结果并更新状态
processFileStatusResponse(result, runningLogs);
} else {
log.error("调用文件状态接口业务处理失败response: {}", responseBody);
}
String statusUrl = caseAiProperties.getBaseUrl() + "/apigateway/knowledge/v1/file/status";
String kId = caseAiProperties.getCaseKnowledgeId();
String taskIdsParam = String.join(",", taskIds);
String queryParams = "kId=" + kId + "&taskIds=" + taskIdsParam;
try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
HttpGet httpGet = new HttpGet(statusUrl + "?" + queryParams);
httpGet.setHeader("X-AI-ApiCode", caseAiProperties.getAiApiCode());
httpGet.setHeader("access_token", accessToken);
try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
int statusCode = response.getStatusLine().getStatusCode();
String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
if (statusCode == 200) {
JSONObject result = JSON.parseObject(responseBody);
if (result.getBooleanValue("success")) {
// 6. 解析返回结果并更新状态
processFileStatusResponse(result, runningLogs);
} else {
log.error("调用文件状态接口失败status: {}, response: {}", statusCode, responseBody);
log.error("调用文件状态接口业务处理失败response: {}", responseBody);
}
} else {
log.error("调用文件状态接口失败status: {}, response: {}", statusCode, responseBody);
}
}
} catch (Exception e) {
@@ -884,8 +883,9 @@ public class CaseKnowledgeServiceImpl implements ICaseKnowledgeService {
caseDocumentLogDao.save(caseLog);
log.info("更新CaseDocumentLog成功logId: {}, taskId: {}, fileStatus: {}",
caseLog.getId(), caseLog.getTaskId(), fileStatus);
} else {
log.debug("无需更新CaseDocumentLogtaskId: {}, fileStatus: {}", caseLog.getTaskId(), fileStatus);
}
} catch (Exception e) {
log.error("更新日志状态异常taskId: {}, fileStatus: {}", caseLog.getTaskId(), fileStatus, e);
}

View File

@@ -13,8 +13,13 @@ public class CaseDocumentLogTask {
@Autowired
private ICaseKnowledgeService caseKnowledgeService;
// @XxlJob("batchCheckFileStatus")
public void batchCheckFileStatus() {
/**
* 批量查询文件状态并修改
* 目前每次查看10条数据批处理拟每10秒一次每分钟可运行6次60条数据
* cron: 0/10 * * * * ?
*/
@XxlJob("batchCheckFileStatusJob")
public void batchCheckFileStatusJob() {
log.info("开始批量查询文件状态");
caseKnowledgeService.batchCheckFileStatus();
log.info("结束批量查询文件状态");

View File

@@ -1,12 +1,143 @@
package com.xboe.module.boecase.task;
import com.xboe.enums.CaseDocumentLogOptTypeEnum;
import com.xboe.module.boecase.async.CaseAiDocumentAsyncHandler;
import com.xboe.module.boecase.dao.CaseDocumentLogDao;
import com.xboe.module.boecase.dao.CasesDao;
import com.xboe.module.boecase.entity.CaseDocumentLog;
import com.xboe.module.boecase.entity.Cases;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
/**
* 旧案例上传
*/
@Component
@Slf4j
@RestController
@RequestMapping("/xboe/m/boe/caseUpload")
public class CaseUploadTask {
}
@Resource
private CasesDao casesDao;
@Resource
private CaseDocumentLogDao caseDocumentLogDao;
@Autowired
private CaseAiDocumentAsyncHandler caseAiDocumentAsyncHandler;
@Autowired
private StringRedisTemplate stringRedisTemplate;
private static final String CASE_UPLOAD_LAST_ID_KEY = "case:upload:last:id";
@PostMapping("/execute")
public void oldDataUploadJob() {
try {
log.info("开始执行旧案例上传任务");
// 从Redis获取上次处理的最后一条记录ID
String lastProcessedId = stringRedisTemplate.opsForValue().get(CASE_UPLOAD_LAST_ID_KEY);
log.info("上次处理的最后一条记录ID: {}", lastProcessedId);
// 查询符合条件的案例数据
List<Cases> casesToProcess = findCasesToProcess(lastProcessedId);
log.info("查询到待处理案例数量: {}", casesToProcess.size());
if (casesToProcess.isEmpty()) {
log.info("没有需要处理的案例数据");
return;
}
// 批量检查这些案例是否已在CaseDocumentLog中存在记录提升性能
List<String> caseIds = new ArrayList<>();
for (Cases cases : casesToProcess) {
caseIds.add(cases.getId());
}
// 一次性查询所有相关案例的记录
List<CaseDocumentLog> existingLogs = caseDocumentLogDao.getGenericDao()
.findList(CaseDocumentLog.class,
com.xboe.core.orm.FieldFilters.in("caseId", caseIds));
// 过滤出未在CaseDocumentLog中存在的案例
List<Cases> casesList = new ArrayList<>();
for (Cases cases : casesToProcess) {
boolean exists = false;
for (CaseDocumentLog log : existingLogs) {
if (cases.getId().equals(log.getCaseId())) {
exists = true;
break;
}
}
if (!exists) {
casesList.add(cases);
}
}
log.info("过滤后需要处理的案例数量: {}", casesList.size());
if (!casesList.isEmpty()) {
// 调用异步处理方法
caseAiDocumentAsyncHandler.process(CaseDocumentLogOptTypeEnum.CREATE, casesList.toArray(new Cases[0]));
// 将当前处理的最后一条数据ID存入Redis
String currentLastId = casesList.get(casesList.size() - 1).getId();
stringRedisTemplate.opsForValue().set(CASE_UPLOAD_LAST_ID_KEY, currentLastId);
log.info("已处理案例最后一条记录ID已更新为: {}", currentLastId);
} else {
log.info("没有新的案例需要处理");
}
log.info("旧案例上传任务执行完成");
} catch (Exception e) {
log.error("执行旧案例上传任务时发生异常", e);
}
}
/**
* 清除处理位置标记,使下次任务从头开始执行
*/
@PostMapping("/reset")
public void resetLastProcessedId() {
stringRedisTemplate.delete(CASE_UPLOAD_LAST_ID_KEY);
log.info("已清除上次处理位置标记");
}
/**
* 查询需要处理的案例数据
*
* @param lastProcessedId 上次处理的最后一条记录ID
* @return 案例列表
*/
private List<Cases> findCasesToProcess(String lastProcessedId) {
com.xboe.core.orm.QueryBuilder queryBuilder = com.xboe.core.orm.QueryBuilder.from(Cases.class);
queryBuilder.addFilter(com.xboe.core.orm.FieldFilters.eq("deleted", false));
// 只处理有文件路径的案例
queryBuilder.addFilter(com.xboe.core.orm.FieldFilters.isNotNull("filePath"));
queryBuilder.addFilter(com.xboe.core.orm.FieldFilters.notEq("filePath", ""));
// 如果有上次处理的ID则从该ID之后开始查询
if (lastProcessedId != null && !lastProcessedId.isEmpty()) {
queryBuilder.addFilter(com.xboe.core.orm.FieldFilters.gt("id", lastProcessedId));
}
// 按创建时间升序排序
queryBuilder.addOrder(com.xboe.common.OrderCondition.asc("id"));
// 限制每次处理的数量,避免一次性处理太多数据
queryBuilder.setPageSize(100);
return casesDao.findList(queryBuilder.builder());
}
}