mirror of
https://codeup.aliyun.com/67762337eccfc218f6110e0e/per-boe/java-servers.git
synced 2025-12-09 02:46:50 +08:00
Compare commits
4 Commits
9f26b991a0
...
zxwy-esSyn
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6c750d2304 | ||
|
|
aab50a0ff0 | ||
|
|
9866618110 | ||
|
|
be18f7477b |
@@ -43,7 +43,12 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
|||||||
@Resource
|
@Resource
|
||||||
RestHighLevelClient restHighLevelClient;
|
RestHighLevelClient restHighLevelClient;
|
||||||
|
|
||||||
// todo 定时、分批、数据库名
|
/**
|
||||||
|
* 定时同步PHP数据库数据到es中,计划定时20分钟执行一次。具体实现是,
|
||||||
|
* 第一步,查询最近半小时内完成任一课件的课程学习,获取的信息有课程ID,学员ID,学习状态、学习进度。
|
||||||
|
* 第二步,根据上述得到的课程ID,学员ID,查询es中的未完成的记录。
|
||||||
|
* 第三步,比对两者学习进度,如果不一致,以数据库学习记录为准,修改ES
|
||||||
|
*/
|
||||||
@XxlJob("phpOnlineStudyRecordSyncEsTask")
|
@XxlJob("phpOnlineStudyRecordSyncEsTask")
|
||||||
public List<String> phpOnlineStudyRecordSyncEs(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException {
|
public List<String> phpOnlineStudyRecordSyncEs(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException {
|
||||||
|
|
||||||
@@ -101,6 +106,9 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
|||||||
// 获取ES中的数据
|
// 获取ES中的数据
|
||||||
List<CourseStudyDto> esDataList = getEsData(batch);
|
List<CourseStudyDto> esDataList = getEsData(batch);
|
||||||
|
|
||||||
|
if (CollUtil.isEmpty(esDataList)){
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// 更新ES数据
|
// 更新ES数据
|
||||||
List<CourseStudyDto> toBeUpdatedEs = esDataList.stream()
|
List<CourseStudyDto> toBeUpdatedEs = esDataList.stream()
|
||||||
@@ -130,12 +138,13 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 调用批量更新方法
|
// 调用批量更新方法
|
||||||
// toBeUpdatedEs(toBeUpdatedEs, indexName);
|
toBeUpdatedEs(toBeUpdatedEs);
|
||||||
}
|
}
|
||||||
return tempResultList;
|
return tempResultList;
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<CourseStudyDto> getEsData(List<PhpOnlineDto> finishedCourseList) throws IOException {
|
private List<CourseStudyDto> getEsData(List<PhpOnlineDto> finishedCourseList) throws IOException {
|
||||||
|
log.info("待处理的数据:{}", finishedCourseList);
|
||||||
SearchRequest searchRequest = new SearchRequest("new_study_resource");
|
SearchRequest searchRequest = new SearchRequest("new_study_resource");
|
||||||
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
|
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
|
||||||
|
|
||||||
@@ -151,11 +160,13 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(boolQuery).timeout(new TimeValue(60, TimeUnit.SECONDS));
|
if (!boolQuery.hasClauses()) {
|
||||||
searchRequest.source(sourceBuilder);
|
return null;
|
||||||
if (boolQuery.hasClauses()) {
|
|
||||||
sourceBuilder.query(boolQuery);
|
|
||||||
}
|
}
|
||||||
|
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
|
||||||
|
.query(boolQuery)
|
||||||
|
.timeout(new TimeValue(60, TimeUnit.SECONDS))
|
||||||
|
.size(finishedCourseList.size() + 10);
|
||||||
searchRequest.source(sourceBuilder);
|
searchRequest.source(sourceBuilder);
|
||||||
|
|
||||||
SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
|
SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
|
||||||
@@ -175,7 +186,7 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
|||||||
return courseStudyDtoList;
|
return courseStudyDtoList;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void toBeUpdatedEs(List<CourseStudyDto> toBeUpdatedEsData, String indexName) {
|
private void toBeUpdatedEs(List<CourseStudyDto> toBeUpdatedEsData) {
|
||||||
if (CollUtil.isEmpty(toBeUpdatedEsData)) {
|
if (CollUtil.isEmpty(toBeUpdatedEsData)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -186,7 +197,7 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
|||||||
docMap.put("progress", courseStudyDto.getProgress());
|
docMap.put("progress", courseStudyDto.getProgress());
|
||||||
|
|
||||||
// 创建更新请求,并传入单一的docMap
|
// 创建更新请求,并传入单一的docMap
|
||||||
UpdateRequest updateRequest = new UpdateRequest(indexName, courseStudyDto.getId())
|
UpdateRequest updateRequest = new UpdateRequest("new_study_resource", courseStudyDto.getId())
|
||||||
.doc(docMap, XContentType.JSON);
|
.doc(docMap, XContentType.JSON);
|
||||||
|
|
||||||
// 将请求添加到批量请求中
|
// 将请求添加到批量请求中
|
||||||
@@ -208,7 +219,11 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 第一步,查询最近报名的学习,获取的信息有课程ID,学员ID,学习状态、学习进度。
|
||||||
|
* 第二步,根据上述得到的课程ID,学员ID,查询es中的未完成的记录。
|
||||||
|
* 第三步,比对两者学习进度,如果不一致,以数据库学习记录为准,修改ES。
|
||||||
|
*/
|
||||||
public List<String> phpOnlineStudyRecordSyncEsOfFull(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException {
|
public List<String> phpOnlineStudyRecordSyncEsOfFull(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException {
|
||||||
|
|
||||||
log.info("开始同步PHP学习记录到ES");
|
log.info("开始同步PHP学习记录到ES");
|
||||||
|
|||||||
@@ -95,20 +95,20 @@ public class PhpOnlineCourseDao extends BaseDao<PhpOnlineDto> {
|
|||||||
userBasicDataBase = "userbasic";
|
userBasicDataBase = "userbasic";
|
||||||
}
|
}
|
||||||
|
|
||||||
List<String> userIds = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList());
|
List<String> userIdOfPhpList = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList());
|
||||||
List<Object[]> objectList2 = this.sqlFindList("select kid,user_id from " + userBasicDataBase + ".user_account where kid in (?1) and deleted=0 and account_status = 0", userIds);
|
List<Object[]> objectList2 = this.sqlFindList("select kid,user_id from " + userBasicDataBase + ".user_account where kid in (?1) and deleted=0 and account_status = 0", userIdOfPhpList);
|
||||||
|
|
||||||
if (CollUtil.isEmpty(objectList1)) {
|
if (CollUtil.isEmpty(objectList1)) {
|
||||||
log.info("新系统用户数据不存在");
|
log.info("新系统用户数据不存在");
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<Object, Object> userIdToKidMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1]));
|
Map<Object, Object> kidAndUserIdMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1]));
|
||||||
|
|
||||||
// 设置新系统用户ID
|
// 设置新系统用户ID
|
||||||
recentLearnRecordList = recentLearnRecordList.stream()
|
recentLearnRecordList = recentLearnRecordList.stream()
|
||||||
.map(phpOnlineDto -> {
|
.map(phpOnlineDto -> {
|
||||||
Object userIdOfJavaObj = userIdToKidMap.get(phpOnlineDto.getUserIdOfPhp());
|
Object userIdOfJavaObj = kidAndUserIdMap.get(phpOnlineDto.getUserIdOfPhp());
|
||||||
if (userIdOfJavaObj != null) {
|
if (userIdOfJavaObj != null) {
|
||||||
phpOnlineDto.setUserIdOfJava(userIdOfJavaObj.toString());
|
phpOnlineDto.setUserIdOfJava(userIdOfJavaObj.toString());
|
||||||
}
|
}
|
||||||
@@ -209,7 +209,6 @@ public class PhpOnlineCourseDao extends BaseDao<PhpOnlineDto> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 拼接获取所有新系统用户id,这里不选择与上面的联表查询,有效率问题
|
// 拼接获取所有新系统用户id,这里不选择与上面的联表查询,有效率问题
|
||||||
|
|
||||||
String userBasicDataBase;
|
String userBasicDataBase;
|
||||||
if (activeProfile.equals("pro")) {
|
if (activeProfile.equals("pro")) {
|
||||||
userBasicDataBase = "user_basic";
|
userBasicDataBase = "user_basic";
|
||||||
|
|||||||
Reference in New Issue
Block a user