Compare commits

..

9 Commits

Author SHA1 Message Date
yang
6c750d2304 es数据同步 2024-12-19 16:39:36 +08:00
yang
aab50a0ff0 es数据同步 2024-12-18 15:26:17 +08:00
yang
9866618110 es数据同步 2024-12-18 13:39:21 +08:00
yang
be18f7477b es数据同步 2024-12-17 10:53:57 +08:00
yang
e412f5b6eb es数据同步,全量、分页 2024-12-15 20:27:47 +08:00
yang
255b854a08 es数据同步,全量、分页 2024-12-15 19:37:22 +08:00
yang
3343474f33 es数据同步 2024-12-13 20:52:11 +08:00
yang
cb8a333b4f es数据同步 2024-12-13 20:20:49 +08:00
yang
09f64bdb08 es数据同步 2024-12-12 22:21:06 +08:00
4 changed files with 425 additions and 118 deletions

View File

@@ -7,6 +7,7 @@ import com.xboe.school.study.dao.PhpOnlineCourseDao;
import com.xboe.school.study.dto.PhpOnlineDto;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchRequest;
@@ -42,138 +43,113 @@ public class PhpOnlineStudyRecordScheduledTasks {
@Resource
RestHighLevelClient restHighLevelClient;
/**
* 定时同步PHP数据库数据到es中计划定时20分钟执行一次。具体实现是
* 第一步查询最近半小时内完成任一课件的课程学习获取的信息有课程ID学员ID学习状态、学习进度。
* 第二步根据上述得到的课程ID学员ID查询es中的未完成的记录。
* 第三步比对两者学习进度如果不一致以数据库学习记录为准修改ES
*/
@XxlJob("phpOnlineStudyRecordSyncEsTask")
public List<String> phpOnlineStudyRecordSyncEs(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException {
// todo 定时、分批、数据库名
@XxlJob("phpOnlineStudyRecordSyncEs")
public void phpOnlineStudyRecordSyncEs(Long syncTimePoint) throws IOException {
String sql =
"SELECT\n" +
" elc.kid AS courseId,\n" +
" elcr.user_id AS userIdOfPhp,\n" +
" COUNT(1) AS modNum,\n" +
" SUM(CASE WHEN elrc.kid IS NOT NULL THEN 1 ELSE 0 END) AS completeNum\n" +
"FROM\n" +
" elearninglms.eln_ln_course elc\n" +
" INNER JOIN elearninglms.eln_ln_course_reg elcr \n" +
" ON elc.kid = elcr.course_id\n" +
" INNER JOIN (\n" +
" SELECT\n" +
" user_id,\n" +
" course_id \n" +
" FROM\n" +
" elearninglms.eln_ln_res_complete\n" +
" WHERE\n" +
" complete_type = '1'\n" +
" AND complete_status = '2'\n" +
" AND updated_at > ?1\n" +
" AND is_deleted = 0\n" +
" GROUP BY\n" +
" user_id,\n" +
" course_id\n" +
" ) recentFinishStuent \n" +
" ON recentFinishStuent.user_id = elcr.user_id \n" +
" AND recentFinishStuent.course_id = elcr.course_id\n" +
" INNER JOIN elearninglms.eln_ln_mod_res elms \n" +
" ON elms.course_id = elcr.course_id\n" +
" LEFT JOIN elearninglms.eln_ln_res_complete elrc \n" +
" ON elrc.mod_res_id = elms.kid\n" +
" AND elrc.user_id = elcr.user_id\n" +
" AND elrc.complete_type = '1'\n" +
" AND elrc.complete_status = '2'\n" +
"WHERE\n" +
" elc.is_deleted = 0\n" +
" AND elcr.is_deleted = 0\n" +
" AND elcr.reg_state = '1'\n" +
" AND elms.publish_status = '1'\n" +
" AND elms.is_deleted = '0'\n" +
"GROUP BY\n" +
" elc.kid,\n" +
" elcr.user_id\n" +
"HAVING\n" +
" completeNum = modNum\n";
log.info("开始同步PHP学习记录到ES");
// 增量获取PHP中所有已完成的课程
if (syncTimePoint == null) {
LocalDateTime halfAnHourAgo = LocalDateTime.now().minusMinutes(30);
syncTimePoint = halfAnHourAgo.atZone(ZoneId.systemDefault()).toInstant().getEpochSecond();
if (syncTimePointOfBegin == null || syncTimePointOfEnd == null) {
LocalDateTime now = LocalDateTime.now();
LocalDateTime halfAnHourAgo = now.minusMinutes(30);
syncTimePointOfBegin = halfAnHourAgo.atZone(ZoneId.systemDefault()).toInstant().getEpochSecond();
syncTimePointOfEnd = now.atZone(ZoneId.systemDefault()).toInstant().getEpochSecond();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
log.info("同步时间点:{}", formatter.format(halfAnHourAgo));
}
List<Object[]> objectList1 = phpOnlineCourseDao.sqlFindList(sql, syncTimePoint);
if (CollUtil.isEmpty(objectList1)) {
log.info("没有找到已完成的数据");
return;
log.info("同步时间点:{}", formatter.format(halfAnHourAgo));
log.info("同步时间终点:{}", formatter.format(now));
}
String indexName = "new_study_resource";
int pageSize = 1000;
int totalRecordsCount = phpOnlineCourseDao.selectRecentLearnRecordListOfCount(syncTimePointOfBegin, syncTimePointOfEnd); // 总记录数
List<PhpOnlineDto> finishedCourseList = new ArrayList<>();
for (Object[] objects : objectList1) {
String courseId = objects[0].toString();
String userIdOfPhp = objects[1].toString();
PhpOnlineDto phpOnlineDto = new PhpOnlineDto();
phpOnlineDto.setCourseId(courseId);
phpOnlineDto.setUserIdOfPhp(userIdOfPhp);
finishedCourseList.add(phpOnlineDto);
int totalPages = (int) Math.ceil((double) totalRecordsCount / pageSize);
ArrayList<String> tempResultList = new ArrayList<>();
for (int pageNumber = 1; pageNumber <= totalPages; pageNumber++) {
int offset = (pageNumber - 1) * pageSize;
List<PhpOnlineDto> pageData = phpOnlineCourseDao.selectRecentLearnRecordList(syncTimePointOfBegin, syncTimePointOfEnd, offset, pageSize);
ArrayList<String> singleResultList = esDataHandle(isOnlyRead, pageData);
tempResultList.addAll(singleResultList);
}
// 拼接获取所有新系统用户id这里不选择与上面的联表查询有效率问题
List<String> userIds = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList());
List<Object[]> objectList2 = phpOnlineCourseDao.sqlFindList("select kid,user_id from userbasic.user_account where kid in (?1) and deleted=0 and account_status = 0", userIds);
return tempResultList;
}
if (CollUtil.isEmpty(objectList1)) {
log.info("新系统用户数据不存在");
return;
private ArrayList<String> esDataHandle(Integer isOnlyRead, List<PhpOnlineDto> recentLearnRecordList) throws IOException {
ArrayList<String> tempResultList = new ArrayList<>();
// 将数据分批每批50条
int batchSize = 50;
int totalSize = recentLearnRecordList.size();
List<List<PhpOnlineDto>> batches = new ArrayList<>();
for (int i = 0; i < totalSize; i += batchSize) {
int end = Math.min(i + batchSize, totalSize);
batches.add(recentLearnRecordList.subList(i, end));
}
Map<Object, Object> userIdToKidMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1]));
for (List<PhpOnlineDto> batch : batches) {
// 构建映射关系
Map<String, PhpOnlineDto> map = batch.stream()
.collect(Collectors.toMap(
phpOnlineDto -> phpOnlineDto.getUserIdOfJava() + "-" + phpOnlineDto.getCourseId(),
phpOnlineDto -> phpOnlineDto
));
// 设置新系统用户ID
finishedCourseList = finishedCourseList.stream()
.map(phpOnlineDto -> {
Object userIdOfJavaObj = userIdToKidMap.get(phpOnlineDto.getUserIdOfPhp());
if (userIdOfJavaObj != null) {
phpOnlineDto.setUserIdOfJava(userIdOfJavaObj.toString());
}
return phpOnlineDto;
})
.filter(phpOnlineDto -> phpOnlineDto.getUserIdOfJava() != null)
.collect(Collectors.toList());
// 获取ES中的数据
List<CourseStudyDto> esDataList = getEsData(batch);
// 获取ES中没有完成的的课程学习记录
List<CourseStudyDto> notFinishedCourseList = getEsData(finishedCourseList);
Set<String> finishedCourseSet = finishedCourseList.stream().map(e -> e.getUserIdOfJava() + "_" + e.getCourseId()).collect(Collectors.toSet());
List<CourseStudyDto> toBeUpdatedEs = notFinishedCourseList.stream().filter(courseStudyDto -> {
if (finishedCourseSet.contains(courseStudyDto.getAccountId() + "_" + courseStudyDto.getCourseId())) {
return true;
} else {
// 有多种情况es不存在学习记录或者学习记录状态不是9
log.info("跳过:{}", courseStudyDto);
return false;
if (CollUtil.isEmpty(esDataList)){
continue;
}
}).map(courseStudyDto -> {
courseStudyDto.setStatus(9);
courseStudyDto.setProgress(100);
return courseStudyDto;
}).collect(Collectors.toList());
// 更新ES中的未同步为完成的学习记录
toBeUpdatedEs(toBeUpdatedEs, indexName);
// 更新ES数据
List<CourseStudyDto> toBeUpdatedEs = esDataList.stream()
.map(esDataItem -> {
String key = esDataItem.getAccountId() + "-" + esDataItem.getCourseId();
PhpOnlineDto phpOnlineDto = map.get(key);
// 如果找到相应的 PhpOnlineDto 且进度有变化,则进行更新
if (phpOnlineDto != null && !esDataItem.getProgress().equals(phpOnlineDto.getProgress())) {
esDataItem.setProgress(phpOnlineDto.getProgress());
if (phpOnlineDto.getProgress() == 100) {
esDataItem.setStatus(9); // 完成
} else {
esDataItem.setStatus(2); // 进行中
}
return esDataItem;
}
return null; // 返回 null 表示不需要更新
})
.filter(Objects::nonNull) // 去掉返回为 null 的项
.collect(Collectors.toList());
tempResultList.addAll(toBeUpdatedEs.stream().map(esDataItem -> esDataItem.getId()).collect(Collectors.toList()));
if (isOnlyRead != null && isOnlyRead == 1) {
continue;
}
// 调用批量更新方法
toBeUpdatedEs(toBeUpdatedEs);
}
return tempResultList;
}
private List<CourseStudyDto> getEsData(List<PhpOnlineDto> finishedCourseList) throws IOException {
log.info("待处理的数据:{}", finishedCourseList);
SearchRequest searchRequest = new SearchRequest("new_study_resource");
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
for (PhpOnlineDto phpOnlineDto : finishedCourseList) {
if (phpOnlineDto.getUserIdOfJava() == null || phpOnlineDto.getCourseId() == null) {
if (StringUtils.isBlank(phpOnlineDto.getUserIdOfJava()) || StringUtils.isBlank(phpOnlineDto.getCourseId())) {
continue;
}
@@ -184,11 +160,13 @@ public class PhpOnlineStudyRecordScheduledTasks {
);
}
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(boolQuery).timeout(new TimeValue(60, TimeUnit.SECONDS));
searchRequest.source(sourceBuilder);
if (boolQuery.hasClauses()) {
sourceBuilder.query(boolQuery);
if (!boolQuery.hasClauses()) {
return null;
}
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
.query(boolQuery)
.timeout(new TimeValue(60, TimeUnit.SECONDS))
.size(finishedCourseList.size() + 10);
searchRequest.source(sourceBuilder);
SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
@@ -208,7 +186,7 @@ public class PhpOnlineStudyRecordScheduledTasks {
return courseStudyDtoList;
}
private void toBeUpdatedEs(List<CourseStudyDto> toBeUpdatedEsData, String indexName) {
private void toBeUpdatedEs(List<CourseStudyDto> toBeUpdatedEsData) {
if (CollUtil.isEmpty(toBeUpdatedEsData)) {
return;
}
@@ -219,7 +197,7 @@ public class PhpOnlineStudyRecordScheduledTasks {
docMap.put("progress", courseStudyDto.getProgress());
// 创建更新请求并传入单一的docMap
UpdateRequest updateRequest = new UpdateRequest(indexName, courseStudyDto.getId())
UpdateRequest updateRequest = new UpdateRequest("new_study_resource", courseStudyDto.getId())
.doc(docMap, XContentType.JSON);
// 将请求添加到批量请求中
@@ -241,5 +219,42 @@ public class PhpOnlineStudyRecordScheduledTasks {
}
}
/**
* 第一步查询最近报名的学习获取的信息有课程ID学员ID学习状态、学习进度。
* 第二步根据上述得到的课程ID学员ID查询es中的未完成的记录。
* 第三步比对两者学习进度如果不一致以数据库学习记录为准修改ES。
*/
public List<String> phpOnlineStudyRecordSyncEsOfFull(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException {
log.info("开始同步PHP学习记录到ES");
// 增量获取PHP中所有已完成的课程
if (syncTimePointOfBegin == null || syncTimePointOfEnd == null) {
LocalDateTime now = LocalDateTime.now();
LocalDateTime halfAnHourAgo = now.minusMinutes(30);
syncTimePointOfBegin = halfAnHourAgo.atZone(ZoneId.systemDefault()).toInstant().getEpochSecond();
syncTimePointOfEnd = now.atZone(ZoneId.systemDefault()).toInstant().getEpochSecond();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
log.info("同步时间起点:{}", formatter.format(halfAnHourAgo));
log.info("同步时间终点:{}", formatter.format(now));
}
int pageSize = 1000;
int totalRecordsCount = phpOnlineCourseDao.selectRecentRegRecordListOfCount(syncTimePointOfBegin, syncTimePointOfEnd); // 总记录数
int totalPages = (int) Math.ceil((double) totalRecordsCount / pageSize);
ArrayList<String> tempResultList = new ArrayList<>();
for (int pageNumber = 1; pageNumber <= totalPages; pageNumber++) {
int offset = (pageNumber - 1) * pageSize;
List<PhpOnlineDto> pageData = phpOnlineCourseDao.selectRecentRegRecordList(syncTimePointOfBegin, syncTimePointOfEnd, offset, pageSize);
ArrayList<String> singleResultList = esDataHandle(isOnlyRead, pageData);
tempResultList.addAll(singleResultList);
}
return tempResultList;
}
}

View File

@@ -44,6 +44,7 @@ public class StudyCourseESApi extends ApiBaseController{
@Resource
private PhpOnlineStudyRecordScheduledTasks phpOnlineStudyRecordScheduledTasks;
@RequestMapping(value="/search",method = {RequestMethod.GET,RequestMethod.POST})
public JsonResponse<PageList<CourseStudyDto>> search(Pagination page, CourseStudyDto dto){
@@ -167,10 +168,25 @@ public class StudyCourseESApi extends ApiBaseController{
return success(true);
}
/**
*
* @param syncTimePointOfBegin
* @param syncTimePointOfEnd
* @param isOnlyRead
* @return
* @throws IOException
*/
@PostMapping("/phpOnlineStudyRecordSyncEs")
public JsonResponse<Boolean> phpOnlineStudyRecordSyncEs(Long syncTimePoint) throws IOException {
phpOnlineStudyRecordScheduledTasks.phpOnlineStudyRecordSyncEs(syncTimePoint);
return success(true);
public JsonResponse<List<String>> phpOnlineStudyRecordSyncEs(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException {
List<String> courseStudyDtoList = phpOnlineStudyRecordScheduledTasks.phpOnlineStudyRecordSyncEs(syncTimePointOfBegin, syncTimePointOfEnd, isOnlyRead);
return success(courseStudyDtoList);
}
@PostMapping("/phpOnlineStudyRecordSyncEsOfFull")
public JsonResponse<List<String>> phpOnlineStudyRecordSyncEsOfFull(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException {
List<String> courseStudyDtoList = phpOnlineStudyRecordScheduledTasks.phpOnlineStudyRecordSyncEsOfFull(syncTimePointOfBegin, syncTimePointOfEnd, isOnlyRead);
return success(courseStudyDtoList);
}
}

View File

@@ -1,9 +1,280 @@
package com.xboe.school.study.dao;
import cn.hutool.core.collection.CollUtil;
import com.xboe.core.orm.BaseDao;
import com.xboe.school.study.dto.PhpOnlineDto;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Repository;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Repository
@Slf4j
public class PhpOnlineCourseDao extends BaseDao<PhpOnlineDto> {
@Value("${spring.profiles.active}")
private String activeProfile;
public List<PhpOnlineDto> selectRecentLearnRecordList(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer offset, Integer limit) {
String sql =
"SELECT\n" +
" elc.kid AS courseId,\n" +
" elcr.user_id AS userIdOfPhp,\n" +
" ROUND((SUM(CASE WHEN elrc.kid IS NOT NULL THEN 1 ELSE 0 END) / COUNT(1)) * 100, 0) AS progress\n" +
"FROM\n" +
" elearninglms.eln_ln_course elc\n" +
" INNER JOIN elearninglms.eln_ln_course_reg elcr \n" +
" ON elc.kid = elcr.course_id\n" +
" INNER JOIN (\n" +
" SELECT\n" +
" user_id,\n" +
" course_id \n" +
" FROM\n" +
" elearninglms.eln_ln_res_complete\n" +
" WHERE\n" +
" complete_type = '1'\n" +
" AND complete_status = '2'\n" +
" AND updated_at > ?1 AND updated_at < ?2\n" +
" AND is_deleted = 0\n" +
" GROUP BY\n" +
" user_id,\n" +
" course_id\n" +
" ) recentFinishStuent \n" +
" ON recentFinishStuent.user_id = elcr.user_id \n" +
" AND recentFinishStuent.course_id = elcr.course_id\n" +
" INNER JOIN elearninglms.eln_ln_mod_res elms \n" +
" ON elms.course_id = elcr.course_id\n" +
" LEFT JOIN elearninglms.eln_ln_res_complete elrc \n" +
" ON elrc.mod_res_id = elms.kid\n" +
" AND elrc.user_id = elcr.user_id\n" +
" AND elrc.complete_type = '1'\n" +
" AND elrc.complete_status = '2'\n" +
"WHERE\n" +
" elc.is_deleted = 0\n" +
" AND elcr.is_deleted = 0\n" +
" AND elcr.reg_state = '1'\n" +
" AND elms.publish_status = '1'\n" +
" AND elms.is_deleted = '0'\n" +
"GROUP BY\n" +
" elc.kid,\n" +
" elcr.user_id\n" +
"LIMIT " + offset + "," + limit;
List<Object[]> objectList1 = this.sqlFindList(sql, syncTimePointOfBegin, syncTimePointOfEnd);
if (CollUtil.isEmpty(objectList1)) {
log.info("没有找到已完成的数据");
return null;
}
List<PhpOnlineDto> recentLearnRecordList = new ArrayList<>();
for (Object[] objects : objectList1) {
String courseId = objects[0].toString();
String userIdOfPhp = objects[1].toString();
Integer progress = ((BigDecimal) objects[2]).intValue();
PhpOnlineDto phpOnlineDto = new PhpOnlineDto();
phpOnlineDto.setCourseId(courseId);
phpOnlineDto.setUserIdOfPhp(userIdOfPhp);
phpOnlineDto.setProgress(progress);
recentLearnRecordList.add(phpOnlineDto);
}
// 拼接获取所有新系统用户id这里不选择与上面的联表查询有效率问题
String userBasicDataBase;
if (activeProfile.equals("pro")) {
userBasicDataBase = "user_basic";
} else {
userBasicDataBase = "userbasic";
}
List<String> userIdOfPhpList = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList());
List<Object[]> objectList2 = this.sqlFindList("select kid,user_id from " + userBasicDataBase + ".user_account where kid in (?1) and deleted=0 and account_status = 0", userIdOfPhpList);
if (CollUtil.isEmpty(objectList1)) {
log.info("新系统用户数据不存在");
return null;
}
Map<Object, Object> kidAndUserIdMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1]));
// 设置新系统用户ID
recentLearnRecordList = recentLearnRecordList.stream()
.map(phpOnlineDto -> {
Object userIdOfJavaObj = kidAndUserIdMap.get(phpOnlineDto.getUserIdOfPhp());
if (userIdOfJavaObj != null) {
phpOnlineDto.setUserIdOfJava(userIdOfJavaObj.toString());
}
return phpOnlineDto;
})
.filter(phpOnlineDto -> StringUtils.isNotBlank(phpOnlineDto.getUserIdOfJava()))
.collect(Collectors.toList());
return recentLearnRecordList;
}
public int selectRecentLearnRecordListOfCount(Long syncTimePointOfBegin, Long syncTimePointOfEnd) {
String sql =
"SELECT COUNT(1) \n" +
"FROM (\n" +
" SELECT elc.kid\n" +
" FROM elearninglms.eln_ln_course elc\n" +
" INNER JOIN elearninglms.eln_ln_course_reg elcr\n" +
" ON elc.kid = elcr.course_id\n" +
" INNER JOIN (\n" +
" SELECT user_id, course_id\n" +
" FROM elearninglms.eln_ln_res_complete\n" +
" WHERE complete_type = '1'\n" +
" AND complete_status = '2'\n" +
" AND updated_at > ?1\n" +
" AND updated_at < ?2\n" +
" AND is_deleted = 0\n" +
" GROUP BY user_id, course_id\n" +
" ) recentFinishStuent\n" +
" ON recentFinishStuent.user_id = elcr.user_id\n" +
" AND recentFinishStuent.course_id = elcr.course_id\n" +
" INNER JOIN elearninglms.eln_ln_mod_res elms\n" +
" ON elms.course_id = elcr.course_id\n" +
" LEFT JOIN elearninglms.eln_ln_res_complete elrc\n" +
" ON elrc.mod_res_id = elms.kid\n" +
" AND elrc.user_id = elcr.user_id\n" +
" AND elrc.complete_type = '1'\n" +
" AND elrc.complete_status = '2'\n" +
" WHERE elc.is_deleted = 0\n" +
" AND elcr.is_deleted = 0\n" +
" AND elcr.reg_state = '1'\n" +
" AND elms.publish_status = '1'\n" +
" AND elms.is_deleted = '0'\n" +
" GROUP BY elc.kid, elcr.user_id\n" +
") temp;\n"; // 每页 1000 条记录OFFSET 计算分页
int count = this.sqlCount(sql, syncTimePointOfBegin, syncTimePointOfEnd);
return count;
}
public List<PhpOnlineDto> selectRecentRegRecordList(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer offset, Integer limit) {
String sql =
"SELECT\n" +
" elc.kid AS courseId,\n" +
" elcr.user_id AS userIdOfPhp,\n" +
" ROUND((SUM(CASE WHEN elrc.kid IS NOT NULL THEN 1 ELSE 0 END) / COUNT(1)) * 100, 0) AS progress\n" +
"FROM\n" +
" elearninglms.eln_ln_course elc\n" +
" INNER JOIN elearninglms.eln_ln_course_reg elcr ON elc.kid = elcr.course_id AND elcr.created_at > ?1 AND elcr.created_at < ?2\n" +
" INNER JOIN elearninglms.eln_ln_mod_res elms \n" +
" ON elms.course_id = elcr.course_id\n" +
" LEFT JOIN elearninglms.eln_ln_res_complete elrc \n" +
" ON elrc.mod_res_id = elms.kid\n" +
" AND elrc.user_id = elcr.user_id\n" +
" AND elrc.complete_type = '1'\n" +
" AND elrc.complete_status = '2'\n" +
"WHERE\n" +
" elc.is_deleted = 0\n" +
" AND elcr.is_deleted = 0\n" +
" AND elcr.reg_state = '1'\n" +
" AND elms.publish_status = '1'\n" +
" AND elms.is_deleted = '0'\n" +
"GROUP BY\n" +
" elc.kid,\n" +
" elcr.user_id\n" +
"LIMIT " + offset + "," + limit;
List<Object[]> objectList1 = this.sqlFindList(sql, syncTimePointOfBegin, syncTimePointOfEnd);
if (CollUtil.isEmpty(objectList1)) {
log.info("没有找到已完成的数据");
return null;
}
List<PhpOnlineDto> recentLearnRecordList = new ArrayList<>();
for (Object[] objects : objectList1) {
String courseId = objects[0].toString();
String userIdOfPhp = objects[1].toString();
Integer progress = ((BigDecimal) objects[2]).intValue();
PhpOnlineDto phpOnlineDto = new PhpOnlineDto();
phpOnlineDto.setCourseId(courseId);
phpOnlineDto.setUserIdOfPhp(userIdOfPhp);
phpOnlineDto.setProgress(progress);
recentLearnRecordList.add(phpOnlineDto);
}
// 拼接获取所有新系统用户id这里不选择与上面的联表查询有效率问题
String userBasicDataBase;
if (activeProfile.equals("pro")) {
userBasicDataBase = "user_basic";
} else {
userBasicDataBase = "userbasic";
}
List<String> userIds = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList());
List<Object[]> objectList2 = this.sqlFindList("select kid,user_id from " + userBasicDataBase + ".user_account where kid in (?1) and deleted=0 and account_status = 0", userIds);
if (CollUtil.isEmpty(objectList1)) {
log.info("新系统用户数据不存在");
return null;
}
Map<Object, Object> userIdToKidMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1]));
// 设置新系统用户ID
recentLearnRecordList = recentLearnRecordList.stream()
.map(phpOnlineDto -> {
Object userIdOfJavaObj = userIdToKidMap.get(phpOnlineDto.getUserIdOfPhp());
if (userIdOfJavaObj != null) {
phpOnlineDto.setUserIdOfJava(userIdOfJavaObj.toString());
}
return phpOnlineDto;
})
.filter(phpOnlineDto -> phpOnlineDto.getUserIdOfJava() != null)
.collect(Collectors.toList());
return recentLearnRecordList;
}
public int selectRecentRegRecordListOfCount(Long syncTimePointOfBegin, Long syncTimePointOfEnd) {
String sql =
"SELECT COUNT(1)\n" +
"FROM (\n" +
" SELECT\n" +
" elc.kid\n" +
" FROM\n" +
" elearninglms.eln_ln_course elc\n" +
" INNER JOIN elearninglms.eln_ln_course_reg elcr \n" +
" ON elc.kid = elcr.course_id\n" +
" AND elcr.created_at > ?1\n" +
" AND elcr.created_at < ?2\n" +
" INNER JOIN elearninglms.eln_ln_mod_res elms \n" +
" ON elms.course_id = elcr.course_id\n" +
" LEFT JOIN elearninglms.eln_ln_res_complete elrc \n" +
" ON elrc.mod_res_id = elms.kid\n" +
" AND elrc.user_id = elcr.user_id\n" +
" AND elrc.complete_type = '1'\n" +
" AND elrc.complete_status = '2'\n" +
" WHERE\n" +
" elc.is_deleted = 0\n" +
" AND elcr.is_deleted = 0\n" +
" AND elcr.reg_state = '1'\n" +
" AND elms.publish_status = '1'\n" +
" AND elms.is_deleted = '0'\n" +
" GROUP BY\n" +
" elc.kid,\n" +
" elcr.user_id\n" +
") temp;\n"; // 每页 1000 条记录OFFSET 计算分页
int count = this.sqlCount(sql, syncTimePointOfBegin, syncTimePointOfEnd);
return count;
}
}

View File

@@ -25,4 +25,9 @@ public class PhpOnlineDto {
* 课程名称
*/
private String userIdOfJava;
/**
* 进度
*/
private Integer progress;
}