feat: 停止当前聊天输出接口功能完善

This commit is contained in:
郭诚奇
2025-12-03 17:51:35 +08:00
parent 7bee2e3c45
commit 4412563208
4 changed files with 97 additions and 6 deletions

View File

@@ -0,0 +1,34 @@
package com.xboe.config;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import javax.jms.ConnectionFactory;
import javax.jms.Topic;
@EnableJms
@Configuration
public class MqConfig {
/**
* 配置topic
*/
@Bean
public Topic broadcastTopic() {
return new ActiveMQTopic("broadcast.session.termination");
}
// 配置JmsListenerContainerFactory为发布/订阅模式
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(true); // 设置为发布/订阅模式
factory.setSubscriptionDurable(false); // 非持久订阅
return factory;
}
}

View File

@@ -0,0 +1,28 @@
package com.xboe.module.boecase.mq;
import com.xboe.module.boecase.service.ICaseAiChatService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class BroadcastMessageConsumer {
@Autowired
private ICaseAiChatService iCaseAiChatService;
/**
* 接收会话终止广播消息
*
* @param conversationId 会话ID
*/
@JmsListener(destination = "broadcast.session.termination",
containerFactory = "jmsListenerContainerFactory")
public void receiveSessionTerminationBroadcastMessage(String conversationId) {
log.info("收到会话终止广播消息:{}", conversationId);
iCaseAiChatService.eventSourceCancel(conversationId);
}
}

View File

@@ -66,6 +66,13 @@ public interface ICaseAiChatService {
*/ */
boolean stopChatOutput(String conversationId); boolean stopChatOutput(String conversationId);
/**
* 取消eventSource
*
* @param conversationId 会话ID
*/
void eventSourceCancel(String conversationId);
/** /**
* 消息反馈保存 * 消息反馈保存
* likeStatus: 踩/赞 * likeStatus: 踩/赞

View File

@@ -10,6 +10,7 @@ import com.xboe.module.boecase.dao.CaseAiConversationsDao;
import com.xboe.module.boecase.dao.CaseDocumentLogDao; import com.xboe.module.boecase.dao.CaseDocumentLogDao;
import com.xboe.module.boecase.dao.CasesDao; import com.xboe.module.boecase.dao.CasesDao;
import com.xboe.module.boecase.dto.CaseAiChatDto; import com.xboe.module.boecase.dto.CaseAiChatDto;
import com.xboe.module.boecase.entity.AiChatConversationData;
import com.xboe.module.boecase.dto.CaseAiMsgLikeDto; import com.xboe.module.boecase.dto.CaseAiMsgLikeDto;
import com.xboe.module.boecase.entity.CaseAiConversations; import com.xboe.module.boecase.entity.CaseAiConversations;
import com.xboe.module.boecase.entity.CaseDocumentLog; import com.xboe.module.boecase.entity.CaseDocumentLog;
@@ -20,7 +21,6 @@ import com.xboe.module.boecase.service.ICaseAiChatService;
import com.xboe.module.boecase.service.IElasticSearchIndexService; import com.xboe.module.boecase.service.IElasticSearchIndexService;
import com.xboe.module.boecase.vo.CaseAiMessageVo; import com.xboe.module.boecase.vo.CaseAiMessageVo;
import com.xboe.module.boecase.vo.CaseReferVo; import com.xboe.module.boecase.vo.CaseReferVo;
import com.xboe.module.boecase.entity.AiChatConversationData;
import com.xboe.module.boecase.vo.ConversationExcelVo; import com.xboe.module.boecase.vo.ConversationExcelVo;
import com.xboe.system.organization.vo.OrgSimpleVo; import com.xboe.system.organization.vo.OrgSimpleVo;
import com.xboe.system.user.service.IUserService; import com.xboe.system.user.service.IUserService;
@@ -45,7 +45,11 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.jms.JmsException;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
@@ -55,7 +59,6 @@ import javax.servlet.http.HttpServletResponse;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
@@ -100,6 +103,12 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
@Autowired @Autowired
private CasesDao casesDao; private CasesDao casesDao;
@Autowired
private JmsTemplate jmsTemplate;
@Value("${activemq.topic.name}")
private String topicName;
// 用于存储会话ID与EventSource的映射关系以便能够中断特定会话 // 用于存储会话ID与EventSource的映射关系以便能够中断特定会话
private final Map<String, EventSource> conversationEventSourceMap = new ConcurrentHashMap<>(); private final Map<String, EventSource> conversationEventSourceMap = new ConcurrentHashMap<>();
@@ -760,6 +769,22 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
@Override @Override
public boolean stopChatOutput(String conversationId) { public boolean stopChatOutput(String conversationId) {
log.info("收到停止会话 {} 的指令", conversationId);
// 发送广播消息,通知中断连接
try {
jmsTemplate.convertAndSend(topicName, conversationId);
} catch (JmsException e) {
log.error("发送停止会话 {} 输出时发生异常", conversationId, e);
return false;
}
return true;
}
/**
* 取消eventSource
*/
@Override
public void eventSourceCancel(String conversationId) {
EventSource eventSource = conversationEventSourceMap.get(conversationId); EventSource eventSource = conversationEventSourceMap.get(conversationId);
if (eventSource != null) { if (eventSource != null) {
try { try {
@@ -767,16 +792,13 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
eventSource.cancel(); eventSource.cancel();
// 注意cancel()会触发onFailure回调在onFailure中会清理资源 // 注意cancel()会触发onFailure回调在onFailure中会清理资源
log.info("成功发送停止会话 {} 的指令", conversationId); log.info("成功发送停止会话 {} 的指令", conversationId);
return true;
} catch (Exception e) { } catch (Exception e) {
log.error("停止会话 {} 输出时发生异常", conversationId, e); log.error("停止会话 {} 输出时发生异常", conversationId, e);
// 即使出现异常也从Map中移除避免内存泄漏 // 即使出现异常也从Map中移除避免内存泄漏
conversationEventSourceMap.remove(conversationId); conversationEventSourceMap.remove(conversationId);
return false;
} }
} else { } else {
log.info("未找到会话 {} 对应的事件源,可能已经完成或不存在", conversationId); log.info("未找到会话 {} 对应的事件源,可能已经完成或不存在", conversationId);
return true;
} }
} }