From 44fd77b81b68b2caf944a55a21345d528fe338a8 Mon Sep 17 00:00:00 2001 From: jeffrey Date: Sat, 19 Oct 2024 23:14:01 +0900 Subject: [PATCH 01/11] fix parse_ocr.yaml typo --- sample_config/parse/parse_ocr.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sample_config/parse/parse_ocr.yaml b/sample_config/parse/parse_ocr.yaml index 1b84b209c..21562c47d 100644 --- a/sample_config/parse/parse_ocr.yaml +++ b/sample_config/parse/parse_ocr.yaml @@ -1,6 +1,6 @@ modules: - module_type: langchain_parse - parse_method: upstagelayoutanalysis + parse_method: upstagedocumentparse - module_type: llama_parse result_type: markdown - module_type: clova From d20806e5ae766181450c54988bb8e70354de7f58 Mon Sep 17 00:00:00 2001 From: "Jeffrey (Dongkyu) Kim" Date: Mon, 21 Oct 2024 01:26:24 +0900 Subject: [PATCH 02/11] Add BaseVectorDB schema only for AutoRAG and replace original chroma collection to Chroma vector store instance. (#871) * add Chroma wrapper for vectorstore compatibility * Make my own Chroma class * refactor get_id_scores * replace chroma to VectorStore instance with async * use vector store embedding at vectordb module * optimize imports --------- Co-authored-by: jeffrey --- autorag/data/legacy/qacreation/base.py | 1 + autorag/evaluation/metric/util.py | 8 + autorag/nodes/retrieval/vectordb.py | 181 +++++++++--------- autorag/utils/util.py | 18 ++ autorag/vectordb/__init__.py | 0 autorag/vectordb/base.py | 59 ++++++ autorag/vectordb/chroma.py | 116 +++++++++++ .../autorag/nodes/retrieval/test_vectordb.py | 171 ++++++++--------- tests/autorag/vectordb/test_chroma.py | 56 ++++++ tests/mock.py | 6 + 10 files changed, 441 insertions(+), 175 deletions(-) create mode 100644 autorag/vectordb/__init__.py create mode 100644 autorag/vectordb/base.py create mode 100644 autorag/vectordb/chroma.py create mode 100644 tests/autorag/vectordb/test_chroma.py diff --git a/autorag/data/legacy/qacreation/base.py b/autorag/data/legacy/qacreation/base.py index 3abec876c..2d4d2e6af 100644 --- a/autorag/data/legacy/qacreation/base.py +++ b/autorag/data/legacy/qacreation/base.py @@ -144,6 +144,7 @@ def make_qa_with_existing_qa( :param kwargs: The keyword arguments for qa_creation_func. :return: QA dataset dataframe. """ + raise DeprecationWarning("This function is deprecated.") assert ( "query" in existing_query_df.columns ), "existing_query_df must have 'query' column." diff --git a/autorag/evaluation/metric/util.py b/autorag/evaluation/metric/util.py index 08f7063a1..0c53b6598 100644 --- a/autorag/evaluation/metric/util.py +++ b/autorag/evaluation/metric/util.py @@ -15,6 +15,14 @@ def calculate_cosine_similarity(a, b): return similarity +def calculate_l2_distance(a, b): + return np.linalg.norm(a - b) + + +def calculate_inner_product(a, b): + return np.dot(a, b) + + def autorag_metric(fields_to_check: List[str]): def decorator_autorag_metric(func): @functools.wraps(func) diff --git a/autorag/nodes/retrieval/vectordb.py b/autorag/nodes/retrieval/vectordb.py index 9d8a87248..06332d6e1 100644 --- a/autorag/nodes/retrieval/vectordb.py +++ b/autorag/nodes/retrieval/vectordb.py @@ -3,13 +3,17 @@ from typing import List, Tuple, Optional import chromadb +import numpy as np import pandas as pd -from chromadb import GetResult, QueryResult -from chromadb.utils.batch_utils import create_batches from llama_index.core.embeddings import BaseEmbedding from llama_index.embeddings.openai import OpenAIEmbedding from autorag import embedding_models +from autorag.evaluation.metric.util import ( + calculate_l2_distance, + calculate_inner_product, + calculate_cosine_similarity, +) from autorag.nodes.retrieval.base import evenly_distribute_passages, BaseRetrieval from autorag.utils import validate_corpus_dataset, cast_corpus_dataset from autorag.utils.util import ( @@ -20,9 +24,12 @@ result_to_dataframe, pop_params, fetch_contents, - apply_recursive, empty_cuda_cache, + convert_inputs_to_list, + make_batch, ) +from autorag.vectordb.base import BaseVectorStore +from autorag.vectordb.chroma import Chroma logger = logging.getLogger("AutoRAG") @@ -51,19 +58,23 @@ def __init__(self, project_dir: str, embedding_model: str, **kwargs): chroma_path ), f"chroma_path {chroma_path} does not exist. Please ingest first." - self.chroma_collection = load_chroma_collection( - db_path=chroma_path, collection_name=embedding_model + # TODO: load any vector store from YAML file at here + self.vector_store = Chroma( + embedding_model=embedding_model, + collection_name=embedding_model, + client_type="persistent", + path=chroma_path, ) # init embedding model if embedding_model in embedding_models: - self.embedding_model = embedding_models[embedding_model]() + self.embedding_model = self.vector_store.embedding else: logger.error(f"embedding_model_str {embedding_model} does not exist.") raise KeyError(f"embedding_model_str {embedding_model} does not exist.") def __del__(self): - del self.chroma_collection + del self.vector_store del self.embedding_model empty_cuda_cache() super().__del__() @@ -102,8 +113,9 @@ def _pure( It will be a length of queries. And each element has a length of top_k. """ # check if bm25_corpus is valid + # TODO: available at other Vector DB? assert ( - self.chroma_collection.count() > 0 + self.vector_store.collection.count() > 0 ), "collection must contain at least one document. Please check you ingested collection correctly." # truncate queries and embedding execution here. openai_embedding_limit = 8000 @@ -119,31 +131,46 @@ def _pure( ) ) - query_embeddings = flatten_apply( - run_query_embedding_batch, - queries, - embedding_model=self.embedding_model, - batch_size=embedding_batch, - ) - # if ids are specified, fetch the ids score from Chroma if ids is not None: - client = chromadb.Client() + query_embeddings = flatten_apply( + run_query_embedding_batch, + queries, + embedding_model=self.embedding_model, + batch_size=embedding_batch, + ) + + loop = get_event_loop() + + async def run_fetch(ids): + final_result = [] + for id_list in ids: + if len(id_list) == 0: + final_result.append([]) + else: + result = await self.vector_store.fetch(id_list) + final_result.append(result) + return final_result + + content_embeddings = loop.run_until_complete(run_fetch(ids)) + score_result = list( map( - lambda query_embedding_list, id_list: get_id_scores( - id_list, query_embedding_list, self.chroma_collection, client + lambda query_embedding_list, content_embedding_list: get_id_scores( + query_embedding_list, + content_embedding_list, + similarity_metric=self.vector_store.similarity_metric, ), query_embeddings, - ids, + content_embeddings, ) ) return ids, score_result # run async vector_db_pure function tasks = [ - vectordb_pure(query_embedding, top_k, self.chroma_collection) - for query_embedding in query_embeddings + vectordb_pure(query_list, top_k, self.vector_store) + for query_list in queries ] loop = get_event_loop() results = loop.run_until_complete( @@ -155,7 +182,7 @@ def _pure( async def vectordb_pure( - query_embeddings: List[List[float]], top_k: int, collection: chromadb.Collection + queries: List[str], top_k: int, vectordb: BaseVectorStore ) -> Tuple[List[str], List[float]]: """ Async VectorDB retrieval function. @@ -163,16 +190,10 @@ async def vectordb_pure( :param query_embeddings: A list of query embeddings. :param top_k: The number of passages to be retrieved. - :param collection: A chroma collection instance that will be used to retrieve passages. + :param vectordb: The vector store instance. :return: The tuple contains a list of passage ids that are retrieved from vectordb and a list of its scores. """ - id_result, score_result = [], [] - for embedded_query in query_embeddings: - result = collection.query(query_embeddings=embedded_query, n_results=top_k) - id_result.extend(result["ids"]) - score_result.extend( - list(map(lambda lst: list(map(lambda x: 1 - x, lst)), result["distances"])) - ) + id_result, score_result = await vectordb.query(queries=queries, top_k=top_k) # Distribute passages evenly id_result, score_result = evenly_distribute_passages(id_result, score_result, top_k) @@ -187,10 +208,9 @@ async def vectordb_pure( return list(id_result), list(score_result) -def vectordb_ingest( - collection: chromadb.Collection, +async def vectordb_ingest( + vectordb: BaseVectorStore, corpus_data: pd.DataFrame, - embedding_model: BaseEmbedding, embedding_batch: int = 128, ): """ @@ -199,43 +219,26 @@ def vectordb_ingest( Plus, when the corpus content is empty (whitespace), it will be ignored. And if there is a document id that already exists in the collection, it will be ignored. - :param collection: Chromadb collection instance to ingest. + :param vectordb: The vector store instance that you want to ingest. :param corpus_data: The corpus data that contains doc_id and contents columns. - :param embedding_model: An embedding model instance that will be used to embed queries. :param embedding_batch: The number of chunks that will be processed in parallel. """ - embedding_model.embed_batch_size = embedding_batch corpus_data = cast_corpus_dataset(corpus_data) validate_corpus_dataset(corpus_data) ids = corpus_data["doc_id"].tolist() # Query the collection to check if IDs already exist - existing_ids = set( - collection.get(ids=ids)["ids"] - ) # Assuming 'ids' is the key in the response - new_passage = corpus_data[~corpus_data["doc_id"].isin(existing_ids)] + existed_bool_list = await vectordb.is_exist(ids=ids) + # Assuming 'ids' is the key in the response + new_passage = corpus_data[~pd.Series(existed_bool_list)] if not new_passage.empty: new_contents = new_passage["contents"].tolist() - - # truncate by token if embedding_model is OpenAIEmbedding - if isinstance(embedding_model, OpenAIEmbedding): - openai_embedding_limit = 8000 - new_contents = openai_truncate_by_token( - new_contents, openai_embedding_limit, embedding_model.model_name - ) - new_ids = new_passage["doc_id"].tolist() - embedded_contents = embedding_model.get_text_embedding_batch( - new_contents, show_progress=True - ) - input_batches = create_batches( - api=collection._client, ids=new_ids, embeddings=embedded_contents - ) - for batch in input_batches: - ids = batch[0] - embed_content = batch[1] - collection.add(ids=ids, embeddings=embed_content) + content_batches = make_batch(new_contents, embedding_batch) + id_batches = make_batch(new_ids, embedding_batch) + for content_batch, id_batch in zip(content_batches, id_batches): + await vectordb.add(ids=id_batch, texts=content_batch) def run_query_embedding_batch( @@ -249,34 +252,40 @@ def run_query_embedding_batch( return result -def get_id_scores( - ids: List[str], - query_embeddings: List[List[float]], - collection: chromadb.Collection, - temp_client: chromadb.Client, -) -> List[float]: - if len(ids) == 0 or ids is None or not bool(ids): - return [] - - id_results: GetResult = collection.get(ids, include=["embeddings"]) - temp_collection = temp_client.create_collection( - name="temp", metadata={"hnsw:space": "cosine"} - ) - temp_collection.add(ids=id_results["ids"], embeddings=id_results["embeddings"]) - - query_result: QueryResult = temp_collection.query( - query_embeddings=query_embeddings, n_results=len(ids) - ) - assert len(query_result["ids"]) == len(query_result["distances"]) - id_scores_dict = {id_: [] for id_ in ids} - score_result = apply_recursive(lambda x: 1 - x, query_result["distances"]) - for id_list, score_list in zip(query_result["ids"], score_result): - for id_ in list(id_scores_dict.keys()): - id_idx = id_list.index(id_) - id_scores_dict[id_].append(score_list[id_idx]) - id_scores_pd = pd.DataFrame(id_scores_dict) - temp_client.delete_collection("temp") - return id_scores_pd.max(axis=0).tolist() +@convert_inputs_to_list +def get_id_scores( # To find the uncalculated score when fuse the scores for the hybrid retrieval + query_embeddings: List[ + List[float] + ], # `queries` is input. This is one user input query. + content_embeddings: List[List[float]], + similarity_metric: str, +) -> List[ + float +]: # The most high scores among each query. The length of a result is the same as the contents length. + """ + Calculate the highest similarity scores between query embeddings and content embeddings. + + :param query_embeddings: A list of lists containing query embeddings. + :param content_embeddings: A list of lists containing content embeddings. + :param similarity_metric: The similarity metric to use ('l2', 'ip', or 'cosine'). + :return: A list of the highest similarity scores for each content embedding. + """ + metric_func_dict = { + "l2": lambda x, y: 1 - calculate_l2_distance(x, y), + "ip": calculate_inner_product, + "cosine": calculate_cosine_similarity, + } + metric_func = metric_func_dict[similarity_metric] + + result = [] + for content_embedding in content_embeddings: + scores = [] + for query_embedding in query_embeddings: + scores.append( + metric_func(np.array(query_embedding), np.array(content_embedding)) + ) + result.append(max(scores)) + return result def load_chroma_collection(db_path: str, collection_name: str) -> chromadb.Collection: diff --git a/autorag/utils/util.py b/autorag/utils/util.py index 38d49c50e..bd6b22a2a 100644 --- a/autorag/utils/util.py +++ b/autorag/utils/util.py @@ -370,6 +370,24 @@ def flatten_apply( return df.groupby(level=0, sort=False)["result"].apply(list).tolist() +async def aflatten_apply( + func: Callable, nested_list: List[List[Any]], **kwargs +) -> List[List[Any]]: + """ + This function flattens the input list and applies the function to the elements. + After that, it reconstructs the list to the original shape. + Its speciality is that the first dimension length of the list can be different from each other. + + :param func: The function that applies to the flattened list. + :param nested_list: The nested list to be flattened. + :return: The list that is reconstructed after applying the function. + """ + df = pd.DataFrame({"col1": nested_list}) + df = df.explode("col1") + df["result"] = await func(df["col1"].tolist(), **kwargs) + return df.groupby(level=0, sort=False)["result"].apply(list).tolist() + + def sort_by_scores(row, reverse=True): """ Sorts each row by 'scores' column. diff --git a/autorag/vectordb/__init__.py b/autorag/vectordb/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/autorag/vectordb/base.py b/autorag/vectordb/base.py new file mode 100644 index 000000000..04d05fd88 --- /dev/null +++ b/autorag/vectordb/base.py @@ -0,0 +1,59 @@ +from abc import abstractmethod +from typing import List, Tuple + +from llama_index.embeddings.openai import OpenAIEmbedding + +from autorag import embedding_models +from autorag.utils.util import openai_truncate_by_token + + +class BaseVectorStore: + support_similarity_metrics = ["l2", "ip", "cosine"] + + def __init__(self, embedding_model: str, similarity_metric: str = "cosine"): + self.embedding = embedding_models[embedding_model]() + assert ( + similarity_metric in self.support_similarity_metrics + ), f"search method {similarity_metric} is not supported" + self.similarity_metric = similarity_metric + + @abstractmethod + async def add( + self, + ids: List[str], + texts: List[str], + ): + pass + + @abstractmethod + async def query( + self, queries: List[str], top_k: int, **kwargs + ) -> Tuple[List[List[str]], List[List[float]]]: + pass + + @abstractmethod + async def fetch(self, ids: List[str]) -> List[List[float]]: + """ + Fetch the embeddings of the ids. + """ + pass + + @abstractmethod + async def is_exist(self, ids: List[str]) -> List[bool]: + """ + Check if the ids exist in the Vector DB. + """ + pass + + @abstractmethod + async def delete(self, ids: List[str]): + pass + + def truncated_inputs(self, inputs: List[str]) -> List[str]: + if isinstance(self.embedding, OpenAIEmbedding): + openai_embedding_limit = 8000 + results = openai_truncate_by_token( + inputs, openai_embedding_limit, self.embedding.model_name + ) + return results + return inputs diff --git a/autorag/vectordb/chroma.py b/autorag/vectordb/chroma.py new file mode 100644 index 000000000..e694ac153 --- /dev/null +++ b/autorag/vectordb/chroma.py @@ -0,0 +1,116 @@ +from typing import List, Optional, Dict, Tuple + +from chromadb import ( + EphemeralClient, + PersistentClient, + DEFAULT_TENANT, + DEFAULT_DATABASE, + CloudClient, + AsyncHttpClient, +) +from chromadb.api.models.AsyncCollection import AsyncCollection +from chromadb.api.types import IncludeEnum, QueryResult + +from autorag.utils.util import apply_recursive +from autorag.vectordb.base import BaseVectorStore + + +class Chroma(BaseVectorStore): + def __init__( + self, + embedding_model: str, + collection_name: str, + client_type: str = "persistent", + similarity_metric: str = "cosine", + path: str = None, + host: str = "localhost", + port: int = 8000, + ssl: bool = False, + headers: Optional[Dict[str, str]] = None, + api_key: Optional[str] = None, + tenant: str = DEFAULT_TENANT, + database: str = DEFAULT_DATABASE, + ): + super().__init__(embedding_model) + if client_type == "ephemeral": + self.client = EphemeralClient(tenant=tenant, database=database) + elif client_type == "persistent": + assert path is not None, "path must be provided for persistent client" + self.client = PersistentClient(path=path, tenant=tenant, database=database) + elif client_type == "http": + self.client = AsyncHttpClient( + host=host, + port=port, + ssl=ssl, + headers=headers, + tenant=tenant, + database=database, + ) + elif client_type == "cloud": + self.client = CloudClient( + tenant=tenant, + database=database, + api_key=api_key, + ) + else: + raise ValueError( + f"client_type {client_type} is not supported\n" + "supported client types are: ephemeral, persistent, http, cloud" + ) + + self.collection = self.client.get_or_create_collection( + name=collection_name, + metadata={"hnsw:space": similarity_metric}, + ) + + async def add(self, ids: List[str], texts: List[str]): + texts = self.truncated_inputs(texts) + text_embeddings = await self.embedding.aget_text_embedding_batch(texts) + if isinstance(self.collection, AsyncCollection): + await self.collection.add(ids=ids, embeddings=text_embeddings) + else: + self.collection.add(ids=ids, embeddings=text_embeddings) + + async def fetch(self, ids: List[str]) -> List[List[float]]: + if isinstance(self.collection, AsyncCollection): + fetch_result = await self.collection.get( + ids, include=[IncludeEnum.embeddings] + ) + else: + fetch_result = self.collection.get(ids, include=[IncludeEnum.embeddings]) + fetch_embeddings = fetch_result["embeddings"] + return fetch_embeddings + + async def is_exist(self, ids: List[str]) -> List[bool]: + if isinstance(self.collection, AsyncCollection): + fetched_result = await self.collection.get(ids, include=[]) + else: + fetched_result = self.collection.get(ids, include=[]) + existed_ids = fetched_result["ids"] + return list(map(lambda x: x in existed_ids, ids)) + + async def query( + self, queries: List[str], top_k: int, **kwargs + ) -> Tuple[List[List[str]], List[List[float]]]: + queries = self.truncated_inputs(queries) + query_embeddings: List[ + List[float] + ] = await self.embedding.aget_text_embedding_batch(queries) + if isinstance(self.collection, AsyncCollection): + query_result: QueryResult = await self.collection.query( + query_embeddings=query_embeddings, n_results=top_k + ) + else: + query_result: QueryResult = self.collection.query( + query_embeddings=query_embeddings, n_results=top_k + ) + ids = query_result["ids"] + scores = query_result["distances"] + scores = apply_recursive(lambda x: 1 - x, scores) + return ids, scores + + async def delete(self, ids: List[str]): + if isinstance(self.collection, AsyncCollection): + await self.collection.delete(ids) + else: + self.collection.delete(ids) diff --git a/tests/autorag/nodes/retrieval/test_vectordb.py b/tests/autorag/nodes/retrieval/test_vectordb.py index 2a3dc4700..a9b2ba0fd 100644 --- a/tests/autorag/nodes/retrieval/test_vectordb.py +++ b/tests/autorag/nodes/retrieval/test_vectordb.py @@ -1,3 +1,4 @@ +import asyncio import os import pathlib import shutil @@ -6,15 +7,13 @@ from datetime import datetime from unittest.mock import patch -import chromadb import pandas as pd import pytest -from llama_index.core import MockEmbedding from llama_index.embeddings.openai import OpenAIEmbedding -from autorag import embedding_models from autorag.nodes.retrieval import VectorDB from autorag.nodes.retrieval.vectordb import vectordb_ingest, get_id_scores +from autorag.vectordb.chroma import Chroma from tests.autorag.nodes.retrieval.test_retrieval_base import ( queries, corpus_df, @@ -23,39 +22,38 @@ base_retrieval_node_test, searchable_input_ids, ) -from tests.mock import mock_get_text_embedding_batch +from tests.mock import mock_aget_text_embedding_batch root_dir = pathlib.PurePath( os.path.dirname(os.path.realpath(__file__)) ).parent.parent.parent resource_path = os.path.join(root_dir, "resources") -embedding_model = MockEmbedding(1536) - @pytest.fixture -def ingested_vectordb(): +def mock_chroma(): with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as chroma_path: - db = chromadb.PersistentClient(path=chroma_path) - collection = db.create_collection( - name="test_vectordb_retrieval", metadata={"hnsw:space": "cosine"} + chroma = Chroma( + client_type="persistent", + path=chroma_path, + embedding_model="mock", + collection_name="test_vectordb_retrieval", + similarity_metric="cosine", ) - - vectordb_ingest(collection, corpus_df, embedding_model) - - assert collection.count() == 5 - yield collection + yield chroma @pytest.fixture -def empty_chromadb(): +def openai_chroma(): with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as chroma_path: - db = chromadb.PersistentClient(path=chroma_path) - collection = db.create_collection( - name="test_vectordb_retrieval", metadata={"hnsw:space": "cosine"} + chroma = Chroma( + client_type="persistent", + path=chroma_path, + embedding_model="openai", + collection_name="test_vectordb_retrieval", + similarity_metric="cosine", ) - - yield collection + yield chroma @pytest.fixture @@ -64,15 +62,17 @@ def project_dir_for_vectordb_node(): os.makedirs(os.path.join(test_project_dir, "resources")) chroma_path = os.path.join(test_project_dir, "resources", "chroma") os.makedirs(chroma_path) - db = chromadb.PersistentClient(path=chroma_path) - collection = db.create_collection( - name="openai", metadata={"hnsw:space": "cosine"} + chroma = Chroma( + client_type="persistent", + path=chroma_path, + embedding_model="mock", + collection_name="mock", + similarity_metric="cosine", ) os.makedirs(os.path.join(test_project_dir, "data")) corpus_path = os.path.join(test_project_dir, "data", "corpus.parquet") corpus_df.to_parquet(corpus_path, index=False) - vectordb_ingest(collection, corpus_df, embedding_model) - + asyncio.run(vectordb_ingest(chroma, corpus_df)) yield test_project_dir @@ -85,13 +85,16 @@ def project_dir_for_vectordb_node_from_sample_project(): chroma_path = os.path.join(test_project_dir, "resources", "chroma") os.makedirs(chroma_path) - db = chromadb.PersistentClient(path=chroma_path) - collection = db.create_collection( - name="openai", metadata={"hnsw:space": "cosine"} + chroma = Chroma( + client_type="persistent", + path=chroma_path, + embedding_model="mock", + collection_name="mock", + similarity_metric="cosine", ) corpus_path = os.path.join(test_project_dir, "data", "corpus.parquet") local_corpus_df = pd.read_parquet(corpus_path, engine="pyarrow") - vectordb_ingest(collection, local_corpus_df, embedding_model) + asyncio.run(vectordb_ingest(chroma, local_corpus_df)) yield test_project_dir @@ -99,16 +102,11 @@ def project_dir_for_vectordb_node_from_sample_project(): @pytest.fixture def vectordb_instance(project_dir_for_vectordb_node): vectordb = VectorDB( - project_dir=project_dir_for_vectordb_node, embedding_model="openai" + project_dir=project_dir_for_vectordb_node, embedding_model="mock" ) yield vectordb -@patch.object( - OpenAIEmbedding, - "get_text_embedding_batch", - mock_get_text_embedding_batch, -) def test_vectordb_retrieval(vectordb_instance): top_k = 4 id_result, score_result = vectordb_instance._pure( @@ -118,11 +116,6 @@ def test_vectordb_retrieval(vectordb_instance): base_retrieval_test(id_result, score_result, top_k) -@patch.object( - OpenAIEmbedding, - "get_text_embedding_batch", - mock_get_text_embedding_batch, -) def test_vectordb_retrieval_ids(vectordb_instance): ids = [["doc2", "doc3"], ["doc1", "doc2"], ["doc4", "doc5"]] id_result, score_result = vectordb_instance._pure( @@ -135,11 +128,6 @@ def test_vectordb_retrieval_ids(vectordb_instance): assert all([len(score_list) == 2 for score_list in score_result]) -@patch.object( - OpenAIEmbedding, - "get_text_embedding_batch", - mock_get_text_embedding_batch, -) def test_vectordb_retrieval_ids_empty(vectordb_instance): ids = [["doc2", "doc3"], [], ["doc4"]] id_result, score_result = vectordb_instance._pure( @@ -154,32 +142,22 @@ def test_vectordb_retrieval_ids_empty(vectordb_instance): assert len(score_result[2]) == 1 -@patch.object( - OpenAIEmbedding, - "get_text_embedding_batch", - mock_get_text_embedding_batch, -) def test_vectordb_node(project_dir_for_vectordb_node_from_sample_project): result_df = VectorDB.run_evaluator( project_dir=project_dir_for_vectordb_node_from_sample_project, previous_result=previous_result, top_k=4, - embedding_model="openai", + embedding_model="mock", ) base_retrieval_node_test(result_df) -@patch.object( - OpenAIEmbedding, - "get_text_embedding_batch", - mock_get_text_embedding_batch, -) def test_vectordb_node_ids(project_dir_for_vectordb_node_from_sample_project): result_df = VectorDB.run_evaluator( project_dir=project_dir_for_vectordb_node_from_sample_project, previous_result=previous_result, top_k=4, - embedding_model="openai", + embedding_model="mock", ids=searchable_input_ids, ) contents = result_df["retrieved_contents"].tolist() @@ -192,12 +170,13 @@ def test_vectordb_node_ids(project_dir_for_vectordb_node_from_sample_project): @patch.object( OpenAIEmbedding, - "get_text_embedding_batch", - mock_get_text_embedding_batch, + "aget_text_embedding_batch", + mock_aget_text_embedding_batch, ) -def test_duplicate_id_vectordb_ingest(ingested_vectordb): - vectordb_ingest(ingested_vectordb, corpus_df, embedding_model) - assert ingested_vectordb.count() == 5 +@pytest.mark.asyncio +async def test_duplicate_id_vectordb_ingest(openai_chroma): + await vectordb_ingest(openai_chroma, corpus_df) + assert openai_chroma.collection.count() == 5 new_doc_id = ["doc4", "doc5", "doc6", "doc7", "doc8"] new_contents = [ @@ -211,27 +190,28 @@ def test_duplicate_id_vectordb_ingest(ingested_vectordb): new_corpus_df = pd.DataFrame( {"doc_id": new_doc_id, "contents": new_contents, "metadata": new_metadata} ) - vectordb_ingest(ingested_vectordb, new_corpus_df, embedding_model) + await vectordb_ingest(openai_chroma, new_corpus_df) - assert ingested_vectordb.count() == 8 + assert openai_chroma.collection.count() == 8 @patch.object( OpenAIEmbedding, - "get_text_embedding_batch", - mock_get_text_embedding_batch, + "aget_text_embedding_batch", + mock_aget_text_embedding_batch, ) -def test_long_text_vectordb_ingest(ingested_vectordb): +@pytest.mark.asyncio +async def test_long_text_vectordb_ingest(openai_chroma): + await vectordb_ingest(openai_chroma, corpus_df) new_doc_id = ["doc6", "doc7"] new_contents = ["This is a test" * 20000, "This is a test" * 40000] new_metadata = [{"datetime": datetime.now()} for _ in range(2)] new_corpus_df = pd.DataFrame( {"doc_id": new_doc_id, "contents": new_contents, "metadata": new_metadata} ) - assert isinstance(embedding_model, MockEmbedding) - vectordb_ingest(ingested_vectordb, new_corpus_df, embedding_model) + await vectordb_ingest(openai_chroma, new_corpus_df) - assert ingested_vectordb.count() == 7 + assert openai_chroma.collection.count() == 7 def mock_get_text_embedding_batch(self, texts, **kwargs): @@ -239,10 +219,10 @@ def mock_get_text_embedding_batch(self, texts, **kwargs): @patch.object( - OpenAIEmbedding, "get_text_embedding_batch", mock_get_text_embedding_batch + OpenAIEmbedding, "aget_text_embedding_batch", mock_aget_text_embedding_batch ) -def test_long_ids_ingest(empty_chromadb): - embedding_model = OpenAIEmbedding() +@pytest.mark.asyncio +async def test_long_ids_ingest(openai_chroma): content_df = pd.DataFrame( { "doc_id": [str(uuid.uuid4()) for _ in range(10000)], @@ -252,18 +232,31 @@ def test_long_ids_ingest(empty_chromadb): ], } ) - vectordb_ingest(empty_chromadb, content_df, embedding_model) - - -def test_get_id_scores(ingested_vectordb): - ids = ["doc2", "doc3", "doc4"] - embedding_model = MockEmbedding(1536) - queries = [ - "다이노스 오! 권희동~ 엔씨 오 권희동 오 권희동 권희동 안타~", - "두산의 헨리 라모스 오오오 라모스 시원하게 화끈하게 날려버려라", - ] - query_embeddings = embedding_model.get_text_embedding_batch(queries) - client = chromadb.Client() - scores = get_id_scores(ids, query_embeddings, ingested_vectordb, client) - assert len(scores) == 3 - assert isinstance(scores[0], float) + await vectordb_ingest(openai_chroma, content_df) + + +def test_get_id_scores(): + query_embeddings = [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6], [0.7, 0.8, 0.9]] + content_embeddings = [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6], [0.7, 0.8, 0.9]] + similarity_metric = "cosine" + + scores = get_id_scores(query_embeddings, content_embeddings, similarity_metric) + + assert len(scores) == len(content_embeddings) + assert all(isinstance(score, float) for score in scores) + assert scores == pytest.approx([1.0, 1.0, 1.0]) + + similarity_metric = "l2" + scores = get_id_scores(query_embeddings, content_embeddings, similarity_metric) + assert len(scores) == len(content_embeddings) + assert all(isinstance(score, float) for score in scores) + assert scores == pytest.approx( + [1.0, 1.0, 1.0] + ) # Assuming zero distance for identical vectors + + # Test for inner product + similarity_metric = "ip" + scores = get_id_scores(query_embeddings, content_embeddings, similarity_metric) + assert len(scores) == len(content_embeddings) + assert all(isinstance(score, float) for score in scores) + assert scores == pytest.approx([0.5, 1.22, 1.94]) diff --git a/tests/autorag/vectordb/test_chroma.py b/tests/autorag/vectordb/test_chroma.py new file mode 100644 index 000000000..36edf3c41 --- /dev/null +++ b/tests/autorag/vectordb/test_chroma.py @@ -0,0 +1,56 @@ +import pytest +from autorag.vectordb.chroma import Chroma + + +@pytest.fixture +def chroma_ephemeral(): + return Chroma( + embedding_model="mock", + collection_name="test_collection", + client_type="ephemeral", + ) + + +@pytest.mark.asyncio +async def test_add_and_query_documents(chroma_ephemeral): + # Add documents + ids = ["doc1", "doc2"] + texts = ["This is a test document.", "This is another test document."] + await chroma_ephemeral.add(ids, texts) + + # Query documents + queries = ["test document"] + contents, scores = await chroma_ephemeral.query(queries, top_k=2) + + assert len(contents) == 1 + assert len(scores) == 1 + assert len(contents[0]) == 2 + assert len(scores[0]) == 2 + assert scores[0][0] > scores[0][1] + + embeddings = await chroma_ephemeral.fetch([ids[0]]) + assert len(embeddings) == 1 + assert len(embeddings[0]) == 768 + + exist = await chroma_ephemeral.is_exist([ids[0], "doc3"]) + assert len(exist) == 2 + assert exist[0] is True + assert exist[1] is False + + +@pytest.mark.asyncio +async def test_delete_documents(chroma_ephemeral): + # Add documents + ids = ["doc1", "doc2"] + texts = ["This is a test document.", "This is another test document."] + await chroma_ephemeral.add(ids, texts) + + # Delete documents + await chroma_ephemeral.delete([ids[0]]) + + # Query documents to ensure they are deleted + queries = ["test document"] + contents, scores = await chroma_ephemeral.query(queries, top_k=2) + + assert len(contents[0]) == 1 + assert len(scores[0]) == 1 diff --git a/tests/mock.py b/tests/mock.py index 1fcffce5d..7da1400df 100644 --- a/tests/mock.py +++ b/tests/mock.py @@ -138,3 +138,9 @@ def mock_get_text_embedding_batch( **kwargs: Any, ) -> List[Embedding]: return [[random() for _ in range(1536)] for _ in range(len(texts))] + + +async def mock_aget_text_embedding_batch( + self, texts: List[str], show_progress: bool = False, **kwargs: Any +): + return [[random() for _ in range(1536)] for _ in range(len(texts))] From 5475fdb8126b19f53c9f46c770a22c4bdf2358e3 Mon Sep 17 00:00:00 2001 From: "Jeffrey (Dongkyu) Kim" Date: Mon, 21 Oct 2024 17:24:40 +0900 Subject: [PATCH 03/11] Load vector store configuration from YAML file and use it on the vectordb module. (#874) * make load_yaml_config for easier retrieve config yaml file * load vectordb instance from config YAML file * load vectordb from YAML in the vectordb module * fixed wrong annotation --------- Co-authored-by: jeffrey --- autorag/deploy/base.py | 12 +- autorag/evaluator.py | 27 ++-- autorag/nodes/retrieval/vectordb.py | 140 ++++++++---------- autorag/utils/util.py | 22 +++ autorag/vectordb/__init__.py | 37 +++++ sample_config/rag/full.yaml | 11 +- .../autorag/nodes/retrieval/test_vectordb.py | 53 +++++-- tests/autorag/vectordb/test_base.py | 48 ++++++ tests/resources/simple_mock.yaml | 15 +- 9 files changed, 244 insertions(+), 121 deletions(-) create mode 100644 tests/autorag/vectordb/test_base.py diff --git a/autorag/deploy/base.py b/autorag/deploy/base.py index 2d0b84cdf..61f072dd9 100644 --- a/autorag/deploy/base.py +++ b/autorag/deploy/base.py @@ -8,8 +8,7 @@ import yaml from autorag.support import get_support_modules -from autorag.utils.util import load_summary_file - +from autorag.utils.util import load_summary_file, load_yaml_config logger = logging.getLogger("AutoRAG") @@ -124,8 +123,6 @@ class BaseRunner: def __init__(self, config: Dict, project_dir: Optional[str] = None): self.config = config project_dir = os.getcwd() if project_dir is None else project_dir - # self.app = Flask(__name__) - # self.__add_api_route() # init modules node_lines = deepcopy(self.config["node_lines"]) @@ -159,12 +156,7 @@ def from_yaml(cls, yaml_path: str, project_dir: Optional[str] = None): Default is the current directory. :return: Initialized Runner. """ - with open(yaml_path, "r") as f: - try: - config = yaml.safe_load(f) - except yaml.YAMLError as exc: - logger.error(exc) - raise exc + config = load_yaml_config(yaml_path) return cls(config, project_dir=project_dir) @classmethod diff --git a/autorag/evaluator.py b/autorag/evaluator.py index aa4add6de..797852d44 100644 --- a/autorag/evaluator.py +++ b/autorag/evaluator.py @@ -29,10 +29,9 @@ ) from autorag.utils.util import ( load_summary_file, - convert_string_to_tuple_in_dict, - convert_env_in_dict, explode, empty_cuda_cache, + load_yaml_config, ) logger = logging.getLogger("AutoRAG") @@ -85,6 +84,7 @@ def __init__( self.project_dir = project_dir if project_dir is not None else os.getcwd() if not os.path.exists(self.project_dir): os.makedirs(self.project_dir) + os.environ["PROJECT_DIR"] = self.project_dir validate_qa_from_corpus_dataset(self.qa_data, self.corpus_data) @@ -122,8 +122,18 @@ def start_trial(self, yaml_path: str, skip_validation: bool = False): shutil.copy( yaml_path, os.path.join(self.project_dir, trial_name, "config.yaml") ) + vectordb_config_path = os.path.join( + self.project_dir, "resources", "vectordb.yaml" + ) + yaml_dict = load_yaml_config(yaml_path) + vectordb = yaml_dict.get("vectordb", []) + with open(vectordb_config_path, "w") as f: + yaml.safe_dump({"vectordb": vectordb}, f) + node_lines = self._load_node_lines(yaml_path) - self.__embed(node_lines) + self.__embed( + node_lines + ) # TODO: Change the ingest logic for external vector DBs trial_summary_df = pd.DataFrame( columns=[ @@ -284,16 +294,7 @@ def __make_trial_dir(self, trial_name: str): @staticmethod def _load_node_lines(yaml_path: str) -> Dict[str, List[Node]]: - if not os.path.exists(yaml_path): - raise ValueError(f"YAML file {yaml_path} does not exist.") - with open(yaml_path, "r", encoding="utf-8") as stream: - try: - yaml_dict = yaml.safe_load(stream) - except yaml.YAMLError as exc: - raise ValueError(f"YAML file {yaml_path} could not be loaded.") from exc - - yaml_dict = convert_string_to_tuple_in_dict(yaml_dict) - yaml_dict = convert_env_in_dict(yaml_dict) + yaml_dict = load_yaml_config(yaml_path) node_lines = yaml_dict["node_lines"] node_line_dict = {} for node_line in node_lines: diff --git a/autorag/nodes/retrieval/vectordb.py b/autorag/nodes/retrieval/vectordb.py index 06332d6e1..5677d0351 100644 --- a/autorag/nodes/retrieval/vectordb.py +++ b/autorag/nodes/retrieval/vectordb.py @@ -2,13 +2,11 @@ import os from typing import List, Tuple, Optional -import chromadb import numpy as np import pandas as pd from llama_index.core.embeddings import BaseEmbedding from llama_index.embeddings.openai import OpenAIEmbedding -from autorag import embedding_models from autorag.evaluation.metric.util import ( calculate_l2_distance, calculate_inner_product, @@ -28,50 +26,32 @@ convert_inputs_to_list, make_batch, ) +from autorag.vectordb import load_vectordb_from_yaml from autorag.vectordb.base import BaseVectorStore -from autorag.vectordb.chroma import Chroma logger = logging.getLogger("AutoRAG") class VectorDB(BaseRetrieval): - def __init__(self, project_dir: str, embedding_model: str, **kwargs): + def __init__(self, project_dir: str, vectordb: str = "default", **kwargs): """ Initialize VectorDB retrieval node. :param project_dir: The project directory path. - :param embedding_model: The embedding model name. - It will initialize from the autorag.embedding_models dictionary. - You can add your own models to the dictionary. - For more information, see https://docs.auto-rag.com/local_model.html#configure-the-embedding-model + :param vectordb: The vectordb name. + You must configure the vectordb name in the config.yaml file. + If you don't configure, it uses the default vectordb. :param kwargs: The optional arguments. Not affected in the init method. """ super().__init__(project_dir) - # init chroma collection - chroma_path = os.path.join(self.resources_dir, "chroma") - assert ( - chroma_path is not None - ), "chroma_path must be specified for using vectordb retrieval." - assert os.path.exists( - chroma_path - ), f"chroma_path {chroma_path} does not exist. Please ingest first." - - # TODO: load any vector store from YAML file at here - self.vector_store = Chroma( - embedding_model=embedding_model, - collection_name=embedding_model, - client_type="persistent", - path=chroma_path, + vectordb_config_path = os.path.join(self.resources_dir, "vectordb.yaml") + self.vector_store = load_vectordb_from_yaml( + vectordb_config_path, vectordb, project_dir ) - # init embedding model - if embedding_model in embedding_models: - self.embedding_model = self.vector_store.embedding - else: - logger.error(f"embedding_model_str {embedding_model} does not exist.") - raise KeyError(f"embedding_model_str {embedding_model} does not exist.") + self.embedding_model = self.vector_store.embedding def __del__(self): del self.vector_store @@ -117,6 +97,25 @@ def _pure( assert ( self.vector_store.collection.count() > 0 ), "collection must contain at least one document. Please check you ingested collection correctly." + + # if ids are specified, fetch the ids score from Chroma + if ids is not None: + return self.__get_ids_scores(queries, ids, embedding_batch) + + # run async vector_db_pure function + tasks = [ + vectordb_pure(query_list, top_k, self.vector_store) + for query_list in queries + ] + loop = get_event_loop() + results = loop.run_until_complete( + process_batch(tasks, batch_size=embedding_batch) + ) + id_result = list(map(lambda x: x[0], results)) + score_result = list(map(lambda x: x[1], results)) + return id_result, score_result + + def __get_ids_scores(self, queries, ids, embedding_batch: int): # truncate queries and embedding execution here. openai_embedding_limit = 8000 if isinstance(self.embedding_model, OpenAIEmbedding): @@ -131,54 +130,39 @@ def _pure( ) ) - # if ids are specified, fetch the ids score from Chroma - if ids is not None: - query_embeddings = flatten_apply( - run_query_embedding_batch, - queries, - embedding_model=self.embedding_model, - batch_size=embedding_batch, - ) - - loop = get_event_loop() - - async def run_fetch(ids): - final_result = [] - for id_list in ids: - if len(id_list) == 0: - final_result.append([]) - else: - result = await self.vector_store.fetch(id_list) - final_result.append(result) - return final_result + query_embeddings = flatten_apply( + run_query_embedding_batch, + queries, + embedding_model=self.embedding_model, + batch_size=embedding_batch, + ) - content_embeddings = loop.run_until_complete(run_fetch(ids)) + loop = get_event_loop() - score_result = list( - map( - lambda query_embedding_list, content_embedding_list: get_id_scores( - query_embedding_list, - content_embedding_list, - similarity_metric=self.vector_store.similarity_metric, - ), - query_embeddings, - content_embeddings, - ) + async def run_fetch(ids): + final_result = [] + for id_list in ids: + if len(id_list) == 0: + final_result.append([]) + else: + result = await self.vector_store.fetch(id_list) + final_result.append(result) + return final_result + + content_embeddings = loop.run_until_complete(run_fetch(ids)) + + score_result = list( + map( + lambda query_embedding_list, content_embedding_list: get_id_scores( + query_embedding_list, + content_embedding_list, + similarity_metric=self.vector_store.similarity_metric, + ), + query_embeddings, + content_embeddings, ) - return ids, score_result - - # run async vector_db_pure function - tasks = [ - vectordb_pure(query_list, top_k, self.vector_store) - for query_list in queries - ] - loop = get_event_loop() - results = loop.run_until_complete( - process_batch(tasks, batch_size=embedding_batch) ) - id_result = list(map(lambda x: x[0], results)) - score_result = list(map(lambda x: x[1], results)) - return id_result, score_result + return ids, score_result async def vectordb_pure( @@ -214,12 +198,12 @@ async def vectordb_ingest( embedding_batch: int = 128, ): """ - Ingest given corpus data to the chromadb collection. + Ingest given corpus data to the vectordb. It truncates corpus content when the embedding model is OpenAIEmbedding to the 8000 tokens. Plus, when the corpus content is empty (whitespace), it will be ignored. And if there is a document id that already exists in the collection, it will be ignored. - :param vectordb: The vector store instance that you want to ingest. + :param vectordb: A vector store instance that you want to ingest. :param corpus_data: The corpus data that contains doc_id and contents columns. :param embedding_batch: The number of chunks that will be processed in parallel. """ @@ -286,9 +270,3 @@ def get_id_scores( # To find the uncalculated score when fuse the scores for th ) result.append(max(scores)) return result - - -def load_chroma_collection(db_path: str, collection_name: str) -> chromadb.Collection: - db = chromadb.PersistentClient(path=db_path) - collection = db.get_collection(name=collection_name) - return collection diff --git a/autorag/utils/util.py b/autorag/utils/util.py index bd6b22a2a..abc8acff2 100644 --- a/autorag/utils/util.py +++ b/autorag/utils/util.py @@ -18,6 +18,7 @@ import tiktoken import unicodedata +import yaml from llama_index.embeddings.openai import OpenAIEmbedding from pydantic import BaseModel as BM from pydantic.v1 import BaseModel @@ -668,3 +669,24 @@ def empty_cuda_cache(): torch.cuda.empty_cache() except ImportError: pass + + +def load_yaml_config(yaml_path: str) -> Dict: + """ + Load a YAML configuration file for AutoRAG. + It contains safe loading, converting string to tuple, and insert environment variables. + + :param yaml_path: The path of the YAML configuration file. + :return: The loaded configuration dictionary. + """ + if not os.path.exists(yaml_path): + raise ValueError(f"YAML file {yaml_path} does not exist.") + with open(yaml_path, "r", encoding="utf-8") as stream: + try: + yaml_dict = yaml.safe_load(stream) + except yaml.YAMLError as exc: + raise ValueError(f"YAML file {yaml_path} could not be loaded.") from exc + + yaml_dict = convert_string_to_tuple_in_dict(yaml_dict) + yaml_dict = convert_env_in_dict(yaml_dict) + return yaml_dict diff --git a/autorag/vectordb/__init__.py b/autorag/vectordb/__init__.py index e69de29bb..7d148dfd7 100644 --- a/autorag/vectordb/__init__.py +++ b/autorag/vectordb/__init__.py @@ -0,0 +1,37 @@ +import os + +from autorag.support import dynamically_find_function +from autorag.utils.util import load_yaml_config + + +def get_support_vectordb(vectordb_name: str): + support_vectordb = { + "chroma": ("autorag.vectordb.chroma", "Chroma"), + "Chroma": ("autorag.vectordb.chroma", "Chroma"), + } + return dynamically_find_function(vectordb_name, support_vectordb) + + +def load_vectordb(vectordb_name: str, **kwargs): + vectordb = get_support_vectordb(vectordb_name) + return vectordb(**kwargs) + + +def load_vectordb_from_yaml(yaml_path: str, vectordb_name: str, project_dir: str): + config_dict = load_yaml_config(yaml_path) + vectordb_list = config_dict.get("vectordb", []) + if len(vectordb_list) == 0 or vectordb_name == "default": + chroma_path = os.path.join(project_dir, "resources", "chroma") + return load_vectordb( + "chroma", + client_type="persistent", + embedding_model="openai", + collection_name="openai", + path=chroma_path, + ) + + target_dict = list(filter(lambda x: x["name"] == vectordb_name, vectordb_list)) + target_dict[0].pop("name") # delete a name key + target_vectordb_name = target_dict[0].pop("db_type") + target_vectordb_params = target_dict[0] + return load_vectordb(target_vectordb_name, **target_vectordb_params) diff --git a/sample_config/rag/full.yaml b/sample_config/rag/full.yaml index e65662fa6..969310cd0 100644 --- a/sample_config/rag/full.yaml +++ b/sample_config/rag/full.yaml @@ -1,3 +1,10 @@ +vectordb: + - name: chroma_large + db_type: chroma + client_type: persistent + embedding_model: openai_embed_3_large + collection_name: openai_embed_3_large + path: ${PROJECT_DIR}/data/chroma node_lines: - node_line_name: pre_retrieve_node_line # Arbitrary node line name nodes: @@ -10,7 +17,7 @@ node_lines: - module_type: bm25 bm25_tokenizer: [ porter_stemmer, ko_kiwi, space, gpt2, ko_okt, ko_kkma, sudachipy ] - module_type: vectordb - embedding_model: openai + vectordb: default modules: - module_type: pass_query_expansion - module_type: query_decompose @@ -37,7 +44,7 @@ node_lines: modules: - module_type: bm25 - module_type: vectordb - embedding_model: openai + vectordb: [default, chroma_large] embedding_batch: 256 - module_type: hybrid_rrf weight_range: (4,80) diff --git a/tests/autorag/nodes/retrieval/test_vectordb.py b/tests/autorag/nodes/retrieval/test_vectordb.py index a9b2ba0fd..808ac3c8d 100644 --- a/tests/autorag/nodes/retrieval/test_vectordb.py +++ b/tests/autorag/nodes/retrieval/test_vectordb.py @@ -9,6 +9,7 @@ import pandas as pd import pytest +import yaml from llama_index.embeddings.openai import OpenAIEmbedding from autorag.nodes.retrieval import VectorDB @@ -62,12 +63,25 @@ def project_dir_for_vectordb_node(): os.makedirs(os.path.join(test_project_dir, "resources")) chroma_path = os.path.join(test_project_dir, "resources", "chroma") os.makedirs(chroma_path) + + chroma_config = { + "client_type": "persistent", + "path": chroma_path, + "embedding_model": "mock", + "collection_name": "mock", + "similarity_metric": "cosine", + } + vectordb_config_path = os.path.join( + test_project_dir, "resources", "vectordb.yaml" + ) + with open(vectordb_config_path, "w") as f: + yaml.safe_dump( + {"vectordb": [{"name": "mock", "db_type": "chroma", **chroma_config}]}, + f, + ) + chroma = Chroma( - client_type="persistent", - path=chroma_path, - embedding_model="mock", - collection_name="mock", - similarity_metric="cosine", + **chroma_config, ) os.makedirs(os.path.join(test_project_dir, "data")) corpus_path = os.path.join(test_project_dir, "data", "corpus.parquet") @@ -85,13 +99,23 @@ def project_dir_for_vectordb_node_from_sample_project(): chroma_path = os.path.join(test_project_dir, "resources", "chroma") os.makedirs(chroma_path) - chroma = Chroma( - client_type="persistent", - path=chroma_path, - embedding_model="mock", - collection_name="mock", - similarity_metric="cosine", + chroma_config = { + "client_type": "persistent", + "path": chroma_path, + "embedding_model": "mock", + "collection_name": "mock", + "similarity_metric": "cosine", + } + vectordb_config_path = os.path.join( + test_project_dir, "resources", "vectordb.yaml" ) + with open(vectordb_config_path, "w") as f: + yaml.safe_dump( + {"vectordb": [{"name": "mock", "db_type": "chroma", **chroma_config}]}, + f, + ) + + chroma = Chroma(**chroma_config) corpus_path = os.path.join(test_project_dir, "data", "corpus.parquet") local_corpus_df = pd.read_parquet(corpus_path, engine="pyarrow") asyncio.run(vectordb_ingest(chroma, local_corpus_df)) @@ -102,7 +126,8 @@ def project_dir_for_vectordb_node_from_sample_project(): @pytest.fixture def vectordb_instance(project_dir_for_vectordb_node): vectordb = VectorDB( - project_dir=project_dir_for_vectordb_node, embedding_model="mock" + project_dir=project_dir_for_vectordb_node, + vectordb="mock", ) yield vectordb @@ -147,7 +172,7 @@ def test_vectordb_node(project_dir_for_vectordb_node_from_sample_project): project_dir=project_dir_for_vectordb_node_from_sample_project, previous_result=previous_result, top_k=4, - embedding_model="mock", + vectordb="mock", ) base_retrieval_node_test(result_df) @@ -157,7 +182,7 @@ def test_vectordb_node_ids(project_dir_for_vectordb_node_from_sample_project): project_dir=project_dir_for_vectordb_node_from_sample_project, previous_result=previous_result, top_k=4, - embedding_model="mock", + vectordb="mock", ids=searchable_input_ids, ) contents = result_df["retrieved_contents"].tolist() diff --git a/tests/autorag/vectordb/test_base.py b/tests/autorag/vectordb/test_base.py new file mode 100644 index 000000000..4fd1036eb --- /dev/null +++ b/tests/autorag/vectordb/test_base.py @@ -0,0 +1,48 @@ +import os.path +import pathlib +import tempfile + +from llama_index.core import MockEmbedding +from llama_index.embeddings.openai import OpenAIEmbedding + +from autorag.vectordb import load_vectordb, load_vectordb_from_yaml +from autorag.vectordb.chroma import Chroma + + +root_path = pathlib.PurePath(os.path.dirname(os.path.realpath(__file__))).parent.parent +resource_dir = os.path.join(root_path, "resources") + + +def test_load_vectordb(): + db = load_vectordb( + "chroma", + client_type="ephemeral", + collection_name="jax1", + embedding_model="mock", + ) + assert isinstance(db, Chroma) + assert db.collection.name == "jax1" + + +def test_load_vectordb_from_yaml(): + yaml_path = os.path.join(resource_dir, "simple_mock.yaml") + with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as project_dir: + os.environ["PROJECT_DIR"] = project_dir + default_vectordb = load_vectordb_from_yaml(yaml_path, "default", project_dir) + assert isinstance(default_vectordb, Chroma) + assert default_vectordb.collection.name == "openai" + assert isinstance(default_vectordb.embedding, OpenAIEmbedding) + + chroma_default_vectordb = load_vectordb_from_yaml( + yaml_path, "chroma_default", project_dir + ) + assert isinstance(chroma_default_vectordb, Chroma) + assert chroma_default_vectordb.collection.name == "openai" + assert isinstance(chroma_default_vectordb.embedding, MockEmbedding) + + chroma_large_vectordb = load_vectordb_from_yaml( + yaml_path, "chroma_large", project_dir + ) + assert isinstance(chroma_large_vectordb, Chroma) + assert chroma_large_vectordb.collection.name == "openai_embed_3_large" + assert isinstance(chroma_large_vectordb.embedding, MockEmbedding) diff --git a/tests/resources/simple_mock.yaml b/tests/resources/simple_mock.yaml index a22a7e98e..138826a29 100644 --- a/tests/resources/simple_mock.yaml +++ b/tests/resources/simple_mock.yaml @@ -1,3 +1,16 @@ +vectordb: + - name: chroma_default + db_type: chroma + client_type: persistent + embedding_model: mock + collection_name: openai + path: ${PROJECT_DIR}/resources/chroma + - name: chroma_large + db_type: chroma + client_type: persistent + embedding_model: mock + collection_name: openai_embed_3_large + path: ${PROJECT_DIR}/resources/chroma node_lines: - node_line_name: retrieve_node_line nodes: @@ -9,7 +22,7 @@ node_lines: - module_type: ${BM25} # for testing env variable bm25_tokenizer: [ facebook/opt-125m, porter_stemmer ] - module_type: vectordb - embedding_model: [mock, mock] + vectordb: [chroma_default, chroma_large] embedding_batch: 50 - module_type: hybrid_rrf weight_range: (4, 30) From 049e64349c04db660ee3f96edd917fb2026fbbd3 Mon Sep 17 00:00:00 2001 From: "Jeffrey (Dongkyu) Kim" Date: Tue, 22 Oct 2024 23:43:36 +0900 Subject: [PATCH 04/11] Make Evaluator with new external DB works (#877) * add filter from corpus data and filter from retrieval_gt for optional ingest * make Evaluator with external vectordb working. --------- Co-authored-by: jeffrey --- autorag/evaluator.py | 146 ++++++++---------- autorag/nodes/retrieval/vectordb.py | 69 +++++++-- autorag/vectordb/__init__.py | 28 ++++ autorag/vectordb/base.py | 9 +- autorag/vectordb/chroma.py | 3 +- sample_config/rag/full.yaml | 4 +- .../autorag/nodes/retrieval/test_vectordb.py | 102 +++++++++++- tests/autorag/test_evaluator.py | 10 +- tests/autorag/vectordb/test_base.py | 24 ++- tests/resources/simple.yaml | 19 ++- 10 files changed, 301 insertions(+), 113 deletions(-) diff --git a/autorag/evaluator.py b/autorag/evaluator.py index 797852d44..3e43bbdb8 100644 --- a/autorag/evaluator.py +++ b/autorag/evaluator.py @@ -7,15 +7,17 @@ from itertools import chain from typing import List, Dict, Optional -import chromadb import pandas as pd import yaml -from autorag import embedding_models from autorag.node_line import run_node_line from autorag.nodes.retrieval.base import get_bm25_pkl_name from autorag.nodes.retrieval.bm25 import bm25_ingest -from autorag.nodes.retrieval.vectordb import vectordb_ingest +from autorag.nodes.retrieval.vectordb import ( + vectordb_ingest, + filter_exist_ids, + filter_exist_ids_from_retrieval_gt, +) from autorag.schema import Node from autorag.schema.node import ( module_type_exists, @@ -30,9 +32,10 @@ from autorag.utils.util import ( load_summary_file, explode, - empty_cuda_cache, load_yaml_config, + get_event_loop, ) +from autorag.vectordb import load_all_vectordb_from_yaml logger = logging.getLogger("AutoRAG") @@ -84,7 +87,6 @@ def __init__( self.project_dir = project_dir if project_dir is not None else os.getcwd() if not os.path.exists(self.project_dir): os.makedirs(self.project_dir) - os.environ["PROJECT_DIR"] = self.project_dir validate_qa_from_corpus_dataset(self.qa_data, self.corpus_data) @@ -100,7 +102,26 @@ def __init__( if not os.path.exists(corpus_path_in_project): self.corpus_data.to_parquet(corpus_path_in_project, index=False) - def start_trial(self, yaml_path: str, skip_validation: bool = False): + def start_trial( + self, yaml_path: str, skip_validation: bool = False, full_ingest: bool = True + ): + """ + Start AutoRAG trial. + The trial means one experiment to optimize the RAG pipeline. + It consists of ingesting corpus data, running all nodes and modules, evaluating and finding the optimal modules. + + :param yaml_path: The config YAML path + :param skip_validation: If True, it skips the validation step. + The validation step checks the input config YAML file is well formatted, + and there is any problem with the system settings. + Default is False. + :param full_ingest: If True, it checks the whole corpus data from corpus.parquet that exists in the Vector DB. + If your corpus is huge and don't want to check the whole vector DB, please set it to False. + :return: None + """ + # Make Resources directory + os.makedirs(os.path.join(self.project_dir, "resources"), exist_ok=True) + if not skip_validation: logger.info(ascii_art) logger.info( @@ -115,6 +136,8 @@ def start_trial(self, yaml_path: str, skip_validation: bool = False): ) validator.validate(yaml_path) + os.environ["PROJECT_DIR"] = self.project_dir + trial_name = self.__get_new_trial_name() self.__make_trial_dir(trial_name) @@ -122,18 +145,29 @@ def start_trial(self, yaml_path: str, skip_validation: bool = False): shutil.copy( yaml_path, os.path.join(self.project_dir, trial_name, "config.yaml") ) + yaml_dict = load_yaml_config(yaml_path) + vectordb = yaml_dict.get("vectordb", []) + vectordb_config_path = os.path.join( self.project_dir, "resources", "vectordb.yaml" ) - yaml_dict = load_yaml_config(yaml_path) - vectordb = yaml_dict.get("vectordb", []) with open(vectordb_config_path, "w") as f: yaml.safe_dump({"vectordb": vectordb}, f) node_lines = self._load_node_lines(yaml_path) - self.__embed( - node_lines - ) # TODO: Change the ingest logic for external vector DBs + self.__ingest_bm25_full(node_lines) + + # Ingest VectorDB corpus + if any( + list( + map( + lambda nodes: module_type_exists(nodes, "vectordb"), + node_lines.values(), + ) + ) + ): + loop = get_event_loop() + loop.run_until_complete(self.__ingest_vectordb(yaml_path, full_ingest)) trial_summary_df = pd.DataFrame( columns=[ @@ -163,7 +197,7 @@ def start_trial(self, yaml_path: str, skip_validation: bool = False): logger.info("Evaluation complete.") - def __embed(self, node_lines: Dict[str, List[Node]]): + def __ingest_bm25_full(self, node_lines: Dict[str, List[Node]]): if any( list( map( @@ -171,7 +205,6 @@ def __embed(self, node_lines: Dict[str, List[Node]]): ) ) ): - # ingest BM25 corpus logger.info("Embedding BM25 corpus...") bm25_tokenizer_list = list( chain.from_iterable( @@ -194,78 +227,6 @@ def __embed(self, node_lines: Dict[str, List[Node]]): bm25_ingest(bm25_dir, self.corpus_data, bm25_tokenizer=bm25_tokenizer) logger.info("BM25 corpus embedding complete.") - if any( - list( - map( - lambda nodes: module_type_exists(nodes, "vectordb"), - node_lines.values(), - ) - ) - ): - # load embedding_models in nodes - embedding_models_list = list( - chain.from_iterable( - map( - lambda nodes: self._find_embedding_model(nodes), - node_lines.values(), - ) - ) - ) - - # get embedding batch size in nodes - embedding_batch_list = list( - chain.from_iterable( - map( - lambda nodes: extract_values_from_nodes( - nodes, "embedding_batch" - ), - node_lines.values(), - ) - ) - ) - if len(embedding_batch_list) == 0: - embedding_batch = 100 - else: - embedding_batch = embedding_batch_list[0] - - # duplicate check in embedding_models - embedding_models_list = list(set(embedding_models_list)) - - vectordb_dir = os.path.join(self.project_dir, "resources", "chroma") - vectordb = chromadb.PersistentClient(path=vectordb_dir) - - for embedding_model_str in embedding_models_list: - # ingest VectorDB corpus - logger.info(f"Embedding VectorDB corpus with {embedding_model_str}...") - - # Get the collection with GET or CREATE, as it may already exist - collection = vectordb.get_or_create_collection( - name=embedding_model_str, metadata={"hnsw:space": "cosine"} - ) - # get embedding_model - if embedding_model_str in embedding_models: - embedding_model = embedding_models[embedding_model_str]() - else: - logger.error( - f"embedding_model_str {embedding_model_str} does not exist." - ) - raise KeyError( - f"embedding_model_str {embedding_model_str} does not exist." - ) - vectordb_ingest( - collection, - self.corpus_data, - embedding_model, - embedding_batch=embedding_batch, - ) - logger.info( - f"VectorDB corpus embedding complete with {embedding_model_str}." - ) - del embedding_model - empty_cuda_cache() - else: - logger.info("No ingestion needed.") - def __get_new_trial_name(self) -> str: trial_json_path = os.path.join(self.project_dir, "trial.json") if not os.path.exists(trial_json_path): @@ -561,3 +522,18 @@ def _find_embedding_model(nodes: List[Node]): filter(lambda x: x is not None, embedding_models_list) ) return list(set(embedding_models_list)) + + async def __ingest_vectordb(self, yaml_path, full_ingest: bool): + vectordb_list = load_all_vectordb_from_yaml(yaml_path, self.project_dir) + if full_ingest is True: + # get the target ingest corpus from the whole corpus + for vectordb in vectordb_list: + target_corpus = await filter_exist_ids(vectordb, self.corpus_data) + await vectordb_ingest(vectordb, target_corpus) + else: + # get the target ingest corpus from the retrieval gt only + for vectordb in vectordb_list: + target_corpus = await filter_exist_ids_from_retrieval_gt( + vectordb, self.qa_data, self.corpus_data + ) + await vectordb_ingest(vectordb, target_corpus) diff --git a/autorag/nodes/retrieval/vectordb.py b/autorag/nodes/retrieval/vectordb.py index 5677d0351..13fa89f9a 100644 --- a/autorag/nodes/retrieval/vectordb.py +++ b/autorag/nodes/retrieval/vectordb.py @@ -1,3 +1,4 @@ +import itertools import logging import os from typing import List, Tuple, Optional @@ -13,7 +14,12 @@ calculate_cosine_similarity, ) from autorag.nodes.retrieval.base import evenly_distribute_passages, BaseRetrieval -from autorag.utils import validate_corpus_dataset, cast_corpus_dataset +from autorag.utils import ( + validate_corpus_dataset, + cast_corpus_dataset, + cast_qa_dataset, + validate_qa_dataset, +) from autorag.utils.util import ( get_event_loop, process_batch, @@ -192,10 +198,50 @@ async def vectordb_pure( return list(id_result), list(score_result) +async def filter_exist_ids( + vectordb: BaseVectorStore, + corpus_data: pd.DataFrame, +) -> pd.DataFrame: + corpus_data = cast_corpus_dataset(corpus_data) + validate_corpus_dataset(corpus_data) + ids = corpus_data["doc_id"].tolist() + + # Query the collection to check if IDs already exist + existed_bool_list = await vectordb.is_exist(ids=ids) + # Assuming 'ids' is the key in the response + new_passage = corpus_data[~pd.Series(existed_bool_list)] + return new_passage + + +async def filter_exist_ids_from_retrieval_gt( + vectordb: BaseVectorStore, + qa_data: pd.DataFrame, + corpus_data: pd.DataFrame, +) -> pd.DataFrame: + qa_data = cast_qa_dataset(qa_data) + validate_qa_dataset(qa_data) + corpus_data = cast_corpus_dataset(corpus_data) + validate_corpus_dataset(corpus_data) + retrieval_gt = ( + qa_data["retrieval_gt"] + .apply(lambda x: list(itertools.chain.from_iterable(x))) + .tolist() + ) + retrieval_gt = list(itertools.chain.from_iterable(retrieval_gt)) + retrieval_gt = list(set(retrieval_gt)) + + existed_bool_list = await vectordb.is_exist(ids=retrieval_gt) + add_ids = [] + for ret_gt, is_exist in zip(retrieval_gt, existed_bool_list): + if not is_exist: + add_ids.append(ret_gt) + new_passage = corpus_data[corpus_data["doc_id"].isin(add_ids)] + return new_passage + + async def vectordb_ingest( vectordb: BaseVectorStore, corpus_data: pd.DataFrame, - embedding_batch: int = 128, ): """ Ingest given corpus data to the vectordb. @@ -203,22 +249,13 @@ async def vectordb_ingest( Plus, when the corpus content is empty (whitespace), it will be ignored. And if there is a document id that already exists in the collection, it will be ignored. - :param vectordb: A vector store instance that you want to ingest. + :param vectordb: A vector stores instance that you want to ingest. :param corpus_data: The corpus data that contains doc_id and contents columns. - :param embedding_batch: The number of chunks that will be processed in parallel. """ - corpus_data = cast_corpus_dataset(corpus_data) - validate_corpus_dataset(corpus_data) - ids = corpus_data["doc_id"].tolist() - - # Query the collection to check if IDs already exist - existed_bool_list = await vectordb.is_exist(ids=ids) - # Assuming 'ids' is the key in the response - new_passage = corpus_data[~pd.Series(existed_bool_list)] - - if not new_passage.empty: - new_contents = new_passage["contents"].tolist() - new_ids = new_passage["doc_id"].tolist() + embedding_batch = vectordb.embedding_batch + if not corpus_data.empty: + new_contents = corpus_data["contents"].tolist() + new_ids = corpus_data["doc_id"].tolist() content_batches = make_batch(new_contents, embedding_batch) id_batches = make_batch(new_ids, embedding_batch) for content_batch, id_batch in zip(content_batches, id_batches): diff --git a/autorag/vectordb/__init__.py b/autorag/vectordb/__init__.py index 7d148dfd7..c65ce13ef 100644 --- a/autorag/vectordb/__init__.py +++ b/autorag/vectordb/__init__.py @@ -1,7 +1,9 @@ import os +from typing import List from autorag.support import dynamically_find_function from autorag.utils.util import load_yaml_config +from autorag.vectordb.base import BaseVectorStore def get_support_vectordb(vectordb_name: str): @@ -35,3 +37,29 @@ def load_vectordb_from_yaml(yaml_path: str, vectordb_name: str, project_dir: str target_vectordb_name = target_dict[0].pop("db_type") target_vectordb_params = target_dict[0] return load_vectordb(target_vectordb_name, **target_vectordb_params) + + +def load_all_vectordb_from_yaml( + yaml_path: str, project_dir: str +) -> List[BaseVectorStore]: + config_dict = load_yaml_config(yaml_path) + vectordb_list = config_dict.get("vectordb", []) + if len(vectordb_list) == 0: + chroma_path = os.path.join(project_dir, "resources", "chroma") + return [ + load_vectordb( + "chroma", + client_type="persistent", + embedding_model="openai", + collection_name="openai", + path=chroma_path, + ) + ] + + result_vectordbs = [] + for vectordb_dict in vectordb_list: + _ = vectordb_dict.pop("name") + vectordb_type = vectordb_dict.pop("db_type") + vectordb = load_vectordb(vectordb_type, **vectordb_dict) + result_vectordbs.append(vectordb) + return result_vectordbs diff --git a/autorag/vectordb/base.py b/autorag/vectordb/base.py index 04d05fd88..e7cd15101 100644 --- a/autorag/vectordb/base.py +++ b/autorag/vectordb/base.py @@ -10,8 +10,15 @@ class BaseVectorStore: support_similarity_metrics = ["l2", "ip", "cosine"] - def __init__(self, embedding_model: str, similarity_metric: str = "cosine"): + def __init__( + self, + embedding_model: str, + similarity_metric: str = "cosine", + embedding_batch: int = 100, + ): self.embedding = embedding_models[embedding_model]() + self.embedding_batch = embedding_batch + self.embedding.embed_batch_size = embedding_batch assert ( similarity_metric in self.support_similarity_metrics ), f"search method {similarity_metric} is not supported" diff --git a/autorag/vectordb/chroma.py b/autorag/vectordb/chroma.py index e694ac153..3b198b780 100644 --- a/autorag/vectordb/chroma.py +++ b/autorag/vectordb/chroma.py @@ -20,6 +20,7 @@ def __init__( self, embedding_model: str, collection_name: str, + embedding_batch: int = 100, client_type: str = "persistent", similarity_metric: str = "cosine", path: str = None, @@ -31,7 +32,7 @@ def __init__( tenant: str = DEFAULT_TENANT, database: str = DEFAULT_DATABASE, ): - super().__init__(embedding_model) + super().__init__(embedding_model, similarity_metric, embedding_batch) if client_type == "ephemeral": self.client = EphemeralClient(tenant=tenant, database=database) elif client_type == "persistent": diff --git a/sample_config/rag/full.yaml b/sample_config/rag/full.yaml index 969310cd0..fcc25cf71 100644 --- a/sample_config/rag/full.yaml +++ b/sample_config/rag/full.yaml @@ -17,7 +17,7 @@ node_lines: - module_type: bm25 bm25_tokenizer: [ porter_stemmer, ko_kiwi, space, gpt2, ko_okt, ko_kkma, sudachipy ] - module_type: vectordb - vectordb: default + vectordb: chroma_large modules: - module_type: pass_query_expansion - module_type: query_decompose @@ -44,7 +44,7 @@ node_lines: modules: - module_type: bm25 - module_type: vectordb - vectordb: [default, chroma_large] + vectordb: chroma_large embedding_batch: 256 - module_type: hybrid_rrf weight_range: (4,80) diff --git a/tests/autorag/nodes/retrieval/test_vectordb.py b/tests/autorag/nodes/retrieval/test_vectordb.py index 808ac3c8d..2c9ee6b2f 100644 --- a/tests/autorag/nodes/retrieval/test_vectordb.py +++ b/tests/autorag/nodes/retrieval/test_vectordb.py @@ -13,7 +13,12 @@ from llama_index.embeddings.openai import OpenAIEmbedding from autorag.nodes.retrieval import VectorDB -from autorag.nodes.retrieval.vectordb import vectordb_ingest, get_id_scores +from autorag.nodes.retrieval.vectordb import ( + vectordb_ingest, + get_id_scores, + filter_exist_ids_from_retrieval_gt, + filter_exist_ids, +) from autorag.vectordb.chroma import Chroma from tests.autorag.nodes.retrieval.test_retrieval_base import ( queries, @@ -260,6 +265,101 @@ async def test_long_ids_ingest(openai_chroma): await vectordb_ingest(openai_chroma, content_df) +@pytest.mark.asyncio +async def test_filter_exist_ids_from_retrieval_gt(mock_chroma): + last_modified_datetime = datetime.now() + ingested_df = pd.DataFrame( + { + "doc_id": ["id2"], + "contents": ["content2"], + "metadata": [{"last_modified_datetime": last_modified_datetime}], + } + ) + await vectordb_ingest(mock_chroma, ingested_df) + + # Create sample qa_data and corpus_data + qa_data = pd.DataFrame( + { + "qid": ["qid1"], + "query": ["query1"], + "retrieval_gt": [[["id1", "id2"], ["id3"]]], + "generation_gt": [["jaxjax"]], + } + ) + corpus_data = pd.DataFrame( + { + "doc_id": ["id1", "id2", "id3", "id4"], + "contents": ["content1", "content2", "content3", "content4"], + "metadata": [ + {"last_modified_datetime": last_modified_datetime} for _ in range(4) + ], + } + ) + + # Call the function + result = await filter_exist_ids_from_retrieval_gt(mock_chroma, qa_data, corpus_data) + + # Expected result + expected_result = pd.DataFrame( + { + "doc_id": ["id1", "id3"], + "contents": ["content1", "content3"], + "metadata": [ + { + "last_modified_datetime": last_modified_datetime, + "prev_id": None, + "next_id": None, + } + for _ in range(2) + ], + } + ) + + # Assert the result + pd.testing.assert_frame_equal(result.reset_index(drop=True), expected_result) + + +@pytest.mark.asyncio +async def test_filter_exist_ids(mock_chroma): + last_modified_datetime = datetime.now() + ingested_df = pd.DataFrame( + { + "doc_id": ["id2"], + "contents": ["content2"], + "metadata": [{"last_modified_datetime": last_modified_datetime}], + } + ) + await vectordb_ingest(mock_chroma, ingested_df) + + corpus_data = pd.DataFrame( + { + "doc_id": ["id1", "id2", "id3", "id4"], + "contents": ["content1", "content2", "content3", "content4"], + "metadata": [ + {"last_modified_datetime": last_modified_datetime} for _ in range(4) + ], + } + ) + + result = await filter_exist_ids(mock_chroma, corpus_data) + + expected_result = pd.DataFrame( + { + "doc_id": ["id1", "id3", "id4"], + "contents": ["content1", "content3", "content4"], + "metadata": [ + { + "last_modified_datetime": last_modified_datetime, + "prev_id": None, + "next_id": None, + } + for _ in range(3) + ], + } + ) + pd.testing.assert_frame_equal(result.reset_index(drop=True), expected_result) + + def test_get_id_scores(): query_embeddings = [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6], [0.7, 0.8, 0.9]] content_embeddings = [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6], [0.7, 0.8, 0.9]] diff --git a/tests/autorag/test_evaluator.py b/tests/autorag/test_evaluator.py index e416ae48c..a9b72b12d 100644 --- a/tests/autorag/test_evaluator.py +++ b/tests/autorag/test_evaluator.py @@ -19,7 +19,7 @@ from autorag.utils import validate_qa_dataset, validate_corpus_dataset from autorag.utils.util import load_summary_file from tests.delete_tests import is_github_action -from tests.mock import mock_get_text_embedding_batch +from tests.mock import mock_get_text_embedding_batch, mock_aget_text_embedding_batch root_dir = pathlib.PurePath(os.path.dirname(os.path.realpath(__file__))).parent resource_dir = os.path.join(root_dir, "resources") @@ -81,8 +81,7 @@ def test_load_node_line(evaluator): "bm25_tokenizer": ["facebook/opt-125m", "porter_stemmer"] } assert node.modules[1].module_param == { - "embedding_model": ["openai", "openai"], - "embedding_batch": 50, + "vectordb": ["openai_embed_3_large", "openai_embed_3_small"], } assert node.modules[2].module_param == {"weight_range": (4, 30)} assert nodes[2].node_type == "passage_filter" @@ -93,6 +92,11 @@ def test_load_node_line(evaluator): "get_text_embedding_batch", mock_get_text_embedding_batch, ) +@patch.object( + OpenAIEmbedding, + "aget_text_embedding_batch", + mock_aget_text_embedding_batch, +) def test_start_trial(evaluator): evaluator.start_trial(os.path.join(resource_dir, "simple.yaml")) project_dir = evaluator.project_dir diff --git a/tests/autorag/vectordb/test_base.py b/tests/autorag/vectordb/test_base.py index 4fd1036eb..e3aff4ea9 100644 --- a/tests/autorag/vectordb/test_base.py +++ b/tests/autorag/vectordb/test_base.py @@ -5,7 +5,11 @@ from llama_index.core import MockEmbedding from llama_index.embeddings.openai import OpenAIEmbedding -from autorag.vectordb import load_vectordb, load_vectordb_from_yaml +from autorag.vectordb import ( + load_vectordb, + load_vectordb_from_yaml, + load_all_vectordb_from_yaml, +) from autorag.vectordb.chroma import Chroma @@ -46,3 +50,21 @@ def test_load_vectordb_from_yaml(): assert isinstance(chroma_large_vectordb, Chroma) assert chroma_large_vectordb.collection.name == "openai_embed_3_large" assert isinstance(chroma_large_vectordb.embedding, MockEmbedding) + + +def test_load_all_vectordb_from_yaml(): + yaml_path = os.path.join(resource_dir, "simple_mock.yaml") + with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as project_dir: + os.environ["PROJECT_DIR"] = project_dir + vectordb_list = load_all_vectordb_from_yaml(yaml_path, project_dir) + assert len(vectordb_list) == 2 + + chroma_default_vectordb = vectordb_list[0] + assert isinstance(chroma_default_vectordb, Chroma) + assert chroma_default_vectordb.collection.name == "openai" + assert isinstance(chroma_default_vectordb.embedding, MockEmbedding) + + chroma_large_vectordb = vectordb_list[1] + assert isinstance(chroma_large_vectordb, Chroma) + assert chroma_large_vectordb.collection.name == "openai_embed_3_large" + assert isinstance(chroma_large_vectordb.embedding, MockEmbedding) diff --git a/tests/resources/simple.yaml b/tests/resources/simple.yaml index 8432622f0..b06df9a22 100644 --- a/tests/resources/simple.yaml +++ b/tests/resources/simple.yaml @@ -1,3 +1,17 @@ +vectordb: + - name: openai_embed_3_small + db_type: chroma + client_type: persistent + embedding_model: openai_embed_3_small + collection_name: openai_embed_3_small + path: ${PROJECT_DIR}/resources/chroma + - name: openai_embed_3_large + db_type: chroma + client_type: persistent + embedding_model: openai_embed_3_large + collection_name: openai_embed_3_large + path: ${PROJECT_DIR}/resources/chroma + embedding_batch: 50 node_lines: - node_line_name: retrieve_node_line nodes: @@ -10,7 +24,7 @@ node_lines: - module_type: bm25 bm25_tokenizer: [ porter_stemmer, space ] - module_type: vectordb - embedding_model: openai_embed_3_small + vectordb: openai_embed_3_small modules: - module_type: hyde generator_module_type: llama_index_llm @@ -23,8 +37,7 @@ node_lines: - module_type: ${BM25} # for testing env variable bm25_tokenizer: [ facebook/opt-125m, porter_stemmer ] - module_type: vectordb - embedding_model: [openai, openai] - embedding_batch: 50 + vectordb: [openai_embed_3_large, openai_embed_3_small] - module_type: hybrid_rrf weight_range: (4, 30) - node_type: passage_filter From 0baa503e68e4ff91f054191a3aca439ae0c94a78 Mon Sep 17 00:00:00 2001 From: jeffrey Date: Wed, 23 Oct 2024 15:59:53 +0900 Subject: [PATCH 05/11] Implement Mivlus instance and test it (done) --- autorag/vectordb/milvus.py | 140 ++++++++++++++++++++++++++ requirements.txt | 1 + tests/autorag/vectordb/test_milvus.py | 70 +++++++++++++ 3 files changed, 211 insertions(+) create mode 100644 autorag/vectordb/milvus.py create mode 100644 tests/autorag/vectordb/test_milvus.py diff --git a/autorag/vectordb/milvus.py b/autorag/vectordb/milvus.py new file mode 100644 index 000000000..4939ef40c --- /dev/null +++ b/autorag/vectordb/milvus.py @@ -0,0 +1,140 @@ +from typing import List, Tuple, Optional + +from pymilvus import DataType, FieldSchema, CollectionSchema, connections, Collection +from pymilvus.orm import utility + +from autorag.utils.util import apply_recursive +from autorag.vectordb import BaseVectorStore + + +class Milvus(BaseVectorStore): + def __init__( + self, + embedding_model: str, + collection_name: str, + embedding_batch: int = 100, + similarity_metric: str = "cosine", + uri: str = "http://localhost:19530", + db_name: str = "", + token: str = "", + user: str = "", + password: str = "", + timeout: Optional[float] = None, + ): + super().__init__(embedding_model, similarity_metric, embedding_batch) + + # Connect to Milvus server + connections.connect( + "default", + uri=uri, + token=token, + db_name=db_name, + user=user, + password=password, + ) + self.collection_name = collection_name + self.timeout = timeout + # Set Collection + if not utility.has_collection(collection_name, timeout=timeout): + # Get the dimension of the embeddings + test_embedding_result: List[float] = self.embedding.get_query_embedding( + "test" + ) + dimension = len(test_embedding_result) + + pk = FieldSchema( + name="id", + dtype=DataType.VARCHAR, + max_length=128, + is_primary=True, + auto_id=False, + ) + field = FieldSchema( + name="vector", dtype=DataType.FLOAT_VECTOR, dim=dimension + ) + schema = CollectionSchema(fields=[pk, field]) + + self.collection = Collection(name=self.collection_name, schema=schema) + else: + self.collection = Collection(name=self.collection_name) + + async def add(self, ids: List[str], texts: List[str]): + texts = self.truncated_inputs(texts) + text_embeddings: List[ + List[float] + ] = await self.embedding.aget_text_embedding_batch(texts) + + # make data for insertion + data = list( + map(lambda _id, vector: {"id": _id, "vector": vector}, ids, text_embeddings) + ) + + # Insert data into the collection + res = self.collection.insert(data=data, timeout=self.timeout) + assert ( + res.insert_count == len(ids) + ), f"Insertion failed. Try to insert {len(ids)} but only {res['insert_count']} inserted." + + self.collection.flush(timeout=self.timeout) + + index_params = { + "index_type": "IVF_FLAT", + "metric_type": self.similarity_metric.upper(), + "params": {}, + } # TODO : add params + self.collection.create_index( + field_name="vector", index_params=index_params, timeout=self.timeout + ) + + async def query( + self, queries: List[str], top_k: int, **kwargs + ) -> Tuple[List[List[str]], List[List[float]]]: + queries = self.truncated_inputs(queries) + query_embeddings: List[ + List[float] + ] = await self.embedding.aget_text_embedding_batch(queries) + + self.collection.load(timeout=self.timeout) + + # Perform similarity search + results = self.collection.search( + data=query_embeddings, + limit=top_k, + anns_field="vector", + param={"metric_type": self.similarity_metric.upper()}, + timeout=self.timeout, + **kwargs, + ) + + # Extract IDs and distances + ids = [[str(hit.id) for hit in result] for result in results] + distances = [[hit.distance for hit in result] for result in results] + + if self.similarity_metric in ["l2"]: + distances = apply_recursive(lambda x: -x, distances) + + return ids, distances + + async def fetch(self, ids: List[str]) -> List[List[float]]: + self.collection.load(timeout=self.timeout) + # Fetch vectors by IDs + results = self.collection.query( + expr=f"id in {ids}", output_fields=["id", "vector"], timeout=self.timeout + ) + id_vector_dict = {str(result["id"]): result["vector"] for result in results} + result = [id_vector_dict[_id] for _id in ids] + return result + + async def is_exist(self, ids: List[str]) -> List[bool]: + self.collection.load(timeout=self.timeout) + # Check the existence of IDs + results = self.collection.query( + expr=f"id in {ids}", output_fields=["id"], timeout=self.timeout + ) + # Determine existence + existing_ids = {str(result["id"]) for result in results} + return [str(_id) in existing_ids for _id in ids] + + async def delete(self, ids: List[str]): + # Delete entries by IDs + self.collection.delete(expr=f"id in {ids}", timeout=self.timeout) diff --git a/requirements.txt b/requirements.txt index 5b6530899..62c33e611 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,6 +20,7 @@ voyageai # for voyageai reranker mixedbread-ai # for mixedbread-ai reranker llama-index-llms-bedrock scikit-learn +pymilvus # for using milvus vectordb ### API server ### quart diff --git a/tests/autorag/vectordb/test_milvus.py b/tests/autorag/vectordb/test_milvus.py new file mode 100644 index 000000000..da2ff3085 --- /dev/null +++ b/tests/autorag/vectordb/test_milvus.py @@ -0,0 +1,70 @@ +import asyncio +import os + +import pytest + +from autorag.vectordb.milvus import Milvus + + +@pytest.fixture +def milvus_instance(): + milvus = Milvus( + uri=os.environ["MILVUS_URI"], + token=os.environ["MILVUS_TOKEN"], + embedding_model="mock", + collection_name="test_collection", + similarity_metric="ip", + ) + yield milvus + milvus.collection.release() + milvus.collection.drop_index() + milvus.collection.drop() + + +@pytest.mark.asyncio +async def test_add_and_query_documents(milvus_instance): + # Add documents + ids = ["doc1", "doc2"] + texts = ["This is a test document.", "This is another test document."] + await milvus_instance.add(ids, texts) + + await asyncio.sleep(1) + + # Query documents + queries = ["test document"] + contents, scores = await milvus_instance.query(queries, top_k=2) + + assert len(contents) == 1 + assert len(scores) == 1 + assert len(contents[0]) == 2 + assert len(scores[0]) == 2 + assert scores[0][0] > scores[0][1] + + embeddings = await milvus_instance.fetch([ids[0]]) + assert len(embeddings) == 1 + assert len(embeddings[0]) == 768 + + exist = await milvus_instance.is_exist([ids[0], "doc3"]) + assert len(exist) == 2 + assert exist[0] is True + assert exist[1] is False + + +@pytest.mark.asyncio +async def test_delete_documents(milvus_instance): + # Add documents + ids = ["doc1", "doc2"] + texts = ["This is a test document.", "This is another test document."] + await milvus_instance.add(ids, texts) + + await asyncio.sleep(1) + + # Delete documents + await milvus_instance.delete([ids[0]]) + + # Query documents to ensure they are deleted + queries = ["test document"] + contents, scores = await milvus_instance.query(queries, top_k=2) + + assert len(contents[0]) == 1 + assert len(scores[0]) == 1 From b81cbe5909dd0df318e4fc645ae833a47061ab15 Mon Sep 17 00:00:00 2001 From: "Jeffrey (Dongkyu) Kim" Date: Wed, 23 Oct 2024 16:59:17 +0900 Subject: [PATCH 06/11] Implement milvus and test it (#879) Co-authored-by: jeffrey --- autorag/nodes/retrieval/vectordb.py | 6 ------ autorag/vectordb/__init__.py | 2 ++ autorag/vectordb/milvus.py | 24 ++++++++++++++++++++--- tests/autorag/test_evaluator.py | 24 +++++++++++++++++++++++ tests/autorag/vectordb/test_milvus.py | 9 +++++++++ tests/resources/simple_milvus.yaml | 28 +++++++++++++++++++++++++++ 6 files changed, 84 insertions(+), 9 deletions(-) create mode 100644 tests/resources/simple_milvus.yaml diff --git a/autorag/nodes/retrieval/vectordb.py b/autorag/nodes/retrieval/vectordb.py index 13fa89f9a..ba0cc2c13 100644 --- a/autorag/nodes/retrieval/vectordb.py +++ b/autorag/nodes/retrieval/vectordb.py @@ -98,12 +98,6 @@ def _pure( :return: The 2-d list contains a list of passage ids that retrieved from vectordb and 2-d list of its scores. It will be a length of queries. And each element has a length of top_k. """ - # check if bm25_corpus is valid - # TODO: available at other Vector DB? - assert ( - self.vector_store.collection.count() > 0 - ), "collection must contain at least one document. Please check you ingested collection correctly." - # if ids are specified, fetch the ids score from Chroma if ids is not None: return self.__get_ids_scores(queries, ids, embedding_batch) diff --git a/autorag/vectordb/__init__.py b/autorag/vectordb/__init__.py index c65ce13ef..01c254347 100644 --- a/autorag/vectordb/__init__.py +++ b/autorag/vectordb/__init__.py @@ -10,6 +10,8 @@ def get_support_vectordb(vectordb_name: str): support_vectordb = { "chroma": ("autorag.vectordb.chroma", "Chroma"), "Chroma": ("autorag.vectordb.chroma", "Chroma"), + "milvus": ("autorag.vectordb.milvus", "Milvus"), + "Milvus": ("autorag.vectordb.milvus", "Milvus"), } return dynamically_find_function(vectordb_name, support_vectordb) diff --git a/autorag/vectordb/milvus.py b/autorag/vectordb/milvus.py index 4939ef40c..d7ec28a85 100644 --- a/autorag/vectordb/milvus.py +++ b/autorag/vectordb/milvus.py @@ -1,12 +1,23 @@ +import logging from typing import List, Tuple, Optional -from pymilvus import DataType, FieldSchema, CollectionSchema, connections, Collection +from pymilvus import ( + DataType, + FieldSchema, + CollectionSchema, + connections, + Collection, + MilvusException, +) from pymilvus.orm import utility from autorag.utils.util import apply_recursive from autorag.vectordb import BaseVectorStore +logger = logging.getLogger("AutoRAG") + + class Milvus(BaseVectorStore): def __init__( self, @@ -116,7 +127,11 @@ async def query( return ids, distances async def fetch(self, ids: List[str]) -> List[List[float]]: - self.collection.load(timeout=self.timeout) + try: + self.collection.load(timeout=self.timeout) + except MilvusException as e: + logger.warning(f"Failed to load collection: {e}") + return [[]] * len(ids) # Fetch vectors by IDs results = self.collection.query( expr=f"id in {ids}", output_fields=["id", "vector"], timeout=self.timeout @@ -126,7 +141,10 @@ async def fetch(self, ids: List[str]) -> List[List[float]]: return result async def is_exist(self, ids: List[str]) -> List[bool]: - self.collection.load(timeout=self.timeout) + try: + self.collection.load(timeout=self.timeout) + except MilvusException: + return [False] * len(ids) # Check the existence of IDs results = self.collection.query( expr=f"id in {ids}", output_fields=["id"], timeout=self.timeout diff --git a/tests/autorag/test_evaluator.py b/tests/autorag/test_evaluator.py index a9b72b12d..c91202444 100644 --- a/tests/autorag/test_evaluator.py +++ b/tests/autorag/test_evaluator.py @@ -214,6 +214,30 @@ def test_start_trial(evaluator): assert trial_summary_df["best_execution_time"][0] > 0 +@pytest.mark.skipif( + is_github_action(), + reason="This test needs milvus uri and token which is confidential.", +) +def test_start_trial_milvus(evaluator): + evaluator.start_trial(os.path.join(resource_dir, "simple_milvus.yaml")) + project_dir = evaluator.project_dir + assert os.path.exists(os.path.join(project_dir, "0")) + assert os.path.exists(os.path.join(project_dir, "data")) + assert os.path.exists(os.path.join(project_dir, "resources")) + assert os.path.exists(os.path.join(project_dir, "trial.json")) + assert os.path.exists(os.path.join(project_dir, "0", "config.yaml")) + assert os.path.exists(os.path.join(project_dir, "0", "retrieve_node_line")) + assert os.path.exists( + os.path.join(project_dir, "0", "retrieve_node_line", "retrieval") + ) + assert os.path.exists( + os.path.join(project_dir, "0", "retrieve_node_line", "retrieval", "0.parquet") + ) + assert os.path.exists( + os.path.join(project_dir, "0", "retrieve_node_line", "retrieval", "1.parquet") + ) + + @patch.object( OpenAIEmbedding, "get_text_embedding_batch", diff --git a/tests/autorag/vectordb/test_milvus.py b/tests/autorag/vectordb/test_milvus.py index da2ff3085..5bf5feb02 100644 --- a/tests/autorag/vectordb/test_milvus.py +++ b/tests/autorag/vectordb/test_milvus.py @@ -4,8 +4,13 @@ import pytest from autorag.vectordb.milvus import Milvus +from tests.delete_tests import is_github_action +@pytest.mark.skipif( + is_github_action(), + reason="This test needs milvus uri and token which is confidential.", +) @pytest.fixture def milvus_instance(): milvus = Milvus( @@ -21,6 +26,10 @@ def milvus_instance(): milvus.collection.drop() +@pytest.mark.skipif( + is_github_action(), + reason="This test needs milvus uri and token which is confidential.", +) @pytest.mark.asyncio async def test_add_and_query_documents(milvus_instance): # Add documents diff --git a/tests/resources/simple_milvus.yaml b/tests/resources/simple_milvus.yaml new file mode 100644 index 000000000..b2faf9352 --- /dev/null +++ b/tests/resources/simple_milvus.yaml @@ -0,0 +1,28 @@ +vectordb: + - name: openai_embed_3_small + db_type: chroma + client_type: persistent + embedding_model: openai_embed_3_small + collection_name: openai_embed_3_small + path: ${PROJECT_DIR}/resources/chroma + - name: openai_embed_3_large + db_type: milvus + embedding_model: openai_embed_3_large + collection_name: openai_embed_3_large + uri: ${MILVUS_URI} + token: ${MILVUS_TOKEN} + embedding_batch: 50 +node_lines: +- node_line_name: retrieve_node_line + nodes: + - node_type: retrieval # represents run_node function + strategy: # essential for every node + metrics: [retrieval_f1, retrieval_recall] + top_k: 10 # node param, which adapt to every module in this node. + modules: + - module_type: ${BM25} # for testing env variable + bm25_tokenizer: [ facebook/opt-125m, porter_stemmer ] + - module_type: vectordb + vectordb: [openai_embed_3_large, openai_embed_3_small] + - module_type: hybrid_rrf + weight_range: (4, 30) From c103dc433f0d5fc2e278cdeee15a4f524f7d3df4 Mon Sep 17 00:00:00 2001 From: "Jeffrey (Dongkyu) Kim" Date: Wed, 23 Oct 2024 19:12:28 +0900 Subject: [PATCH 07/11] Use vectordb dictionary at extracted yaml file (#880) * extract vectordb and save it to extracted yaml as also * add delete collection method for milvus * edit hybrid testing for new vectordb * optimize imports at test_hybrid_cc.py --------- Co-authored-by: jeffrey --- autorag/deploy/base.py | 31 +++++++++++++++-- autorag/vectordb/milvus.py | 6 ++++ .../nodes/retrieval/test_hybrid_base.py | 34 +++++++++++++++---- .../autorag/nodes/retrieval/test_hybrid_cc.py | 11 +----- .../nodes/retrieval/test_hybrid_rrf.py | 2 +- tests/autorag/test_deploy.py | 30 ++++++++++++++-- tests/autorag/test_evaluator.py | 10 ++++++ tests/autorag/vectordb/test_milvus.py | 4 +-- .../result_project/resources/vectordb.yaml | 7 ++++ 9 files changed, 110 insertions(+), 25 deletions(-) create mode 100644 tests/resources/result_project/resources/vectordb.yaml diff --git a/autorag/deploy/base.py b/autorag/deploy/base.py index 61f072dd9..ab065f1f0 100644 --- a/autorag/deploy/base.py +++ b/autorag/deploy/base.py @@ -1,5 +1,6 @@ import logging import os +import pathlib import uuid from copy import deepcopy from typing import Optional, Dict, List @@ -93,13 +94,13 @@ def summary_df_to_yaml(summary_df: pd.DataFrame, config_dict: Dict) -> Dict: def extract_best_config(trial_path: str, output_path: Optional[str] = None) -> Dict: """ - Extract the optimal pipeline from evaluated trial. + Extract the optimal pipeline from the evaluated trial. :param trial_path: The path to the trial directory that you want to extract the pipeline from. Must already be evaluated. :param output_path: Output path that pipeline yaml file will be saved. Must be .yaml or .yml file. - If None, it does not save yaml file and just return dict values. + If None, it does not save YAML file and just returns dict values. Default is None. :return: The dictionary of the extracted pipeline. """ @@ -113,16 +114,42 @@ def extract_best_config(trial_path: str, output_path: Optional[str] = None) -> D with open(config_yaml_path, "r") as f: config_dict = yaml.safe_load(f) yaml_dict = summary_df_to_yaml(trial_summary_df, config_dict) + yaml_dict["vectordb"] = extract_vectordb_config(trial_path) if output_path is not None: with open(output_path, "w") as f: yaml.safe_dump(yaml_dict, f) return yaml_dict +def extract_vectordb_config(trial_path: str) -> List[Dict]: + # get vectordb.yaml file + project_dir = pathlib.PurePath(os.path.realpath(trial_path)).parent + vectordb_config_path = os.path.join(project_dir, "resources", "vectordb.yaml") + if not os.path.exists(vectordb_config_path): + raise ValueError(f"vectordb.yaml does not exist in {vectordb_config_path}.") + with open(vectordb_config_path, "r") as f: + vectordb_dict = yaml.safe_load(f) + result = vectordb_dict.get("vectordb", []) + if len(result) != 0: + return result + # return default setting of chroma + return [ + { + "name": "default", + "db_type": "chroma", + "client_type": "persistent", + "embedding_model": "openai", + "collection_name": "openai", + "path": os.path.join(project_dir, "resources", "chroma"), + } + ] + + class BaseRunner: def __init__(self, config: Dict, project_dir: Optional[str] = None): self.config = config project_dir = os.getcwd() if project_dir is None else project_dir + os.environ["PROJECT_DIR"] = project_dir # init modules node_lines = deepcopy(self.config["node_lines"]) diff --git a/autorag/vectordb/milvus.py b/autorag/vectordb/milvus.py index d7ec28a85..dab6e3a54 100644 --- a/autorag/vectordb/milvus.py +++ b/autorag/vectordb/milvus.py @@ -156,3 +156,9 @@ async def is_exist(self, ids: List[str]) -> List[bool]: async def delete(self, ids: List[str]): # Delete entries by IDs self.collection.delete(expr=f"id in {ids}", timeout=self.timeout) + + def delete_collection(self): + # Delete the collection + self.collection.release(timeout=self.timeout) + self.collection.drop_index(timeout=self.timeout) + self.collection.drop(timeout=self.timeout) diff --git a/tests/autorag/nodes/retrieval/test_hybrid_base.py b/tests/autorag/nodes/retrieval/test_hybrid_base.py index b0feed34f..0001fa811 100644 --- a/tests/autorag/nodes/retrieval/test_hybrid_base.py +++ b/tests/autorag/nodes/retrieval/test_hybrid_base.py @@ -2,13 +2,14 @@ import tempfile from datetime import datetime -import chromadb import pandas as pd import pytest -from llama_index.core import MockEmbedding +import yaml from autorag.nodes.retrieval.bm25 import bm25_ingest from autorag.nodes.retrieval.vectordb import vectordb_ingest +from autorag.utils.util import get_event_loop +from autorag.vectordb.chroma import Chroma sample_ids = ( [["id-1", "id-2", "id-3"], ["id-2", "id-3", "id-4"]], @@ -132,11 +133,32 @@ def pseudo_project_dir(): os.makedirs(resource_dir) bm25_ingest(os.path.join(resource_dir, "bm25_porter_stemmer.pkl"), corpus_df) chroma_path = os.path.join(resource_dir, "chroma") - db = chromadb.PersistentClient(path=chroma_path) - collection = db.create_collection( - name="openai", metadata={"hnsw:space": "cosine"} + + vectordb_config_path = os.path.join(resource_dir, "vectordb.yaml") + with open(vectordb_config_path, "w") as f: + vectordb_dict = { + "vectordb": [ + { + "name": "test_default", + "db_type": "chroma", + "embedding_model": "mock", + "collection_name": "openai", + "path": chroma_path, + "similarity_metric": "cosine", + } + ] + } + yaml.safe_dump(vectordb_dict, f) + + chroma = Chroma( + embedding_model="mock", + collection_name="openai", + similarity_metric="cosine", + client_type="persistent", + path=chroma_path, ) - vectordb_ingest(collection, corpus_df, MockEmbedding(embed_dim=1536)) + loop = get_event_loop() + loop.run_until_complete(vectordb_ingest(chroma, corpus_df)) yield project_dir diff --git a/tests/autorag/nodes/retrieval/test_hybrid_cc.py b/tests/autorag/nodes/retrieval/test_hybrid_cc.py index 52a22f857..e568bf0b6 100644 --- a/tests/autorag/nodes/retrieval/test_hybrid_cc.py +++ b/tests/autorag/nodes/retrieval/test_hybrid_cc.py @@ -1,8 +1,5 @@ -from unittest.mock import patch - import pandas as pd import pytest -from llama_index.embeddings.openai import OpenAIEmbedding from autorag.nodes.retrieval import HybridCC from autorag.nodes.retrieval.hybrid_cc import fuse_per_query, hybrid_cc @@ -16,7 +13,6 @@ pseudo_project_dir, previous_result, ) -from tests.mock import mock_get_text_embedding_batch def test_cc_fuse_per_query(): @@ -80,17 +76,12 @@ def test_hybrid_cc_fixed_weight(): assert isinstance(result_scores, list) -@patch.object( - OpenAIEmbedding, - "get_text_embedding_batch", - mock_get_text_embedding_batch, -) def test_hybrid_cc_node_deploy(pseudo_project_dir): modules = { "target_modules": ("bm25", "vectordb"), "target_module_params": [ {"top_k": 3}, - {"embedding_model": "openai", "top_k": 3}, + {"vectordb": "test_default", "top_k": 3}, ], "top_k": 3, "weight": 0.4, diff --git a/tests/autorag/nodes/retrieval/test_hybrid_rrf.py b/tests/autorag/nodes/retrieval/test_hybrid_rrf.py index 68cc772e8..16ad7cd5b 100644 --- a/tests/autorag/nodes/retrieval/test_hybrid_rrf.py +++ b/tests/autorag/nodes/retrieval/test_hybrid_rrf.py @@ -80,7 +80,7 @@ def test_hybrid_rrf_node_deploy(pseudo_project_dir): "target_modules": ("bm25", "vectordb"), "target_module_params": [ {"top_k": 3}, - {"embedding_model": "openai", "top_k": 3}, + {"vectordb": "test_default", "top_k": 3}, ], "top_k": 3, "weight": 1, diff --git a/tests/autorag/test_deploy.py b/tests/autorag/test_deploy.py index f4e2891a1..5622be46d 100644 --- a/tests/autorag/test_deploy.py +++ b/tests/autorag/test_deploy.py @@ -120,6 +120,25 @@ def full_config(): @pytest.fixture def pseudo_trial_path(): with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as project_dir: + os.makedirs(os.path.join(project_dir, "resources")) + vectordb_config_path = os.path.join(project_dir, "resources", "vectordb.yaml") + with open(vectordb_config_path, "w") as f: + yaml.safe_dump( + { + "vectordb": [ + { + "name": "default", + "db_type": "chroma", + "client_type": "persistent", + "embedding_model": "openai", + "collection_name": "openai", + "path": os.path.join(project_dir, "resources", "chroma"), + } + ] + }, + f, + ) + trial_path = os.path.join(project_dir, "0") os.makedirs(trial_path) summary_df.to_csv(os.path.join(trial_path, "summary.csv"), index=False) @@ -159,15 +178,20 @@ def test_summary_df_to_yaml(): def test_extract_best_config(pseudo_trial_path): yaml_dict = extract_best_config(pseudo_trial_path) - assert yaml_dict == solution_dict + assert yaml_dict["node_lines"] == solution_dict["node_lines"] with tempfile.NamedTemporaryFile( suffix="yaml", mode="w+t", delete=False ) as yaml_path: yaml_dict = extract_best_config(pseudo_trial_path, yaml_path.name) - assert yaml_dict == solution_dict + assert yaml_dict["node_lines"] == solution_dict["node_lines"] assert os.path.exists(yaml_path.name) yaml_dict = yaml.safe_load(yaml_path) - assert yaml_dict == solution_dict + assert yaml_dict["node_lines"] == solution_dict["node_lines"] + assert yaml_dict["vectordb"][0]["name"] == "default" + assert yaml_dict["vectordb"][0]["db_type"] == "chroma" + assert yaml_dict["vectordb"][0]["client_type"] == "persistent" + assert yaml_dict["vectordb"][0]["embedding_model"] == "openai" + assert yaml_dict["vectordb"][0]["collection_name"] == "openai" yaml_path.close() os.unlink(yaml_path.name) diff --git a/tests/autorag/test_evaluator.py b/tests/autorag/test_evaluator.py index c91202444..1772fdf69 100644 --- a/tests/autorag/test_evaluator.py +++ b/tests/autorag/test_evaluator.py @@ -18,6 +18,7 @@ from autorag.schema import Node from autorag.utils import validate_qa_dataset, validate_corpus_dataset from autorag.utils.util import load_summary_file +from autorag.vectordb.milvus import Milvus from tests.delete_tests import is_github_action from tests.mock import mock_get_text_embedding_batch, mock_aget_text_embedding_batch @@ -219,6 +220,15 @@ def test_start_trial(evaluator): reason="This test needs milvus uri and token which is confidential.", ) def test_start_trial_milvus(evaluator): + milvus = Milvus( + uri=os.environ["MILVUS_URI"], + token=os.environ["MILVUS_TOKEN"], + embedding_model="openai_embed_3_large", + collection_name="openai_embed_3_large", + ) + milvus.delete_collection() + del milvus + evaluator.start_trial(os.path.join(resource_dir, "simple_milvus.yaml")) project_dir = evaluator.project_dir assert os.path.exists(os.path.join(project_dir, "0")) diff --git a/tests/autorag/vectordb/test_milvus.py b/tests/autorag/vectordb/test_milvus.py index 5bf5feb02..2eb505fc5 100644 --- a/tests/autorag/vectordb/test_milvus.py +++ b/tests/autorag/vectordb/test_milvus.py @@ -21,9 +21,7 @@ def milvus_instance(): similarity_metric="ip", ) yield milvus - milvus.collection.release() - milvus.collection.drop_index() - milvus.collection.drop() + milvus.delete_collection() @pytest.mark.skipif( diff --git a/tests/resources/result_project/resources/vectordb.yaml b/tests/resources/result_project/resources/vectordb.yaml new file mode 100644 index 000000000..8a037656e --- /dev/null +++ b/tests/resources/result_project/resources/vectordb.yaml @@ -0,0 +1,7 @@ +vectordb: + - name: default + db_type: chroma + client_type: persistent + embedding_model: openai + collection_name: openai + path: ./chroma From 55748649548e55aca6500951918c3bf4a7e57813 Mon Sep 17 00:00:00 2001 From: "Jeffrey (Dongkyu) Kim" Date: Wed, 23 Oct 2024 20:53:15 +0900 Subject: [PATCH 08/11] Documentation for new external Vector DB connection (#881) * add vector db documentations (milvus and chroma) * change documentation file for vectordb * Fix all sample config YAML file to use vectordb --------- Co-authored-by: jeffrey --- README.md | 2 +- docs/source/index.rst | 8 ++ docs/source/local_model.md | 10 +- docs/source/migration.md | 59 ++++++++++ .../nodes/query_expansion/query_expansion.md | 2 +- docs/source/nodes/retrieval/vectordb.md | 12 +- docs/source/optimization/custom_config.md | 13 +- docs/source/vectordb/chroma.md | 111 ++++++++++++++++++ docs/source/vectordb/milvus.md | 111 ++++++++++++++++++ docs/source/vectordb/vectordb.md | 103 ++++++++++++++++ .../rag/english/gpu/compact_local.yaml | 11 +- .../rag/english/gpu/compact_openai.yaml | 3 +- sample_config/rag/english/gpu/full.yaml | 5 +- sample_config/rag/english/gpu/half.yaml | 3 +- .../rag/english/gpu_api/compact.yaml | 3 +- sample_config/rag/english/gpu_api/full.yaml | 5 +- sample_config/rag/english/gpu_api/half.yaml | 3 +- .../rag/english/non_gpu/compact.yaml | 3 +- sample_config/rag/english/non_gpu/full.yaml | 2 +- sample_config/rag/english/non_gpu/half.yaml | 3 +- .../rag/english/non_gpu/simple_bedrock.yaml | 9 +- .../rag/english/non_gpu/simple_local.yaml | 10 +- .../rag/english/non_gpu/simple_ollama.yaml | 9 +- .../rag/english/non_gpu/simple_openai.yaml | 2 +- sample_config/rag/extracted_sample.yaml | 9 +- .../rag/korean/gpu/compact_korean.yaml | 3 +- sample_config/rag/korean/gpu/full_korean.yaml | 5 +- sample_config/rag/korean/gpu/half_korean.yaml | 3 +- .../rag/korean/gpu_api/compact_korean.yaml | 3 +- .../rag/korean/gpu_api/full_korean.yaml | 5 +- .../rag/korean/gpu_api/half_korean.yaml | 3 +- .../rag/korean/non_gpu/compact_korean.yaml | 3 +- .../rag/korean/non_gpu/full_korean.yaml | 5 +- .../rag/korean/non_gpu/half_korean.yaml | 3 +- .../rag/korean/non_gpu/simple_korean.yaml | 2 +- 35 files changed, 478 insertions(+), 68 deletions(-) create mode 100644 docs/source/vectordb/chroma.md create mode 100644 docs/source/vectordb/milvus.md create mode 100644 docs/source/vectordb/vectordb.md diff --git a/README.md b/README.md index 1877bba78..9a17195d6 100644 --- a/README.md +++ b/README.md @@ -316,7 +316,7 @@ node_lines: top_k: 3 modules: - module_type: vectordb - embedding_model: openai + vectordb: default - module_type: bm25 - module_type: hybrid_rrf weight_range: (4,80) diff --git a/docs/source/index.rst b/docs/source/index.rst index a401bdcb2..8ef31d5fd 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -84,6 +84,14 @@ Also, feel free to ask your question at our `github issue Date: Wed, 23 Oct 2024 21:09:55 +0900 Subject: [PATCH 09/11] resolve error at test_run_retrieval_node.py --- .../retrieval/test_run_retrieval_node.py | 45 ++++++++++++++----- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/tests/autorag/nodes/retrieval/test_run_retrieval_node.py b/tests/autorag/nodes/retrieval/test_run_retrieval_node.py index 79c372ad1..cc51cd5b8 100644 --- a/tests/autorag/nodes/retrieval/test_run_retrieval_node.py +++ b/tests/autorag/nodes/retrieval/test_run_retrieval_node.py @@ -4,16 +4,18 @@ import tempfile from unittest.mock import patch -import chromadb import pandas as pd import pytest +import yaml from llama_index.core import MockEmbedding from llama_index.embeddings.openai import OpenAIEmbedding +import autorag from autorag.nodes.retrieval import BM25, VectorDB, HybridCC, HybridRRF from autorag.nodes.retrieval.run import run_retrieval_node from autorag.nodes.retrieval.vectordb import vectordb_ingest -from autorag.utils.util import load_summary_file +from autorag.utils.util import load_summary_file, get_event_loop +from autorag.vectordb.chroma import Chroma from tests.mock import mock_get_text_embedding_batch root_dir = pathlib.PurePath( @@ -31,13 +33,34 @@ def node_line_dir(): chroma_path = os.path.join(test_project_dir, "resources", "chroma") os.makedirs(chroma_path) - db = chromadb.PersistentClient(path=chroma_path) - collection = db.create_collection( - name="openai", metadata={"hnsw:space": "cosine"} - ) corpus_path = os.path.join(test_project_dir, "data", "corpus.parquet") corpus_df = pd.read_parquet(corpus_path) - vectordb_ingest(collection, corpus_df, MockEmbedding(1536)) + autorag.embedding_models["mock_1536"] = autorag.LazyInit( + MockEmbedding, embed_dim=1536 + ) + chroma_config = { + "client_type": "persistent", + "embedding_model": "mock_1536", + "collection_name": "openai", + "path": chroma_path, + "similarity_metric": "cosine", + } + chroma = Chroma(**chroma_config) + loop = get_event_loop() + loop.run_until_complete(vectordb_ingest(chroma, corpus_df)) + + chroma_config_path = os.path.join( + test_project_dir, "resources", "vectordb.yaml" + ) + with open(chroma_config_path, "w") as f: + yaml.safe_dump( + { + "vectordb": [ + {**chroma_config, "name": "test_mock", "db_type": "chroma"} + ] + }, + f, + ) test_trial_dir = os.path.join(test_project_dir, "test_trial") os.makedirs(test_trial_dir) @@ -55,7 +78,7 @@ def test_run_retrieval_node(node_line_dir): modules = [BM25, VectorDB, HybridRRF, HybridCC, HybridCC] module_params = [ {"top_k": 4, "bm25_tokenizer": "gpt2"}, - {"top_k": 4, "embedding_model": "openai"}, + {"top_k": 4, "vectordb": "test_mock"}, {"top_k": 4, "weight_range": (5, 70)}, {"top_k": 4, "weight_range": (0.3, 0.7), "test_weight_size": 40}, {"top_k": 4, "weight_range": (0.1, 0.9), "test_weight_size": 8}, @@ -107,7 +130,7 @@ def test_run_retrieval_node(node_line_dir): assert summary_df["retrieval_f1"][1] == bm25_top_k_df["retrieval_f1"].mean() assert summary_df["retrieval_recall"][1] == bm25_top_k_df["retrieval_recall"].mean() assert summary_df["module_name"][0] == "VectorDB" - assert summary_df["module_params"][0] == {"top_k": 4, "embedding_model": "openai"} + assert summary_df["module_params"][0] == {"top_k": 4, "vectordb": "test_mock"} assert summary_df["execution_time"][0] > 0 # assert average times assert summary_df["execution_time"][0] + summary_df["execution_time"][ @@ -231,7 +254,7 @@ def test_run_retrieval_node_only_hybrid(node_line_dir): "weight": 0.3, "target_module_params": ( {"top_k": 3}, - {"top_k": 3, "embedding_model": "openai"}, + {"top_k": 3, "vectordb": "test_mock"}, ), }, ] @@ -288,7 +311,7 @@ def test_run_retrieval_node_only_hybrid(node_line_dir): "weight": 0.3, "target_module_params": ( {"top_k": 3}, - {"top_k": 3, "embedding_model": "openai"}, + {"top_k": 3, "vectordb": "test_mock"}, ), } assert summary_df["execution_time"][0] > 0 From cb641903ecdb91b473e1aff8c9fc8c44808d1377 Mon Sep 17 00:00:00 2001 From: jeffrey Date: Wed, 23 Oct 2024 21:24:07 +0900 Subject: [PATCH 10/11] skip milvus test code --- tests/autorag/vectordb/test_milvus.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/autorag/vectordb/test_milvus.py b/tests/autorag/vectordb/test_milvus.py index 2eb505fc5..7d6247f8b 100644 --- a/tests/autorag/vectordb/test_milvus.py +++ b/tests/autorag/vectordb/test_milvus.py @@ -57,6 +57,10 @@ async def test_add_and_query_documents(milvus_instance): assert exist[1] is False +@pytest.mark.skipif( + is_github_action(), + reason="This test needs milvus uri and token which is confidential.", +) @pytest.mark.asyncio async def test_delete_documents(milvus_instance): # Add documents From bec9759ab12d56ae266d6c6f2e594f03dbbdc75a Mon Sep 17 00:00:00 2001 From: jeffrey Date: Wed, 23 Oct 2024 22:17:11 +0900 Subject: [PATCH 11/11] resolve test error --- autorag/cli.py | 2 +- autorag/evaluator.py | 1 + tests/resources/result_project/0/config.yaml | 4 ++-- tests/resources/result_project/1/config.yaml | 4 ++-- tests/resources/result_project/2/config.yaml | 4 ++-- tests/resources/result_project/3/config.yaml | 10 ++++++++-- 6 files changed, 16 insertions(+), 9 deletions(-) diff --git a/autorag/cli.py b/autorag/cli.py index c6c9aa421..809a2b3f6 100644 --- a/autorag/cli.py +++ b/autorag/cli.py @@ -159,7 +159,7 @@ def extract_best_config(trial_path: str, output_path: str): def restart_evaluate(trial_path): if not os.path.exists(trial_path): raise ValueError(f"trial_path {trial_path} does not exist.") - project_dir = pathlib.PurePath(trial_path).parent + project_dir = str(pathlib.PurePath(trial_path).parent) qa_data_path = os.path.join(project_dir, "data", "qa.parquet") corpus_data_path = os.path.join(project_dir, "data", "corpus.parquet") evaluator = Evaluator(qa_data_path, corpus_data_path, project_dir) diff --git a/autorag/evaluator.py b/autorag/evaluator.py index 3e43bbdb8..4f0661127 100644 --- a/autorag/evaluator.py +++ b/autorag/evaluator.py @@ -266,6 +266,7 @@ def _load_node_lines(yaml_path: str) -> Dict[str, List[Node]]: def restart_trial(self, trial_path: str): logger.info(ascii_art) + os.environ["PROJECT_DIR"] = self.project_dir # Check if trial_path exists if not os.path.exists(trial_path): raise ValueError(f"Trial path {trial_path} does not exist.") diff --git a/tests/resources/result_project/0/config.yaml b/tests/resources/result_project/0/config.yaml index 57a38d6e8..9e11fbaca 100644 --- a/tests/resources/result_project/0/config.yaml +++ b/tests/resources/result_project/0/config.yaml @@ -9,7 +9,7 @@ node_lines: retrieval_modules: - module_type: bm25 - module_type: vectordb - embedding_model: openai + vectordb: default modules: - module_type: query_decompose llm: openai @@ -26,7 +26,7 @@ node_lines: modules: - module_type: bm25 - module_type: vectordb - embedding_model: openai + vectordb: default - node_type: passage_reranker strategy: metrics: [retrieval_f1, retrieval_recall, retrieval_precision] diff --git a/tests/resources/result_project/1/config.yaml b/tests/resources/result_project/1/config.yaml index ab3783f06..8c3563503 100644 --- a/tests/resources/result_project/1/config.yaml +++ b/tests/resources/result_project/1/config.yaml @@ -9,7 +9,7 @@ node_lines: retrieval_modules: - module_type: bm25 - module_type: vectordb - embedding_model: openai + vectordb: default modules: - module_type: query_decompose llm: openai @@ -27,7 +27,7 @@ node_lines: modules: - module_type: bm25 - module_type: vectordb - embedding_model: openai + vectordb: default - node_type: passage_reranker strategy: metrics: [ retrieval_f1, retrieval_recall, retrieval_precision ] diff --git a/tests/resources/result_project/2/config.yaml b/tests/resources/result_project/2/config.yaml index 446dfc06e..cd2611165 100644 --- a/tests/resources/result_project/2/config.yaml +++ b/tests/resources/result_project/2/config.yaml @@ -9,7 +9,7 @@ node_lines: retrieval_modules: - module_type: bm25 - module_type: vectordb - embedding_model: openai + vectordb: default modules: - module_type: query_decompose llm: openai @@ -27,7 +27,7 @@ node_lines: modules: - module_type: bm25 - module_type: vectordb - embedding_model: openai + vectordb: default - node_type: passage_reranker strategy: metrics: [ retrieval_f1, retrieval_recall, retrieval_precision ] diff --git a/tests/resources/result_project/3/config.yaml b/tests/resources/result_project/3/config.yaml index ff0fcf1fc..08ca1e4bb 100644 --- a/tests/resources/result_project/3/config.yaml +++ b/tests/resources/result_project/3/config.yaml @@ -1,4 +1,10 @@ -# need to delete when there are full modules implemented +vectordb: + - name: mock_chroma + db_type: chroma + client_type: persistent + path: ${PROJECT_DIR}/resources/chroma + collection_name: mock_chroma + embedding_model: mock node_lines: - node_line_name: retrieve_node_line nodes: @@ -9,6 +15,6 @@ node_lines: modules: - module_type: ${BM25} # for testing env variable - module_type: vectordb - embedding_model: [ mock, mock ] + vectordb: mock_chroma - module_type: hybrid_rrf weight_range: (4, 40)