mirror of
https://codeup.aliyun.com/67762337eccfc218f6110e0e/per-boe/java-servers.git
synced 2025-12-10 03:16:48 +08:00
整理es相关代码;增加手动调试用接口
This commit is contained in:
@@ -26,12 +26,11 @@ public class ElasticSearchIndexInitializer {
|
||||
*/
|
||||
@EventListener(ApplicationReadyEvent.class)
|
||||
public void initializeElasticSearchIndices() {
|
||||
String indexName = "ai_chat_messages";
|
||||
if (elasticSearchIndexService.checkIndexExists(indexName)) {
|
||||
if (elasticSearchIndexService.checkIndexExists()) {
|
||||
log.info("ElasticSearch索引 ai_chat_messages 已存在");
|
||||
} else {
|
||||
log.info("ElasticSearch索引 ai_chat_messages 不存在,开始创建...");
|
||||
elasticSearchIndexService.createIndex(indexName);
|
||||
elasticSearchIndexService.createIndex();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
package com.xboe.constants;
|
||||
|
||||
public class CaseAiConstants {
|
||||
|
||||
public static final String CASE_AI_INDEX_NAME = "ai_chat_messages";
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package com.xboe.module.boecase.api;
|
||||
import com.xboe.core.api.ApiBaseController;
|
||||
import com.xboe.core.JsonResponse;
|
||||
import com.xboe.module.boecase.dto.CaseAiChatDto;
|
||||
import com.xboe.module.boecase.entity.AiChatConversationData;
|
||||
import com.xboe.module.boecase.service.ICaseAiChatService;
|
||||
import com.xboe.module.boecase.service.ICaseAiPermissionService;
|
||||
import com.xboe.module.boecase.service.IElasticSearchIndexService;
|
||||
@@ -94,16 +95,32 @@ public class CaseAiChatApi extends ApiBaseController {
|
||||
*/
|
||||
@PostMapping("/index/refresh")
|
||||
public JsonResponse<String> deleteAndCreateEsIndex() {
|
||||
String indexName = "ai_chat_messages";
|
||||
if (elasticSearchIndexService.checkIndexExists(indexName)) {
|
||||
boolean deleteResult = elasticSearchIndexService.deleteIndex(indexName);
|
||||
if (elasticSearchIndexService.checkIndexExists()) {
|
||||
boolean deleteResult = elasticSearchIndexService.deleteIndex();
|
||||
if (deleteResult) {
|
||||
elasticSearchIndexService.createIndex(indexName);
|
||||
elasticSearchIndexService.createIndex();
|
||||
return success("刷新成功");
|
||||
}
|
||||
} else {
|
||||
elasticSearchIndexService.createIndex(indexName);
|
||||
elasticSearchIndexService.createIndex();
|
||||
}
|
||||
return error("刷新失败");
|
||||
}
|
||||
|
||||
@PostMapping("/es/create")
|
||||
public JsonResponse<String> createNewConversation(@RequestBody CaseAiMessageVo caseAiMessageVo,
|
||||
@RequestParam String conversationId,
|
||||
@RequestParam String userId) {
|
||||
AiChatConversationData aiChatConversationData = new AiChatConversationData();
|
||||
aiChatConversationData.setConversationId(conversationId);
|
||||
aiChatConversationData.setQuery(caseAiMessageVo.getQuery());
|
||||
aiChatConversationData.appendAnswer(caseAiMessageVo.getAnswer());
|
||||
aiChatConversationData.setCaseRefers(caseAiMessageVo.getCaseRefer());
|
||||
aiChatConversationData.setSuggestions(caseAiMessageVo.getSuggestions());
|
||||
aiChatConversationData.setUserId(userId);
|
||||
if (elasticSearchIndexService.createData(aiChatConversationData)) {
|
||||
return success("创建成功");
|
||||
}
|
||||
return error("创建失败");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,10 @@
|
||||
package com.xboe.module.boecase.service;
|
||||
|
||||
import com.xboe.module.boecase.entity.AiChatConversationData;
|
||||
import com.xboe.module.boecase.vo.CaseAiMessageVo;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* es索引
|
||||
*/
|
||||
@@ -10,18 +15,32 @@ public interface IElasticSearchIndexService {
|
||||
* @param indexName
|
||||
* @return
|
||||
*/
|
||||
boolean checkIndexExists(String indexName);
|
||||
boolean checkIndexExists();
|
||||
|
||||
/**
|
||||
* 创建索引
|
||||
* @param indexName
|
||||
*/
|
||||
boolean createIndex(String indexName);
|
||||
boolean createIndex();
|
||||
|
||||
/**
|
||||
* 删除索引
|
||||
* @param indexName
|
||||
* @return
|
||||
*/
|
||||
boolean deleteIndex(String indexName);
|
||||
boolean deleteIndex();
|
||||
|
||||
/**
|
||||
* 新增数据
|
||||
* @param data
|
||||
* @return
|
||||
*/
|
||||
boolean createData(AiChatConversationData data);
|
||||
|
||||
/**
|
||||
* 查询数据
|
||||
* @param conversationId
|
||||
* @return
|
||||
*/
|
||||
List<CaseAiMessageVo> queryData(String conversationId);
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ import com.xboe.module.boecase.entity.Cases;
|
||||
import com.xboe.module.boecase.properties.CaseAiProperties;
|
||||
import com.xboe.module.boecase.service.IAiAccessTokenService;
|
||||
import com.xboe.module.boecase.service.ICaseAiChatService;
|
||||
import com.xboe.module.boecase.service.IElasticSearchIndexService;
|
||||
import com.xboe.module.boecase.vo.CaseAiMessageVo;
|
||||
import com.xboe.module.boecase.vo.CaseReferVo;
|
||||
import com.xboe.module.boecase.entity.AiChatConversationData;
|
||||
@@ -82,8 +83,8 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
|
||||
@Autowired
|
||||
private CaseAiConversationsDao caseAiConversationsDao;
|
||||
|
||||
@Autowired(required = false)
|
||||
private RestHighLevelClient elasticsearchClient;
|
||||
@Autowired
|
||||
private IElasticSearchIndexService elasticSearchIndexService;
|
||||
|
||||
@Autowired
|
||||
private CaseDocumentLogDao caseDocumentLogDao;
|
||||
@@ -142,7 +143,7 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
|
||||
public void onClosed(@NotNull EventSource eventSource) {
|
||||
log.info("调用接口 [{}] 接口关闭", request.url());
|
||||
// 对话完成,保存到ES
|
||||
saveConversationToES(conversationData);
|
||||
elasticSearchIndexService.createData(conversationData);
|
||||
sseEmitter.complete();
|
||||
}
|
||||
|
||||
@@ -309,46 +310,11 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
|
||||
|
||||
@Override
|
||||
public List<CaseAiMessageVo> getConversationMessages(String conversationId) {
|
||||
List<CaseAiMessageVo> messages = new ArrayList<>();
|
||||
|
||||
if (elasticsearchClient == null) {
|
||||
log.warn("未配置Elasticsearch客户端,无法查询消息记录");
|
||||
return messages;
|
||||
if (StringUtils.isEmpty(conversationId)) {
|
||||
log.warn("conversationId 为空, 不查询");
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
try {
|
||||
// 根据conversationId从数据库查询AI会话ID
|
||||
String aiConversationId = caseAiConversationsDao.findAiConversationIdById(conversationId);
|
||||
if (StringUtils.isEmpty(aiConversationId)) {
|
||||
log.warn("未找到conversationId: {}对应的AI会话ID", conversationId);
|
||||
return messages;
|
||||
}
|
||||
|
||||
// 从 ES 中查询消息记录
|
||||
SearchRequest searchRequest = new SearchRequest("ai_chat_messages"); // ES索引名,可以根据实际情况调整
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(QueryBuilders.termQuery("conversationId", aiConversationId));
|
||||
searchSourceBuilder.size(1000); // 设置最大返回数量
|
||||
searchRequest.source(searchSourceBuilder);
|
||||
|
||||
SearchResponse searchResponse = elasticsearchClient.search(searchRequest, RequestOptions.DEFAULT);
|
||||
SearchHits hits = searchResponse.getHits();
|
||||
|
||||
for (SearchHit hit : hits) {
|
||||
Map<String, Object> sourceMap = hit.getSourceAsMap();
|
||||
CaseAiMessageVo messageVo = parseMessageFromES(sourceMap);
|
||||
if (messageVo != null) {
|
||||
messages.add(messageVo);
|
||||
}
|
||||
}
|
||||
|
||||
log.info("从 ES 中查询到 {} 条消息记录", messages.size());
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("从 ES 查询会话消息记录异常", e);
|
||||
}
|
||||
|
||||
return messages;
|
||||
return elasticSearchIndexService.queryData(conversationId);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -556,52 +522,6 @@ public class CaseAiChatServiceImpl implements ICaseAiChatService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存对话记录到ES
|
||||
*/
|
||||
private void saveConversationToES(AiChatConversationData conversationData) {
|
||||
if (elasticsearchClient == null) {
|
||||
log.warn("未配置Elasticsearch客户端,无法保存对话记录");
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// 构建要保存的数据
|
||||
JSONObject esData = new JSONObject();
|
||||
esData.put("query", conversationData.getQuery());
|
||||
esData.put("answer", conversationData.getAnswerAsString());
|
||||
esData.put("conversationId", conversationData.getConversationId());
|
||||
esData.put("userId", conversationData.getUserId());
|
||||
esData.put("timestamp", LocalDateTime.now().toString());
|
||||
|
||||
// 构建 caseRefer 数据
|
||||
JSONArray caseReferArray = new JSONArray();
|
||||
for (CaseReferVo caseRefer : conversationData.getCaseRefers()) {
|
||||
JSONObject caseReferObj = new JSONObject();
|
||||
caseReferObj.put("caseId", caseRefer.getCaseId());
|
||||
caseReferObj.put("title", caseRefer.getTitle());
|
||||
caseReferObj.put("authorName", caseRefer.getAuthorName());
|
||||
caseReferObj.put("keywords", caseRefer.getKeywords());
|
||||
caseReferObj.put("content", caseRefer.getContent());
|
||||
caseReferArray.add(caseReferObj);
|
||||
}
|
||||
esData.put("caseRefer", caseReferArray);
|
||||
|
||||
// 添加建议
|
||||
esData.put("suggestions", conversationData.getSuggestions());
|
||||
|
||||
// 保存到ES
|
||||
IndexRequest indexRequest = new IndexRequest("ai_chat_messages");
|
||||
indexRequest.source(esData.toJSONString(), XContentType.JSON);
|
||||
|
||||
IndexResponse indexResponse = elasticsearchClient.index(indexRequest, RequestOptions.DEFAULT);
|
||||
log.info("保存对话记录到ES成功,文档ID: {}", indexResponse.getId());
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("保存对话记录到ES异常", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 当 SSE 失败时,作为普通 HTTP 请求处理
|
||||
*/
|
||||
|
||||
@@ -1,8 +1,18 @@
|
||||
package com.xboe.module.boecase.service.impl;
|
||||
|
||||
import com.alibaba.fastjson.JSONArray;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.xboe.constants.CaseAiConstants;
|
||||
import com.xboe.module.boecase.entity.AiChatConversationData;
|
||||
import com.xboe.module.boecase.service.IElasticSearchIndexService;
|
||||
import com.xboe.module.boecase.vo.CaseAiMessageVo;
|
||||
import com.xboe.module.boecase.vo.CaseReferVo;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.RequestOptions;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
@@ -11,10 +21,18 @@ import org.elasticsearch.client.indices.CreateIndexResponse;
|
||||
import org.elasticsearch.client.indices.GetIndexRequest;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchHits;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
@@ -24,13 +42,13 @@ public class ElasticSearchIndexServiceImpl implements IElasticSearchIndexService
|
||||
private RestHighLevelClient elasticsearchClient;
|
||||
|
||||
@Override
|
||||
public boolean checkIndexExists(String indexName) {
|
||||
public boolean checkIndexExists() {
|
||||
if (elasticsearchClient == null) {
|
||||
log.warn("ElasticSearch客户端未配置");
|
||||
log.error("ElasticSearch客户端未配置");
|
||||
return false;
|
||||
}
|
||||
// 检查索引是否存在
|
||||
GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
|
||||
GetIndexRequest getIndexRequest = new GetIndexRequest(CaseAiConstants.CASE_AI_INDEX_NAME);
|
||||
try {
|
||||
return elasticsearchClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
|
||||
} catch (IOException e) {
|
||||
@@ -40,13 +58,13 @@ public class ElasticSearchIndexServiceImpl implements IElasticSearchIndexService
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean createIndex(String indexName) {
|
||||
public boolean createIndex() {
|
||||
if (elasticsearchClient == null) {
|
||||
log.warn("ElasticSearch客户端未配置");
|
||||
log.error("ElasticSearch客户端未配置");
|
||||
return false;
|
||||
}
|
||||
// 创建索引
|
||||
CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName);
|
||||
CreateIndexRequest createIndexRequest = new CreateIndexRequest(CaseAiConstants.CASE_AI_INDEX_NAME);
|
||||
|
||||
// 设置索引配置
|
||||
createIndexRequest.settings(Settings.builder()
|
||||
@@ -71,29 +89,29 @@ public class ElasticSearchIndexServiceImpl implements IElasticSearchIndexService
|
||||
}
|
||||
|
||||
if (createIndexResponse.isAcknowledged()) {
|
||||
log.info("ElasticSearch索引 [{}] 创建成功", indexName);
|
||||
log.info("ElasticSearch索引 [{}] 创建成功", CaseAiConstants.CASE_AI_INDEX_NAME);
|
||||
return true;
|
||||
} else {
|
||||
log.warn("ElasticSearch索引 [{}] 创建可能失败,响应未确认", indexName);
|
||||
log.error("ElasticSearch索引 [{}] 创建可能失败,响应未确认", CaseAiConstants.CASE_AI_INDEX_NAME);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean deleteIndex(String indexName) {
|
||||
public boolean deleteIndex() {
|
||||
if (elasticsearchClient == null) {
|
||||
log.warn("ElasticSearch客户端未配置");
|
||||
log.error("ElasticSearch客户端未配置");
|
||||
return false;
|
||||
}
|
||||
// 执行删除索引请求
|
||||
DeleteIndexRequest deleteRequest = new DeleteIndexRequest(indexName);
|
||||
DeleteIndexRequest deleteRequest = new DeleteIndexRequest(CaseAiConstants.CASE_AI_INDEX_NAME);
|
||||
try {
|
||||
AcknowledgedResponse deleteResponse = elasticsearchClient.indices().delete(deleteRequest, RequestOptions.DEFAULT);
|
||||
if (deleteResponse.isAcknowledged()) {
|
||||
log.info("成功删除Elasticsearch索引: {}", indexName);
|
||||
log.info("成功删除Elasticsearch索引: {}", CaseAiConstants.CASE_AI_INDEX_NAME);
|
||||
return true;
|
||||
} else {
|
||||
log.warn("删除索引 [{}] 未被确认(可能部分节点未响应)", indexName);
|
||||
log.error("删除索引 [{}] 未被确认(可能部分节点未响应)", CaseAiConstants.CASE_AI_INDEX_NAME);
|
||||
return false;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
@@ -102,6 +120,130 @@ public class ElasticSearchIndexServiceImpl implements IElasticSearchIndexService
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean createData(AiChatConversationData conversationData) {
|
||||
if (elasticsearchClient == null) {
|
||||
log.error("未配置Elasticsearch客户端,无法保存对话记录");
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
// 构建要保存的数据
|
||||
JSONObject esData = new JSONObject();
|
||||
esData.put("query", conversationData.getQuery());
|
||||
esData.put("answer", conversationData.getAnswerAsString());
|
||||
esData.put("conversationId", conversationData.getConversationId());
|
||||
esData.put("userId", conversationData.getUserId());
|
||||
esData.put("timestamp", LocalDateTime.now().toString());
|
||||
|
||||
// 构建 caseRefer 数据
|
||||
JSONArray caseReferArray = new JSONArray();
|
||||
for (CaseReferVo caseRefer : conversationData.getCaseRefers()) {
|
||||
JSONObject caseReferObj = new JSONObject();
|
||||
caseReferObj.put("caseId", caseRefer.getCaseId());
|
||||
caseReferObj.put("title", caseRefer.getTitle());
|
||||
caseReferObj.put("authorName", caseRefer.getAuthorName());
|
||||
caseReferObj.put("keywords", caseRefer.getKeywords());
|
||||
caseReferObj.put("content", caseRefer.getContent());
|
||||
caseReferArray.add(caseReferObj);
|
||||
}
|
||||
esData.put("caseRefer", caseReferArray);
|
||||
|
||||
// 添加建议
|
||||
esData.put("suggestions", conversationData.getSuggestions());
|
||||
|
||||
// 保存到ES
|
||||
IndexRequest indexRequest = new IndexRequest("ai_chat_messages");
|
||||
String dataStr = esData.toJSONString();
|
||||
log.info("保存对话记录到ES:{}", dataStr);
|
||||
indexRequest.source(dataStr, XContentType.JSON);
|
||||
|
||||
IndexResponse indexResponse = elasticsearchClient.index(indexRequest, RequestOptions.DEFAULT);
|
||||
log.info("保存对话记录到ES成功,文档ID: {}", indexResponse.getId());
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
log.error("保存对话记录到ES异常", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<CaseAiMessageVo> queryData(String conversationId) {
|
||||
List<CaseAiMessageVo> list = new ArrayList<>();
|
||||
if (elasticsearchClient == null) {
|
||||
log.error("未配置Elasticsearch客户端,无法查询对话记录");
|
||||
return list;
|
||||
}
|
||||
try {
|
||||
// 从 ES 中查询消息记录
|
||||
SearchRequest searchRequest = new SearchRequest(CaseAiConstants.CASE_AI_INDEX_NAME); // ES索引名,可以根据实际情况调整
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(QueryBuilders.termQuery("conversationId", conversationId));
|
||||
searchSourceBuilder.size(1000); // 设置最大返回数量
|
||||
searchRequest.source(searchSourceBuilder);
|
||||
|
||||
SearchResponse searchResponse = elasticsearchClient.search(searchRequest, RequestOptions.DEFAULT);
|
||||
SearchHits hits = searchResponse.getHits();
|
||||
|
||||
for (SearchHit hit : hits) {
|
||||
Map<String, Object> sourceMap = hit.getSourceAsMap();
|
||||
CaseAiMessageVo data = parseMessageFromES(sourceMap);
|
||||
if (data != null) {
|
||||
list.add(data);
|
||||
}
|
||||
}
|
||||
|
||||
log.info("从 ES 中查询到 {} 条消息记录", list.size());
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("从 ES 查询会话消息记录异常", e);
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
private CaseAiMessageVo parseMessageFromES(Map<String, Object> sourceMap) {
|
||||
try {
|
||||
CaseAiMessageVo messageVo = new CaseAiMessageVo();
|
||||
messageVo.setQuery((String) sourceMap.get("query"));
|
||||
messageVo.setAnswer((String) sourceMap.get("answer"));
|
||||
|
||||
// 解析 suggestions
|
||||
Object suggestionsObj = sourceMap.get("suggestions");
|
||||
if (suggestionsObj instanceof List) {
|
||||
messageVo.setSuggestions((List<String>) suggestionsObj);
|
||||
}
|
||||
|
||||
// 解析 caseRefer
|
||||
Object caseReferObj = sourceMap.get("caseRefer");
|
||||
if (caseReferObj instanceof List) {
|
||||
List<CaseReferVo> caseReferList = new ArrayList<>();
|
||||
List<Map<String, Object>> caseReferMaps = (List<Map<String, Object>>) caseReferObj;
|
||||
|
||||
for (Map<String, Object> caseReferMap : caseReferMaps) {
|
||||
CaseReferVo caseRefer = new CaseReferVo();
|
||||
caseRefer.setCaseId((String) caseReferMap.get("caseId"));
|
||||
caseRefer.setTitle((String) caseReferMap.get("title"));
|
||||
caseRefer.setAuthorName((String) caseReferMap.get("authorName"));
|
||||
caseRefer.setContent((String) caseReferMap.get("content"));
|
||||
|
||||
// 解析 keywords
|
||||
Object keywordsObj = caseReferMap.get("keywords");
|
||||
if (keywordsObj instanceof List) {
|
||||
caseRefer.setKeywords((List<String>) keywordsObj);
|
||||
}
|
||||
|
||||
caseReferList.add(caseRefer);
|
||||
}
|
||||
messageVo.setCaseRefer(caseReferList);
|
||||
}
|
||||
|
||||
return messageVo;
|
||||
} catch (Exception e) {
|
||||
log.error("解析ES消息数据异常", e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取ai_chat_messages索引的字段映射配置
|
||||
* 根据项目中的会话消息数据结构规范定义映射
|
||||
|
||||
Reference in New Issue
Block a user