mirror of
https://codeup.aliyun.com/67762337eccfc218f6110e0e/per-boe/java-servers.git
synced 2025-12-07 01:46:47 +08:00
Compare commits
4 Commits
release_20
...
zxwy-esSyn
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6c750d2304 | ||
|
|
aab50a0ff0 | ||
|
|
9866618110 | ||
|
|
be18f7477b |
@@ -43,7 +43,12 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
||||
@Resource
|
||||
RestHighLevelClient restHighLevelClient;
|
||||
|
||||
// todo 定时、分批、数据库名
|
||||
/**
|
||||
* 定时同步PHP数据库数据到es中,计划定时20分钟执行一次。具体实现是,
|
||||
* 第一步,查询最近半小时内完成任一课件的课程学习,获取的信息有课程ID,学员ID,学习状态、学习进度。
|
||||
* 第二步,根据上述得到的课程ID,学员ID,查询es中的未完成的记录。
|
||||
* 第三步,比对两者学习进度,如果不一致,以数据库学习记录为准,修改ES
|
||||
*/
|
||||
@XxlJob("phpOnlineStudyRecordSyncEsTask")
|
||||
public List<String> phpOnlineStudyRecordSyncEs(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException {
|
||||
|
||||
@@ -101,6 +106,9 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
||||
// 获取ES中的数据
|
||||
List<CourseStudyDto> esDataList = getEsData(batch);
|
||||
|
||||
if (CollUtil.isEmpty(esDataList)){
|
||||
continue;
|
||||
}
|
||||
|
||||
// 更新ES数据
|
||||
List<CourseStudyDto> toBeUpdatedEs = esDataList.stream()
|
||||
@@ -130,12 +138,13 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
||||
}
|
||||
|
||||
// 调用批量更新方法
|
||||
// toBeUpdatedEs(toBeUpdatedEs, indexName);
|
||||
toBeUpdatedEs(toBeUpdatedEs);
|
||||
}
|
||||
return tempResultList;
|
||||
}
|
||||
|
||||
private List<CourseStudyDto> getEsData(List<PhpOnlineDto> finishedCourseList) throws IOException {
|
||||
log.info("待处理的数据:{}", finishedCourseList);
|
||||
SearchRequest searchRequest = new SearchRequest("new_study_resource");
|
||||
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
|
||||
|
||||
@@ -151,11 +160,13 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
||||
);
|
||||
}
|
||||
|
||||
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(boolQuery).timeout(new TimeValue(60, TimeUnit.SECONDS));
|
||||
searchRequest.source(sourceBuilder);
|
||||
if (boolQuery.hasClauses()) {
|
||||
sourceBuilder.query(boolQuery);
|
||||
if (!boolQuery.hasClauses()) {
|
||||
return null;
|
||||
}
|
||||
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
|
||||
.query(boolQuery)
|
||||
.timeout(new TimeValue(60, TimeUnit.SECONDS))
|
||||
.size(finishedCourseList.size() + 10);
|
||||
searchRequest.source(sourceBuilder);
|
||||
|
||||
SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
|
||||
@@ -175,7 +186,7 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
||||
return courseStudyDtoList;
|
||||
}
|
||||
|
||||
private void toBeUpdatedEs(List<CourseStudyDto> toBeUpdatedEsData, String indexName) {
|
||||
private void toBeUpdatedEs(List<CourseStudyDto> toBeUpdatedEsData) {
|
||||
if (CollUtil.isEmpty(toBeUpdatedEsData)) {
|
||||
return;
|
||||
}
|
||||
@@ -186,7 +197,7 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
||||
docMap.put("progress", courseStudyDto.getProgress());
|
||||
|
||||
// 创建更新请求,并传入单一的docMap
|
||||
UpdateRequest updateRequest = new UpdateRequest(indexName, courseStudyDto.getId())
|
||||
UpdateRequest updateRequest = new UpdateRequest("new_study_resource", courseStudyDto.getId())
|
||||
.doc(docMap, XContentType.JSON);
|
||||
|
||||
// 将请求添加到批量请求中
|
||||
@@ -208,7 +219,11 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 第一步,查询最近报名的学习,获取的信息有课程ID,学员ID,学习状态、学习进度。
|
||||
* 第二步,根据上述得到的课程ID,学员ID,查询es中的未完成的记录。
|
||||
* 第三步,比对两者学习进度,如果不一致,以数据库学习记录为准,修改ES。
|
||||
*/
|
||||
public List<String> phpOnlineStudyRecordSyncEsOfFull(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException {
|
||||
|
||||
log.info("开始同步PHP学习记录到ES");
|
||||
|
||||
@@ -95,20 +95,20 @@ public class PhpOnlineCourseDao extends BaseDao<PhpOnlineDto> {
|
||||
userBasicDataBase = "userbasic";
|
||||
}
|
||||
|
||||
List<String> userIds = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList());
|
||||
List<Object[]> objectList2 = this.sqlFindList("select kid,user_id from " + userBasicDataBase + ".user_account where kid in (?1) and deleted=0 and account_status = 0", userIds);
|
||||
List<String> userIdOfPhpList = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList());
|
||||
List<Object[]> objectList2 = this.sqlFindList("select kid,user_id from " + userBasicDataBase + ".user_account where kid in (?1) and deleted=0 and account_status = 0", userIdOfPhpList);
|
||||
|
||||
if (CollUtil.isEmpty(objectList1)) {
|
||||
log.info("新系统用户数据不存在");
|
||||
return null;
|
||||
}
|
||||
|
||||
Map<Object, Object> userIdToKidMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1]));
|
||||
Map<Object, Object> kidAndUserIdMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1]));
|
||||
|
||||
// 设置新系统用户ID
|
||||
recentLearnRecordList = recentLearnRecordList.stream()
|
||||
.map(phpOnlineDto -> {
|
||||
Object userIdOfJavaObj = userIdToKidMap.get(phpOnlineDto.getUserIdOfPhp());
|
||||
Object userIdOfJavaObj = kidAndUserIdMap.get(phpOnlineDto.getUserIdOfPhp());
|
||||
if (userIdOfJavaObj != null) {
|
||||
phpOnlineDto.setUserIdOfJava(userIdOfJavaObj.toString());
|
||||
}
|
||||
@@ -209,7 +209,6 @@ public class PhpOnlineCourseDao extends BaseDao<PhpOnlineDto> {
|
||||
}
|
||||
|
||||
// 拼接获取所有新系统用户id,这里不选择与上面的联表查询,有效率问题
|
||||
|
||||
String userBasicDataBase;
|
||||
if (activeProfile.equals("pro")) {
|
||||
userBasicDataBase = "user_basic";
|
||||
|
||||
Reference in New Issue
Block a user