es数据同步-在线课

This commit is contained in:
yang
2024-12-12 17:43:35 +08:00
parent e54c8b457b
commit 4ec4b861f7
4 changed files with 294 additions and 0 deletions

View File

@@ -0,0 +1,245 @@
package com.xboe.school.study.api;
import cn.hutool.core.collection.CollUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.xboe.module.course.dto.CourseStudyDto;
import com.xboe.school.study.dao.PhpOnlineCourseDao;
import com.xboe.school.study.dto.PhpOnlineDto;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Component
@Slf4j
public class PhpOnlineStudyRecordScheduledTasks {
@Resource
private PhpOnlineCourseDao phpOnlineCourseDao;
@Resource
RestHighLevelClient restHighLevelClient;
// todo 定时、分批、数据库名
@XxlJob("phpOnlineStudyRecordSyncEs")
public void phpOnlineStudyRecordSyncEs(Long syncTimePoint) throws IOException {
String sql =
"SELECT\n" +
" elc.kid AS courseId,\n" +
" elcr.user_id AS userIdOfPhp,\n" +
" COUNT(1) AS modNum,\n" +
" SUM(CASE WHEN elrc.kid IS NOT NULL THEN 1 ELSE 0 END) AS completeNum\n" +
"FROM\n" +
" elearninglms.eln_ln_course elc\n" +
" INNER JOIN elearninglms.eln_ln_course_reg elcr \n" +
" ON elc.kid = elcr.course_id\n" +
" INNER JOIN (\n" +
" SELECT\n" +
" user_id,\n" +
" course_id \n" +
" FROM\n" +
" elearninglms.eln_ln_res_complete\n" +
" WHERE\n" +
" complete_type = '1'\n" +
" AND complete_status = '2'\n" +
" AND updated_at > ?1\n" +
" AND is_deleted = 0\n" +
" GROUP BY\n" +
" user_id,\n" +
" course_id\n" +
" ) recentFinishStuent \n" +
" ON recentFinishStuent.user_id = elcr.user_id \n" +
" AND recentFinishStuent.course_id = elcr.course_id\n" +
" INNER JOIN elearninglms.eln_ln_mod_res elms \n" +
" ON elms.course_id = elcr.course_id\n" +
" LEFT JOIN elearninglms.eln_ln_res_complete elrc \n" +
" ON elrc.mod_res_id = elms.kid\n" +
" AND elrc.user_id = elcr.user_id\n" +
" AND elrc.complete_type = '1'\n" +
" AND elrc.complete_status = '2'\n" +
"WHERE\n" +
" elc.is_deleted = 0\n" +
" AND elcr.is_deleted = 0\n" +
" AND elcr.reg_state = '1'\n" +
" AND elms.publish_status = '1'\n" +
" AND elms.is_deleted = '0'\n" +
"GROUP BY\n" +
" elc.kid,\n" +
" elcr.user_id\n" +
"HAVING\n" +
" completeNum = modNum\n";
log.info("开始同步PHP学习记录到ES");
// 增量获取PHP中所有已完成的课程
if (syncTimePoint == null) {
LocalDateTime halfAnHourAgo = LocalDateTime.now().minusMinutes(30);
syncTimePoint = halfAnHourAgo.atZone(ZoneId.systemDefault()).toInstant().getEpochSecond();
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
log.info("同步时间点:{}", formatter.format(halfAnHourAgo));
}
List<Object[]> objectList1 = phpOnlineCourseDao.sqlFindList(sql, syncTimePoint);
if (CollUtil.isEmpty(objectList1)) {
log.info("没有找到已完成的数据");
return;
}
String indexName = "new_study_resource";
List<PhpOnlineDto> finishedCourseList = new ArrayList<>();
for (Object[] objects : objectList1) {
String courseId = objects[0].toString();
String userIdOfPhp = objects[1].toString();
PhpOnlineDto phpOnlineDto = new PhpOnlineDto();
phpOnlineDto.setCourseId(courseId);
phpOnlineDto.setUserIdOfPhp(userIdOfPhp);
finishedCourseList.add(phpOnlineDto);
}
// 拼接获取所有新系统用户id这里不选择与上面的联表查询有效率问题
List<String> userIds = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList());
List<Object[]> objectList2 = phpOnlineCourseDao.sqlFindList("select kid,user_id from userbasic.user_account where kid in (?1) and deleted=0 and account_status = 0", userIds);
if (CollUtil.isEmpty(objectList1)) {
log.info("新系统用户数据不存在");
return;
}
Map<Object, Object> userIdToKidMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1]));
// 设置新系统用户ID
finishedCourseList = finishedCourseList.stream()
.map(phpOnlineDto -> {
Object userIdOfJavaObj = userIdToKidMap.get(phpOnlineDto.getUserIdOfPhp());
if (userIdOfJavaObj != null) {
phpOnlineDto.setUserIdOfJava(userIdOfJavaObj.toString());
}
return phpOnlineDto;
})
.filter(phpOnlineDto -> phpOnlineDto.getUserIdOfJava() != null)
.collect(Collectors.toList());
// 获取ES中没有完成的的课程学习记录
List<CourseStudyDto> notFinishedCourseList = getEsData(finishedCourseList);
Set<String> finishedCourseSet = finishedCourseList.stream().map(e -> e.getUserIdOfJava() + "_" + e.getCourseId()).collect(Collectors.toSet());
List<CourseStudyDto> toBeUpdatedEs = notFinishedCourseList.stream().filter(courseStudyDto -> {
if (finishedCourseSet.contains(courseStudyDto.getAccountId() + "_" + courseStudyDto.getCourseId())) {
return true;
} else {
// 有多种情况es不存在学习记录或者学习记录状态不是9
log.info("跳过:{}", courseStudyDto);
return false;
}
}).map(courseStudyDto -> {
courseStudyDto.setStatus(9);
courseStudyDto.setProgress(100);
return courseStudyDto;
}).collect(Collectors.toList());
// 更新ES中的未同步为完成的学习记录
toBeUpdatedEs(toBeUpdatedEs, indexName);
}
private List<CourseStudyDto> getEsData(List<PhpOnlineDto> finishedCourseList) throws IOException {
SearchRequest searchRequest = new SearchRequest("new_study_resource");
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
for (PhpOnlineDto phpOnlineDto : finishedCourseList) {
if (phpOnlineDto.getUserIdOfJava() == null || phpOnlineDto.getCourseId() == null) {
continue;
}
boolQuery.should(QueryBuilders.boolQuery()
.must(QueryBuilders.termQuery("courseId.keyword", phpOnlineDto.getCourseId()))
.must(QueryBuilders.termQuery("accountId.keyword", phpOnlineDto.getUserIdOfJava()))
.mustNot(QueryBuilders.termQuery("status", 9))
);
}
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(boolQuery).timeout(new TimeValue(60, TimeUnit.SECONDS));
searchRequest.source(sourceBuilder);
if (boolQuery.hasClauses()) {
sourceBuilder.query(boolQuery);
}
searchRequest.source(sourceBuilder);
SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
List<CourseStudyDto> courseStudyDtoList = new ArrayList<>();
ObjectMapper mapper = new ObjectMapper();
for (SearchHit hit : hits) {
String sourceAsString = hit.getSourceAsString();
try {
CourseStudyDto cft = mapper.readValue(sourceAsString, CourseStudyDto.class);
courseStudyDtoList.add(cft);
} catch (Exception e) {
log.error("转化json到对应失败", sourceAsString);
}
}
return courseStudyDtoList;
}
private void toBeUpdatedEs(List<CourseStudyDto> toBeUpdatedEsData, String indexName) {
if (CollUtil.isEmpty(toBeUpdatedEsData)) {
return;
}
BulkRequest bulkRequest = new BulkRequest();
for (CourseStudyDto courseStudyDto : toBeUpdatedEsData) {
Map<String, Object> docMap = new HashMap<>();
docMap.put("status", courseStudyDto.getStatus());
docMap.put("progress", courseStudyDto.getProgress());
// 创建更新请求并传入单一的docMap
UpdateRequest updateRequest = new UpdateRequest(indexName, courseStudyDto.getId())
.doc(docMap, XContentType.JSON);
// 将请求添加到批量请求中
bulkRequest.add(updateRequest);
}
try {
// 执行批量更新操作
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
// 检查是否有失败的操作
if (bulkResponse.hasFailures()) {
log.error("批量更新失败: {}", bulkResponse.buildFailureMessage());
} else {
List<String> collect = toBeUpdatedEsData.stream().map(CourseStudyDto::getId).collect(Collectors.toList());
log.info("批量更新成功更新的ES ID列表: {}", collect);
}
} catch (IOException e) {
log.error("执行批量更新时发生错误", e);
}
}
}

