mirror of
https://codeup.aliyun.com/67762337eccfc218f6110e0e/per-boe/java-servers.git
synced 2025-12-19 15:56:53 +08:00
Merge branch 'refs/heads/zxwy-esSync' into dev1107
This commit is contained in:
@@ -21,10 +21,12 @@ import org.elasticsearch.index.query.QueryBuilders;
|
|||||||
import org.elasticsearch.search.SearchHit;
|
import org.elasticsearch.search.SearchHit;
|
||||||
import org.elasticsearch.search.SearchHits;
|
import org.elasticsearch.search.SearchHits;
|
||||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.math.BigDecimal;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.time.ZoneId;
|
import java.time.ZoneId;
|
||||||
import java.time.format.DateTimeFormatter;
|
import java.time.format.DateTimeFormatter;
|
||||||
@@ -42,97 +44,109 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
|||||||
@Resource
|
@Resource
|
||||||
RestHighLevelClient restHighLevelClient;
|
RestHighLevelClient restHighLevelClient;
|
||||||
|
|
||||||
|
@Value("${spring.profiles.active}")
|
||||||
|
private String activeProfile;
|
||||||
|
|
||||||
// todo 定时、分批、数据库名
|
// todo 定时、分批、数据库名
|
||||||
@XxlJob("phpOnlineStudyRecordSyncEs")
|
@XxlJob("phpOnlineStudyRecordSyncEsTask")
|
||||||
public void phpOnlineStudyRecordSyncEs(Long syncTimePoint) throws IOException {
|
public List<CourseStudyDto> phpOnlineStudyRecordSyncEs(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException {
|
||||||
String sql =
|
String sql =
|
||||||
"SELECT\n" +
|
"SELECT\n" +
|
||||||
" elc.kid AS courseId,\n" +
|
" elc.kid AS courseId,\n" +
|
||||||
" elcr.user_id AS userIdOfPhp,\n" +
|
" elcr.user_id AS userIdOfPhp,\n" +
|
||||||
" COUNT(1) AS modNum,\n" +
|
" ROUND((SUM(CASE WHEN elrc.kid IS NOT NULL THEN 1 ELSE 0 END) / COUNT(1)) * 100, 0) AS progress\n" +
|
||||||
" SUM(CASE WHEN elrc.kid IS NOT NULL THEN 1 ELSE 0 END) AS completeNum\n" +
|
"FROM\n" +
|
||||||
"FROM\n" +
|
" elearninglms.eln_ln_course elc\n" +
|
||||||
" elearninglms.eln_ln_course elc\n" +
|
" INNER JOIN elearninglms.eln_ln_course_reg elcr \n" +
|
||||||
" INNER JOIN elearninglms.eln_ln_course_reg elcr \n" +
|
" ON elc.kid = elcr.course_id\n" +
|
||||||
" ON elc.kid = elcr.course_id\n" +
|
" INNER JOIN (\n" +
|
||||||
" INNER JOIN (\n" +
|
" SELECT\n" +
|
||||||
" SELECT\n" +
|
" user_id,\n" +
|
||||||
" user_id,\n" +
|
" course_id \n" +
|
||||||
" course_id \n" +
|
" FROM\n" +
|
||||||
" FROM\n" +
|
" elearninglms.eln_ln_res_complete\n" +
|
||||||
" elearninglms.eln_ln_res_complete\n" +
|
" WHERE\n" +
|
||||||
" WHERE\n" +
|
" complete_type = '1'\n" +
|
||||||
" complete_type = '1'\n" +
|
" AND complete_status = '2'\n" +
|
||||||
" AND complete_status = '2'\n" +
|
" AND updated_at > ?1 AND updated_at < ?2\n" +
|
||||||
" AND updated_at > ?1\n" +
|
" AND is_deleted = 0\n" +
|
||||||
" AND is_deleted = 0\n" +
|
" GROUP BY\n" +
|
||||||
" GROUP BY\n" +
|
" user_id,\n" +
|
||||||
" user_id,\n" +
|
" course_id\n" +
|
||||||
" course_id\n" +
|
" ) recentFinishStuent \n" +
|
||||||
" ) recentFinishStuent \n" +
|
" ON recentFinishStuent.user_id = elcr.user_id \n" +
|
||||||
" ON recentFinishStuent.user_id = elcr.user_id \n" +
|
" AND recentFinishStuent.course_id = elcr.course_id\n" +
|
||||||
" AND recentFinishStuent.course_id = elcr.course_id\n" +
|
" INNER JOIN elearninglms.eln_ln_mod_res elms \n" +
|
||||||
" INNER JOIN elearninglms.eln_ln_mod_res elms \n" +
|
" ON elms.course_id = elcr.course_id\n" +
|
||||||
" ON elms.course_id = elcr.course_id\n" +
|
" LEFT JOIN elearninglms.eln_ln_res_complete elrc \n" +
|
||||||
" LEFT JOIN elearninglms.eln_ln_res_complete elrc \n" +
|
" ON elrc.mod_res_id = elms.kid\n" +
|
||||||
" ON elrc.mod_res_id = elms.kid\n" +
|
" AND elrc.user_id = elcr.user_id\n" +
|
||||||
" AND elrc.user_id = elcr.user_id\n" +
|
" AND elrc.complete_type = '1'\n" +
|
||||||
" AND elrc.complete_type = '1'\n" +
|
" AND elrc.complete_status = '2'\n" +
|
||||||
" AND elrc.complete_status = '2'\n" +
|
"WHERE\n" +
|
||||||
"WHERE\n" +
|
" elc.is_deleted = 0\n" +
|
||||||
" elc.is_deleted = 0\n" +
|
" AND elcr.is_deleted = 0\n" +
|
||||||
" AND elcr.is_deleted = 0\n" +
|
" AND elcr.reg_state = '1'\n" +
|
||||||
" AND elcr.reg_state = '1'\n" +
|
" AND elms.publish_status = '1'\n" +
|
||||||
" AND elms.publish_status = '1'\n" +
|
" AND elms.is_deleted = '0'\n" +
|
||||||
" AND elms.is_deleted = '0'\n" +
|
"GROUP BY\n" +
|
||||||
"GROUP BY\n" +
|
" elc.kid,\n" +
|
||||||
" elc.kid,\n" +
|
" elcr.user_id";
|
||||||
" elcr.user_id\n" +
|
|
||||||
"HAVING\n" +
|
|
||||||
" completeNum = modNum\n";
|
|
||||||
log.info("开始同步PHP学习记录到ES");
|
log.info("开始同步PHP学习记录到ES");
|
||||||
// 增量获取PHP中所有已完成的课程
|
// 增量获取PHP中所有已完成的课程
|
||||||
if (syncTimePoint == null) {
|
if (syncTimePointOfBegin == null || syncTimePointOfEnd == null) {
|
||||||
LocalDateTime halfAnHourAgo = LocalDateTime.now().minusMinutes(30);
|
LocalDateTime now = LocalDateTime.now();
|
||||||
syncTimePoint = halfAnHourAgo.atZone(ZoneId.systemDefault()).toInstant().getEpochSecond();
|
LocalDateTime halfAnHourAgo = now.minusMinutes(30);
|
||||||
|
syncTimePointOfBegin = halfAnHourAgo.atZone(ZoneId.systemDefault()).toInstant().getEpochSecond();
|
||||||
|
syncTimePointOfEnd = now.atZone(ZoneId.systemDefault()).toInstant().getEpochSecond();
|
||||||
|
|
||||||
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||||
log.info("同步时间点:{}", formatter.format(halfAnHourAgo));
|
log.info("同步时间起点:{}", formatter.format(halfAnHourAgo));
|
||||||
|
log.info("同步时间终点:{}", formatter.format(now));
|
||||||
}
|
}
|
||||||
List<Object[]> objectList1 = phpOnlineCourseDao.sqlFindList(sql, syncTimePoint);
|
List<Object[]> objectList1 = phpOnlineCourseDao.sqlFindList(sql, syncTimePointOfBegin,syncTimePointOfEnd);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if (CollUtil.isEmpty(objectList1)) {
|
if (CollUtil.isEmpty(objectList1)) {
|
||||||
log.info("没有找到已完成的数据");
|
log.info("没有找到已完成的数据");
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
String indexName = "new_study_resource";
|
String indexName = "new_study_resource";
|
||||||
|
|
||||||
List<PhpOnlineDto> finishedCourseList = new ArrayList<>();
|
List<PhpOnlineDto> recentLearnRecordList = new ArrayList<>();
|
||||||
for (Object[] objects : objectList1) {
|
for (Object[] objects : objectList1) {
|
||||||
String courseId = objects[0].toString();
|
String courseId = objects[0].toString();
|
||||||
String userIdOfPhp = objects[1].toString();
|
String userIdOfPhp = objects[1].toString();
|
||||||
|
Integer progress = ((BigDecimal) objects[2]).intValue();
|
||||||
PhpOnlineDto phpOnlineDto = new PhpOnlineDto();
|
PhpOnlineDto phpOnlineDto = new PhpOnlineDto();
|
||||||
phpOnlineDto.setCourseId(courseId);
|
phpOnlineDto.setCourseId(courseId);
|
||||||
phpOnlineDto.setUserIdOfPhp(userIdOfPhp);
|
phpOnlineDto.setUserIdOfPhp(userIdOfPhp);
|
||||||
finishedCourseList.add(phpOnlineDto);
|
phpOnlineDto.setProgress(progress);
|
||||||
|
recentLearnRecordList.add(phpOnlineDto);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 拼接获取所有新系统用户id,这里不选择与上面的联表查询,有效率问题
|
// 拼接获取所有新系统用户id,这里不选择与上面的联表查询,有效率问题
|
||||||
|
|
||||||
|
String userBasicDataBase;
|
||||||
|
if (activeProfile.equals("prod")) {
|
||||||
|
userBasicDataBase = "user_basic";
|
||||||
|
} else {
|
||||||
|
userBasicDataBase = "userbasic";
|
||||||
|
}
|
||||||
|
|
||||||
List<String> userIds = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList());
|
List<String> userIds = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList());
|
||||||
List<Object[]> objectList2 = phpOnlineCourseDao.sqlFindList("select kid,user_id from userbasic.user_account where kid in (?1) and deleted=0 and account_status = 0", userIds);
|
List<Object[]> objectList2 = phpOnlineCourseDao.sqlFindList("select kid,user_id from " + userBasicDataBase + ".user_account where kid in (?1) and deleted=0 and account_status = 0", userIds);
|
||||||
|
|
||||||
if (CollUtil.isEmpty(objectList1)) {
|
if (CollUtil.isEmpty(objectList1)) {
|
||||||
log.info("新系统用户数据不存在");
|
log.info("新系统用户数据不存在");
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<Object, Object> userIdToKidMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1]));
|
Map<Object, Object> userIdToKidMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1]));
|
||||||
|
|
||||||
// 设置新系统用户ID
|
// 设置新系统用户ID
|
||||||
finishedCourseList = finishedCourseList.stream()
|
recentLearnRecordList = recentLearnRecordList.stream()
|
||||||
.map(phpOnlineDto -> {
|
.map(phpOnlineDto -> {
|
||||||
Object userIdOfJavaObj = userIdToKidMap.get(phpOnlineDto.getUserIdOfPhp());
|
Object userIdOfJavaObj = userIdToKidMap.get(phpOnlineDto.getUserIdOfPhp());
|
||||||
if (userIdOfJavaObj != null) {
|
if (userIdOfJavaObj != null) {
|
||||||
@@ -144,28 +158,44 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
|||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
|
||||||
// 获取ES中没有完成的的课程学习记录
|
// 获取ES中的数据
|
||||||
List<CourseStudyDto> notFinishedCourseList = getEsData(finishedCourseList);
|
List<CourseStudyDto> esDataList = getEsData(recentLearnRecordList);
|
||||||
|
// 构建映射关系
|
||||||
|
Map<String, PhpOnlineDto> map = recentLearnRecordList.stream()
|
||||||
|
.collect(Collectors.toMap(
|
||||||
|
phpOnlineDto -> phpOnlineDto.getUserIdOfJava() + "-" + phpOnlineDto.getCourseId(),
|
||||||
|
phpOnlineDto -> phpOnlineDto
|
||||||
|
));
|
||||||
|
|
||||||
Set<String> finishedCourseSet = finishedCourseList.stream().map(e -> e.getUserIdOfJava() + "_" + e.getCourseId()).collect(Collectors.toSet());
|
List<CourseStudyDto> toBeUpdatedEs = esDataList.stream()
|
||||||
|
.map(esDataItem -> {
|
||||||
|
String key = esDataItem.getAccountId() + "-" + esDataItem.getCourseId();
|
||||||
|
PhpOnlineDto phpOnlineDto = map.get(key);
|
||||||
|
|
||||||
|
// 如果找到相应的 PhpOnlineDto 且进度有变化,则进行更新
|
||||||
|
if (phpOnlineDto != null && !esDataItem.getProgress().equals(phpOnlineDto.getProgress())) {
|
||||||
|
esDataItem.setProgress(phpOnlineDto.getProgress());
|
||||||
|
if (phpOnlineDto.getProgress() == 100){
|
||||||
|
esDataItem.setStatus(9);
|
||||||
|
} else {
|
||||||
|
esDataItem.setStatus(2);
|
||||||
|
}
|
||||||
|
return esDataItem;
|
||||||
|
}
|
||||||
|
return null; // 返回 null 表示不需要更新
|
||||||
|
})
|
||||||
|
.filter(Objects::nonNull) // 去掉返回为 null 的项
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
|
||||||
|
if (isOnlyRead != null && isOnlyRead == 1) {
|
||||||
|
return toBeUpdatedEs;
|
||||||
|
}
|
||||||
|
|
||||||
List<CourseStudyDto> toBeUpdatedEs = notFinishedCourseList.stream().filter(courseStudyDto -> {
|
|
||||||
if (finishedCourseSet.contains(courseStudyDto.getAccountId() + "_" + courseStudyDto.getCourseId())) {
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
// 有多种情况,es不存在学习记录,或者学习记录状态不是9
|
|
||||||
log.info("跳过:{}", courseStudyDto);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}).map(courseStudyDto -> {
|
|
||||||
courseStudyDto.setStatus(9);
|
|
||||||
courseStudyDto.setProgress(100);
|
|
||||||
return courseStudyDto;
|
|
||||||
}).collect(Collectors.toList());
|
|
||||||
|
|
||||||
// 更新ES中的未同步为完成的学习记录
|
// 更新ES中的未同步为完成的学习记录
|
||||||
toBeUpdatedEs(toBeUpdatedEs, indexName);
|
// toBeUpdatedEs(toBeUpdatedEs, indexName);
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<CourseStudyDto> getEsData(List<PhpOnlineDto> finishedCourseList) throws IOException {
|
private List<CourseStudyDto> getEsData(List<PhpOnlineDto> finishedCourseList) throws IOException {
|
||||||
|
|||||||
@@ -45,6 +45,7 @@ public class StudyCourseESApi extends ApiBaseController{
|
|||||||
@Resource
|
@Resource
|
||||||
private PhpOnlineStudyRecordScheduledTasks phpOnlineStudyRecordScheduledTasks;
|
private PhpOnlineStudyRecordScheduledTasks phpOnlineStudyRecordScheduledTasks;
|
||||||
|
|
||||||
|
|
||||||
@RequestMapping(value="/search",method = {RequestMethod.GET,RequestMethod.POST})
|
@RequestMapping(value="/search",method = {RequestMethod.GET,RequestMethod.POST})
|
||||||
public JsonResponse<PageList<CourseStudyDto>> search(Pagination page, CourseStudyDto dto){
|
public JsonResponse<PageList<CourseStudyDto>> search(Pagination page, CourseStudyDto dto){
|
||||||
if(search==null) {
|
if(search==null) {
|
||||||
@@ -169,8 +170,10 @@ public class StudyCourseESApi extends ApiBaseController{
|
|||||||
|
|
||||||
|
|
||||||
@PostMapping("/phpOnlineStudyRecordSyncEs")
|
@PostMapping("/phpOnlineStudyRecordSyncEs")
|
||||||
public JsonResponse<Boolean> phpOnlineStudyRecordSyncEs(Long syncTimePoint) throws IOException {
|
public JsonResponse<List<CourseStudyDto>> phpOnlineStudyRecordSyncEs(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException {
|
||||||
phpOnlineStudyRecordScheduledTasks.phpOnlineStudyRecordSyncEs(syncTimePoint);
|
List<CourseStudyDto> courseStudyDtoList = phpOnlineStudyRecordScheduledTasks.phpOnlineStudyRecordSyncEs(syncTimePointOfBegin,syncTimePointOfEnd, isOnlyRead);
|
||||||
return success(true);
|
return success(courseStudyDtoList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,4 +25,9 @@ public class PhpOnlineDto {
|
|||||||
* 课程名称
|
* 课程名称
|
||||||
*/
|
*/
|
||||||
private String userIdOfJava;
|
private String userIdOfJava;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 进度
|
||||||
|
*/
|
||||||
|
private Integer progress;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user