Compare commits

...

4 Commits

Author SHA1 Message Date
yang
6c750d2304 es数据同步 2024-12-19 16:39:36 +08:00
yang
aab50a0ff0 es数据同步 2024-12-18 15:26:17 +08:00
yang
9866618110 es数据同步 2024-12-18 13:39:21 +08:00
yang
be18f7477b es数据同步 2024-12-17 10:53:57 +08:00
2 changed files with 28 additions and 14 deletions

View File

@@ -43,7 +43,12 @@ public class PhpOnlineStudyRecordScheduledTasks {
@Resource @Resource
RestHighLevelClient restHighLevelClient; RestHighLevelClient restHighLevelClient;
// todo 定时、分批、数据库名 /**
* 定时同步PHP数据库数据到es中计划定时20分钟执行一次。具体实现是
* 第一步查询最近半小时内完成任一课件的课程学习获取的信息有课程ID学员ID学习状态、学习进度。
* 第二步根据上述得到的课程ID学员ID查询es中的未完成的记录。
* 第三步比对两者学习进度如果不一致以数据库学习记录为准修改ES
*/
@XxlJob("phpOnlineStudyRecordSyncEsTask") @XxlJob("phpOnlineStudyRecordSyncEsTask")
public List<String> phpOnlineStudyRecordSyncEs(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException { public List<String> phpOnlineStudyRecordSyncEs(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException {
@@ -101,6 +106,9 @@ public class PhpOnlineStudyRecordScheduledTasks {
// 获取ES中的数据 // 获取ES中的数据
List<CourseStudyDto> esDataList = getEsData(batch); List<CourseStudyDto> esDataList = getEsData(batch);
if (CollUtil.isEmpty(esDataList)){
continue;
}
// 更新ES数据 // 更新ES数据
List<CourseStudyDto> toBeUpdatedEs = esDataList.stream() List<CourseStudyDto> toBeUpdatedEs = esDataList.stream()
@@ -130,12 +138,13 @@ public class PhpOnlineStudyRecordScheduledTasks {
} }
// 调用批量更新方法 // 调用批量更新方法
// toBeUpdatedEs(toBeUpdatedEs, indexName); toBeUpdatedEs(toBeUpdatedEs);
} }
return tempResultList; return tempResultList;
} }
private List<CourseStudyDto> getEsData(List<PhpOnlineDto> finishedCourseList) throws IOException { private List<CourseStudyDto> getEsData(List<PhpOnlineDto> finishedCourseList) throws IOException {
log.info("待处理的数据:{}", finishedCourseList);
SearchRequest searchRequest = new SearchRequest("new_study_resource"); SearchRequest searchRequest = new SearchRequest("new_study_resource");
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
@@ -151,11 +160,13 @@ public class PhpOnlineStudyRecordScheduledTasks {
); );
} }
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(boolQuery).timeout(new TimeValue(60, TimeUnit.SECONDS)); if (!boolQuery.hasClauses()) {
searchRequest.source(sourceBuilder); return null;
if (boolQuery.hasClauses()) {
sourceBuilder.query(boolQuery);
} }
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
.query(boolQuery)
.timeout(new TimeValue(60, TimeUnit.SECONDS))
.size(finishedCourseList.size() + 10);
searchRequest.source(sourceBuilder); searchRequest.source(sourceBuilder);
SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
@@ -175,7 +186,7 @@ public class PhpOnlineStudyRecordScheduledTasks {
return courseStudyDtoList; return courseStudyDtoList;
} }
private void toBeUpdatedEs(List<CourseStudyDto> toBeUpdatedEsData, String indexName) { private void toBeUpdatedEs(List<CourseStudyDto> toBeUpdatedEsData) {
if (CollUtil.isEmpty(toBeUpdatedEsData)) { if (CollUtil.isEmpty(toBeUpdatedEsData)) {
return; return;
} }
@@ -186,7 +197,7 @@ public class PhpOnlineStudyRecordScheduledTasks {
docMap.put("progress", courseStudyDto.getProgress()); docMap.put("progress", courseStudyDto.getProgress());
// 创建更新请求并传入单一的docMap // 创建更新请求并传入单一的docMap
UpdateRequest updateRequest = new UpdateRequest(indexName, courseStudyDto.getId()) UpdateRequest updateRequest = new UpdateRequest("new_study_resource", courseStudyDto.getId())
.doc(docMap, XContentType.JSON); .doc(docMap, XContentType.JSON);
// 将请求添加到批量请求中 // 将请求添加到批量请求中
@@ -208,7 +219,11 @@ public class PhpOnlineStudyRecordScheduledTasks {
} }
} }
/**
* 第一步查询最近报名的学习获取的信息有课程ID学员ID学习状态、学习进度。
* 第二步根据上述得到的课程ID学员ID查询es中的未完成的记录。
* 第三步比对两者学习进度如果不一致以数据库学习记录为准修改ES。
*/
public List<String> phpOnlineStudyRecordSyncEsOfFull(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException { public List<String> phpOnlineStudyRecordSyncEsOfFull(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException {
log.info("开始同步PHP学习记录到ES"); log.info("开始同步PHP学习记录到ES");

View File

@@ -95,20 +95,20 @@ public class PhpOnlineCourseDao extends BaseDao<PhpOnlineDto> {
userBasicDataBase = "userbasic"; userBasicDataBase = "userbasic";
} }
List<String> userIds = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList()); List<String> userIdOfPhpList = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList());
List<Object[]> objectList2 = this.sqlFindList("select kid,user_id from " + userBasicDataBase + ".user_account where kid in (?1) and deleted=0 and account_status = 0", userIds); List<Object[]> objectList2 = this.sqlFindList("select kid,user_id from " + userBasicDataBase + ".user_account where kid in (?1) and deleted=0 and account_status = 0", userIdOfPhpList);
if (CollUtil.isEmpty(objectList1)) { if (CollUtil.isEmpty(objectList1)) {
log.info("新系统用户数据不存在"); log.info("新系统用户数据不存在");
return null; return null;
} }
Map<Object, Object> userIdToKidMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1])); Map<Object, Object> kidAndUserIdMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1]));
// 设置新系统用户ID // 设置新系统用户ID
recentLearnRecordList = recentLearnRecordList.stream() recentLearnRecordList = recentLearnRecordList.stream()
.map(phpOnlineDto -> { .map(phpOnlineDto -> {
Object userIdOfJavaObj = userIdToKidMap.get(phpOnlineDto.getUserIdOfPhp()); Object userIdOfJavaObj = kidAndUserIdMap.get(phpOnlineDto.getUserIdOfPhp());
if (userIdOfJavaObj != null) { if (userIdOfJavaObj != null) {
phpOnlineDto.setUserIdOfJava(userIdOfJavaObj.toString()); phpOnlineDto.setUserIdOfJava(userIdOfJavaObj.toString());
} }
@@ -209,7 +209,6 @@ public class PhpOnlineCourseDao extends BaseDao<PhpOnlineDto> {
} }
// 拼接获取所有新系统用户id这里不选择与上面的联表查询有效率问题 // 拼接获取所有新系统用户id这里不选择与上面的联表查询有效率问题
String userBasicDataBase; String userBasicDataBase;
if (activeProfile.equals("pro")) { if (activeProfile.equals("pro")) {
userBasicDataBase = "user_basic"; userBasicDataBase = "user_basic";