From 4ec4b861f78e95a8f0c21f8a723178eeb716ef54 Mon Sep 17 00:00:00 2001 From: yang <1175@qq.com> Date: Thu, 12 Dec 2024 17:43:35 +0800 Subject: [PATCH 1/6] =?UTF-8?q?es=E6=95=B0=E6=8D=AE=E5=90=8C=E6=AD=A5-?= =?UTF-8?q?=E5=9C=A8=E7=BA=BF=E8=AF=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../PhpOnlineStudyRecordScheduledTasks.java | 245 ++++++++++++++++++ .../school/study/api/StudyCourseESApi.java | 12 + .../school/study/dao/PhpOnlineCourseDao.java | 9 + .../xboe/school/study/dto/PhpOnlineDto.java | 28 ++ 4 files changed, 294 insertions(+) create mode 100644 servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java create mode 100644 servers/boe-server-all/src/main/java/com/xboe/school/study/dao/PhpOnlineCourseDao.java create mode 100644 servers/boe-server-all/src/main/java/com/xboe/school/study/dto/PhpOnlineDto.java diff --git a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java new file mode 100644 index 00000000..ad2a3550 --- /dev/null +++ b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java @@ -0,0 +1,245 @@ +package com.xboe.school.study.api; + +import cn.hutool.core.collection.CollUtil; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.xboe.module.course.dto.CourseStudyDto; +import com.xboe.school.study.dao.PhpOnlineCourseDao; +import com.xboe.school.study.dto.PhpOnlineDto; +import com.xxl.job.core.handler.annotation.XxlJob; +import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@Component +@Slf4j +public class PhpOnlineStudyRecordScheduledTasks { + + @Resource + private PhpOnlineCourseDao phpOnlineCourseDao; + + @Resource + RestHighLevelClient restHighLevelClient; + + + // todo 定时、分批、数据库名 + @XxlJob("phpOnlineStudyRecordSyncEs") + public void phpOnlineStudyRecordSyncEs(Long syncTimePoint) throws IOException { + String sql = + "SELECT\n" + + " elc.kid AS courseId,\n" + + " elcr.user_id AS userIdOfPhp,\n" + + " COUNT(1) AS modNum,\n" + + " SUM(CASE WHEN elrc.kid IS NOT NULL THEN 1 ELSE 0 END) AS completeNum\n" + + "FROM\n" + + " elearninglms.eln_ln_course elc\n" + + " INNER JOIN elearninglms.eln_ln_course_reg elcr \n" + + " ON elc.kid = elcr.course_id\n" + + " INNER JOIN (\n" + + " SELECT\n" + + " user_id,\n" + + " course_id \n" + + " FROM\n" + + " elearninglms.eln_ln_res_complete\n" + + " WHERE\n" + + " complete_type = '1'\n" + + " AND complete_status = '2'\n" + + " AND updated_at > ?1\n" + + " AND is_deleted = 0\n" + + " GROUP BY\n" + + " user_id,\n" + + " course_id\n" + + " ) recentFinishStuent \n" + + " ON recentFinishStuent.user_id = elcr.user_id \n" + + " AND recentFinishStuent.course_id = elcr.course_id\n" + + " INNER JOIN elearninglms.eln_ln_mod_res elms \n" + + " ON elms.course_id = elcr.course_id\n" + + " LEFT JOIN elearninglms.eln_ln_res_complete elrc \n" + + " ON elrc.mod_res_id = elms.kid\n" + + " AND elrc.user_id = elcr.user_id\n" + + " AND elrc.complete_type = '1'\n" + + " AND elrc.complete_status = '2'\n" + + "WHERE\n" + + " elc.is_deleted = 0\n" + + " AND elcr.is_deleted = 0\n" + + " AND elcr.reg_state = '1'\n" + + " AND elms.publish_status = '1'\n" + + " AND elms.is_deleted = '0'\n" + + "GROUP BY\n" + + " elc.kid,\n" + + " elcr.user_id\n" + + "HAVING\n" + + " completeNum = modNum\n"; + log.info("开始同步PHP学习记录到ES"); + // 增量获取PHP中所有已完成的课程 + if (syncTimePoint == null) { + LocalDateTime halfAnHourAgo = LocalDateTime.now().minusMinutes(30); + syncTimePoint = halfAnHourAgo.atZone(ZoneId.systemDefault()).toInstant().getEpochSecond(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + log.info("同步时间点:{}", formatter.format(halfAnHourAgo)); + } + List objectList1 = phpOnlineCourseDao.sqlFindList(sql, syncTimePoint); + + + + if (CollUtil.isEmpty(objectList1)) { + log.info("没有找到已完成的数据"); + return; + } + + String indexName = "new_study_resource"; + + List finishedCourseList = new ArrayList<>(); + for (Object[] objects : objectList1) { + String courseId = objects[0].toString(); + String userIdOfPhp = objects[1].toString(); + PhpOnlineDto phpOnlineDto = new PhpOnlineDto(); + phpOnlineDto.setCourseId(courseId); + phpOnlineDto.setUserIdOfPhp(userIdOfPhp); + finishedCourseList.add(phpOnlineDto); + } + + // 拼接获取所有新系统用户id,这里不选择与上面的联表查询,有效率问题 + List userIds = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList()); + List objectList2 = phpOnlineCourseDao.sqlFindList("select kid,user_id from userbasic.user_account where kid in (?1) and deleted=0 and account_status = 0", userIds); + + if (CollUtil.isEmpty(objectList1)) { + log.info("新系统用户数据不存在"); + return; + } + + Map userIdToKidMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1])); + + // 设置新系统用户ID + finishedCourseList = finishedCourseList.stream() + .map(phpOnlineDto -> { + Object userIdOfJavaObj = userIdToKidMap.get(phpOnlineDto.getUserIdOfPhp()); + if (userIdOfJavaObj != null) { + phpOnlineDto.setUserIdOfJava(userIdOfJavaObj.toString()); + } + return phpOnlineDto; + }) + .filter(phpOnlineDto -> phpOnlineDto.getUserIdOfJava() != null) + .collect(Collectors.toList()); + + + // 获取ES中没有完成的的课程学习记录 + List notFinishedCourseList = getEsData(finishedCourseList); + + Set finishedCourseSet = finishedCourseList.stream().map(e -> e.getUserIdOfJava() + "_" + e.getCourseId()).collect(Collectors.toSet()); + + List toBeUpdatedEs = notFinishedCourseList.stream().filter(courseStudyDto -> { + if (finishedCourseSet.contains(courseStudyDto.getAccountId() + "_" + courseStudyDto.getCourseId())) { + return true; + } else { + // 有多种情况,es不存在学习记录,或者学习记录状态不是9 + log.info("跳过:{}", courseStudyDto); + return false; + } + }).map(courseStudyDto -> { + courseStudyDto.setStatus(9); + courseStudyDto.setProgress(100); + return courseStudyDto; + }).collect(Collectors.toList()); + + // 更新ES中的未同步为完成的学习记录 + toBeUpdatedEs(toBeUpdatedEs, indexName); + + } + + private List getEsData(List finishedCourseList) throws IOException { + SearchRequest searchRequest = new SearchRequest("new_study_resource"); + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); + + for (PhpOnlineDto phpOnlineDto : finishedCourseList) { + if (phpOnlineDto.getUserIdOfJava() == null || phpOnlineDto.getCourseId() == null) { + continue; + } + + boolQuery.should(QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery("courseId.keyword", phpOnlineDto.getCourseId())) + .must(QueryBuilders.termQuery("accountId.keyword", phpOnlineDto.getUserIdOfJava())) + .mustNot(QueryBuilders.termQuery("status", 9)) + ); + } + + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(boolQuery).timeout(new TimeValue(60, TimeUnit.SECONDS)); + searchRequest.source(sourceBuilder); + if (boolQuery.hasClauses()) { + sourceBuilder.query(boolQuery); + } + searchRequest.source(sourceBuilder); + + SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); + SearchHits hits = response.getHits(); + + List courseStudyDtoList = new ArrayList<>(); + ObjectMapper mapper = new ObjectMapper(); + for (SearchHit hit : hits) { + String sourceAsString = hit.getSourceAsString(); + try { + CourseStudyDto cft = mapper.readValue(sourceAsString, CourseStudyDto.class); + courseStudyDtoList.add(cft); + } catch (Exception e) { + log.error("转化json到对应失败", sourceAsString); + } + } + return courseStudyDtoList; + } + + private void toBeUpdatedEs(List toBeUpdatedEsData, String indexName) { + if (CollUtil.isEmpty(toBeUpdatedEsData)) { + return; + } + BulkRequest bulkRequest = new BulkRequest(); + for (CourseStudyDto courseStudyDto : toBeUpdatedEsData) { + Map docMap = new HashMap<>(); + docMap.put("status", courseStudyDto.getStatus()); + docMap.put("progress", courseStudyDto.getProgress()); + + // 创建更新请求,并传入单一的docMap + UpdateRequest updateRequest = new UpdateRequest(indexName, courseStudyDto.getId()) + .doc(docMap, XContentType.JSON); + + // 将请求添加到批量请求中 + bulkRequest.add(updateRequest); + } + + try { + // 执行批量更新操作 + BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); + // 检查是否有失败的操作 + if (bulkResponse.hasFailures()) { + log.error("批量更新失败: {}", bulkResponse.buildFailureMessage()); + } else { + List collect = toBeUpdatedEsData.stream().map(CourseStudyDto::getId).collect(Collectors.toList()); + log.info("批量更新成功,更新的ES ID列表: {}", collect); + } + } catch (IOException e) { + log.error("执行批量更新时发生错误", e); + } + } + + +} diff --git a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/StudyCourseESApi.java b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/StudyCourseESApi.java index ddfa50f8..0ad3bab6 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/StudyCourseESApi.java +++ b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/StudyCourseESApi.java @@ -24,6 +24,8 @@ import com.xboe.school.study.service.IStudyCourseService; import lombok.extern.slf4j.Slf4j; +import javax.annotation.Resource; + /** * ES的课程检索 * @author seastar @@ -39,6 +41,9 @@ public class StudyCourseESApi extends ApiBaseController{ @Autowired IStudyCourseService service; + + @Resource + private PhpOnlineStudyRecordScheduledTasks phpOnlineStudyRecordScheduledTasks; @RequestMapping(value="/search",method = {RequestMethod.GET,RequestMethod.POST}) public JsonResponse> search(Pagination page, CourseStudyDto dto){ @@ -161,4 +166,11 @@ public class StudyCourseESApi extends ApiBaseController{ return success(true); } + + + @PostMapping("/phpOnlineStudyRecordSyncEs") + public JsonResponse phpOnlineStudyRecordSyncEs(Long syncTimePoint) throws IOException { + phpOnlineStudyRecordScheduledTasks.phpOnlineStudyRecordSyncEs(syncTimePoint); + return success(true); + } } diff --git a/servers/boe-server-all/src/main/java/com/xboe/school/study/dao/PhpOnlineCourseDao.java b/servers/boe-server-all/src/main/java/com/xboe/school/study/dao/PhpOnlineCourseDao.java new file mode 100644 index 00000000..94b64201 --- /dev/null +++ b/servers/boe-server-all/src/main/java/com/xboe/school/study/dao/PhpOnlineCourseDao.java @@ -0,0 +1,9 @@ +package com.xboe.school.study.dao; + +import com.xboe.core.orm.BaseDao; +import com.xboe.school.study.dto.PhpOnlineDto; +import org.springframework.stereotype.Repository; + +@Repository +public class PhpOnlineCourseDao extends BaseDao { +} diff --git a/servers/boe-server-all/src/main/java/com/xboe/school/study/dto/PhpOnlineDto.java b/servers/boe-server-all/src/main/java/com/xboe/school/study/dto/PhpOnlineDto.java new file mode 100644 index 00000000..9d77ca77 --- /dev/null +++ b/servers/boe-server-all/src/main/java/com/xboe/school/study/dto/PhpOnlineDto.java @@ -0,0 +1,28 @@ +package com.xboe.school.study.dto; + +import lombok.Data; + +/** + * 批量报名 + * + * @author seastar + */ +@Data +public class PhpOnlineDto { + + /** + * 课程id + */ + private String courseId; + + /** + * 课程名称 + */ + private String userIdOfPhp; + + + /** + * 课程名称 + */ + private String userIdOfJava; +} From 09f64bdb081836b69b6389a9b4c7cda85d8fc19f Mon Sep 17 00:00:00 2001 From: yang <1175@qq.com> Date: Thu, 12 Dec 2024 22:21:06 +0800 Subject: [PATCH 2/6] =?UTF-8?q?es=E6=95=B0=E6=8D=AE=E5=90=8C=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../PhpOnlineStudyRecordScheduledTasks.java | 26 +-- .../PhpOnlineStudyRecordScheduledTasks2.java | 215 ++++++++++++++++++ .../school/study/api/StudyCourseESApi.java | 31 ++- .../xboe/school/study/dto/PhpOnlineDto.java | 2 + 4 files changed, 253 insertions(+), 21 deletions(-) create mode 100644 servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks2.java diff --git a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java index ad2a3550..b59b5383 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java +++ b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java @@ -45,7 +45,7 @@ public class PhpOnlineStudyRecordScheduledTasks { // todo 定时、分批、数据库名 @XxlJob("phpOnlineStudyRecordSyncEs") - public void phpOnlineStudyRecordSyncEs(Long syncTimePoint) throws IOException { + public List phpOnlineStudyRecordSyncEs(Long syncTimePoint, Integer isOnlyRead) throws IOException { String sql = "SELECT\n" + " elc.kid AS courseId,\n" + @@ -101,11 +101,9 @@ public class PhpOnlineStudyRecordScheduledTasks { } List objectList1 = phpOnlineCourseDao.sqlFindList(sql, syncTimePoint); - - if (CollUtil.isEmpty(objectList1)) { log.info("没有找到已完成的数据"); - return; + return null; } String indexName = "new_study_resource"; @@ -122,11 +120,11 @@ public class PhpOnlineStudyRecordScheduledTasks { // 拼接获取所有新系统用户id,这里不选择与上面的联表查询,有效率问题 List userIds = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList()); - List objectList2 = phpOnlineCourseDao.sqlFindList("select kid,user_id from userbasic.user_account where kid in (?1) and deleted=0 and account_status = 0", userIds); + List objectList2 = phpOnlineCourseDao.sqlFindList("select kid,user_id from user_basic.user_account where kid in (?1) and deleted=0 and account_status = 0", userIds); if (CollUtil.isEmpty(objectList1)) { log.info("新系统用户数据不存在"); - return; + return null; } Map userIdToKidMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1])); @@ -147,17 +145,11 @@ public class PhpOnlineStudyRecordScheduledTasks { // 获取ES中没有完成的的课程学习记录 List notFinishedCourseList = getEsData(finishedCourseList); - Set finishedCourseSet = finishedCourseList.stream().map(e -> e.getUserIdOfJava() + "_" + e.getCourseId()).collect(Collectors.toSet()); + if (isOnlyRead != null && isOnlyRead == 1) { + return notFinishedCourseList; + } - List toBeUpdatedEs = notFinishedCourseList.stream().filter(courseStudyDto -> { - if (finishedCourseSet.contains(courseStudyDto.getAccountId() + "_" + courseStudyDto.getCourseId())) { - return true; - } else { - // 有多种情况,es不存在学习记录,或者学习记录状态不是9 - log.info("跳过:{}", courseStudyDto); - return false; - } - }).map(courseStudyDto -> { + List toBeUpdatedEs = notFinishedCourseList.stream().map(courseStudyDto -> { courseStudyDto.setStatus(9); courseStudyDto.setProgress(100); return courseStudyDto; @@ -165,7 +157,7 @@ public class PhpOnlineStudyRecordScheduledTasks { // 更新ES中的未同步为完成的学习记录 toBeUpdatedEs(toBeUpdatedEs, indexName); - + return null; } private List getEsData(List finishedCourseList) throws IOException { diff --git a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks2.java b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks2.java new file mode 100644 index 00000000..348a6f1a --- /dev/null +++ b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks2.java @@ -0,0 +1,215 @@ +package com.xboe.school.study.api; + +import cn.hutool.core.collection.CollUtil; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.xboe.module.course.dto.CourseStudyDto; +import com.xboe.school.study.dao.PhpOnlineCourseDao; +import com.xboe.school.study.dto.PhpOnlineDto; +import com.xxl.job.core.handler.annotation.XxlJob; +import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@Component +@Slf4j +public class PhpOnlineStudyRecordScheduledTasks2 { + + @Resource + private PhpOnlineCourseDao phpOnlineCourseDao; + + @Resource + RestHighLevelClient restHighLevelClient; + + + // todo 定时、分批、数据库名 + @XxlJob("phpOnlineStudyRecordSyncEs") + public List phpOnlineStudyRecordSyncEs(Long syncTimePoint, Integer isOnlyRead) throws IOException { + String sql = + "SELECT\n" + + " course_id,user_id,status,progress \n" + + "FROM\n" + + " elearninglms.eln_ln_res_complete \n" + + "WHERE\n" + + " complete_type = '1' \n" + + " AND complete_status = '2' \n" + + " AND updated_at > ?1 \n" + + " AND is_deleted=0\n" + + "GROUP BY\n" + + " course_id,user_id"; + + log.info("开始同步PHP学习记录到ES"); + // 增量获取PHP中所有已完成的课程 + if (syncTimePoint == null) { + LocalDateTime halfAnHourAgo = LocalDateTime.now().minusMinutes(30); + syncTimePoint = halfAnHourAgo.atZone(ZoneId.systemDefault()).toInstant().getEpochSecond(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + log.info("同步时间点:{}", formatter.format(halfAnHourAgo)); + } + List objectList1 = phpOnlineCourseDao.sqlFindList(sql, syncTimePoint); + + if (CollUtil.isEmpty(objectList1)) { + log.info("没有找到已完成的数据"); + return null; + } + + String indexName = "new_study_resource"; + + List recentLearnRecordList = new ArrayList<>(); + for (Object[] objects : objectList1) { + String courseId = objects[0].toString(); + String userIdOfPhp = objects[1].toString(); + int status = Integer.parseInt(objects[2].toString()); + int progress = Integer.parseInt(objects[3].toString()); + PhpOnlineDto phpOnlineDto = new PhpOnlineDto(); + phpOnlineDto.setCourseId(courseId); + phpOnlineDto.setUserIdOfPhp(userIdOfPhp); + phpOnlineDto.setProgress(progress); + phpOnlineDto.setStatus(status); + recentLearnRecordList.add(phpOnlineDto); + } + + // 拼接获取所有新系统用户id,这里不选择与上面的联表查询,有效率问题 + List userIds = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList()); + List objectList2 = phpOnlineCourseDao.sqlFindList("select kid,user_id from user_basic.user_account where kid in (?1) and deleted=0 and account_status = 0", userIds); + + if (CollUtil.isEmpty(objectList1)) { + log.info("新系统用户数据不存在"); + return null; + } + + Map userIdToKidMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1])); + + // 设置新系统用户ID + recentLearnRecordList = recentLearnRecordList.stream() + .map(phpOnlineDto -> { + Object userIdOfJavaObj = userIdToKidMap.get(phpOnlineDto.getUserIdOfPhp()); + if (userIdOfJavaObj != null) { + phpOnlineDto.setUserIdOfJava(userIdOfJavaObj.toString()); + } + return phpOnlineDto; + }) + .filter(phpOnlineDto -> phpOnlineDto.getUserIdOfJava() != null) + .collect(Collectors.toList()); + + + // 获取ES中没有完成的的课程学习记录 + List esData = getEsData(recentLearnRecordList); + Map map = recentLearnRecordList.stream().collect(Collectors.toMap(phpOnlineDto -> phpOnlineDto.getUserIdOfJava() + phpOnlineDto.getCourseId(), phpOnlineDto -> phpOnlineDto)); + List toBeUpdatedEs = esData.stream().filter(courseStudyDto -> { + if (map.containsKey(courseStudyDto.getAccountId())) { + PhpOnlineDto phpOnlineDto = map.get(courseStudyDto.getAccountId()); + if (!phpOnlineDto.getStatus().equals(phpOnlineDto.getStatus()) || !phpOnlineDto.getProgress().equals(phpOnlineDto.getProgress())) { + return true; + } + } + return false; + }).collect(Collectors.toList()); + + if (isOnlyRead != null && isOnlyRead == 1) { + return esData; + } + + // 更新ES中的未同步为完成的学习记录 + toBeUpdatedEs(toBeUpdatedEs, indexName); + return null; + } + + private List getEsData(List finishedCourseList) throws IOException { + SearchRequest searchRequest = new SearchRequest("new_study_resource"); + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); + + for (PhpOnlineDto phpOnlineDto : finishedCourseList) { + if (phpOnlineDto.getUserIdOfJava() == null || phpOnlineDto.getCourseId() == null) { + continue; + } + + boolQuery.should(QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery("courseId.keyword", phpOnlineDto.getCourseId())) + .must(QueryBuilders.termQuery("accountId.keyword", phpOnlineDto.getUserIdOfJava())) + ); + } + + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(boolQuery).timeout(new TimeValue(60, TimeUnit.SECONDS)); + searchRequest.source(sourceBuilder); + if (boolQuery.hasClauses()) { + sourceBuilder.query(boolQuery); + } + searchRequest.source(sourceBuilder); + + SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); + SearchHits hits = response.getHits(); + + List courseStudyDtoList = new ArrayList<>(); + ObjectMapper mapper = new ObjectMapper(); + for (SearchHit hit : hits) { + String sourceAsString = hit.getSourceAsString(); + try { + CourseStudyDto cft = mapper.readValue(sourceAsString, CourseStudyDto.class); + courseStudyDtoList.add(cft); + } catch (Exception e) { + log.error("转化json到对应失败", sourceAsString); + } + } + return courseStudyDtoList; + } + + private void toBeUpdatedEs(List toBeUpdatedEsData, String indexName) { + if (CollUtil.isEmpty(toBeUpdatedEsData)) { + return; + } + BulkRequest bulkRequest = new BulkRequest(); + for (CourseStudyDto courseStudyDto : toBeUpdatedEsData) { + Map docMap = new HashMap<>(); + docMap.put("status", courseStudyDto.getStatus()); + docMap.put("progress", courseStudyDto.getProgress()); + + // 创建更新请求,并传入单一的docMap + UpdateRequest updateRequest = new UpdateRequest(indexName, courseStudyDto.getId()) + .doc(docMap, XContentType.JSON); + + // 将请求添加到批量请求中 + bulkRequest.add(updateRequest); + } + + try { + // 执行批量更新操作 + BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); + // 检查是否有失败的操作 + if (bulkResponse.hasFailures()) { + log.error("批量更新失败: {}", bulkResponse.buildFailureMessage()); + } else { + List collect = toBeUpdatedEsData.stream().map(CourseStudyDto::getId).collect(Collectors.toList()); + log.info("批量更新成功,更新的ES ID列表: {}", collect); + } + } catch (IOException e) { + log.error("执行批量更新时发生错误", e); + } + } + + +} diff --git a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/StudyCourseESApi.java b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/StudyCourseESApi.java index 0ad3bab6..812a739c 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/StudyCourseESApi.java +++ b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/StudyCourseESApi.java @@ -44,6 +44,9 @@ public class StudyCourseESApi extends ApiBaseController{ @Resource private PhpOnlineStudyRecordScheduledTasks phpOnlineStudyRecordScheduledTasks; + + @Resource + private PhpOnlineStudyRecordScheduledTasks2 phpOnlineStudyRecordScheduledTasks2; @RequestMapping(value="/search",method = {RequestMethod.GET,RequestMethod.POST}) public JsonResponse> search(Pagination page, CourseStudyDto dto){ @@ -167,10 +170,30 @@ public class StudyCourseESApi extends ApiBaseController{ return success(true); } - + /** + * + * @param syncTimePoint + * @param isOnlyRead 0 更新ES 1 查询ES + * @return + * @throws IOException + */ @PostMapping("/phpOnlineStudyRecordSyncEs") - public JsonResponse phpOnlineStudyRecordSyncEs(Long syncTimePoint) throws IOException { - phpOnlineStudyRecordScheduledTasks.phpOnlineStudyRecordSyncEs(syncTimePoint); - return success(true); + public JsonResponse> phpOnlineStudyRecordSyncEs(Long syncTimePoint,Integer isOnlyRead) throws IOException { + List courseStudyDtoList = phpOnlineStudyRecordScheduledTasks.phpOnlineStudyRecordSyncEs(syncTimePoint, isOnlyRead); + return success(courseStudyDtoList); + } + + + /** + * + * @param syncTimePoint + * @param isOnlyRead 0 更新ES 1 查询ES + * @return + * @throws IOException + */ + @PostMapping("/phpOnlineStudyRecordSyncEs2") + public JsonResponse> phpOnlineStudyRecordSyncEs2(Long syncTimePoint,Integer isOnlyRead) throws IOException { + List courseStudyDtoList = phpOnlineStudyRecordScheduledTasks2.phpOnlineStudyRecordSyncEs(syncTimePoint, isOnlyRead); + return success(courseStudyDtoList); } } diff --git a/servers/boe-server-all/src/main/java/com/xboe/school/study/dto/PhpOnlineDto.java b/servers/boe-server-all/src/main/java/com/xboe/school/study/dto/PhpOnlineDto.java index 9d77ca77..7a92be2d 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/school/study/dto/PhpOnlineDto.java +++ b/servers/boe-server-all/src/main/java/com/xboe/school/study/dto/PhpOnlineDto.java @@ -25,4 +25,6 @@ public class PhpOnlineDto { * 课程名称 */ private String userIdOfJava; + private Integer status; + private Integer progress; } From cb8a333b4fbcc750e6dd64c42a6e3220155b36d3 Mon Sep 17 00:00:00 2001 From: yang <1175@qq.com> Date: Fri, 13 Dec 2024 20:20:49 +0800 Subject: [PATCH 3/6] =?UTF-8?q?es=E6=95=B0=E6=8D=AE=E5=90=8C=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../PhpOnlineStudyRecordScheduledTasks.java | 164 ++++++++----- .../PhpOnlineStudyRecordScheduledTasks2.java | 215 ------------------ .../school/study/api/StudyCourseESApi.java | 26 +-- .../xboe/school/study/dto/PhpOnlineDto.java | 5 +- 4 files changed, 108 insertions(+), 302 deletions(-) delete mode 100644 servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks2.java diff --git a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java index b59b5383..f69ed0f1 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java +++ b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java @@ -21,10 +21,12 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.IOException; +import java.math.BigDecimal; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; @@ -42,64 +44,68 @@ public class PhpOnlineStudyRecordScheduledTasks { @Resource RestHighLevelClient restHighLevelClient; + @Value("${spring.profiles.active}") + private String activeProfile; // todo 定时、分批、数据库名 - @XxlJob("phpOnlineStudyRecordSyncEs") - public List phpOnlineStudyRecordSyncEs(Long syncTimePoint, Integer isOnlyRead) throws IOException { + @XxlJob("phpOnlineStudyRecordSyncEsTask") + public List phpOnlineStudyRecordSyncEs(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException { String sql = "SELECT\n" + - " elc.kid AS courseId,\n" + - " elcr.user_id AS userIdOfPhp,\n" + - " COUNT(1) AS modNum,\n" + - " SUM(CASE WHEN elrc.kid IS NOT NULL THEN 1 ELSE 0 END) AS completeNum\n" + - "FROM\n" + - " elearninglms.eln_ln_course elc\n" + - " INNER JOIN elearninglms.eln_ln_course_reg elcr \n" + - " ON elc.kid = elcr.course_id\n" + - " INNER JOIN (\n" + - " SELECT\n" + - " user_id,\n" + - " course_id \n" + - " FROM\n" + - " elearninglms.eln_ln_res_complete\n" + - " WHERE\n" + - " complete_type = '1'\n" + - " AND complete_status = '2'\n" + - " AND updated_at > ?1\n" + - " AND is_deleted = 0\n" + - " GROUP BY\n" + - " user_id,\n" + - " course_id\n" + - " ) recentFinishStuent \n" + - " ON recentFinishStuent.user_id = elcr.user_id \n" + - " AND recentFinishStuent.course_id = elcr.course_id\n" + - " INNER JOIN elearninglms.eln_ln_mod_res elms \n" + - " ON elms.course_id = elcr.course_id\n" + - " LEFT JOIN elearninglms.eln_ln_res_complete elrc \n" + - " ON elrc.mod_res_id = elms.kid\n" + - " AND elrc.user_id = elcr.user_id\n" + - " AND elrc.complete_type = '1'\n" + - " AND elrc.complete_status = '2'\n" + - "WHERE\n" + - " elc.is_deleted = 0\n" + - " AND elcr.is_deleted = 0\n" + - " AND elcr.reg_state = '1'\n" + - " AND elms.publish_status = '1'\n" + - " AND elms.is_deleted = '0'\n" + - "GROUP BY\n" + - " elc.kid,\n" + - " elcr.user_id\n" + - "HAVING\n" + - " completeNum = modNum\n"; + " elc.kid AS courseId,\n" + + " elcr.user_id AS userIdOfPhp,\n" + + " ROUND((SUM(CASE WHEN elrc.kid IS NOT NULL THEN 1 ELSE 0 END) / COUNT(1)) * 100, 0) AS progress\n" + + "FROM\n" + + " elearninglms.eln_ln_course elc\n" + + " INNER JOIN elearninglms.eln_ln_course_reg elcr \n" + + " ON elc.kid = elcr.course_id\n" + + " INNER JOIN (\n" + + " SELECT\n" + + " user_id,\n" + + " course_id \n" + + " FROM\n" + + " elearninglms.eln_ln_res_complete\n" + + " WHERE\n" + + " complete_type = '1'\n" + + " AND complete_status = '2'\n" + + " AND updated_at > ?1 AND updated_at < ?2\n" + + " AND is_deleted = 0\n" + + " GROUP BY\n" + + " user_id,\n" + + " course_id\n" + + " ) recentFinishStuent \n" + + " ON recentFinishStuent.user_id = elcr.user_id \n" + + " AND recentFinishStuent.course_id = elcr.course_id\n" + + " INNER JOIN elearninglms.eln_ln_mod_res elms \n" + + " ON elms.course_id = elcr.course_id\n" + + " LEFT JOIN elearninglms.eln_ln_res_complete elrc \n" + + " ON elrc.mod_res_id = elms.kid\n" + + " AND elrc.user_id = elcr.user_id\n" + + " AND elrc.complete_type = '1'\n" + + " AND elrc.complete_status = '2'\n" + + "WHERE\n" + + " elc.is_deleted = 0\n" + + " AND elcr.is_deleted = 0\n" + + " AND elcr.reg_state = '1'\n" + + " AND elms.publish_status = '1'\n" + + " AND elms.is_deleted = '0'\n" + + "GROUP BY\n" + + " elc.kid,\n" + + " elcr.user_id"; + log.info("开始同步PHP学习记录到ES"); // 增量获取PHP中所有已完成的课程 - if (syncTimePoint == null) { - LocalDateTime halfAnHourAgo = LocalDateTime.now().minusMinutes(30); - syncTimePoint = halfAnHourAgo.atZone(ZoneId.systemDefault()).toInstant().getEpochSecond(); + if (syncTimePointOfBegin == null || syncTimePointOfEnd == null) { + LocalDateTime now = LocalDateTime.now(); + LocalDateTime halfAnHourAgo = now.minusMinutes(30); + syncTimePointOfBegin = halfAnHourAgo.atZone(ZoneId.systemDefault()).toInstant().getEpochSecond(); + syncTimePointOfEnd = now.atZone(ZoneId.systemDefault()).toInstant().getEpochSecond(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - log.info("同步时间点:{}", formatter.format(halfAnHourAgo)); + log.info("同步时间起点:{}", formatter.format(halfAnHourAgo)); + log.info("同步时间终点:{}", formatter.format(now)); } - List objectList1 = phpOnlineCourseDao.sqlFindList(sql, syncTimePoint); + List objectList1 = phpOnlineCourseDao.sqlFindList(sql, syncTimePointOfBegin,syncTimePointOfEnd); if (CollUtil.isEmpty(objectList1)) { log.info("没有找到已完成的数据"); @@ -108,19 +114,29 @@ public class PhpOnlineStudyRecordScheduledTasks { String indexName = "new_study_resource"; - List finishedCourseList = new ArrayList<>(); + List recentLearnRecordList = new ArrayList<>(); for (Object[] objects : objectList1) { String courseId = objects[0].toString(); String userIdOfPhp = objects[1].toString(); + Integer progress = ((BigDecimal) objects[2]).intValue(); PhpOnlineDto phpOnlineDto = new PhpOnlineDto(); phpOnlineDto.setCourseId(courseId); phpOnlineDto.setUserIdOfPhp(userIdOfPhp); - finishedCourseList.add(phpOnlineDto); + phpOnlineDto.setProgress(progress); + recentLearnRecordList.add(phpOnlineDto); } // 拼接获取所有新系统用户id,这里不选择与上面的联表查询,有效率问题 + + String userBasicDataBase; + if (activeProfile.equals("prod")) { + userBasicDataBase = "user_basic"; + } else { + userBasicDataBase = "userbasic"; + } + List userIds = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList()); - List objectList2 = phpOnlineCourseDao.sqlFindList("select kid,user_id from user_basic.user_account where kid in (?1) and deleted=0 and account_status = 0", userIds); + List objectList2 = phpOnlineCourseDao.sqlFindList("select kid,user_id from " + userBasicDataBase + ".user_account where kid in (?1) and deleted=0 and account_status = 0", userIds); if (CollUtil.isEmpty(objectList1)) { log.info("新系统用户数据不存在"); @@ -130,7 +146,7 @@ public class PhpOnlineStudyRecordScheduledTasks { Map userIdToKidMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1])); // 设置新系统用户ID - finishedCourseList = finishedCourseList.stream() + recentLearnRecordList = recentLearnRecordList.stream() .map(phpOnlineDto -> { Object userIdOfJavaObj = userIdToKidMap.get(phpOnlineDto.getUserIdOfPhp()); if (userIdOfJavaObj != null) { @@ -142,21 +158,43 @@ public class PhpOnlineStudyRecordScheduledTasks { .collect(Collectors.toList()); - // 获取ES中没有完成的的课程学习记录 - List notFinishedCourseList = getEsData(finishedCourseList); + // 获取ES中的数据 + List esDataList = getEsData(recentLearnRecordList); + // 构建映射关系 + Map map = recentLearnRecordList.stream() + .collect(Collectors.toMap( + phpOnlineDto -> phpOnlineDto.getUserIdOfJava() + "-" + phpOnlineDto.getCourseId(), + phpOnlineDto -> phpOnlineDto + )); + + List toBeUpdatedEs = esDataList.stream() + .map(esDataItem -> { + String key = esDataItem.getAccountId() + "-" + esDataItem.getCourseId(); + PhpOnlineDto phpOnlineDto = map.get(key); + + // 如果找到相应的 PhpOnlineDto 且进度有变化,则进行更新 + if (phpOnlineDto != null && !esDataItem.getProgress().equals(phpOnlineDto.getProgress())) { + esDataItem.setProgress(phpOnlineDto.getProgress()); + if (phpOnlineDto.getProgress() == 100){ + esDataItem.setStatus(9); + } else { + esDataItem.setStatus(2); + } + return esDataItem; + } + return null; // 返回 null 表示不需要更新 + }) + .filter(Objects::nonNull) // 去掉返回为 null 的项 + .collect(Collectors.toList()); + if (isOnlyRead != null && isOnlyRead == 1) { - return notFinishedCourseList; + return toBeUpdatedEs; } - List toBeUpdatedEs = notFinishedCourseList.stream().map(courseStudyDto -> { - courseStudyDto.setStatus(9); - courseStudyDto.setProgress(100); - return courseStudyDto; - }).collect(Collectors.toList()); // 更新ES中的未同步为完成的学习记录 - toBeUpdatedEs(toBeUpdatedEs, indexName); +// toBeUpdatedEs(toBeUpdatedEs, indexName); return null; } diff --git a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks2.java b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks2.java deleted file mode 100644 index 348a6f1a..00000000 --- a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks2.java +++ /dev/null @@ -1,215 +0,0 @@ -package com.xboe.school.study.api; - -import cn.hutool.core.collection.CollUtil; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.xboe.module.course.dto.CourseStudyDto; -import com.xboe.school.study.dao.PhpOnlineCourseDao; -import com.xboe.school.study.dto.PhpOnlineDto; -import com.xxl.job.core.handler.annotation.XxlJob; -import lombok.extern.slf4j.Slf4j; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.query.BoolQueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHits; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.springframework.stereotype.Component; - -import javax.annotation.Resource; -import java.io.IOException; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -@Component -@Slf4j -public class PhpOnlineStudyRecordScheduledTasks2 { - - @Resource - private PhpOnlineCourseDao phpOnlineCourseDao; - - @Resource - RestHighLevelClient restHighLevelClient; - - - // todo 定时、分批、数据库名 - @XxlJob("phpOnlineStudyRecordSyncEs") - public List phpOnlineStudyRecordSyncEs(Long syncTimePoint, Integer isOnlyRead) throws IOException { - String sql = - "SELECT\n" + - " course_id,user_id,status,progress \n" + - "FROM\n" + - " elearninglms.eln_ln_res_complete \n" + - "WHERE\n" + - " complete_type = '1' \n" + - " AND complete_status = '2' \n" + - " AND updated_at > ?1 \n" + - " AND is_deleted=0\n" + - "GROUP BY\n" + - " course_id,user_id"; - - log.info("开始同步PHP学习记录到ES"); - // 增量获取PHP中所有已完成的课程 - if (syncTimePoint == null) { - LocalDateTime halfAnHourAgo = LocalDateTime.now().minusMinutes(30); - syncTimePoint = halfAnHourAgo.atZone(ZoneId.systemDefault()).toInstant().getEpochSecond(); - DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - log.info("同步时间点:{}", formatter.format(halfAnHourAgo)); - } - List objectList1 = phpOnlineCourseDao.sqlFindList(sql, syncTimePoint); - - if (CollUtil.isEmpty(objectList1)) { - log.info("没有找到已完成的数据"); - return null; - } - - String indexName = "new_study_resource"; - - List recentLearnRecordList = new ArrayList<>(); - for (Object[] objects : objectList1) { - String courseId = objects[0].toString(); - String userIdOfPhp = objects[1].toString(); - int status = Integer.parseInt(objects[2].toString()); - int progress = Integer.parseInt(objects[3].toString()); - PhpOnlineDto phpOnlineDto = new PhpOnlineDto(); - phpOnlineDto.setCourseId(courseId); - phpOnlineDto.setUserIdOfPhp(userIdOfPhp); - phpOnlineDto.setProgress(progress); - phpOnlineDto.setStatus(status); - recentLearnRecordList.add(phpOnlineDto); - } - - // 拼接获取所有新系统用户id,这里不选择与上面的联表查询,有效率问题 - List userIds = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList()); - List objectList2 = phpOnlineCourseDao.sqlFindList("select kid,user_id from user_basic.user_account where kid in (?1) and deleted=0 and account_status = 0", userIds); - - if (CollUtil.isEmpty(objectList1)) { - log.info("新系统用户数据不存在"); - return null; - } - - Map userIdToKidMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1])); - - // 设置新系统用户ID - recentLearnRecordList = recentLearnRecordList.stream() - .map(phpOnlineDto -> { - Object userIdOfJavaObj = userIdToKidMap.get(phpOnlineDto.getUserIdOfPhp()); - if (userIdOfJavaObj != null) { - phpOnlineDto.setUserIdOfJava(userIdOfJavaObj.toString()); - } - return phpOnlineDto; - }) - .filter(phpOnlineDto -> phpOnlineDto.getUserIdOfJava() != null) - .collect(Collectors.toList()); - - - // 获取ES中没有完成的的课程学习记录 - List esData = getEsData(recentLearnRecordList); - Map map = recentLearnRecordList.stream().collect(Collectors.toMap(phpOnlineDto -> phpOnlineDto.getUserIdOfJava() + phpOnlineDto.getCourseId(), phpOnlineDto -> phpOnlineDto)); - List toBeUpdatedEs = esData.stream().filter(courseStudyDto -> { - if (map.containsKey(courseStudyDto.getAccountId())) { - PhpOnlineDto phpOnlineDto = map.get(courseStudyDto.getAccountId()); - if (!phpOnlineDto.getStatus().equals(phpOnlineDto.getStatus()) || !phpOnlineDto.getProgress().equals(phpOnlineDto.getProgress())) { - return true; - } - } - return false; - }).collect(Collectors.toList()); - - if (isOnlyRead != null && isOnlyRead == 1) { - return esData; - } - - // 更新ES中的未同步为完成的学习记录 - toBeUpdatedEs(toBeUpdatedEs, indexName); - return null; - } - - private List getEsData(List finishedCourseList) throws IOException { - SearchRequest searchRequest = new SearchRequest("new_study_resource"); - BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); - - for (PhpOnlineDto phpOnlineDto : finishedCourseList) { - if (phpOnlineDto.getUserIdOfJava() == null || phpOnlineDto.getCourseId() == null) { - continue; - } - - boolQuery.should(QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery("courseId.keyword", phpOnlineDto.getCourseId())) - .must(QueryBuilders.termQuery("accountId.keyword", phpOnlineDto.getUserIdOfJava())) - ); - } - - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(boolQuery).timeout(new TimeValue(60, TimeUnit.SECONDS)); - searchRequest.source(sourceBuilder); - if (boolQuery.hasClauses()) { - sourceBuilder.query(boolQuery); - } - searchRequest.source(sourceBuilder); - - SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); - SearchHits hits = response.getHits(); - - List courseStudyDtoList = new ArrayList<>(); - ObjectMapper mapper = new ObjectMapper(); - for (SearchHit hit : hits) { - String sourceAsString = hit.getSourceAsString(); - try { - CourseStudyDto cft = mapper.readValue(sourceAsString, CourseStudyDto.class); - courseStudyDtoList.add(cft); - } catch (Exception e) { - log.error("转化json到对应失败", sourceAsString); - } - } - return courseStudyDtoList; - } - - private void toBeUpdatedEs(List toBeUpdatedEsData, String indexName) { - if (CollUtil.isEmpty(toBeUpdatedEsData)) { - return; - } - BulkRequest bulkRequest = new BulkRequest(); - for (CourseStudyDto courseStudyDto : toBeUpdatedEsData) { - Map docMap = new HashMap<>(); - docMap.put("status", courseStudyDto.getStatus()); - docMap.put("progress", courseStudyDto.getProgress()); - - // 创建更新请求,并传入单一的docMap - UpdateRequest updateRequest = new UpdateRequest(indexName, courseStudyDto.getId()) - .doc(docMap, XContentType.JSON); - - // 将请求添加到批量请求中 - bulkRequest.add(updateRequest); - } - - try { - // 执行批量更新操作 - BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); - // 检查是否有失败的操作 - if (bulkResponse.hasFailures()) { - log.error("批量更新失败: {}", bulkResponse.buildFailureMessage()); - } else { - List collect = toBeUpdatedEsData.stream().map(CourseStudyDto::getId).collect(Collectors.toList()); - log.info("批量更新成功,更新的ES ID列表: {}", collect); - } - } catch (IOException e) { - log.error("执行批量更新时发生错误", e); - } - } - - -} diff --git a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/StudyCourseESApi.java b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/StudyCourseESApi.java index 812a739c..40b4f97a 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/StudyCourseESApi.java +++ b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/StudyCourseESApi.java @@ -45,8 +45,6 @@ public class StudyCourseESApi extends ApiBaseController{ @Resource private PhpOnlineStudyRecordScheduledTasks phpOnlineStudyRecordScheduledTasks; - @Resource - private PhpOnlineStudyRecordScheduledTasks2 phpOnlineStudyRecordScheduledTasks2; @RequestMapping(value="/search",method = {RequestMethod.GET,RequestMethod.POST}) public JsonResponse> search(Pagination page, CourseStudyDto dto){ @@ -170,30 +168,12 @@ public class StudyCourseESApi extends ApiBaseController{ return success(true); } - /** - * - * @param syncTimePoint - * @param isOnlyRead 0 更新ES 1 查询ES - * @return - * @throws IOException - */ + @PostMapping("/phpOnlineStudyRecordSyncEs") - public JsonResponse> phpOnlineStudyRecordSyncEs(Long syncTimePoint,Integer isOnlyRead) throws IOException { - List courseStudyDtoList = phpOnlineStudyRecordScheduledTasks.phpOnlineStudyRecordSyncEs(syncTimePoint, isOnlyRead); + public JsonResponse> phpOnlineStudyRecordSyncEs(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException { + List courseStudyDtoList = phpOnlineStudyRecordScheduledTasks.phpOnlineStudyRecordSyncEs(syncTimePointOfBegin,syncTimePointOfEnd, isOnlyRead); return success(courseStudyDtoList); } - /** - * - * @param syncTimePoint - * @param isOnlyRead 0 更新ES 1 查询ES - * @return - * @throws IOException - */ - @PostMapping("/phpOnlineStudyRecordSyncEs2") - public JsonResponse> phpOnlineStudyRecordSyncEs2(Long syncTimePoint,Integer isOnlyRead) throws IOException { - List courseStudyDtoList = phpOnlineStudyRecordScheduledTasks2.phpOnlineStudyRecordSyncEs(syncTimePoint, isOnlyRead); - return success(courseStudyDtoList); - } } diff --git a/servers/boe-server-all/src/main/java/com/xboe/school/study/dto/PhpOnlineDto.java b/servers/boe-server-all/src/main/java/com/xboe/school/study/dto/PhpOnlineDto.java index 7a92be2d..2cf9dda9 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/school/study/dto/PhpOnlineDto.java +++ b/servers/boe-server-all/src/main/java/com/xboe/school/study/dto/PhpOnlineDto.java @@ -25,6 +25,9 @@ public class PhpOnlineDto { * 课程名称 */ private String userIdOfJava; - private Integer status; + + /** + * 进度 + */ private Integer progress; } From 3343474f33ce4bd4ff6d20c6818d75f08a85ef56 Mon Sep 17 00:00:00 2001 From: yang <1175@qq.com> Date: Fri, 13 Dec 2024 20:52:11 +0800 Subject: [PATCH 4/6] =?UTF-8?q?es=E6=95=B0=E6=8D=AE=E5=90=8C=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../school/study/api/PhpOnlineStudyRecordScheduledTasks.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java index f69ed0f1..c4989c5b 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java +++ b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java @@ -129,7 +129,7 @@ public class PhpOnlineStudyRecordScheduledTasks { // 拼接获取所有新系统用户id,这里不选择与上面的联表查询,有效率问题 String userBasicDataBase; - if (activeProfile.equals("prod")) { + if (activeProfile.equals("pro")) { userBasicDataBase = "user_basic"; } else { userBasicDataBase = "userbasic"; From 255b854a082be3f9c30b19fd8a211bce29bf403a Mon Sep 17 00:00:00 2001 From: yang <1175@qq.com> Date: Sun, 15 Dec 2024 19:37:22 +0800 Subject: [PATCH 5/6] =?UTF-8?q?es=E6=95=B0=E6=8D=AE=E5=90=8C=E6=AD=A5,?= =?UTF-8?q?=E5=85=A8=E9=87=8F=E3=80=81=E5=88=86=E9=A1=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../PhpOnlineStudyRecordScheduledTasks.java | 219 ++++++-------- .../school/study/api/StudyCourseESApi.java | 19 +- .../school/study/dao/PhpOnlineCourseDao.java | 271 ++++++++++++++++++ 3 files changed, 381 insertions(+), 128 deletions(-) diff --git a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java index c4989c5b..a57aa8f6 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java +++ b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java @@ -21,12 +21,10 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.io.IOException; -import java.math.BigDecimal; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; @@ -44,54 +42,9 @@ public class PhpOnlineStudyRecordScheduledTasks { @Resource RestHighLevelClient restHighLevelClient; - @Value("${spring.profiles.active}") - private String activeProfile; - // todo 定时、分批、数据库名 @XxlJob("phpOnlineStudyRecordSyncEsTask") - public List phpOnlineStudyRecordSyncEs(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException { - String sql = - "SELECT\n" + - " elc.kid AS courseId,\n" + - " elcr.user_id AS userIdOfPhp,\n" + - " ROUND((SUM(CASE WHEN elrc.kid IS NOT NULL THEN 1 ELSE 0 END) / COUNT(1)) * 100, 0) AS progress\n" + - "FROM\n" + - " elearninglms.eln_ln_course elc\n" + - " INNER JOIN elearninglms.eln_ln_course_reg elcr \n" + - " ON elc.kid = elcr.course_id\n" + - " INNER JOIN (\n" + - " SELECT\n" + - " user_id,\n" + - " course_id \n" + - " FROM\n" + - " elearninglms.eln_ln_res_complete\n" + - " WHERE\n" + - " complete_type = '1'\n" + - " AND complete_status = '2'\n" + - " AND updated_at > ?1 AND updated_at < ?2\n" + - " AND is_deleted = 0\n" + - " GROUP BY\n" + - " user_id,\n" + - " course_id\n" + - " ) recentFinishStuent \n" + - " ON recentFinishStuent.user_id = elcr.user_id \n" + - " AND recentFinishStuent.course_id = elcr.course_id\n" + - " INNER JOIN elearninglms.eln_ln_mod_res elms \n" + - " ON elms.course_id = elcr.course_id\n" + - " LEFT JOIN elearninglms.eln_ln_res_complete elrc \n" + - " ON elrc.mod_res_id = elms.kid\n" + - " AND elrc.user_id = elcr.user_id\n" + - " AND elrc.complete_type = '1'\n" + - " AND elrc.complete_status = '2'\n" + - "WHERE\n" + - " elc.is_deleted = 0\n" + - " AND elcr.is_deleted = 0\n" + - " AND elcr.reg_state = '1'\n" + - " AND elms.publish_status = '1'\n" + - " AND elms.is_deleted = '0'\n" + - "GROUP BY\n" + - " elc.kid,\n" + - " elcr.user_id"; + public List phpOnlineStudyRecordSyncEs(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException { log.info("开始同步PHP学习记录到ES"); // 增量获取PHP中所有已完成的课程 @@ -105,97 +58,80 @@ public class PhpOnlineStudyRecordScheduledTasks { log.info("同步时间起点:{}", formatter.format(halfAnHourAgo)); log.info("同步时间终点:{}", formatter.format(now)); } - List objectList1 = phpOnlineCourseDao.sqlFindList(sql, syncTimePointOfBegin,syncTimePointOfEnd); - if (CollUtil.isEmpty(objectList1)) { - log.info("没有找到已完成的数据"); - return null; + int pageSize = 1000; + int totalRecordsCount = phpOnlineCourseDao.selectRecentLearnRecordListOfCount(syncTimePointOfBegin, syncTimePointOfEnd); // 总记录数 + + int totalPages = (int) Math.ceil((double) totalRecordsCount / pageSize); + + ArrayList tempResultList = new ArrayList<>(); + + for (int pageNumber = 1; pageNumber <= totalPages; pageNumber++) { + int offset = (pageNumber - 1) * pageSize; + List pageData = phpOnlineCourseDao.selectRecentLearnRecordList(syncTimePointOfBegin, syncTimePointOfEnd, offset, pageSize); + ArrayList singleResultList = esDataHandle(isOnlyRead, pageData); + tempResultList.addAll(singleResultList); } - String indexName = "new_study_resource"; + return tempResultList; + } - List recentLearnRecordList = new ArrayList<>(); - for (Object[] objects : objectList1) { - String courseId = objects[0].toString(); - String userIdOfPhp = objects[1].toString(); - Integer progress = ((BigDecimal) objects[2]).intValue(); - PhpOnlineDto phpOnlineDto = new PhpOnlineDto(); - phpOnlineDto.setCourseId(courseId); - phpOnlineDto.setUserIdOfPhp(userIdOfPhp); - phpOnlineDto.setProgress(progress); - recentLearnRecordList.add(phpOnlineDto); + + private ArrayList esDataHandle(Integer isOnlyRead, List recentLearnRecordList) throws IOException { + ArrayList tempResultList = new ArrayList<>(); + // 将数据分批,每批50条 + int batchSize = 50; + int totalSize = recentLearnRecordList.size(); + List> batches = new ArrayList<>(); + + for (int i = 0; i < totalSize; i += batchSize) { + int end = Math.min(i + batchSize, totalSize); + batches.add(recentLearnRecordList.subList(i, end)); } - // 拼接获取所有新系统用户id,这里不选择与上面的联表查询,有效率问题 + for (List batch : batches) { + // 构建映射关系 + Map map = batch.stream() + .collect(Collectors.toMap( + phpOnlineDto -> phpOnlineDto.getUserIdOfJava() + "-" + phpOnlineDto.getCourseId(), + phpOnlineDto -> phpOnlineDto + )); - String userBasicDataBase; - if (activeProfile.equals("pro")) { - userBasicDataBase = "user_basic"; - } else { - userBasicDataBase = "userbasic"; - } - - List userIds = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList()); - List objectList2 = phpOnlineCourseDao.sqlFindList("select kid,user_id from " + userBasicDataBase + ".user_account where kid in (?1) and deleted=0 and account_status = 0", userIds); - - if (CollUtil.isEmpty(objectList1)) { - log.info("新系统用户数据不存在"); - return null; - } - - Map userIdToKidMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1])); - - // 设置新系统用户ID - recentLearnRecordList = recentLearnRecordList.stream() - .map(phpOnlineDto -> { - Object userIdOfJavaObj = userIdToKidMap.get(phpOnlineDto.getUserIdOfPhp()); - if (userIdOfJavaObj != null) { - phpOnlineDto.setUserIdOfJava(userIdOfJavaObj.toString()); - } - return phpOnlineDto; - }) - .filter(phpOnlineDto -> phpOnlineDto.getUserIdOfJava() != null) - .collect(Collectors.toList()); + // 获取ES中的数据 + List esDataList = getEsData(batch); - // 获取ES中的数据 - List esDataList = getEsData(recentLearnRecordList); - // 构建映射关系 - Map map = recentLearnRecordList.stream() - .collect(Collectors.toMap( - phpOnlineDto -> phpOnlineDto.getUserIdOfJava() + "-" + phpOnlineDto.getCourseId(), - phpOnlineDto -> phpOnlineDto - )); + // 更新ES数据 + List toBeUpdatedEs = esDataList.stream() + .map(esDataItem -> { + String key = esDataItem.getAccountId() + "-" + esDataItem.getCourseId(); + PhpOnlineDto phpOnlineDto = map.get(key); - List toBeUpdatedEs = esDataList.stream() - .map(esDataItem -> { - String key = esDataItem.getAccountId() + "-" + esDataItem.getCourseId(); - PhpOnlineDto phpOnlineDto = map.get(key); - - // 如果找到相应的 PhpOnlineDto 且进度有变化,则进行更新 - if (phpOnlineDto != null && !esDataItem.getProgress().equals(phpOnlineDto.getProgress())) { - esDataItem.setProgress(phpOnlineDto.getProgress()); - if (phpOnlineDto.getProgress() == 100){ - esDataItem.setStatus(9); - } else { - esDataItem.setStatus(2); + // 如果找到相应的 PhpOnlineDto 且进度有变化,则进行更新 + if (phpOnlineDto != null && !esDataItem.getProgress().equals(phpOnlineDto.getProgress())) { + esDataItem.setProgress(phpOnlineDto.getProgress()); + if (phpOnlineDto.getProgress() == 100) { + esDataItem.setStatus(9); // 完成 + } else { + esDataItem.setStatus(2); // 进行中 + } + return esDataItem; } - return esDataItem; - } - return null; // 返回 null 表示不需要更新 - }) - .filter(Objects::nonNull) // 去掉返回为 null 的项 - .collect(Collectors.toList()); + return null; // 返回 null 表示不需要更新 + }) + .filter(Objects::nonNull) // 去掉返回为 null 的项 + .collect(Collectors.toList()); + tempResultList.addAll(toBeUpdatedEs.stream().map(esDataItem -> esDataItem.getId()).collect(Collectors.toList())); - if (isOnlyRead != null && isOnlyRead == 1) { - return toBeUpdatedEs; + if (isOnlyRead != null && isOnlyRead == 1) { + continue; + } + + // 调用批量更新方法 +// toBeUpdatedEs(toBeUpdatedEs, indexName); } - - - // 更新ES中的未同步为完成的学习记录 -// toBeUpdatedEs(toBeUpdatedEs, indexName); - return null; + return tempResultList; } private List getEsData(List finishedCourseList) throws IOException { @@ -272,4 +208,37 @@ public class PhpOnlineStudyRecordScheduledTasks { } + public List phpOnlineStudyRecordSyncEsOfFull(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException { + + log.info("开始同步PHP学习记录到ES"); + // 增量获取PHP中所有已完成的课程 + if (syncTimePointOfBegin == null || syncTimePointOfEnd == null) { + LocalDateTime now = LocalDateTime.now(); + LocalDateTime halfAnHourAgo = now.minusMinutes(30); + syncTimePointOfBegin = halfAnHourAgo.atZone(ZoneId.systemDefault()).toInstant().getEpochSecond(); + syncTimePointOfEnd = now.atZone(ZoneId.systemDefault()).toInstant().getEpochSecond(); + + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + log.info("同步时间起点:{}", formatter.format(halfAnHourAgo)); + log.info("同步时间终点:{}", formatter.format(now)); + } + + int pageSize = 1000; + int totalRecordsCount = phpOnlineCourseDao.selectRecentRegRecordListOfCount(syncTimePointOfBegin, syncTimePointOfEnd); // 总记录数 + + int totalPages = (int) Math.ceil((double) totalRecordsCount / pageSize); + + ArrayList tempResultList = new ArrayList<>(); + + for (int pageNumber = 1; pageNumber <= totalPages; pageNumber++) { + int offset = (pageNumber - 1) * pageSize; + List pageData = phpOnlineCourseDao.selectRecentRegRecordList(syncTimePointOfBegin, syncTimePointOfEnd, offset, pageSize); + ArrayList singleResultList = esDataHandle(isOnlyRead, pageData); + tempResultList.addAll(singleResultList); + } + + return tempResultList; + } + + } diff --git a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/StudyCourseESApi.java b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/StudyCourseESApi.java index 40b4f97a..6dd980b1 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/StudyCourseESApi.java +++ b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/StudyCourseESApi.java @@ -168,10 +168,23 @@ public class StudyCourseESApi extends ApiBaseController{ return success(true); } - + /** + * + * @param syncTimePointOfBegin + * @param syncTimePointOfEnd + * @param isOnlyRead + * @return + * @throws IOException + */ @PostMapping("/phpOnlineStudyRecordSyncEs") - public JsonResponse> phpOnlineStudyRecordSyncEs(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException { - List courseStudyDtoList = phpOnlineStudyRecordScheduledTasks.phpOnlineStudyRecordSyncEs(syncTimePointOfBegin,syncTimePointOfEnd, isOnlyRead); + public JsonResponse> phpOnlineStudyRecordSyncEs(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException { + List courseStudyDtoList = phpOnlineStudyRecordScheduledTasks.phpOnlineStudyRecordSyncEs(syncTimePointOfBegin, syncTimePointOfEnd, isOnlyRead); + return success(courseStudyDtoList); + } + + @PostMapping("/phpOnlineStudyRecordSyncEsOfFull") + public JsonResponse> phpOnlineStudyRecordSyncEsOfFull(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer isOnlyRead) throws IOException { + List courseStudyDtoList = phpOnlineStudyRecordScheduledTasks.phpOnlineStudyRecordSyncEsOfFull(syncTimePointOfBegin, syncTimePointOfEnd, isOnlyRead); return success(courseStudyDtoList); } diff --git a/servers/boe-server-all/src/main/java/com/xboe/school/study/dao/PhpOnlineCourseDao.java b/servers/boe-server-all/src/main/java/com/xboe/school/study/dao/PhpOnlineCourseDao.java index 94b64201..fb1cea8f 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/school/study/dao/PhpOnlineCourseDao.java +++ b/servers/boe-server-all/src/main/java/com/xboe/school/study/dao/PhpOnlineCourseDao.java @@ -1,9 +1,280 @@ package com.xboe.school.study.dao; +import cn.hutool.core.collection.CollUtil; import com.xboe.core.orm.BaseDao; import com.xboe.school.study.dto.PhpOnlineDto; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Repository; +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + + @Repository +@Slf4j public class PhpOnlineCourseDao extends BaseDao { + + @Value("${spring.profiles.active}") + private String activeProfile; + + public List selectRecentLearnRecordList(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer offset, Integer limit) { + String sql = + "SELECT\n" + + " elc.kid AS courseId,\n" + + " elcr.user_id AS userIdOfPhp,\n" + + " ROUND((SUM(CASE WHEN elrc.kid IS NOT NULL THEN 1 ELSE 0 END) / COUNT(1)) * 100, 0) AS progress\n" + + "FROM\n" + + " elearninglms.eln_ln_course elc\n" + + " INNER JOIN elearninglms.eln_ln_course_reg elcr \n" + + " ON elc.kid = elcr.course_id\n" + + " INNER JOIN (\n" + + " SELECT\n" + + " user_id,\n" + + " course_id \n" + + " FROM\n" + + " elearninglms.eln_ln_res_complete\n" + + " WHERE\n" + + " complete_type = '1'\n" + + " AND complete_status = '2'\n" + + " AND updated_at > ?1 AND updated_at < ?2\n" + + " AND is_deleted = 0\n" + + " GROUP BY\n" + + " user_id,\n" + + " course_id\n" + + " ) recentFinishStuent \n" + + " ON recentFinishStuent.user_id = elcr.user_id \n" + + " AND recentFinishStuent.course_id = elcr.course_id\n" + + " INNER JOIN elearninglms.eln_ln_mod_res elms \n" + + " ON elms.course_id = elcr.course_id\n" + + " LEFT JOIN elearninglms.eln_ln_res_complete elrc \n" + + " ON elrc.mod_res_id = elms.kid\n" + + " AND elrc.user_id = elcr.user_id\n" + + " AND elrc.complete_type = '1'\n" + + " AND elrc.complete_status = '2'\n" + + "WHERE\n" + + " elc.is_deleted = 0\n" + + " AND elcr.is_deleted = 0\n" + + " AND elcr.reg_state = '1'\n" + + " AND elms.publish_status = '1'\n" + + " AND elms.is_deleted = '0'\n" + + "GROUP BY\n" + + " elc.kid,\n" + + " elcr.user_id\n" + + "LIMIT " + offset + "," + limit; + + + List objectList1 = this.sqlFindList(sql, syncTimePointOfBegin, syncTimePointOfEnd); + if (CollUtil.isEmpty(objectList1)) { + log.info("没有找到已完成的数据"); + return null; + } + + List recentLearnRecordList = new ArrayList<>(); + for (Object[] objects : objectList1) { + String courseId = objects[0].toString(); + String userIdOfPhp = objects[1].toString(); + Integer progress = ((BigDecimal) objects[2]).intValue(); + PhpOnlineDto phpOnlineDto = new PhpOnlineDto(); + phpOnlineDto.setCourseId(courseId); + phpOnlineDto.setUserIdOfPhp(userIdOfPhp); + phpOnlineDto.setProgress(progress); + recentLearnRecordList.add(phpOnlineDto); + } + + // 拼接获取所有新系统用户id,这里不选择与上面的联表查询,有效率问题 + + String userBasicDataBase; + if (activeProfile.equals("pro")) { + userBasicDataBase = "user_basic"; + } else { + userBasicDataBase = "userbasic"; + } + + List userIds = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList()); + List objectList2 = this.sqlFindList("select kid,user_id from " + userBasicDataBase + ".user_account where kid in (?1) and deleted=0 and account_status = 0", userIds); + + if (CollUtil.isEmpty(objectList1)) { + log.info("新系统用户数据不存在"); + return null; + } + + Map userIdToKidMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1])); + + // 设置新系统用户ID + recentLearnRecordList = recentLearnRecordList.stream() + .map(phpOnlineDto -> { + Object userIdOfJavaObj = userIdToKidMap.get(phpOnlineDto.getUserIdOfPhp()); + if (userIdOfJavaObj != null) { + phpOnlineDto.setUserIdOfJava(userIdOfJavaObj.toString()); + } + return phpOnlineDto; + }) + .filter(phpOnlineDto -> phpOnlineDto.getUserIdOfJava() != null) + .collect(Collectors.toList()); + + return recentLearnRecordList; + } + + + public int selectRecentLearnRecordListOfCount(Long syncTimePointOfBegin, Long syncTimePointOfEnd) { + String sql = + "SELECT COUNT(1) \n" + + "FROM (\n" + + " SELECT elc.kid\n" + + " FROM elearninglms.eln_ln_course elc\n" + + " INNER JOIN elearninglms.eln_ln_course_reg elcr\n" + + " ON elc.kid = elcr.course_id\n" + + " INNER JOIN (\n" + + " SELECT user_id, course_id\n" + + " FROM elearninglms.eln_ln_res_complete\n" + + " WHERE complete_type = '1'\n" + + " AND complete_status = '2'\n" + + " AND updated_at > ?1\n" + + " AND updated_at < ?2\n" + + " AND is_deleted = 0\n" + + " GROUP BY user_id, course_id\n" + + " ) recentFinishStuent\n" + + " ON recentFinishStuent.user_id = elcr.user_id\n" + + " AND recentFinishStuent.course_id = elcr.course_id\n" + + " INNER JOIN elearninglms.eln_ln_mod_res elms\n" + + " ON elms.course_id = elcr.course_id\n" + + " LEFT JOIN elearninglms.eln_ln_res_complete elrc\n" + + " ON elrc.mod_res_id = elms.kid\n" + + " AND elrc.user_id = elcr.user_id\n" + + " AND elrc.complete_type = '1'\n" + + " AND elrc.complete_status = '2'\n" + + " WHERE elc.is_deleted = 0\n" + + " AND elcr.is_deleted = 0\n" + + " AND elcr.reg_state = '1'\n" + + " AND elms.publish_status = '1'\n" + + " AND elms.is_deleted = '0'\n" + + " GROUP BY elc.kid, elcr.user_id\n" + + ") temp;\n"; // 每页 1000 条记录,OFFSET 计算分页 + + + int count = this.sqlCount(sql, syncTimePointOfBegin, syncTimePointOfEnd); + return count; + } + + + public List selectRecentRegRecordList(Long syncTimePointOfBegin, Long syncTimePointOfEnd, Integer offset, Integer limit) { + String sql = + "SELECT\n" + + " elc.kid AS courseId,\n" + + " elcr.user_id AS userIdOfPhp,\n" + + " ROUND((SUM(CASE WHEN elrc.kid IS NOT NULL THEN 1 ELSE 0 END) / COUNT(1)) * 100, 0) AS progress\n" + + "FROM\n" + + " elearninglms.eln_ln_course elc\n" + + " INNER JOIN elearninglms.eln_ln_course_reg elcr ON elc.kid = elcr.course_id AND elcr.created_at > ?1 AND elcr.created_at < ?2\n" + + " INNER JOIN elearninglms.eln_ln_mod_res elms \n" + + " ON elms.course_id = elcr.course_id\n" + + " LEFT JOIN elearninglms.eln_ln_res_complete elrc \n" + + " ON elrc.mod_res_id = elms.kid\n" + + " AND elrc.user_id = elcr.user_id\n" + + " AND elrc.complete_type = '1'\n" + + " AND elrc.complete_status = '2'\n" + + "WHERE\n" + + " elc.is_deleted = 0\n" + + " AND elcr.is_deleted = 0\n" + + " AND elcr.reg_state = '1'\n" + + " AND elms.publish_status = '1'\n" + + " AND elms.is_deleted = '0'\n" + + "GROUP BY\n" + + " elc.kid,\n" + + " elcr.user_id\n" + + "LIMIT " + offset + "," + limit; + + + List objectList1 = this.sqlFindList(sql, syncTimePointOfBegin, syncTimePointOfEnd); + if (CollUtil.isEmpty(objectList1)) { + log.info("没有找到已完成的数据"); + return null; + } + + List recentLearnRecordList = new ArrayList<>(); + for (Object[] objects : objectList1) { + String courseId = objects[0].toString(); + String userIdOfPhp = objects[1].toString(); + Integer progress = ((BigDecimal) objects[2]).intValue(); + PhpOnlineDto phpOnlineDto = new PhpOnlineDto(); + phpOnlineDto.setCourseId(courseId); + phpOnlineDto.setUserIdOfPhp(userIdOfPhp); + phpOnlineDto.setProgress(progress); + recentLearnRecordList.add(phpOnlineDto); + } + + // 拼接获取所有新系统用户id,这里不选择与上面的联表查询,有效率问题 + + String userBasicDataBase; + if (activeProfile.equals("pro")) { + userBasicDataBase = "user_basic"; + } else { + userBasicDataBase = "userbasic"; + } + + List userIds = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList()); + List objectList2 = this.sqlFindList("select kid,user_id from " + userBasicDataBase + ".user_account where kid in (?1) and deleted=0 and account_status = 0", userIds); + + if (CollUtil.isEmpty(objectList1)) { + log.info("新系统用户数据不存在"); + return null; + } + + Map userIdToKidMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1])); + + // 设置新系统用户ID + recentLearnRecordList = recentLearnRecordList.stream() + .map(phpOnlineDto -> { + Object userIdOfJavaObj = userIdToKidMap.get(phpOnlineDto.getUserIdOfPhp()); + if (userIdOfJavaObj != null) { + phpOnlineDto.setUserIdOfJava(userIdOfJavaObj.toString()); + } + return phpOnlineDto; + }) + .filter(phpOnlineDto -> phpOnlineDto.getUserIdOfJava() != null) + .collect(Collectors.toList()); + + return recentLearnRecordList; + } + + public int selectRecentRegRecordListOfCount(Long syncTimePointOfBegin, Long syncTimePointOfEnd) { + String sql = + "SELECT COUNT(1)\n" + + "FROM (\n" + + " SELECT\n" + + " elc.kid\n" + + " FROM\n" + + " elearninglms.eln_ln_course elc\n" + + " INNER JOIN elearninglms.eln_ln_course_reg elcr \n" + + " ON elc.kid = elcr.course_id\n" + + " AND elcr.created_at > ?1\n" + + " AND elcr.created_at < ?2\n" + + " INNER JOIN elearninglms.eln_ln_mod_res elms \n" + + " ON elms.course_id = elcr.course_id\n" + + " LEFT JOIN elearninglms.eln_ln_res_complete elrc \n" + + " ON elrc.mod_res_id = elms.kid\n" + + " AND elrc.user_id = elcr.user_id\n" + + " AND elrc.complete_type = '1'\n" + + " AND elrc.complete_status = '2'\n" + + " WHERE\n" + + " elc.is_deleted = 0\n" + + " AND elcr.is_deleted = 0\n" + + " AND elcr.reg_state = '1'\n" + + " AND elms.publish_status = '1'\n" + + " AND elms.is_deleted = '0'\n" + + " GROUP BY\n" + + " elc.kid,\n" + + " elcr.user_id\n" + + ") temp;\n"; // 每页 1000 条记录,OFFSET 计算分页 + + + int count = this.sqlCount(sql, syncTimePointOfBegin, syncTimePointOfEnd); + return count; + } + + } From e412f5b6eb90a08074dfd9b89a62ddc262091330 Mon Sep 17 00:00:00 2001 From: yang <1175@qq.com> Date: Sun, 15 Dec 2024 20:27:47 +0800 Subject: [PATCH 6/6] =?UTF-8?q?es=E6=95=B0=E6=8D=AE=E5=90=8C=E6=AD=A5,?= =?UTF-8?q?=E5=85=A8=E9=87=8F=E3=80=81=E5=88=86=E9=A1=B5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../school/study/api/PhpOnlineStudyRecordScheduledTasks.java | 3 ++- .../java/com/xboe/school/study/dao/PhpOnlineCourseDao.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java index a57aa8f6..a65efa9a 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java +++ b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java @@ -7,6 +7,7 @@ import com.xboe.school.study.dao.PhpOnlineCourseDao; import com.xboe.school.study.dto.PhpOnlineDto; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.search.SearchRequest; @@ -139,7 +140,7 @@ public class PhpOnlineStudyRecordScheduledTasks { BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); for (PhpOnlineDto phpOnlineDto : finishedCourseList) { - if (phpOnlineDto.getUserIdOfJava() == null || phpOnlineDto.getCourseId() == null) { + if (StringUtils.isBlank(phpOnlineDto.getUserIdOfJava()) || StringUtils.isBlank(phpOnlineDto.getCourseId())) { continue; } diff --git a/servers/boe-server-all/src/main/java/com/xboe/school/study/dao/PhpOnlineCourseDao.java b/servers/boe-server-all/src/main/java/com/xboe/school/study/dao/PhpOnlineCourseDao.java index fb1cea8f..67e05d59 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/school/study/dao/PhpOnlineCourseDao.java +++ b/servers/boe-server-all/src/main/java/com/xboe/school/study/dao/PhpOnlineCourseDao.java @@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollUtil; import com.xboe.core.orm.BaseDao; import com.xboe.school.study.dto.PhpOnlineDto; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Repository; @@ -113,7 +114,7 @@ public class PhpOnlineCourseDao extends BaseDao { } return phpOnlineDto; }) - .filter(phpOnlineDto -> phpOnlineDto.getUserIdOfJava() != null) + .filter(phpOnlineDto -> StringUtils.isNotBlank(phpOnlineDto.getUserIdOfJava())) .collect(Collectors.toList()); return recentLearnRecordList;