mirror of
https://codeup.aliyun.com/67762337eccfc218f6110e0e/per-boe/java-servers.git
synced 2025-12-09 19:06:49 +08:00
es数据同步,全量、分页
This commit is contained in:
@@ -21,12 +21,10 @@ import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchHits;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.io.IOException;
|
||||
import java.math.BigDecimal;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
@@ -44,54 +42,9 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
||||
@Resource
|
||||
RestHighLevelClient restHighLevelClient;
|
||||
|
||||
@Value("${spring.profiles.active}")
|
||||
private String activeProfile;
|
||||
|
||||
// todo 定时、分批、数据库名
|
||||
@XxlJob("phpOnlineStudyRecordSyncEsTask")
|
||||
public List<CourseStudyDto> phpOnlineStudyRecordSyncEs(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException {
|
||||
String sql =
|
||||
"SELECT\n" +
|
||||
" elc.kid AS courseId,\n" +
|
||||
" elcr.user_id AS userIdOfPhp,\n" +
|
||||
" ROUND((SUM(CASE WHEN elrc.kid IS NOT NULL THEN 1 ELSE 0 END) / COUNT(1)) * 100, 0) AS progress\n" +
|
||||
"FROM\n" +
|
||||
" elearninglms.eln_ln_course elc\n" +
|
||||
" INNER JOIN elearninglms.eln_ln_course_reg elcr \n" +
|
||||
" ON elc.kid = elcr.course_id\n" +
|
||||
" INNER JOIN (\n" +
|
||||
" SELECT\n" +
|
||||
" user_id,\n" +
|
||||
" course_id \n" +
|
||||
" FROM\n" +
|
||||
" elearninglms.eln_ln_res_complete\n" +
|
||||
" WHERE\n" +
|
||||
" complete_type = '1'\n" +
|
||||
" AND complete_status = '2'\n" +
|
||||
" AND updated_at > ?1 AND updated_at < ?2\n" +
|
||||
" AND is_deleted = 0\n" +
|
||||
" GROUP BY\n" +
|
||||
" user_id,\n" +
|
||||
" course_id\n" +
|
||||
" ) recentFinishStuent \n" +
|
||||
" ON recentFinishStuent.user_id = elcr.user_id \n" +
|
||||
" AND recentFinishStuent.course_id = elcr.course_id\n" +
|
||||
" INNER JOIN elearninglms.eln_ln_mod_res elms \n" +
|
||||
" ON elms.course_id = elcr.course_id\n" +
|
||||
" LEFT JOIN elearninglms.eln_ln_res_complete elrc \n" +
|
||||
" ON elrc.mod_res_id = elms.kid\n" +
|
||||
" AND elrc.user_id = elcr.user_id\n" +
|
||||
" AND elrc.complete_type = '1'\n" +
|
||||
" AND elrc.complete_status = '2'\n" +
|
||||
"WHERE\n" +
|
||||
" elc.is_deleted = 0\n" +
|
||||
" AND elcr.is_deleted = 0\n" +
|
||||
" AND elcr.reg_state = '1'\n" +
|
||||
" AND elms.publish_status = '1'\n" +
|
||||
" AND elms.is_deleted = '0'\n" +
|
||||
"GROUP BY\n" +
|
||||
" elc.kid,\n" +
|
||||
" elcr.user_id";
|
||||
public List<String> phpOnlineStudyRecordSyncEs(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException {
|
||||
|
||||
log.info("开始同步PHP学习记录到ES");
|
||||
// 增量获取PHP中所有已完成的课程
|
||||
@@ -105,97 +58,80 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
||||
log.info("同步时间起点:{}", formatter.format(halfAnHourAgo));
|
||||
log.info("同步时间终点:{}", formatter.format(now));
|
||||
}
|
||||
List<Object[]> objectList1 = phpOnlineCourseDao.sqlFindList(sql, syncTimePointOfBegin,syncTimePointOfEnd);
|
||||
|
||||
if (CollUtil.isEmpty(objectList1)) {
|
||||
log.info("没有找到已完成的数据");
|
||||
return null;
|
||||
int pageSize = 1000;
|
||||
int totalRecordsCount = phpOnlineCourseDao.selectRecentLearnRecordListOfCount(syncTimePointOfBegin, syncTimePointOfEnd); // 总记录数
|
||||
|
||||
int totalPages = (int) Math.ceil((double) totalRecordsCount / pageSize);
|
||||
|
||||
ArrayList<String> tempResultList = new ArrayList<>();
|
||||
|
||||
for (int pageNumber = 1; pageNumber <= totalPages; pageNumber++) {
|
||||
int offset = (pageNumber - 1) * pageSize;
|
||||
List<PhpOnlineDto> pageData = phpOnlineCourseDao.selectRecentLearnRecordList(syncTimePointOfBegin, syncTimePointOfEnd, offset, pageSize);
|
||||
ArrayList<String> singleResultList = esDataHandle(isOnlyRead, pageData);
|
||||
tempResultList.addAll(singleResultList);
|
||||
}
|
||||
|
||||
String indexName = "new_study_resource";
|
||||
return tempResultList;
|
||||
}
|
||||
|
||||
List<PhpOnlineDto> recentLearnRecordList = new ArrayList<>();
|
||||
for (Object[] objects : objectList1) {
|
||||
String courseId = objects[0].toString();
|
||||
String userIdOfPhp = objects[1].toString();
|
||||
Integer progress = ((BigDecimal) objects[2]).intValue();
|
||||
PhpOnlineDto phpOnlineDto = new PhpOnlineDto();
|
||||
phpOnlineDto.setCourseId(courseId);
|
||||
phpOnlineDto.setUserIdOfPhp(userIdOfPhp);
|
||||
phpOnlineDto.setProgress(progress);
|
||||
recentLearnRecordList.add(phpOnlineDto);
|
||||
|
||||
private ArrayList<String> esDataHandle(Integer isOnlyRead, List<PhpOnlineDto> recentLearnRecordList) throws IOException {
|
||||
ArrayList<String> tempResultList = new ArrayList<>();
|
||||
// 将数据分批,每批50条
|
||||
int batchSize = 50;
|
||||
int totalSize = recentLearnRecordList.size();
|
||||
List<List<PhpOnlineDto>> batches = new ArrayList<>();
|
||||
|
||||
for (int i = 0; i < totalSize; i += batchSize) {
|
||||
int end = Math.min(i + batchSize, totalSize);
|
||||
batches.add(recentLearnRecordList.subList(i, end));
|
||||
}
|
||||
|
||||
// 拼接获取所有新系统用户id,这里不选择与上面的联表查询,有效率问题
|
||||
for (List<PhpOnlineDto> batch : batches) {
|
||||
// 构建映射关系
|
||||
Map<String, PhpOnlineDto> map = batch.stream()
|
||||
.collect(Collectors.toMap(
|
||||
phpOnlineDto -> phpOnlineDto.getUserIdOfJava() + "-" + phpOnlineDto.getCourseId(),
|
||||
phpOnlineDto -> phpOnlineDto
|
||||
));
|
||||
|
||||
String userBasicDataBase;
|
||||
if (activeProfile.equals("pro")) {
|
||||
userBasicDataBase = "user_basic";
|
||||
} else {
|
||||
userBasicDataBase = "userbasic";
|
||||
}
|
||||
|
||||
List<String> userIds = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList());
|
||||
List<Object[]> objectList2 = phpOnlineCourseDao.sqlFindList("select kid,user_id from " + userBasicDataBase + ".user_account where kid in (?1) and deleted=0 and account_status = 0", userIds);
|
||||
|
||||
if (CollUtil.isEmpty(objectList1)) {
|
||||
log.info("新系统用户数据不存在");
|
||||
return null;
|
||||
}
|
||||
|
||||
Map<Object, Object> userIdToKidMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1]));
|
||||
|
||||
// 设置新系统用户ID
|
||||
recentLearnRecordList = recentLearnRecordList.stream()
|
||||
.map(phpOnlineDto -> {
|
||||
Object userIdOfJavaObj = userIdToKidMap.get(phpOnlineDto.getUserIdOfPhp());
|
||||
if (userIdOfJavaObj != null) {
|
||||
phpOnlineDto.setUserIdOfJava(userIdOfJavaObj.toString());
|
||||
}
|
||||
return phpOnlineDto;
|
||||
})
|
||||
.filter(phpOnlineDto -> phpOnlineDto.getUserIdOfJava() != null)
|
||||
.collect(Collectors.toList());
|
||||
// 获取ES中的数据
|
||||
List<CourseStudyDto> esDataList = getEsData(batch);
|
||||
|
||||
|
||||
// 获取ES中的数据
|
||||
List<CourseStudyDto> esDataList = getEsData(recentLearnRecordList);
|
||||
// 构建映射关系
|
||||
Map<String, PhpOnlineDto> map = recentLearnRecordList.stream()
|
||||
.collect(Collectors.toMap(
|
||||
phpOnlineDto -> phpOnlineDto.getUserIdOfJava() + "-" + phpOnlineDto.getCourseId(),
|
||||
phpOnlineDto -> phpOnlineDto
|
||||
));
|
||||
// 更新ES数据
|
||||
List<CourseStudyDto> toBeUpdatedEs = esDataList.stream()
|
||||
.map(esDataItem -> {
|
||||
String key = esDataItem.getAccountId() + "-" + esDataItem.getCourseId();
|
||||
PhpOnlineDto phpOnlineDto = map.get(key);
|
||||
|
||||
List<CourseStudyDto> toBeUpdatedEs = esDataList.stream()
|
||||
.map(esDataItem -> {
|
||||
String key = esDataItem.getAccountId() + "-" + esDataItem.getCourseId();
|
||||
PhpOnlineDto phpOnlineDto = map.get(key);
|
||||
|
||||
// 如果找到相应的 PhpOnlineDto 且进度有变化,则进行更新
|
||||
if (phpOnlineDto != null && !esDataItem.getProgress().equals(phpOnlineDto.getProgress())) {
|
||||
esDataItem.setProgress(phpOnlineDto.getProgress());
|
||||
if (phpOnlineDto.getProgress() == 100){
|
||||
esDataItem.setStatus(9);
|
||||
} else {
|
||||
esDataItem.setStatus(2);
|
||||
// 如果找到相应的 PhpOnlineDto 且进度有变化,则进行更新
|
||||
if (phpOnlineDto != null && !esDataItem.getProgress().equals(phpOnlineDto.getProgress())) {
|
||||
esDataItem.setProgress(phpOnlineDto.getProgress());
|
||||
if (phpOnlineDto.getProgress() == 100) {
|
||||
esDataItem.setStatus(9); // 完成
|
||||
} else {
|
||||
esDataItem.setStatus(2); // 进行中
|
||||
}
|
||||
return esDataItem;
|
||||
}
|
||||
return esDataItem;
|
||||
}
|
||||
return null; // 返回 null 表示不需要更新
|
||||
})
|
||||
.filter(Objects::nonNull) // 去掉返回为 null 的项
|
||||
.collect(Collectors.toList());
|
||||
return null; // 返回 null 表示不需要更新
|
||||
})
|
||||
.filter(Objects::nonNull) // 去掉返回为 null 的项
|
||||
.collect(Collectors.toList());
|
||||
|
||||
tempResultList.addAll(toBeUpdatedEs.stream().map(esDataItem -> esDataItem.getId()).collect(Collectors.toList()));
|
||||
|
||||
if (isOnlyRead != null && isOnlyRead == 1) {
|
||||
return toBeUpdatedEs;
|
||||
if (isOnlyRead != null && isOnlyRead == 1) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 调用批量更新方法
|
||||
// toBeUpdatedEs(toBeUpdatedEs, indexName);
|
||||
}
|
||||
|
||||
|
||||
// 更新ES中的未同步为完成的学习记录
|
||||
// toBeUpdatedEs(toBeUpdatedEs, indexName);
|
||||
return null;
|
||||
return tempResultList;
|
||||
}
|
||||
|
||||
private List<CourseStudyDto> getEsData(List<PhpOnlineDto> finishedCourseList) throws IOException {
|
||||
@@ -272,4 +208,37 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
||||
}
|
||||
|
||||
|
||||
public List<String> phpOnlineStudyRecordSyncEsOfFull(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException {
|
||||
|
||||
log.info("开始同步PHP学习记录到ES");
|
||||
// 增量获取PHP中所有已完成的课程
|
||||
if (syncTimePointOfBegin == null || syncTimePointOfEnd == null) {
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
LocalDateTime halfAnHourAgo = now.minusMinutes(30);
|
||||
syncTimePointOfBegin = halfAnHourAgo.atZone(ZoneId.systemDefault()).toInstant().getEpochSecond();
|
||||
syncTimePointOfEnd = now.atZone(ZoneId.systemDefault()).toInstant().getEpochSecond();
|
||||
|
||||
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||
log.info("同步时间起点:{}", formatter.format(halfAnHourAgo));
|
||||
log.info("同步时间终点:{}", formatter.format(now));
|
||||
}
|
||||
|
||||
int pageSize = 1000;
|
||||
int totalRecordsCount = phpOnlineCourseDao.selectRecentRegRecordListOfCount(syncTimePointOfBegin, syncTimePointOfEnd); // 总记录数
|
||||
|
||||
int totalPages = (int) Math.ceil((double) totalRecordsCount / pageSize);
|
||||
|
||||
ArrayList<String> tempResultList = new ArrayList<>();
|
||||
|
||||
for (int pageNumber = 1; pageNumber <= totalPages; pageNumber++) {
|
||||
int offset = (pageNumber - 1) * pageSize;
|
||||
List<PhpOnlineDto> pageData = phpOnlineCourseDao.selectRecentRegRecordList(syncTimePointOfBegin, syncTimePointOfEnd, offset, pageSize);
|
||||
ArrayList<String> singleResultList = esDataHandle(isOnlyRead, pageData);
|
||||
tempResultList.addAll(singleResultList);
|
||||
}
|
||||
|
||||
return tempResultList;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -168,10 +168,23 @@ public class StudyCourseESApi extends ApiBaseController{
|
||||
return success(true);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* @param syncTimePointOfBegin
|
||||
* @param syncTimePointOfEnd
|
||||
* @param isOnlyRead
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
@PostMapping("/phpOnlineStudyRecordSyncEs")
|
||||
public JsonResponse<List<CourseStudyDto>> phpOnlineStudyRecordSyncEs(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException {
|
||||
List<CourseStudyDto> courseStudyDtoList = phpOnlineStudyRecordScheduledTasks.phpOnlineStudyRecordSyncEs(syncTimePointOfBegin,syncTimePointOfEnd, isOnlyRead);
|
||||
public JsonResponse<List<String>> phpOnlineStudyRecordSyncEs(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException {
|
||||
List<String> courseStudyDtoList = phpOnlineStudyRecordScheduledTasks.phpOnlineStudyRecordSyncEs(syncTimePointOfBegin, syncTimePointOfEnd, isOnlyRead);
|
||||
return success(courseStudyDtoList);
|
||||
}
|
||||
|
||||
@PostMapping("/phpOnlineStudyRecordSyncEsOfFull")
|
||||
public JsonResponse<List<String>> phpOnlineStudyRecordSyncEsOfFull(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException {
|
||||
List<String> courseStudyDtoList = phpOnlineStudyRecordScheduledTasks.phpOnlineStudyRecordSyncEsOfFull(syncTimePointOfBegin, syncTimePointOfEnd, isOnlyRead);
|
||||
return success(courseStudyDtoList);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,9 +1,280 @@
|
||||
package com.xboe.school.study.dao;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import com.xboe.core.orm.BaseDao;
|
||||
import com.xboe.school.study.dto.PhpOnlineDto;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
@Repository
|
||||
@Slf4j
|
||||
public class PhpOnlineCourseDao extends BaseDao<PhpOnlineDto> {
|
||||
|
||||
@Value("${spring.profiles.active}")
|
||||
private String activeProfile;
|
||||
|
||||
public List<PhpOnlineDto> selectRecentLearnRecordList(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer offset, Integer limit) {
|
||||
String sql =
|
||||
"SELECT\n" +
|
||||
" elc.kid AS courseId,\n" +
|
||||
" elcr.user_id AS userIdOfPhp,\n" +
|
||||
" ROUND((SUM(CASE WHEN elrc.kid IS NOT NULL THEN 1 ELSE 0 END) / COUNT(1)) * 100, 0) AS progress\n" +
|
||||
"FROM\n" +
|
||||
" elearninglms.eln_ln_course elc\n" +
|
||||
" INNER JOIN elearninglms.eln_ln_course_reg elcr \n" +
|
||||
" ON elc.kid = elcr.course_id\n" +
|
||||
" INNER JOIN (\n" +
|
||||
" SELECT\n" +
|
||||
" user_id,\n" +
|
||||
" course_id \n" +
|
||||
" FROM\n" +
|
||||
" elearninglms.eln_ln_res_complete\n" +
|
||||
" WHERE\n" +
|
||||
" complete_type = '1'\n" +
|
||||
" AND complete_status = '2'\n" +
|
||||
" AND updated_at > ?1 AND updated_at < ?2\n" +
|
||||
" AND is_deleted = 0\n" +
|
||||
" GROUP BY\n" +
|
||||
" user_id,\n" +
|
||||
" course_id\n" +
|
||||
" ) recentFinishStuent \n" +
|
||||
" ON recentFinishStuent.user_id = elcr.user_id \n" +
|
||||
" AND recentFinishStuent.course_id = elcr.course_id\n" +
|
||||
" INNER JOIN elearninglms.eln_ln_mod_res elms \n" +
|
||||
" ON elms.course_id = elcr.course_id\n" +
|
||||
" LEFT JOIN elearninglms.eln_ln_res_complete elrc \n" +
|
||||
" ON elrc.mod_res_id = elms.kid\n" +
|
||||
" AND elrc.user_id = elcr.user_id\n" +
|
||||
" AND elrc.complete_type = '1'\n" +
|
||||
" AND elrc.complete_status = '2'\n" +
|
||||
"WHERE\n" +
|
||||
" elc.is_deleted = 0\n" +
|
||||
" AND elcr.is_deleted = 0\n" +
|
||||
" AND elcr.reg_state = '1'\n" +
|
||||
" AND elms.publish_status = '1'\n" +
|
||||
" AND elms.is_deleted = '0'\n" +
|
||||
"GROUP BY\n" +
|
||||
" elc.kid,\n" +
|
||||
" elcr.user_id\n" +
|
||||
"LIMIT " + offset + "," + limit;
|
||||
|
||||
|
||||
List<Object[]> objectList1 = this.sqlFindList(sql, syncTimePointOfBegin, syncTimePointOfEnd);
|
||||
if (CollUtil.isEmpty(objectList1)) {
|
||||
log.info("没有找到已完成的数据");
|
||||
return null;
|
||||
}
|
||||
|
||||
List<PhpOnlineDto> recentLearnRecordList = new ArrayList<>();
|
||||
for (Object[] objects : objectList1) {
|
||||
String courseId = objects[0].toString();
|
||||
String userIdOfPhp = objects[1].toString();
|
||||
Integer progress = ((BigDecimal) objects[2]).intValue();
|
||||
PhpOnlineDto phpOnlineDto = new PhpOnlineDto();
|
||||
phpOnlineDto.setCourseId(courseId);
|
||||
phpOnlineDto.setUserIdOfPhp(userIdOfPhp);
|
||||
phpOnlineDto.setProgress(progress);
|
||||
recentLearnRecordList.add(phpOnlineDto);
|
||||
}
|
||||
|
||||
// 拼接获取所有新系统用户id,这里不选择与上面的联表查询,有效率问题
|
||||
|
||||
String userBasicDataBase;
|
||||
if (activeProfile.equals("pro")) {
|
||||
userBasicDataBase = "user_basic";
|
||||
} else {
|
||||
userBasicDataBase = "userbasic";
|
||||
}
|
||||
|
||||
List<String> userIds = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList());
|
||||
List<Object[]> objectList2 = this.sqlFindList("select kid,user_id from " + userBasicDataBase + ".user_account where kid in (?1) and deleted=0 and account_status = 0", userIds);
|
||||
|
||||
if (CollUtil.isEmpty(objectList1)) {
|
||||
log.info("新系统用户数据不存在");
|
||||
return null;
|
||||
}
|
||||
|
||||
Map<Object, Object> userIdToKidMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1]));
|
||||
|
||||
// 设置新系统用户ID
|
||||
recentLearnRecordList = recentLearnRecordList.stream()
|
||||
.map(phpOnlineDto -> {
|
||||
Object userIdOfJavaObj = userIdToKidMap.get(phpOnlineDto.getUserIdOfPhp());
|
||||
if (userIdOfJavaObj != null) {
|
||||
phpOnlineDto.setUserIdOfJava(userIdOfJavaObj.toString());
|
||||
}
|
||||
return phpOnlineDto;
|
||||
})
|
||||
.filter(phpOnlineDto -> phpOnlineDto.getUserIdOfJava() != null)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
return recentLearnRecordList;
|
||||
}
|
||||
|
||||
|
||||
public int selectRecentLearnRecordListOfCount(Long syncTimePointOfBegin, Long syncTimePointOfEnd) {
|
||||
String sql =
|
||||
"SELECT COUNT(1) \n" +
|
||||
"FROM (\n" +
|
||||
" SELECT elc.kid\n" +
|
||||
" FROM elearninglms.eln_ln_course elc\n" +
|
||||
" INNER JOIN elearninglms.eln_ln_course_reg elcr\n" +
|
||||
" ON elc.kid = elcr.course_id\n" +
|
||||
" INNER JOIN (\n" +
|
||||
" SELECT user_id, course_id\n" +
|
||||
" FROM elearninglms.eln_ln_res_complete\n" +
|
||||
" WHERE complete_type = '1'\n" +
|
||||
" AND complete_status = '2'\n" +
|
||||
" AND updated_at > ?1\n" +
|
||||
" AND updated_at < ?2\n" +
|
||||
" AND is_deleted = 0\n" +
|
||||
" GROUP BY user_id, course_id\n" +
|
||||
" ) recentFinishStuent\n" +
|
||||
" ON recentFinishStuent.user_id = elcr.user_id\n" +
|
||||
" AND recentFinishStuent.course_id = elcr.course_id\n" +
|
||||
" INNER JOIN elearninglms.eln_ln_mod_res elms\n" +
|
||||
" ON elms.course_id = elcr.course_id\n" +
|
||||
" LEFT JOIN elearninglms.eln_ln_res_complete elrc\n" +
|
||||
" ON elrc.mod_res_id = elms.kid\n" +
|
||||
" AND elrc.user_id = elcr.user_id\n" +
|
||||
" AND elrc.complete_type = '1'\n" +
|
||||
" AND elrc.complete_status = '2'\n" +
|
||||
" WHERE elc.is_deleted = 0\n" +
|
||||
" AND elcr.is_deleted = 0\n" +
|
||||
" AND elcr.reg_state = '1'\n" +
|
||||
" AND elms.publish_status = '1'\n" +
|
||||
" AND elms.is_deleted = '0'\n" +
|
||||
" GROUP BY elc.kid, elcr.user_id\n" +
|
||||
") temp;\n"; // 每页 1000 条记录,OFFSET 计算分页
|
||||
|
||||
|
||||
int count = this.sqlCount(sql, syncTimePointOfBegin, syncTimePointOfEnd);
|
||||
return count;
|
||||
}
|
||||
|
||||
|
||||
public List<PhpOnlineDto> selectRecentRegRecordList(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer offset, Integer limit) {
|
||||
String sql =
|
||||
"SELECT\n" +
|
||||
" elc.kid AS courseId,\n" +
|
||||
" elcr.user_id AS userIdOfPhp,\n" +
|
||||
" ROUND((SUM(CASE WHEN elrc.kid IS NOT NULL THEN 1 ELSE 0 END) / COUNT(1)) * 100, 0) AS progress\n" +
|
||||
"FROM\n" +
|
||||
" elearninglms.eln_ln_course elc\n" +
|
||||
" INNER JOIN elearninglms.eln_ln_course_reg elcr ON elc.kid = elcr.course_id AND elcr.created_at > ?1 AND elcr.created_at < ?2\n" +
|
||||
" INNER JOIN elearninglms.eln_ln_mod_res elms \n" +
|
||||
" ON elms.course_id = elcr.course_id\n" +
|
||||
" LEFT JOIN elearninglms.eln_ln_res_complete elrc \n" +
|
||||
" ON elrc.mod_res_id = elms.kid\n" +
|
||||
" AND elrc.user_id = elcr.user_id\n" +
|
||||
" AND elrc.complete_type = '1'\n" +
|
||||
" AND elrc.complete_status = '2'\n" +
|
||||
"WHERE\n" +
|
||||
" elc.is_deleted = 0\n" +
|
||||
" AND elcr.is_deleted = 0\n" +
|
||||
" AND elcr.reg_state = '1'\n" +
|
||||
" AND elms.publish_status = '1'\n" +
|
||||
" AND elms.is_deleted = '0'\n" +
|
||||
"GROUP BY\n" +
|
||||
" elc.kid,\n" +
|
||||
" elcr.user_id\n" +
|
||||
"LIMIT " + offset + "," + limit;
|
||||
|
||||
|
||||
List<Object[]> objectList1 = this.sqlFindList(sql, syncTimePointOfBegin, syncTimePointOfEnd);
|
||||
if (CollUtil.isEmpty(objectList1)) {
|
||||
log.info("没有找到已完成的数据");
|
||||
return null;
|
||||
}
|
||||
|
||||
List<PhpOnlineDto> recentLearnRecordList = new ArrayList<>();
|
||||
for (Object[] objects : objectList1) {
|
||||
String courseId = objects[0].toString();
|
||||
String userIdOfPhp = objects[1].toString();
|
||||
Integer progress = ((BigDecimal) objects[2]).intValue();
|
||||
PhpOnlineDto phpOnlineDto = new PhpOnlineDto();
|
||||
phpOnlineDto.setCourseId(courseId);
|
||||
phpOnlineDto.setUserIdOfPhp(userIdOfPhp);
|
||||
phpOnlineDto.setProgress(progress);
|
||||
recentLearnRecordList.add(phpOnlineDto);
|
||||
}
|
||||
|
||||
// 拼接获取所有新系统用户id,这里不选择与上面的联表查询,有效率问题
|
||||
|
||||
String userBasicDataBase;
|
||||
if (activeProfile.equals("pro")) {
|
||||
userBasicDataBase = "user_basic";
|
||||
} else {
|
||||
userBasicDataBase = "userbasic";
|
||||
}
|
||||
|
||||
List<String> userIds = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList());
|
||||
List<Object[]> objectList2 = this.sqlFindList("select kid,user_id from " + userBasicDataBase + ".user_account where kid in (?1) and deleted=0 and account_status = 0", userIds);
|
||||
|
||||
if (CollUtil.isEmpty(objectList1)) {
|
||||
log.info("新系统用户数据不存在");
|
||||
return null;
|
||||
}
|
||||
|
||||
Map<Object, Object> userIdToKidMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1]));
|
||||
|
||||
// 设置新系统用户ID
|
||||
recentLearnRecordList = recentLearnRecordList.stream()
|
||||
.map(phpOnlineDto -> {
|
||||
Object userIdOfJavaObj = userIdToKidMap.get(phpOnlineDto.getUserIdOfPhp());
|
||||
if (userIdOfJavaObj != null) {
|
||||
phpOnlineDto.setUserIdOfJava(userIdOfJavaObj.toString());
|
||||
}
|
||||
return phpOnlineDto;
|
||||
})
|
||||
.filter(phpOnlineDto -> phpOnlineDto.getUserIdOfJava() != null)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
return recentLearnRecordList;
|
||||
}
|
||||
|
||||
public int selectRecentRegRecordListOfCount(Long syncTimePointOfBegin, Long syncTimePointOfEnd) {
|
||||
String sql =
|
||||
"SELECT COUNT(1)\n" +
|
||||
"FROM (\n" +
|
||||
" SELECT\n" +
|
||||
" elc.kid\n" +
|
||||
" FROM\n" +
|
||||
" elearninglms.eln_ln_course elc\n" +
|
||||
" INNER JOIN elearninglms.eln_ln_course_reg elcr \n" +
|
||||
" ON elc.kid = elcr.course_id\n" +
|
||||
" AND elcr.created_at > ?1\n" +
|
||||
" AND elcr.created_at < ?2\n" +
|
||||
" INNER JOIN elearninglms.eln_ln_mod_res elms \n" +
|
||||
" ON elms.course_id = elcr.course_id\n" +
|
||||
" LEFT JOIN elearninglms.eln_ln_res_complete elrc \n" +
|
||||
" ON elrc.mod_res_id = elms.kid\n" +
|
||||
" AND elrc.user_id = elcr.user_id\n" +
|
||||
" AND elrc.complete_type = '1'\n" +
|
||||
" AND elrc.complete_status = '2'\n" +
|
||||
" WHERE\n" +
|
||||
" elc.is_deleted = 0\n" +
|
||||
" AND elcr.is_deleted = 0\n" +
|
||||
" AND elcr.reg_state = '1'\n" +
|
||||
" AND elms.publish_status = '1'\n" +
|
||||
" AND elms.is_deleted = '0'\n" +
|
||||
" GROUP BY\n" +
|
||||
" elc.kid,\n" +
|
||||
" elcr.user_id\n" +
|
||||
") temp;\n"; // 每页 1000 条记录,OFFSET 计算分页
|
||||
|
||||
|
||||
int count = this.sqlCount(sql, syncTimePointOfBegin, syncTimePointOfEnd);
|
||||
return count;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user