提交异步推送

This commit is contained in:
Guava
2023-07-04 18:57:53 +08:00
parent 4eb577324f
commit 2d35973aed
2 changed files with 179 additions and 136 deletions

View File

@@ -0,0 +1,170 @@
package com.xboe.module.boecase.service.impl;
import com.xboe.api.ThirdApi;
import com.xboe.core.orm.FieldFilters;
import com.xboe.core.orm.UpdateBuilder;
import com.xboe.enums.CasesPushStatusEnum;
import com.xboe.module.boecase.dao.CasesRecommendDao;
import com.xboe.module.boecase.dao.CasesRecommendPushRecordDao;
import com.xboe.module.boecase.entity.CasesRecommend;
import com.xboe.module.boecase.entity.CasesRecommendPushRecord;
import com.xboe.system.user.dao.MessageDao;
import com.xboe.system.user.entity.Message;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* @author : civism
* @version 1.0
* @date 2023/7/4 18:54
*/
@Service
@Slf4j
public class AsyncSendCasesRecommendService {
@Resource
private CasesRecommendDao casesRecommendDao;
@Resource
private CasesRecommendPushRecordDao casesRecommendPushRecordDao;
@Resource
private ThirdApi thirdApi;
@Value("${xboe.old.base.url}")
private String domain;
@Resource
private MessageDao messageDao;
@Async
public void sendCasesRecommend(CasesRecommend casesRecommend) {
List<Integer> pushStatusList = new ArrayList<>();
pushStatusList.add(CasesPushStatusEnum.WAIT_PUSH.getStatus());
pushStatusList.add(CasesPushStatusEnum.PUSH_REVOKE.getStatus());
List<CasesRecommendPushRecord> casesRecommendPushRecords = casesRecommendPushRecordDao.getGenericDao()
.findList(CasesRecommendPushRecord.class, FieldFilters.in("pushStatus", pushStatusList), FieldFilters.eqField("recommendId", casesRecommend.getId()));
if (CollectionUtils.isEmpty(casesRecommendPushRecords)) {
//修改为推送完成 --- 无数据 无需推送
updateProcessStatus(casesRecommend.getId(), CasesPushStatusEnum.PUSH_SUCCESS.getStatus());
return;
}
//修改为推送中
updateProcessStatus(casesRecommend.getId(), CasesPushStatusEnum.PUSH_ING.getStatus());
List<String> caseIds = casesRecommendPushRecords.stream().map(CasesRecommendPushRecord::getCaseId).collect(Collectors.toList());
if (caseIds.size() > 1) {
sendMixCaseRecommend(casesRecommendPushRecords);
} else {
sendSingleCaseRecommend(casesRecommendPushRecords);
}
//修改为推送完成 --- 无数据 无需推送
updateProcessStatus(casesRecommend.getId(), CasesPushStatusEnum.PUSH_SUCCESS.getStatus());
}
/**
* 修改轧辊天
*
* @param id
* @param pushProgress
* @return
*/
private boolean updateProcessStatus(String id, Integer pushProgress) {
return casesRecommendDao.updateMultiFieldById(id, UpdateBuilder.create("pushProgress", pushProgress)) > 0;
}
private static <T> Predicate<T> distinctByKey(Function<? super T, ?> keyExtractor) {
Set<Object> seen = ConcurrentHashMap.newKeySet();
return t -> seen.add(keyExtractor.apply(t));
}
private void sendMixCaseRecommend(List<CasesRecommendPushRecord> casesRecommendPushRecords) {
Map<String, List<CasesRecommendPushRecord>> caseMap = casesRecommendPushRecords.stream().collect(Collectors.groupingBy(CasesRecommendPushRecord::getPushUserId));
Map<String, String> userMap = casesRecommendPushRecords.stream().filter(distinctByKey(CasesRecommendPushRecord::getPushUserId)).collect(Collectors.toMap(CasesRecommendPushRecord::getPushUserId, CasesRecommendPushRecord::getPushUserName));
for (String userId : caseMap.keySet()) {
List<CasesRecommendPushRecord> pushRecords = caseMap.get(userId);
try {
Message message = new Message();
message.setMsgType(1);
message.setAcceptId(userId);
message.setAcceptName(userMap.get(userId));
message.setContent("案例推荐-《" + pushRecords.get(0).getCaseTitle() + "》等" + pushRecords.size() + "个案例");
message.setIsRead(false);
message.setMsgTime(LocalDateTime.now());
message.setRefId(pushRecords.get(0).getRecommendId());
message.setBatchId(pushRecords.get(0).getRecommendId());
message.setRefType("99");
message.setSendName(pushRecords.get(0).getSysCreateBy());
message.setSendType(1);
message.setTitle("案例推荐");
message.setSendAid(pushRecords.get(0).getSysCreateAid());
message.setSource(1);
// message.setPageUrl(domain + "/pc/case/detail?id=" + casesRecommendPushRecord.getCaseId());
// message.setPageParams(casesRecommendPushRecord.getCaseId());
message.setPageType(3);
messageDao.save(message);
updatePushStatus(casesRecommendPushRecords, CasesPushStatusEnum.PUSH_SUCCESS.getStatus());
} catch (Exception e) {
log.error("推荐案例失败", e);
updatePushStatus(casesRecommendPushRecords, CasesPushStatusEnum.PUSH_SUCCESS.getStatus());
}
}
}
private void updatePushStatus(List<CasesRecommendPushRecord> pushRecords, Integer pushStatus) {
for (CasesRecommendPushRecord pushRecord : pushRecords) {
casesRecommendPushRecordDao.updateMultiFieldById(pushRecord.getId(), UpdateBuilder.create("pushStatus", pushStatus), UpdateBuilder.create("pushTime", new Date()));
}
}
private void sendSingleCaseRecommend(List<CasesRecommendPushRecord> casesRecommendPushRecords) {
for (CasesRecommendPushRecord casesRecommendPushRecord : casesRecommendPushRecords) {
Integer pushStatus;
try {
Message message = new Message();
message.setMsgType(1);
message.setAcceptId(casesRecommendPushRecord.getPushUserId());
message.setAcceptName(casesRecommendPushRecord.getPushUserName());
message.setContent("案例推荐-《" + casesRecommendPushRecord.getCaseTitle() + "");
message.setIsRead(false);
message.setMsgTime(LocalDateTime.now());
message.setRefId(casesRecommendPushRecord.getId());
message.setBatchId(casesRecommendPushRecord.getRecommendId());
message.setRefType("99");
message.setSendName(casesRecommendPushRecord.getSysCreateBy());
message.setSendType(1);
message.setTitle("案例推荐");
message.setSendAid(casesRecommendPushRecord.getSysCreateAid());
message.setSource(1);
message.setPageUrl(domain + "/pc/case/detail?id=" + casesRecommendPushRecord.getCaseId());
message.setPageParams(casesRecommendPushRecord.getCaseId());
message.setPageType(3);
messageDao.save(message);
pushStatus = CasesPushStatusEnum.PUSH_SUCCESS.getStatus();
} catch (Exception e) {
log.error("推荐案例失败", e);
pushStatus = CasesPushStatusEnum.PUSH_FAIL.getStatus();
}
casesRecommendPushRecordDao.updateMultiFieldById(casesRecommendPushRecord.getId(), UpdateBuilder.create("pushStatus", pushStatus), UpdateBuilder.create("pushTime", new Date()));
}
}
}

