mirror of
https://codeup.aliyun.com/67762337eccfc218f6110e0e/per-boe/java-servers.git
synced 2025-12-10 11:26:50 +08:00
es数据同步
This commit is contained in:
@@ -45,7 +45,7 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
|||||||
|
|
||||||
// todo 定时、分批、数据库名
|
// todo 定时、分批、数据库名
|
||||||
@XxlJob("phpOnlineStudyRecordSyncEs")
|
@XxlJob("phpOnlineStudyRecordSyncEs")
|
||||||
public void phpOnlineStudyRecordSyncEs(Long syncTimePoint) throws IOException {
|
public List<CourseStudyDto> phpOnlineStudyRecordSyncEs(Long syncTimePoint, Integer isOnlyRead) throws IOException {
|
||||||
String sql =
|
String sql =
|
||||||
"SELECT\n" +
|
"SELECT\n" +
|
||||||
" elc.kid AS courseId,\n" +
|
" elc.kid AS courseId,\n" +
|
||||||
@@ -101,11 +101,9 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
|||||||
}
|
}
|
||||||
List<Object[]> objectList1 = phpOnlineCourseDao.sqlFindList(sql, syncTimePoint);
|
List<Object[]> objectList1 = phpOnlineCourseDao.sqlFindList(sql, syncTimePoint);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if (CollUtil.isEmpty(objectList1)) {
|
if (CollUtil.isEmpty(objectList1)) {
|
||||||
log.info("没有找到已完成的数据");
|
log.info("没有找到已完成的数据");
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
String indexName = "new_study_resource";
|
String indexName = "new_study_resource";
|
||||||
@@ -122,11 +120,11 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
|||||||
|
|
||||||
// 拼接获取所有新系统用户id,这里不选择与上面的联表查询,有效率问题
|
// 拼接获取所有新系统用户id,这里不选择与上面的联表查询,有效率问题
|
||||||
List<String> userIds = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList());
|
List<String> userIds = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList());
|
||||||
List<Object[]> objectList2 = phpOnlineCourseDao.sqlFindList("select kid,user_id from userbasic.user_account where kid in (?1) and deleted=0 and account_status = 0", userIds);
|
List<Object[]> objectList2 = phpOnlineCourseDao.sqlFindList("select kid,user_id from user_basic.user_account where kid in (?1) and deleted=0 and account_status = 0", userIds);
|
||||||
|
|
||||||
if (CollUtil.isEmpty(objectList1)) {
|
if (CollUtil.isEmpty(objectList1)) {
|
||||||
log.info("新系统用户数据不存在");
|
log.info("新系统用户数据不存在");
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<Object, Object> userIdToKidMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1]));
|
Map<Object, Object> userIdToKidMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1]));
|
||||||
@@ -147,17 +145,11 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
|||||||
// 获取ES中没有完成的的课程学习记录
|
// 获取ES中没有完成的的课程学习记录
|
||||||
List<CourseStudyDto> notFinishedCourseList = getEsData(finishedCourseList);
|
List<CourseStudyDto> notFinishedCourseList = getEsData(finishedCourseList);
|
||||||
|
|
||||||
Set<String> finishedCourseSet = finishedCourseList.stream().map(e -> e.getUserIdOfJava() + "_" + e.getCourseId()).collect(Collectors.toSet());
|
if (isOnlyRead != null && isOnlyRead == 1) {
|
||||||
|
return notFinishedCourseList;
|
||||||
|
}
|
||||||
|
|
||||||
List<CourseStudyDto> toBeUpdatedEs = notFinishedCourseList.stream().filter(courseStudyDto -> {
|
List<CourseStudyDto> toBeUpdatedEs = notFinishedCourseList.stream().map(courseStudyDto -> {
|
||||||
if (finishedCourseSet.contains(courseStudyDto.getAccountId() + "_" + courseStudyDto.getCourseId())) {
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
// 有多种情况,es不存在学习记录,或者学习记录状态不是9
|
|
||||||
log.info("跳过:{}", courseStudyDto);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}).map(courseStudyDto -> {
|
|
||||||
courseStudyDto.setStatus(9);
|
courseStudyDto.setStatus(9);
|
||||||
courseStudyDto.setProgress(100);
|
courseStudyDto.setProgress(100);
|
||||||
return courseStudyDto;
|
return courseStudyDto;
|
||||||
@@ -165,7 +157,7 @@ public class PhpOnlineStudyRecordScheduledTasks {
|
|||||||
|
|
||||||
// 更新ES中的未同步为完成的学习记录
|
// 更新ES中的未同步为完成的学习记录
|
||||||
toBeUpdatedEs(toBeUpdatedEs, indexName);
|
toBeUpdatedEs(toBeUpdatedEs, indexName);
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<CourseStudyDto> getEsData(List<PhpOnlineDto> finishedCourseList) throws IOException {
|
private List<CourseStudyDto> getEsData(List<PhpOnlineDto> finishedCourseList) throws IOException {
|
||||||
|
|||||||
@@ -0,0 +1,215 @@
|
|||||||
|
package com.xboe.school.study.api;
|
||||||
|
|
||||||
|
import cn.hutool.core.collection.CollUtil;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.xboe.module.course.dto.CourseStudyDto;
|
||||||
|
import com.xboe.school.study.dao.PhpOnlineCourseDao;
|
||||||
|
import com.xboe.school.study.dto.PhpOnlineDto;
|
||||||
|
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.elasticsearch.action.bulk.BulkRequest;
|
||||||
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
|
import org.elasticsearch.action.search.SearchRequest;
|
||||||
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
|
import org.elasticsearch.action.update.UpdateRequest;
|
||||||
|
import org.elasticsearch.client.RequestOptions;
|
||||||
|
import org.elasticsearch.client.RestHighLevelClient;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
|
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||||
|
import org.elasticsearch.index.query.QueryBuilders;
|
||||||
|
import org.elasticsearch.search.SearchHit;
|
||||||
|
import org.elasticsearch.search.SearchHits;
|
||||||
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.ZoneId;
|
||||||
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
@Slf4j
|
||||||
|
public class PhpOnlineStudyRecordScheduledTasks2 {
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private PhpOnlineCourseDao phpOnlineCourseDao;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
RestHighLevelClient restHighLevelClient;
|
||||||
|
|
||||||
|
|
||||||
|
// todo 定时、分批、数据库名
|
||||||
|
@XxlJob("phpOnlineStudyRecordSyncEs")
|
||||||
|
public List<CourseStudyDto> phpOnlineStudyRecordSyncEs(Long syncTimePoint, Integer isOnlyRead) throws IOException {
|
||||||
|
String sql =
|
||||||
|
"SELECT\n" +
|
||||||
|
" course_id,user_id,status,progress \n" +
|
||||||
|
"FROM\n" +
|
||||||
|
" elearninglms.eln_ln_res_complete \n" +
|
||||||
|
"WHERE\n" +
|
||||||
|
" complete_type = '1' \n" +
|
||||||
|
" AND complete_status = '2' \n" +
|
||||||
|
" AND updated_at > ?1 \n" +
|
||||||
|
" AND is_deleted=0\n" +
|
||||||
|
"GROUP BY\n" +
|
||||||
|
" course_id,user_id";
|
||||||
|
|
||||||
|
log.info("开始同步PHP学习记录到ES");
|
||||||
|
// 增量获取PHP中所有已完成的课程
|
||||||
|
if (syncTimePoint == null) {
|
||||||
|
LocalDateTime halfAnHourAgo = LocalDateTime.now().minusMinutes(30);
|
||||||
|
syncTimePoint = halfAnHourAgo.atZone(ZoneId.systemDefault()).toInstant().getEpochSecond();
|
||||||
|
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
|
||||||
|
log.info("同步时间点:{}", formatter.format(halfAnHourAgo));
|
||||||
|
}
|
||||||
|
List<Object[]> objectList1 = phpOnlineCourseDao.sqlFindList(sql, syncTimePoint);
|
||||||
|
|
||||||
|
if (CollUtil.isEmpty(objectList1)) {
|
||||||
|
log.info("没有找到已完成的数据");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
String indexName = "new_study_resource";
|
||||||
|
|
||||||
|
List<PhpOnlineDto> recentLearnRecordList = new ArrayList<>();
|
||||||
|
for (Object[] objects : objectList1) {
|
||||||
|
String courseId = objects[0].toString();
|
||||||
|
String userIdOfPhp = objects[1].toString();
|
||||||
|
int status = Integer.parseInt(objects[2].toString());
|
||||||
|
int progress = Integer.parseInt(objects[3].toString());
|
||||||
|
PhpOnlineDto phpOnlineDto = new PhpOnlineDto();
|
||||||
|
phpOnlineDto.setCourseId(courseId);
|
||||||
|
phpOnlineDto.setUserIdOfPhp(userIdOfPhp);
|
||||||
|
phpOnlineDto.setProgress(progress);
|
||||||
|
phpOnlineDto.setStatus(status);
|
||||||
|
recentLearnRecordList.add(phpOnlineDto);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 拼接获取所有新系统用户id,这里不选择与上面的联表查询,有效率问题
|
||||||
|
List<String> userIds = objectList1.stream().map(objects -> String.valueOf(objects[1])).distinct().collect(Collectors.toList());
|
||||||
|
List<Object[]> objectList2 = phpOnlineCourseDao.sqlFindList("select kid,user_id from user_basic.user_account where kid in (?1) and deleted=0 and account_status = 0", userIds);
|
||||||
|
|
||||||
|
if (CollUtil.isEmpty(objectList1)) {
|
||||||
|
log.info("新系统用户数据不存在");
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<Object, Object> userIdToKidMap = objectList2.stream().collect(Collectors.toMap(object -> object[0], object -> object[1]));
|
||||||
|
|
||||||
|
// 设置新系统用户ID
|
||||||
|
recentLearnRecordList = recentLearnRecordList.stream()
|
||||||
|
.map(phpOnlineDto -> {
|
||||||
|
Object userIdOfJavaObj = userIdToKidMap.get(phpOnlineDto.getUserIdOfPhp());
|
||||||
|
if (userIdOfJavaObj != null) {
|
||||||
|
phpOnlineDto.setUserIdOfJava(userIdOfJavaObj.toString());
|
||||||
|
}
|
||||||
|
return phpOnlineDto;
|
||||||
|
})
|
||||||
|
.filter(phpOnlineDto -> phpOnlineDto.getUserIdOfJava() != null)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
|
||||||
|
// 获取ES中没有完成的的课程学习记录
|
||||||
|
List<CourseStudyDto> esData = getEsData(recentLearnRecordList);
|
||||||
|
Map<String, PhpOnlineDto> map = recentLearnRecordList.stream().collect(Collectors.toMap(phpOnlineDto -> phpOnlineDto.getUserIdOfJava() + phpOnlineDto.getCourseId(), phpOnlineDto -> phpOnlineDto));
|
||||||
|
List<CourseStudyDto> toBeUpdatedEs = esData.stream().filter(courseStudyDto -> {
|
||||||
|
if (map.containsKey(courseStudyDto.getAccountId())) {
|
||||||
|
PhpOnlineDto phpOnlineDto = map.get(courseStudyDto.getAccountId());
|
||||||
|
if (!phpOnlineDto.getStatus().equals(phpOnlineDto.getStatus()) || !phpOnlineDto.getProgress().equals(phpOnlineDto.getProgress())) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
|
||||||
|
if (isOnlyRead != null && isOnlyRead == 1) {
|
||||||
|
return esData;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 更新ES中的未同步为完成的学习记录
|
||||||
|
toBeUpdatedEs(toBeUpdatedEs, indexName);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<CourseStudyDto> getEsData(List<PhpOnlineDto> finishedCourseList) throws IOException {
|
||||||
|
SearchRequest searchRequest = new SearchRequest("new_study_resource");
|
||||||
|
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
|
||||||
|
|
||||||
|
for (PhpOnlineDto phpOnlineDto : finishedCourseList) {
|
||||||
|
if (phpOnlineDto.getUserIdOfJava() == null || phpOnlineDto.getCourseId() == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
boolQuery.should(QueryBuilders.boolQuery()
|
||||||
|
.must(QueryBuilders.termQuery("courseId.keyword", phpOnlineDto.getCourseId()))
|
||||||
|
.must(QueryBuilders.termQuery("accountId.keyword", phpOnlineDto.getUserIdOfJava()))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(boolQuery).timeout(new TimeValue(60, TimeUnit.SECONDS));
|
||||||
|
searchRequest.source(sourceBuilder);
|
||||||
|
if (boolQuery.hasClauses()) {
|
||||||
|
sourceBuilder.query(boolQuery);
|
||||||
|
}
|
||||||
|
searchRequest.source(sourceBuilder);
|
||||||
|
|
||||||
|
SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
|
||||||
|
SearchHits hits = response.getHits();
|
||||||
|
|
||||||
|
List<CourseStudyDto> courseStudyDtoList = new ArrayList<>();
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
for (SearchHit hit : hits) {
|
||||||
|
String sourceAsString = hit.getSourceAsString();
|
||||||
|
try {
|
||||||
|
CourseStudyDto cft = mapper.readValue(sourceAsString, CourseStudyDto.class);
|
||||||
|
courseStudyDtoList.add(cft);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("转化json到对应失败", sourceAsString);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return courseStudyDtoList;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void toBeUpdatedEs(List<CourseStudyDto> toBeUpdatedEsData, String indexName) {
|
||||||
|
if (CollUtil.isEmpty(toBeUpdatedEsData)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
BulkRequest bulkRequest = new BulkRequest();
|
||||||
|
for (CourseStudyDto courseStudyDto : toBeUpdatedEsData) {
|
||||||
|
Map<String, Object> docMap = new HashMap<>();
|
||||||
|
docMap.put("status", courseStudyDto.getStatus());
|
||||||
|
docMap.put("progress", courseStudyDto.getProgress());
|
||||||
|
|
||||||
|
// 创建更新请求,并传入单一的docMap
|
||||||
|
UpdateRequest updateRequest = new UpdateRequest(indexName, courseStudyDto.getId())
|
||||||
|
.doc(docMap, XContentType.JSON);
|
||||||
|
|
||||||
|
// 将请求添加到批量请求中
|
||||||
|
bulkRequest.add(updateRequest);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 执行批量更新操作
|
||||||
|
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
|
||||||
|
// 检查是否有失败的操作
|
||||||
|
if (bulkResponse.hasFailures()) {
|
||||||
|
log.error("批量更新失败: {}", bulkResponse.buildFailureMessage());
|
||||||
|
} else {
|
||||||
|
List<String> collect = toBeUpdatedEsData.stream().map(CourseStudyDto::getId).collect(Collectors.toList());
|
||||||
|
log.info("批量更新成功,更新的ES ID列表: {}", collect);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
log.error("执行批量更新时发生错误", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
@@ -44,6 +44,9 @@ public class StudyCourseESApi extends ApiBaseController{
|
|||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private PhpOnlineStudyRecordScheduledTasks phpOnlineStudyRecordScheduledTasks;
|
private PhpOnlineStudyRecordScheduledTasks phpOnlineStudyRecordScheduledTasks;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private PhpOnlineStudyRecordScheduledTasks2 phpOnlineStudyRecordScheduledTasks2;
|
||||||
|
|
||||||
@RequestMapping(value="/search",method = {RequestMethod.GET,RequestMethod.POST})
|
@RequestMapping(value="/search",method = {RequestMethod.GET,RequestMethod.POST})
|
||||||
public JsonResponse<PageList<CourseStudyDto>> search(Pagination page, CourseStudyDto dto){
|
public JsonResponse<PageList<CourseStudyDto>> search(Pagination page, CourseStudyDto dto){
|
||||||
@@ -167,10 +170,30 @@ public class StudyCourseESApi extends ApiBaseController{
|
|||||||
return success(true);
|
return success(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param syncTimePoint
|
||||||
|
* @param isOnlyRead 0 更新ES 1 查询ES
|
||||||
|
* @return
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
@PostMapping("/phpOnlineStudyRecordSyncEs")
|
@PostMapping("/phpOnlineStudyRecordSyncEs")
|
||||||
public JsonResponse<Boolean> phpOnlineStudyRecordSyncEs(Long syncTimePoint) throws IOException {
|
public JsonResponse<List<CourseStudyDto>> phpOnlineStudyRecordSyncEs(Long syncTimePoint,Integer isOnlyRead) throws IOException {
|
||||||
phpOnlineStudyRecordScheduledTasks.phpOnlineStudyRecordSyncEs(syncTimePoint);
|
List<CourseStudyDto> courseStudyDtoList = phpOnlineStudyRecordScheduledTasks.phpOnlineStudyRecordSyncEs(syncTimePoint, isOnlyRead);
|
||||||
return success(true);
|
return success(courseStudyDtoList);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param syncTimePoint
|
||||||
|
* @param isOnlyRead 0 更新ES 1 查询ES
|
||||||
|
* @return
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@PostMapping("/phpOnlineStudyRecordSyncEs2")
|
||||||
|
public JsonResponse<List<CourseStudyDto>> phpOnlineStudyRecordSyncEs2(Long syncTimePoint,Integer isOnlyRead) throws IOException {
|
||||||
|
List<CourseStudyDto> courseStudyDtoList = phpOnlineStudyRecordScheduledTasks2.phpOnlineStudyRecordSyncEs(syncTimePoint, isOnlyRead);
|
||||||
|
return success(courseStudyDtoList);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,4 +25,6 @@ public class PhpOnlineDto {
|
|||||||
* 课程名称
|
* 课程名称
|
||||||
*/
|
*/
|
||||||
private String userIdOfJava;
|
private String userIdOfJava;
|
||||||
|
private Integer status;
|
||||||
|
private Integer progress;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user