View File

@@ -24,6 +24,8 @@ import com.xboe.school.study.service.IStudyCourseService;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Resource;
/**
* ES的课程检索
* @author seastar
@@ -39,6 +41,9 @@ public class StudyCourseESApi extends ApiBaseController{
@Autowired
IStudyCourseService service;
@Resource
private PhpOnlineStudyRecordScheduledTasks phpOnlineStudyRecordScheduledTasks;
@RequestMapping(value="/search",method = {RequestMethod.GET,RequestMethod.POST})
public JsonResponse<PageList<CourseStudyDto>> search(Pagination page, CourseStudyDto dto){
@@ -161,4 +166,11 @@ public class StudyCourseESApi extends ApiBaseController{
return success(true);
}
@PostMapping("/phpOnlineStudyRecordSyncEs")
public JsonResponse<Boolean> phpOnlineStudyRecordSyncEs(Long syncTimePoint) throws IOException {
phpOnlineStudyRecordScheduledTasks.phpOnlineStudyRecordSyncEs(syncTimePoint);
return success(true);
}
}

View File

@@ -0,0 +1,9 @@
package com.xboe.school.study.dao;
import com.xboe.core.orm.BaseDao;
import com.xboe.school.study.dto.PhpOnlineDto;
import org.springframework.stereotype.Repository;
@Repository
public class PhpOnlineCourseDao extends BaseDao<PhpOnlineDto> {
}

View File

@@ -0,0 +1,28 @@
package com.xboe.school.study.dto;
import lombok.Data;
/**
* 批量报名
*
* @author seastar
*/
@Data
public class PhpOnlineDto {
/**
* 课程id
*/
private String courseId;
/**
* 课程名称
*/
private String userIdOfPhp;
/**
* 课程名称
*/
private String userIdOfJava;
}