View File

@@ -6,7 +6,6 @@ import cn.hutool.json.JSONUtil;
import com.xboe.api.ThirdApi;
import com.xboe.api.vo.*;
import com.xboe.core.CurrentUser;
import com.xboe.core.event.IEventDataSender;
import com.xboe.core.orm.FieldFilters;
import com.xboe.core.orm.UpdateBuilder;
import com.xboe.data.outside.IOutSideDataService;
@@ -21,23 +20,18 @@ import com.xboe.module.boecase.entity.CasesRecommendLaunchImportData;
import com.xboe.module.boecase.entity.CasesRecommendPushRecord;
import com.xboe.module.boecase.service.ICasesRecommendPushRecordService;
import com.xboe.module.boecase.vo.CasesRecommendLaunchVo;
import com.xboe.system.user.dao.MessageDao;
import com.xboe.system.user.dao.UserDao;
import com.xboe.system.user.entity.Message;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Service
@@ -64,19 +58,15 @@ public class CasesRecommendPushRecordServiceImpl implements ICasesRecommendPushR
@Resource
private CasesRecommendPushRecordDao casesRecommendPushRecordDao;
@Resource
private MessageDao messageDao;
@Autowired(required = false)
private IEventDataSender eventDataSender;
@Resource
private ThirdApi thirdApi;
@Value("${xboe.old.base.url}")
private String domain;
@Resource
private AsyncSendCasesRecommendService asyncSendCasesRecommendService;
@Override
public boolean launchPush(CasesRecommendLaunchVo casesRecommendLaunch, CurrentUser currentUser, String token) {
@@ -159,130 +149,13 @@ public class CasesRecommendPushRecordServiceImpl implements ICasesRecommendPushR
}
try {
//发送推送案例消息
sendCasesRecommend(casesRecommend);
asyncSendCasesRecommendService.sendCasesRecommend(casesRecommend);
} catch (Exception e) {
log.error("推送失败", e);
}
return true;
}
/**
* 修改轧辊天
*
* @param id
* @param pushProgress
* @return
*/
private boolean updateProcessStatus(String id, Integer pushProgress) {
return casesRecommendDao.updateMultiFieldById(id, UpdateBuilder.create("pushProgress", pushProgress)) > 0;
}
@Async
public void sendCasesRecommend(CasesRecommend casesRecommend) {
List<Integer> pushStatusList = new ArrayList<>();
pushStatusList.add(CasesPushStatusEnum.WAIT_PUSH.getStatus());
pushStatusList.add(CasesPushStatusEnum.PUSH_REVOKE.getStatus());
List<CasesRecommendPushRecord> casesRecommendPushRecords = casesRecommendPushRecordDao.getGenericDao()
.findList(CasesRecommendPushRecord.class, FieldFilters.in("pushStatus", pushStatusList), FieldFilters.eqField("recommendId", casesRecommend.getId()));
if (CollectionUtils.isEmpty(casesRecommendPushRecords)) {
//修改为推送完成 --- 无数据 无需推送
updateProcessStatus(casesRecommend.getId(), CasesPushStatusEnum.PUSH_SUCCESS.getStatus());
return;
}
//修改为推送中
updateProcessStatus(casesRecommend.getId(), CasesPushStatusEnum.PUSH_ING.getStatus());
List<String> caseIds = casesRecommendPushRecords.stream().map(CasesRecommendPushRecord::getCaseId).collect(Collectors.toList());
if (caseIds.size() > 1) {
sendMixCaseRecommend(casesRecommendPushRecords);
} else {
sendSingleCaseRecommend(casesRecommendPushRecords);
}
//修改为推送完成 --- 无数据 无需推送
updateProcessStatus(casesRecommend.getId(), CasesPushStatusEnum.PUSH_SUCCESS.getStatus());
}
private static <T> Predicate<T> distinctByKey(Function<? super T, ?> keyExtractor) {
Set<Object> seen = ConcurrentHashMap.newKeySet();
return t -> seen.add(keyExtractor.apply(t));
}
private void sendMixCaseRecommend(List<CasesRecommendPushRecord> casesRecommendPushRecords) {
Map<String, List<CasesRecommendPushRecord>> caseMap = casesRecommendPushRecords.stream().collect(Collectors.groupingBy(CasesRecommendPushRecord::getPushUserId));
Map<String, String> userMap = casesRecommendPushRecords.stream().filter(distinctByKey(CasesRecommendPushRecord::getPushUserId)).collect(Collectors.toMap(CasesRecommendPushRecord::getPushUserId, CasesRecommendPushRecord::getPushUserName));
for (String userId : caseMap.keySet()) {
List<CasesRecommendPushRecord> pushRecords = caseMap.get(userId);
try {
Message message = new Message();
message.setMsgType(1);
message.setAcceptId(userId);
message.setAcceptName(userMap.get(userId));
message.setContent("案例推荐-《" + pushRecords.get(0).getCaseTitle() + "》等" + pushRecords.size() + "个案例");
message.setIsRead(false);
message.setMsgTime(LocalDateTime.now());
message.setRefId(pushRecords.get(0).getRecommendId());
message.setBatchId(pushRecords.get(0).getRecommendId());
message.setRefType("99");
message.setSendName(pushRecords.get(0).getSysCreateBy());
message.setSendType(1);
message.setTitle("案例推荐");
message.setSendAid(pushRecords.get(0).getSysCreateAid());
message.setSource(1);
// message.setPageUrl(domain + "/pc/case/detail?id=" + casesRecommendPushRecord.getCaseId());
// message.setPageParams(casesRecommendPushRecord.getCaseId());
message.setPageType(3);
messageDao.save(message);
updatePushStatus(casesRecommendPushRecords, CasesPushStatusEnum.PUSH_SUCCESS.getStatus());
} catch (Exception e) {
log.error("推荐案例失败", e);
updatePushStatus(casesRecommendPushRecords, CasesPushStatusEnum.PUSH_SUCCESS.getStatus());
}
}
}
private void updatePushStatus(List<CasesRecommendPushRecord> pushRecords, Integer pushStatus) {
for (CasesRecommendPushRecord pushRecord : pushRecords) {
casesRecommendPushRecordDao.updateMultiFieldById(pushRecord.getId(), UpdateBuilder.create("pushStatus", pushStatus), UpdateBuilder.create("pushTime", new Date()));
}
}
private void sendSingleCaseRecommend(List<CasesRecommendPushRecord> casesRecommendPushRecords) {
for (CasesRecommendPushRecord casesRecommendPushRecord : casesRecommendPushRecords) {
Integer pushStatus;
try {
Message message = new Message();
message.setMsgType(1);
message.setAcceptId(casesRecommendPushRecord.getPushUserId());
message.setAcceptName(casesRecommendPushRecord.getPushUserName());
message.setContent("案例推荐-《" + casesRecommendPushRecord.getCaseTitle() + "");
message.setIsRead(false);
message.setMsgTime(LocalDateTime.now());
message.setRefId(casesRecommendPushRecord.getId());
message.setBatchId(casesRecommendPushRecord.getRecommendId());
message.setRefType("99");
message.setSendName(casesRecommendPushRecord.getSysCreateBy());
message.setSendType(1);
message.setTitle("案例推荐");
message.setSendAid(casesRecommendPushRecord.getSysCreateAid());
message.setSource(1);
message.setPageUrl(domain + "/pc/case/detail?id=" + casesRecommendPushRecord.getCaseId());
message.setPageParams(casesRecommendPushRecord.getCaseId());
message.setPageType(3);
messageDao.save(message);
pushStatus = CasesPushStatusEnum.PUSH_SUCCESS.getStatus();
} catch (Exception e) {
log.error("推荐案例失败", e);
pushStatus = CasesPushStatusEnum.PUSH_FAIL.getStatus();
}
casesRecommendPushRecordDao.updateMultiFieldById(casesRecommendPushRecord.getId(), UpdateBuilder.create("pushStatus", pushStatus), UpdateBuilder.create("pushTime", new Date()));
}
}
@Override
public List<CasesRecommendPushRecord> findAllByRecommendId(String recommendId) {
@@ -307,7 +180,7 @@ public class CasesRecommendPushRecordServiceImpl implements ICasesRecommendPushR
} else {
try {
//发送推送案例消息
sendCasesRecommend(casesRecommend);
asyncSendCasesRecommendService.sendCasesRecommend(casesRecommend);
} catch (Exception e) {
log.error("推送失败", e);
}