From c9fe597f55c1bffb5337715dd677ce199668b762 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=83=AD=E8=AF=9A=E5=A5=87?= Date: Wed, 3 Dec 2025 17:51:35 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=81=9C=E6=AD=A2=E5=BD=93=E5=89=8D?= =?UTF-8?q?=E8=81=8A=E5=A4=A9=E8=BE=93=E5=87=BA=E6=8E=A5=E5=8F=A3=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=E5=AE=8C=E5=96=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/com/xboe/config/MqConfig.java | 34 +++++++++++++++++++ .../boecase/mq/BroadcastMessageConsumer.java | 28 +++++++++++++++ .../boecase/service/ICaseAiChatService.java | 7 ++++ .../service/impl/CaseAiChatServiceImpl.java | 34 +++++++++++++++---- 4 files changed, 97 insertions(+), 6 deletions(-) create mode 100644 servers/boe-server-all/src/main/java/com/xboe/config/MqConfig.java create mode 100644 servers/boe-server-all/src/main/java/com/xboe/module/boecase/mq/BroadcastMessageConsumer.java diff --git a/servers/boe-server-all/src/main/java/com/xboe/config/MqConfig.java b/servers/boe-server-all/src/main/java/com/xboe/config/MqConfig.java new file mode 100644 index 00000000..a47016ca --- /dev/null +++ b/servers/boe-server-all/src/main/java/com/xboe/config/MqConfig.java @@ -0,0 +1,34 @@ +package com.xboe.config; + +import org.apache.activemq.command.ActiveMQTopic; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jms.annotation.EnableJms; +import org.springframework.jms.config.DefaultJmsListenerContainerFactory; +import org.springframework.jms.config.JmsListenerContainerFactory; + +import javax.jms.ConnectionFactory; +import javax.jms.Topic; + +@EnableJms +@Configuration +public class MqConfig { + + /** + * 配置topic + */ + @Bean + public Topic broadcastTopic() { + return new ActiveMQTopic("broadcast.session.termination"); + } + + // 配置JmsListenerContainerFactory为发布/订阅模式 + @Bean + public JmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) { + DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); + factory.setConnectionFactory(connectionFactory); + factory.setPubSubDomain(true); // 设置为发布/订阅模式 + factory.setSubscriptionDurable(false); // 非持久订阅 + return factory; + } +} diff --git a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/mq/BroadcastMessageConsumer.java b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/mq/BroadcastMessageConsumer.java new file mode 100644 index 00000000..98112392 --- /dev/null +++ b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/mq/BroadcastMessageConsumer.java @@ -0,0 +1,28 @@ +package com.xboe.module.boecase.mq; + + +import com.xboe.module.boecase.service.ICaseAiChatService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.jms.annotation.JmsListener; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class BroadcastMessageConsumer { + + @Autowired + private ICaseAiChatService iCaseAiChatService; + + /** + * 接收会话终止广播消息 + * + * @param conversationId 会话ID + */ + @JmsListener(destination = "broadcast.session.termination", + containerFactory = "jmsListenerContainerFactory") + public void receiveSessionTerminationBroadcastMessage(String conversationId) { + log.info("收到会话终止广播消息:{}", conversationId); + iCaseAiChatService.eventSourceCancel(conversationId); + } +} diff --git a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/ICaseAiChatService.java b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/ICaseAiChatService.java index d424400b..c14d983a 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/ICaseAiChatService.java +++ b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/ICaseAiChatService.java @@ -66,6 +66,13 @@ public interface ICaseAiChatService { */ boolean stopChatOutput(String conversationId); + /** + * 取消eventSource + * + * @param conversationId 会话ID + */ + void eventSourceCancel(String conversationId); + /** * 消息反馈保存 * likeStatus: 踩/赞 diff --git a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/impl/CaseAiChatServiceImpl.java b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/impl/CaseAiChatServiceImpl.java index db29dc7a..80ad8ee3 100644 --- a/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/impl/CaseAiChatServiceImpl.java +++ b/servers/boe-server-all/src/main/java/com/xboe/module/boecase/service/impl/CaseAiChatServiceImpl.java @@ -10,6 +10,7 @@ import com.xboe.module.boecase.dao.CaseAiConversationsDao; import com.xboe.module.boecase.dao.CaseDocumentLogDao; import com.xboe.module.boecase.dao.CasesDao; import com.xboe.module.boecase.dto.CaseAiChatDto; +import com.xboe.module.boecase.entity.AiChatConversationData; import com.xboe.module.boecase.dto.CaseAiMsgLikeDto; import com.xboe.module.boecase.entity.CaseAiConversations; import com.xboe.module.boecase.entity.CaseDocumentLog; @@ -20,7 +21,6 @@ import com.xboe.module.boecase.service.ICaseAiChatService; import com.xboe.module.boecase.service.IElasticSearchIndexService; import com.xboe.module.boecase.vo.CaseAiMessageVo; import com.xboe.module.boecase.vo.CaseReferVo; -import com.xboe.module.boecase.entity.AiChatConversationData; import com.xboe.module.boecase.vo.ConversationExcelVo; import com.xboe.system.organization.vo.OrgSimpleVo; import com.xboe.system.user.service.IUserService; @@ -45,7 +45,11 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.jms.JmsException; +import org.springframework.jms.annotation.JmsListener; +import org.springframework.jms.core.JmsTemplate; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -55,7 +59,6 @@ import javax.servlet.http.HttpServletResponse; import java.io.File; import java.io.IOException; import java.io.OutputStream; -import java.net.SocketTimeoutException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; @@ -99,7 +102,13 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService { @Autowired private CasesDao casesDao; - + + @Autowired + private JmsTemplate jmsTemplate; + + @Value("${activemq.topic.name}") + private String topicName; + // 用于存储会话ID与EventSource的映射关系,以便能够中断特定会话 private final Map conversationEventSourceMap = new ConcurrentHashMap<>(); @@ -760,6 +769,22 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService { @Override public boolean stopChatOutput(String conversationId) { + log.info("收到停止会话 {} 的指令", conversationId); + // 发送广播消息,通知中断连接 + try { + jmsTemplate.convertAndSend(topicName, conversationId); + } catch (JmsException e) { + log.error("发送停止会话 {} 输出时发生异常", conversationId, e); + return false; + } + return true; + } + + /** + * 取消eventSource + */ + @Override + public void eventSourceCancel(String conversationId) { EventSource eventSource = conversationEventSourceMap.get(conversationId); if (eventSource != null) { try { @@ -767,16 +792,13 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService { eventSource.cancel(); // 注意:cancel()会触发onFailure回调,在onFailure中会清理资源 log.info("成功发送停止会话 {} 的指令", conversationId); - return true; } catch (Exception e) { log.error("停止会话 {} 输出时发生异常", conversationId, e); // 即使出现异常,也从Map中移除,避免内存泄漏 conversationEventSourceMap.remove(conversationId); - return false; } } else { log.info("未找到会话 {} 对应的事件源,可能已经完成或不存在", conversationId); - return true; } }