mirror of
https://codeup.aliyun.com/67762337eccfc218f6110e0e/per-boe/java-servers.git
synced 2025-12-14 05:16:49 +08:00
批处理:JobHandler开发
This commit is contained in:
@@ -0,0 +1,33 @@
|
|||||||
|
package com.xboe.module.boecase.api;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||||
|
import org.springframework.web.bind.annotation.PostMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 案例上传任务API
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@RestController
|
||||||
|
@RequestMapping("/xboe/m/boe/caseUpload")
|
||||||
|
public class CaseUploadTaskApi {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private StringRedisTemplate stringRedisTemplate;
|
||||||
|
|
||||||
|
private static final String CASE_UPLOAD_LAST_ID_KEY = "case:upload:last:id";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 清除处理位置标记,使下次任务从头开始执行
|
||||||
|
*/
|
||||||
|
@PostMapping("/reset")
|
||||||
|
public void resetLastProcessedId() {
|
||||||
|
stringRedisTemplate.delete(CASE_UPLOAD_LAST_ID_KEY);
|
||||||
|
log.info("已清除上次处理位置标记");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -59,8 +59,9 @@ public class CaseAiDocumentAsyncHandler {
|
|||||||
caseKnowledgeService.uploadCaseDocument(cases.getId());
|
caseKnowledgeService.uploadCaseDocument(cases.getId());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
log.info("处理案例成功,caseId: {}, 操作类型: {}", cases.getId(), optTypeEnum.getDesc());
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("处理案例失败,caseId: {}, optType: {}", cases.getId(), optTypeEnum.getCode(), e);
|
log.error("处理案例失败,caseId: {}, 操作类型: {}", cases.getId(), optTypeEnum.getDesc(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,7 @@
|
|||||||
package com.xboe.module.boecase.service;
|
package com.xboe.module.boecase.service;
|
||||||
|
|
||||||
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 案例-知识库
|
* 案例-知识库
|
||||||
*/
|
*/
|
||||||
@@ -42,5 +44,6 @@ public interface ICaseKnowledgeService {
|
|||||||
/**
|
/**
|
||||||
* 批量检查文件状态
|
* 批量检查文件状态
|
||||||
*/
|
*/
|
||||||
|
@Transactional(rollbackFor = Throwable.class)
|
||||||
void batchCheckFileStatus();
|
void batchCheckFileStatus();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package com.xboe.module.boecase.service.impl;
|
|||||||
|
|
||||||
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSON;
|
||||||
import com.alibaba.fastjson.JSONObject;
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import com.xboe.common.PageList;
|
||||||
import com.xboe.common.utils.IDGenerator;
|
import com.xboe.common.utils.IDGenerator;
|
||||||
import com.xboe.common.utils.StringUtil;
|
import com.xboe.common.utils.StringUtil;
|
||||||
import com.xboe.common.OrderCondition;
|
import com.xboe.common.OrderCondition;
|
||||||
@@ -537,13 +538,14 @@ public class CaseKnowledgeServiceImpl implements ICaseKnowledgeService {
|
|||||||
public void batchCheckFileStatus() {
|
public void batchCheckFileStatus() {
|
||||||
log.info("开始批量检查文件状态");
|
log.info("开始批量检查文件状态");
|
||||||
|
|
||||||
// 1. 查询CaseDocumentLog表中所有run_status等于0的数据
|
// 1. 查询CaseDocumentLog表中前10条run_status等于0的数据,并按创建时间升序排序
|
||||||
List<CaseDocumentLog> runningLogs = caseDocumentLogDao.getGenericDao()
|
PageList<CaseDocumentLog> runningLogPage = caseDocumentLogDao.getGenericDao()
|
||||||
.findList(CaseDocumentLog.class,
|
.findPage(1, 10, CaseDocumentLog.class, OrderCondition.asc("sysCreateTime"),
|
||||||
FieldFilters.eq("runStatus", CaseDocumentLogRunStatusEnum.RUNNING.getCode()));
|
FieldFilters.eq("runStatus", CaseDocumentLogRunStatusEnum.RUNNING.getCode()));
|
||||||
|
List<CaseDocumentLog> runningLogs = runningLogPage.getList();
|
||||||
|
|
||||||
// 2. 如果没有符合条件的数据,完成
|
// 2. 如果没有符合条件的数据,完成
|
||||||
if (runningLogs.isEmpty()) {
|
if (runningLogs == null || runningLogs.isEmpty()) {
|
||||||
log.info("没有需要检查状态的文档,批量检查完成");
|
log.info("没有需要检查状态的文档,批量检查完成");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -572,33 +574,30 @@ public class CaseKnowledgeServiceImpl implements ICaseKnowledgeService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 5. 调用三方接口检查状态
|
// 5. 调用三方接口检查状态
|
||||||
try {
|
String statusUrl = caseAiProperties.getBaseUrl() + "/apigateway/knowledge/v1/file/status";
|
||||||
String statusUrl = caseAiProperties.getBaseUrl() + "/apigateway/knowledge/v1/file/status";
|
String kId = caseAiProperties.getCaseKnowledgeId();
|
||||||
String kId = caseAiProperties.getCaseKnowledgeId();
|
String taskIdsParam = String.join(",", taskIds);
|
||||||
String taskIdsParam = String.join(",", taskIds);
|
String queryParams = "kId=" + kId + "&taskIds=" + taskIdsParam;
|
||||||
String queryParams = "kId=" + URLEncoder.encode(kId, StandardCharsets.UTF_8.name()) +
|
|
||||||
"&taskIds=" + URLEncoder.encode(taskIdsParam, StandardCharsets.UTF_8.name());
|
|
||||||
|
|
||||||
try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
|
try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
|
||||||
HttpGet httpGet = new HttpGet(statusUrl + "?" + queryParams);
|
HttpGet httpGet = new HttpGet(statusUrl + "?" + queryParams);
|
||||||
httpGet.setHeader("X-AI-ApiCode", caseAiProperties.getAiApiCode());
|
httpGet.setHeader("X-AI-ApiCode", caseAiProperties.getAiApiCode());
|
||||||
httpGet.setHeader("access_token", accessToken);
|
httpGet.setHeader("access_token", accessToken);
|
||||||
|
|
||||||
try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
|
try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
|
||||||
int statusCode = response.getStatusLine().getStatusCode();
|
int statusCode = response.getStatusLine().getStatusCode();
|
||||||
String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
|
String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
|
||||||
|
|
||||||
if (statusCode == 200) {
|
if (statusCode == 200) {
|
||||||
JSONObject result = JSON.parseObject(responseBody);
|
JSONObject result = JSON.parseObject(responseBody);
|
||||||
if (result.getBooleanValue("success")) {
|
if (result.getBooleanValue("success")) {
|
||||||
// 6. 解析返回结果并更新状态
|
// 6. 解析返回结果并更新状态
|
||||||
processFileStatusResponse(result, runningLogs);
|
processFileStatusResponse(result, runningLogs);
|
||||||
} else {
|
|
||||||
log.error("调用文件状态接口业务处理失败,response: {}", responseBody);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
log.error("调用文件状态接口失败,status: {}, response: {}", statusCode, responseBody);
|
log.error("调用文件状态接口业务处理失败,response: {}", responseBody);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
log.error("调用文件状态接口失败,status: {}, response: {}", statusCode, responseBody);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@@ -884,8 +883,9 @@ public class CaseKnowledgeServiceImpl implements ICaseKnowledgeService {
|
|||||||
caseDocumentLogDao.save(caseLog);
|
caseDocumentLogDao.save(caseLog);
|
||||||
log.info("更新CaseDocumentLog成功,logId: {}, taskId: {}, fileStatus: {}",
|
log.info("更新CaseDocumentLog成功,logId: {}, taskId: {}, fileStatus: {}",
|
||||||
caseLog.getId(), caseLog.getTaskId(), fileStatus);
|
caseLog.getId(), caseLog.getTaskId(), fileStatus);
|
||||||
|
} else {
|
||||||
|
log.debug("无需更新CaseDocumentLog,taskId: {}, fileStatus: {}", caseLog.getTaskId(), fileStatus);
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("更新日志状态异常,taskId: {}, fileStatus: {}", caseLog.getTaskId(), fileStatus, e);
|
log.error("更新日志状态异常,taskId: {}, fileStatus: {}", caseLog.getTaskId(), fileStatus, e);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,8 +13,13 @@ public class CaseDocumentLogTask {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private ICaseKnowledgeService caseKnowledgeService;
|
private ICaseKnowledgeService caseKnowledgeService;
|
||||||
|
|
||||||
// @XxlJob("batchCheckFileStatus")
|
/**
|
||||||
public void batchCheckFileStatus() {
|
* 批量查询文件状态并修改
|
||||||
|
* 目前每次查看10条数据,批处理拟每10秒一次,每分钟可运行6次,60条数据
|
||||||
|
* cron: 0/10 * * * * ?
|
||||||
|
*/
|
||||||
|
@XxlJob("batchCheckFileStatusJob")
|
||||||
|
public void batchCheckFileStatusJob() {
|
||||||
log.info("开始批量查询文件状态");
|
log.info("开始批量查询文件状态");
|
||||||
caseKnowledgeService.batchCheckFileStatus();
|
caseKnowledgeService.batchCheckFileStatus();
|
||||||
log.info("结束批量查询文件状态");
|
log.info("结束批量查询文件状态");
|
||||||
|
|||||||
@@ -1,12 +1,143 @@
|
|||||||
package com.xboe.module.boecase.task;
|
package com.xboe.module.boecase.task;
|
||||||
|
|
||||||
|
import com.xboe.enums.CaseDocumentLogOptTypeEnum;
|
||||||
|
import com.xboe.module.boecase.async.CaseAiDocumentAsyncHandler;
|
||||||
|
import com.xboe.module.boecase.dao.CaseDocumentLogDao;
|
||||||
|
import com.xboe.module.boecase.dao.CasesDao;
|
||||||
|
import com.xboe.module.boecase.entity.CaseDocumentLog;
|
||||||
|
import com.xboe.module.boecase.entity.Cases;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
import org.springframework.web.bind.annotation.PostMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RequestMapping;
|
||||||
|
import org.springframework.web.bind.annotation.RestController;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 旧案例上传
|
* 旧案例上传
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
@Slf4j
|
@Slf4j
|
||||||
|
@RestController
|
||||||
|
@RequestMapping("/xboe/m/boe/caseUpload")
|
||||||
public class CaseUploadTask {
|
public class CaseUploadTask {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private CasesDao casesDao;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private CaseDocumentLogDao caseDocumentLogDao;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private CaseAiDocumentAsyncHandler caseAiDocumentAsyncHandler;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private StringRedisTemplate stringRedisTemplate;
|
||||||
|
|
||||||
|
private static final String CASE_UPLOAD_LAST_ID_KEY = "case:upload:last:id";
|
||||||
|
|
||||||
|
@PostMapping("/execute")
|
||||||
|
public void oldDataUploadJob() {
|
||||||
|
try {
|
||||||
|
log.info("开始执行旧案例上传任务");
|
||||||
|
|
||||||
|
// 从Redis获取上次处理的最后一条记录ID
|
||||||
|
String lastProcessedId = stringRedisTemplate.opsForValue().get(CASE_UPLOAD_LAST_ID_KEY);
|
||||||
|
log.info("上次处理的最后一条记录ID: {}", lastProcessedId);
|
||||||
|
|
||||||
|
// 查询符合条件的案例数据
|
||||||
|
List<Cases> casesToProcess = findCasesToProcess(lastProcessedId);
|
||||||
|
log.info("查询到待处理案例数量: {}", casesToProcess.size());
|
||||||
|
|
||||||
|
if (casesToProcess.isEmpty()) {
|
||||||
|
log.info("没有需要处理的案例数据");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 批量检查这些案例是否已在CaseDocumentLog中存在记录,提升性能
|
||||||
|
List<String> caseIds = new ArrayList<>();
|
||||||
|
for (Cases cases : casesToProcess) {
|
||||||
|
caseIds.add(cases.getId());
|
||||||
|
}
|
||||||
|
|
||||||
|
// 一次性查询所有相关案例的记录
|
||||||
|
List<CaseDocumentLog> existingLogs = caseDocumentLogDao.getGenericDao()
|
||||||
|
.findList(CaseDocumentLog.class,
|
||||||
|
com.xboe.core.orm.FieldFilters.in("caseId", caseIds));
|
||||||
|
|
||||||
|
// 过滤出未在CaseDocumentLog中存在的案例
|
||||||
|
List<Cases> casesList = new ArrayList<>();
|
||||||
|
for (Cases cases : casesToProcess) {
|
||||||
|
boolean exists = false;
|
||||||
|
for (CaseDocumentLog log : existingLogs) {
|
||||||
|
if (cases.getId().equals(log.getCaseId())) {
|
||||||
|
exists = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!exists) {
|
||||||
|
casesList.add(cases);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("过滤后需要处理的案例数量: {}", casesList.size());
|
||||||
|
|
||||||
|
if (!casesList.isEmpty()) {
|
||||||
|
// 调用异步处理方法
|
||||||
|
caseAiDocumentAsyncHandler.process(CaseDocumentLogOptTypeEnum.CREATE, casesList.toArray(new Cases[0]));
|
||||||
|
|
||||||
|
// 将当前处理的最后一条数据ID存入Redis
|
||||||
|
String currentLastId = casesList.get(casesList.size() - 1).getId();
|
||||||
|
stringRedisTemplate.opsForValue().set(CASE_UPLOAD_LAST_ID_KEY, currentLastId);
|
||||||
|
log.info("已处理案例,最后一条记录ID已更新为: {}", currentLastId);
|
||||||
|
} else {
|
||||||
|
log.info("没有新的案例需要处理");
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("旧案例上传任务执行完成");
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("执行旧案例上传任务时发生异常", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 清除处理位置标记,使下次任务从头开始执行
|
||||||
|
*/
|
||||||
|
@PostMapping("/reset")
|
||||||
|
public void resetLastProcessedId() {
|
||||||
|
stringRedisTemplate.delete(CASE_UPLOAD_LAST_ID_KEY);
|
||||||
|
log.info("已清除上次处理位置标记");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 查询需要处理的案例数据
|
||||||
|
*
|
||||||
|
* @param lastProcessedId 上次处理的最后一条记录ID
|
||||||
|
* @return 案例列表
|
||||||
|
*/
|
||||||
|
private List<Cases> findCasesToProcess(String lastProcessedId) {
|
||||||
|
com.xboe.core.orm.QueryBuilder queryBuilder = com.xboe.core.orm.QueryBuilder.from(Cases.class);
|
||||||
|
queryBuilder.addFilter(com.xboe.core.orm.FieldFilters.eq("deleted", false));
|
||||||
|
// 只处理有文件路径的案例
|
||||||
|
queryBuilder.addFilter(com.xboe.core.orm.FieldFilters.isNotNull("filePath"));
|
||||||
|
queryBuilder.addFilter(com.xboe.core.orm.FieldFilters.notEq("filePath", ""));
|
||||||
|
|
||||||
|
// 如果有上次处理的ID,则从该ID之后开始查询
|
||||||
|
if (lastProcessedId != null && !lastProcessedId.isEmpty()) {
|
||||||
|
queryBuilder.addFilter(com.xboe.core.orm.FieldFilters.gt("id", lastProcessedId));
|
||||||
|
}
|
||||||
|
|
||||||
|
// 按创建时间升序排序
|
||||||
|
queryBuilder.addOrder(com.xboe.common.OrderCondition.asc("id"));
|
||||||
|
// 限制每次处理的数量,避免一次性处理太多数据
|
||||||
|
queryBuilder.setPageSize(100);
|
||||||
|
|
||||||
|
return casesDao.findList(queryBuilder.builder());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user