From 4ec4b861f78e95a8f0c21f8a723178eeb716ef54 Mon Sep 17 00:00:00 2001 From: yang <1175@qq.com> Date: Thu, 12 Dec 2024 17:43:35 +0800 Subject: [PATCH] =?UTF-8?q?es=E6=95=B0=E6=8D=AE=E5=90=8C=E6=AD=A5-?= =?UTF-8?q?=E5=9C=A8=E7=BA=BF=E8=AF=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../PhpOnlineStudyRecordScheduledTasks.java | 245 ++++++++++++++++++ .../school/study/api/StudyCourseESApi.java | 12 + .../school/study/dao/PhpOnlineCourseDao.java | 9 + .../xboe/school/study/dto/PhpOnlineDto.java | 28 ++ 4 files changed, 294 insertions(+) create mode 100644 servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java create mode 100644 servers/boe-server-all/src/main/java/com/xboe/school/study/dao/PhpOnlineCourseDao.java create mode 100644 servers/boe-server-all/src/main/java/com/xboe/school/study/dto/PhpOnlineDto.java diff --git a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java new file mode 100644 index 00000000..ad2a3550 --- /dev/null +++ b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/PhpOnlineStudyRecordScheduledTasks.java @@ -0,0 +1,245 @@ +package com.xboe.school.study.api; + +import cn.hutool.core.collection.CollUtil; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.xboe.module.course.dto.CourseStudyDto; +import com.xboe.school.study.dao.PhpOnlineCourseDao; +import com.xboe.school.study.dto.PhpOnlineDto; +import com.xxl.job.core.handler.annotation.XxlJob; +import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@Component +@Slf4j +public class PhpOnlineStudyRecordScheduledTasks { + + @Resource + private PhpOnlineCourseDao phpOnlineCourseDao; + + @Resource + RestHighLevelClient restHighLevelClient; + + + // todo 定时、分批、数据库名 + @XxlJob("phpOnlineStudyRecordSyncEs") + public void phpOnlineStudyRecordSyncEs(Long syncTimePoint) throws IOException { + String sql = + "SELECT\n" + + " elc.kid AS courseId,\n" + + " elcr.user_id AS userIdOfPhp,\n" + + " COUNT(1) AS modNum,\n" + + " SUM(CASE WHEN elrc.kid IS NOT NULL THEN 1 ELSE 0 END) AS completeNum\n" + + "FROM\n" + + " elearninglms.eln_ln_course elc\n" + + " INNER JOIN elearninglms.eln_ln_course_reg elcr \n" + + " ON elc.kid = elcr.course_id\n" + + " INNER JOIN (\n" + + " SELECT\n" + + " user_id,\n" + + " course_id \n" + + " FROM\n" + + " elearninglms.eln_ln_res_complete\n" + + " WHERE\n" + + " complete_type = '1'\n" + + " AND complete_status = '2'\n" + + " AND updated_at > ?1\n" + + " AND is_deleted = 0\n" + + " GROUP BY\n" + + " user_id,\n" + + " course_id\n" + + " ) recentFinishStuent \n" + + " ON recentFinishStuent.user_id = elcr.user_id \n" + + " AND recentFinishStuent.course_id = elcr.course_id\n" + + " INNER JOIN elearninglms.eln_ln_mod_res elms \n" + + " ON elms.course_id = elcr.course_id\n" + + " LEFT JOIN elearninglms.eln_ln_res_complete elrc \n" + + " ON elrc.mod_res_id = elms.kid\n" + + " AND elrc.user_id = elcr.user_id\n" + + " AND elrc.complete_type = '1'\n" + + " AND elrc.complete_status = '2'\n" + + "WHERE\n" + + " elc.is_deleted = 0\n" + + " AND elcr.is_deleted = 0\n" + + " AND elcr.reg_state = '1'\n" + + " AND elms.publish_status = '1'\n" + + " AND elms.is_deleted = '0'\n" + + "GROUP BY\n" + + " elc.kid,\n" + + " elcr.user_id\n" + + "HAVING\n" + + " completeNum = modNum\n"; + log.info("开始同步PHP学习记录到ES"); + // 增量获取PHP中所有已完成的课程 + if (syncTimePoint == null) { + LocalDateTime halfAnHourAgo = LocalDateTime.now().minusMinutes(30); + syncTimePoint = halfAnHourAgo.atZone(ZoneId.systemDefault()).toInstant().getEpochSecond(); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + log.info("同步时间点:{}", formatter.format(halfAnHourAgo)); + } + List objectList1 = phpOnlineCourseDao.sqlFindList(sql, syncTimePoint); + + + + if (CollUtil.isEmpty(objectList1)) { + log.info("没有找到已完成的数据"); + return; + } + + String indexName = "new_study_resource"; + + List finishedCourseList = new ArrayList<>(); + for (Object[] objects : objectList1) { + String courseId = objects[0].toString(); + String userIdOfPhp = objects[1].toString(); + PhpOnlineDto phpOnlineDto = new PhpOnlineDto(); + phpOnlineDto.setCourseId(courseId); + phpOnlineDto.setUserIdOfPhp(userIdOfPhp); + finishedCourseList.add(phpOnlineDto); + } + + // 拼接获取所有新系统用户id,这里不选择与上面的联表查询,有效率问题 + List userIds = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList()); + List objectList2 = phpOnlineCourseDao.sqlFindList("select kid,user_id from userbasic.user_account where kid in (?1) and deleted=0 and account_status = 0", userIds); + + if (CollUtil.isEmpty(objectList1)) { + log.info("新系统用户数据不存在"); + return; + } + + Map userIdToKidMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1])); + + // 设置新系统用户ID + finishedCourseList = finishedCourseList.stream() + .map(phpOnlineDto -> { + Object userIdOfJavaObj = userIdToKidMap.get(phpOnlineDto.getUserIdOfPhp()); + if (userIdOfJavaObj != null) { + phpOnlineDto.setUserIdOfJava(userIdOfJavaObj.toString()); + } + return phpOnlineDto; + }) + .filter(phpOnlineDto -> phpOnlineDto.getUserIdOfJava() != null) + .collect(Collectors.toList()); + + + // 获取ES中没有完成的的课程学习记录 + List notFinishedCourseList = getEsData(finishedCourseList); + + Set finishedCourseSet = finishedCourseList.stream().map(e -> e.getUserIdOfJava() + "_" + e.getCourseId()).collect(Collectors.toSet()); + + List toBeUpdatedEs = notFinishedCourseList.stream().filter(courseStudyDto -> { + if (finishedCourseSet.contains(courseStudyDto.getAccountId() + "_" + courseStudyDto.getCourseId())) { + return true; + } else { + // 有多种情况,es不存在学习记录,或者学习记录状态不是9 + log.info("跳过:{}", courseStudyDto); + return false; + } + }).map(courseStudyDto -> { + courseStudyDto.setStatus(9); + courseStudyDto.setProgress(100); + return courseStudyDto; + }).collect(Collectors.toList()); + + // 更新ES中的未同步为完成的学习记录 + toBeUpdatedEs(toBeUpdatedEs, indexName); + + } + + private List getEsData(List finishedCourseList) throws IOException { + SearchRequest searchRequest = new SearchRequest("new_study_resource"); + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); + + for (PhpOnlineDto phpOnlineDto : finishedCourseList) { + if (phpOnlineDto.getUserIdOfJava() == null || phpOnlineDto.getCourseId() == null) { + continue; + } + + boolQuery.should(QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery("courseId.keyword", phpOnlineDto.getCourseId())) + .must(QueryBuilders.termQuery("accountId.keyword", phpOnlineDto.getUserIdOfJava())) + .mustNot(QueryBuilders.termQuery("status", 9)) + ); + } + + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(boolQuery).timeout(new TimeValue(60, TimeUnit.SECONDS)); + searchRequest.source(sourceBuilder); + if (boolQuery.hasClauses()) { + sourceBuilder.query(boolQuery); + } + searchRequest.source(sourceBuilder); + + SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); + SearchHits hits = response.getHits(); + + List courseStudyDtoList = new ArrayList<>(); + ObjectMapper mapper = new ObjectMapper(); + for (SearchHit hit : hits) { + String sourceAsString = hit.getSourceAsString(); + try { + CourseStudyDto cft = mapper.readValue(sourceAsString, CourseStudyDto.class); + courseStudyDtoList.add(cft); + } catch (Exception e) { + log.error("转化json到对应失败", sourceAsString); + } + } + return courseStudyDtoList; + } + + private void toBeUpdatedEs(List toBeUpdatedEsData, String indexName) { + if (CollUtil.isEmpty(toBeUpdatedEsData)) { + return; + } + BulkRequest bulkRequest = new BulkRequest(); + for (CourseStudyDto courseStudyDto : toBeUpdatedEsData) { + Map docMap = new HashMap<>(); + docMap.put("status", courseStudyDto.getStatus()); + docMap.put("progress", courseStudyDto.getProgress()); + + // 创建更新请求,并传入单一的docMap + UpdateRequest updateRequest = new UpdateRequest(indexName, courseStudyDto.getId()) + .doc(docMap, XContentType.JSON); + + // 将请求添加到批量请求中 + bulkRequest.add(updateRequest); + } + + try { + // 执行批量更新操作 + BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); + // 检查是否有失败的操作 + if (bulkResponse.hasFailures()) { + log.error("批量更新失败: {}", bulkResponse.buildFailureMessage()); + } else { + List collect = toBeUpdatedEsData.stream().map(CourseStudyDto::getId).collect(Collectors.toList()); + log.info("批量更新成功,更新的ES ID列表: {}", collect); + } + } catch (IOException e) { + log.error("执行批量更新时发生错误", e); + } + } + + +} diff --git a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/StudyCourseESApi.java b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/StudyCourseESApi.java index ddfa50f8..0ad3bab6 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/school/study/api/StudyCourseESApi.java +++ b/servers/boe-server-all/src/main/java/com/xboe/school/study/api/StudyCourseESApi.java @@ -24,6 +24,8 @@ import com.xboe.school.study.service.IStudyCourseService; import lombok.extern.slf4j.Slf4j; +import javax.annotation.Resource; + /** * ES的课程检索 * @author seastar @@ -39,6 +41,9 @@ public class StudyCourseESApi extends ApiBaseController{ @Autowired IStudyCourseService service; + + @Resource + private PhpOnlineStudyRecordScheduledTasks phpOnlineStudyRecordScheduledTasks; @RequestMapping(value="/search",method = {RequestMethod.GET,RequestMethod.POST}) public JsonResponse> search(Pagination page, CourseStudyDto dto){ @@ -161,4 +166,11 @@ public class StudyCourseESApi extends ApiBaseController{ return success(true); } + + + @PostMapping("/phpOnlineStudyRecordSyncEs") + public JsonResponse phpOnlineStudyRecordSyncEs(Long syncTimePoint) throws IOException { + phpOnlineStudyRecordScheduledTasks.phpOnlineStudyRecordSyncEs(syncTimePoint); + return success(true); + } } diff --git a/servers/boe-server-all/src/main/java/com/xboe/school/study/dao/PhpOnlineCourseDao.java b/servers/boe-server-all/src/main/java/com/xboe/school/study/dao/PhpOnlineCourseDao.java new file mode 100644 index 00000000..94b64201 --- /dev/null +++ b/servers/boe-server-all/src/main/java/com/xboe/school/study/dao/PhpOnlineCourseDao.java @@ -0,0 +1,9 @@ +package com.xboe.school.study.dao; + +import com.xboe.core.orm.BaseDao; +import com.xboe.school.study.dto.PhpOnlineDto; +import org.springframework.stereotype.Repository; + +@Repository +public class PhpOnlineCourseDao extends BaseDao { +} diff --git a/servers/boe-server-all/src/main/java/com/xboe/school/study/dto/PhpOnlineDto.java b/servers/boe-server-all/src/main/java/com/xboe/school/study/dto/PhpOnlineDto.java new file mode 100644 index 00000000..9d77ca77 --- /dev/null +++ b/servers/boe-server-all/src/main/java/com/xboe/school/study/dto/PhpOnlineDto.java @@ -0,0 +1,28 @@ +package com.xboe.school.study.dto; + +import lombok.Data; + +/** + * 批量报名 + * + * @author seastar + */ +@Data +public class PhpOnlineDto { + + /** + * 课程id + */ + private String courseId; + + /** + * 课程名称 + */ + private String userIdOfPhp; + + + /** + * 课程名称 + */ + private String userIdOfJava; +}