Knowledge optimization (#3755)

Co-authored-by: crazywoola <427733928@qq.com>
Co-authored-by: JzoNg <jzongcode@gmail.com>
This commit is contained in:
Jyong
2024-04-24 15:02:29 +08:00
committed by GitHub
parent 3cd8e6f5c6
commit f257f2c396
75 changed files with 2756 additions and 266 deletions

View File

@@ -16,6 +16,7 @@ from models.dataset import (
)
# Add import statement for ValueError
@shared_task(queue='dataset')
def clean_dataset_task(dataset_id: str, tenant_id: str, indexing_technique: str,
index_struct: str, collection_binding_id: str, doc_form: str):
@@ -48,6 +49,9 @@ def clean_dataset_task(dataset_id: str, tenant_id: str, indexing_technique: str,
logging.info(click.style('No documents found for dataset: {}'.format(dataset_id), fg='green'))
else:
logging.info(click.style('Cleaning documents for dataset: {}'.format(dataset_id), fg='green'))
# Specify the index type before initializing the index processor
if doc_form is None:
raise ValueError("Index type must be specified.")
index_processor = IndexProcessorFactory(doc_form).init_index_processor()
index_processor.clean(dataset, None)

View File

@@ -64,6 +64,39 @@ def deal_dataset_vector_index_task(dataset_id: str, action: str):
# save vector index
index_processor.load(dataset, documents, with_keywords=False)
elif action == 'update':
# clean index
index_processor.clean(dataset, None, with_keywords=False)
dataset_documents = db.session.query(DatasetDocument).filter(
DatasetDocument.dataset_id == dataset_id,
DatasetDocument.indexing_status == 'completed',
DatasetDocument.enabled == True,
DatasetDocument.archived == False,
).all()
# add new index
if dataset_documents:
documents = []
for dataset_document in dataset_documents:
# delete from vector index
segments = db.session.query(DocumentSegment).filter(
DocumentSegment.document_id == dataset_document.id,
DocumentSegment.enabled == True
).order_by(DocumentSegment.position.asc()).all()
for segment in segments:
document = Document(
page_content=segment.content,
metadata={
"doc_id": segment.index_node_id,
"doc_hash": segment.index_node_hash,
"document_id": segment.document_id,
"dataset_id": segment.dataset_id,
}
)
documents.append(document)
# save vector index
index_processor.load(dataset, documents, with_keywords=False)
end_at = time.perf_counter()
logging.info(

View File

@@ -0,0 +1,94 @@
import datetime
import logging
import time
import click
from celery import shared_task
from flask import current_app
from core.indexing_runner import DocumentIsPausedException, IndexingRunner
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
from extensions.ext_database import db
from models.dataset import Dataset, Document, DocumentSegment
from services.feature_service import FeatureService
@shared_task(queue='dataset')
def duplicate_document_indexing_task(dataset_id: str, document_ids: list):
"""
Async process document
:param dataset_id:
:param document_ids:
Usage: duplicate_document_indexing_task.delay(dataset_id, document_id)
"""
documents = []
start_at = time.perf_counter()
dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
# check document limit
features = FeatureService.get_features(dataset.tenant_id)
try:
if features.billing.enabled:
vector_space = features.vector_space
count = len(document_ids)
batch_upload_limit = int(current_app.config['BATCH_UPLOAD_LIMIT'])
if count > batch_upload_limit:
raise ValueError(f"You have reached the batch upload limit of {batch_upload_limit}.")
if 0 < vector_space.limit <= vector_space.size:
raise ValueError("Your total number of documents plus the number of uploads have over the limit of "
"your subscription.")
except Exception as e:
for document_id in document_ids:
document = db.session.query(Document).filter(
Document.id == document_id,
Document.dataset_id == dataset_id
).first()
if document:
document.indexing_status = 'error'
document.error = str(e)
document.stopped_at = datetime.datetime.utcnow()
db.session.add(document)
db.session.commit()
return
for document_id in document_ids:
logging.info(click.style('Start process document: {}'.format(document_id), fg='green'))
document = db.session.query(Document).filter(
Document.id == document_id,
Document.dataset_id == dataset_id
).first()
if document:
# clean old data
index_type = document.doc_form
index_processor = IndexProcessorFactory(index_type).init_index_processor()
segments = db.session.query(DocumentSegment).filter(DocumentSegment.document_id == document_id).all()
if segments:
index_node_ids = [segment.index_node_id for segment in segments]
# delete from vector index
index_processor.clean(dataset, index_node_ids)
for segment in segments:
db.session.delete(segment)
db.session.commit()
document.indexing_status = 'parsing'
document.processing_started_at = datetime.datetime.utcnow()
documents.append(document)
db.session.add(document)
db.session.commit()
try:
indexing_runner = IndexingRunner()
indexing_runner.run(documents)
end_at = time.perf_counter()
logging.info(click.style('Processed dataset: {} latency: {}'.format(dataset_id, end_at - start_at), fg='green'))
except DocumentIsPausedException as ex:
logging.info(click.style(str(ex), fg='yellow'))
except Exception:
pass

View File

@@ -0,0 +1,91 @@
import datetime
import logging
import time
import click
from celery import shared_task
from core.indexing_runner import IndexingRunner
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models.dataset import Dataset, Document, DocumentSegment
from services.feature_service import FeatureService
@shared_task(queue='dataset')
def retry_document_indexing_task(dataset_id: str, document_ids: list[str]):
"""
Async process document
:param dataset_id:
:param document_ids:
Usage: retry_document_indexing_task.delay(dataset_id, document_id)
"""
documents = []
start_at = time.perf_counter()
dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first()
for document_id in document_ids:
retry_indexing_cache_key = 'document_{}_is_retried'.format(document_id)
# check document limit
features = FeatureService.get_features(dataset.tenant_id)
try:
if features.billing.enabled:
vector_space = features.vector_space
if 0 < vector_space.limit <= vector_space.size:
raise ValueError("Your total number of documents plus the number of uploads have over the limit of "
"your subscription.")
except Exception as e:
document = db.session.query(Document).filter(
Document.id == document_id,
Document.dataset_id == dataset_id
).first()
if document:
document.indexing_status = 'error'
document.error = str(e)
document.stopped_at = datetime.datetime.utcnow()
db.session.add(document)
db.session.commit()
redis_client.delete(retry_indexing_cache_key)
return
logging.info(click.style('Start retry document: {}'.format(document_id), fg='green'))
document = db.session.query(Document).filter(
Document.id == document_id,
Document.dataset_id == dataset_id
).first()
try:
if document:
# clean old data
index_processor = IndexProcessorFactory(document.doc_form).init_index_processor()
segments = db.session.query(DocumentSegment).filter(DocumentSegment.document_id == document_id).all()
if segments:
index_node_ids = [segment.index_node_id for segment in segments]
# delete from vector index
index_processor.clean(dataset, index_node_ids)
for segment in segments:
db.session.delete(segment)
db.session.commit()
document.indexing_status = 'parsing'
document.processing_started_at = datetime.datetime.utcnow()
db.session.add(document)
db.session.commit()
indexing_runner = IndexingRunner()
indexing_runner.run([document])
redis_client.delete(retry_indexing_cache_key)
except Exception as ex:
document.indexing_status = 'error'
document.error = str(ex)
document.stopped_at = datetime.datetime.utcnow()
db.session.add(document)
db.session.commit()
logging.info(click.style(str(ex), fg='yellow'))
redis_client.delete(retry_indexing_cache_key)
pass
end_at = time.perf_counter()
logging.info(click.style('Retry dataset: {} latency: {}'.format(dataset_id, end_at - start_at), fg='green'))