Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add External VectorDB Connections #872

Merged
merged 12 commits into from
Oct 23, 2024
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ node_lines:
top_k: 3
modules:
- module_type: vectordb
embedding_model: openai
vectordb: default
- module_type: bm25
- module_type: hybrid_rrf
weight_range: (4,80)
Expand Down
2 changes: 1 addition & 1 deletion autorag/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def extract_best_config(trial_path: str, output_path: str):
def restart_evaluate(trial_path):
if not os.path.exists(trial_path):
raise ValueError(f"trial_path {trial_path} does not exist.")
project_dir = pathlib.PurePath(trial_path).parent
project_dir = str(pathlib.PurePath(trial_path).parent)
qa_data_path = os.path.join(project_dir, "data", "qa.parquet")
corpus_data_path = os.path.join(project_dir, "data", "corpus.parquet")
evaluator = Evaluator(qa_data_path, corpus_data_path, project_dir)
Expand Down
1 change: 1 addition & 0 deletions autorag/data/legacy/qacreation/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def make_qa_with_existing_qa(
:param kwargs: The keyword arguments for qa_creation_func.
:return: QA dataset dataframe.
"""
raise DeprecationWarning("This function is deprecated.")
assert (
"query" in existing_query_df.columns
), "existing_query_df must have 'query' column."
Expand Down
43 changes: 31 additions & 12 deletions autorag/deploy/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
import pathlib
import uuid
from copy import deepcopy
from typing import Optional, Dict, List
Expand All @@ -8,8 +9,7 @@
import yaml

from autorag.support import get_support_modules
from autorag.utils.util import load_summary_file

from autorag.utils.util import load_summary_file, load_yaml_config

logger = logging.getLogger("AutoRAG")

Expand Down Expand Up @@ -94,13 +94,13 @@ def summary_df_to_yaml(summary_df: pd.DataFrame, config_dict: Dict) -> Dict:

def extract_best_config(trial_path: str, output_path: Optional[str] = None) -> Dict:
"""
Extract the optimal pipeline from evaluated trial.
Extract the optimal pipeline from the evaluated trial.

:param trial_path: The path to the trial directory that you want to extract the pipeline from.
Must already be evaluated.
:param output_path: Output path that pipeline yaml file will be saved.
Must be .yaml or .yml file.
If None, it does not save yaml file and just return dict values.
If None, it does not save YAML file and just returns dict values.
Default is None.
:return: The dictionary of the extracted pipeline.
"""
Expand All @@ -114,18 +114,42 @@ def extract_best_config(trial_path: str, output_path: Optional[str] = None) -> D
with open(config_yaml_path, "r") as f:
config_dict = yaml.safe_load(f)
yaml_dict = summary_df_to_yaml(trial_summary_df, config_dict)
yaml_dict["vectordb"] = extract_vectordb_config(trial_path)
if output_path is not None:
with open(output_path, "w") as f:
yaml.safe_dump(yaml_dict, f)
return yaml_dict


def extract_vectordb_config(trial_path: str) -> List[Dict]:
# get vectordb.yaml file
project_dir = pathlib.PurePath(os.path.realpath(trial_path)).parent
vectordb_config_path = os.path.join(project_dir, "resources", "vectordb.yaml")
if not os.path.exists(vectordb_config_path):
raise ValueError(f"vectordb.yaml does not exist in {vectordb_config_path}.")
with open(vectordb_config_path, "r") as f:
vectordb_dict = yaml.safe_load(f)
result = vectordb_dict.get("vectordb", [])
if len(result) != 0:
return result
# return default setting of chroma
return [
{
"name": "default",
"db_type": "chroma",
"client_type": "persistent",
"embedding_model": "openai",
"collection_name": "openai",
"path": os.path.join(project_dir, "resources", "chroma"),
}
]


class BaseRunner:
def __init__(self, config: Dict, project_dir: Optional[str] = None):
self.config = config
project_dir = os.getcwd() if project_dir is None else project_dir
# self.app = Flask(__name__)
# self.__add_api_route()
os.environ["PROJECT_DIR"] = project_dir

# init modules
node_lines = deepcopy(self.config["node_lines"])
Expand Down Expand Up @@ -159,12 +183,7 @@ def from_yaml(cls, yaml_path: str, project_dir: Optional[str] = None):
Default is the current directory.
:return: Initialized Runner.
"""
with open(yaml_path, "r") as f:
try:
config = yaml.safe_load(f)
except yaml.YAMLError as exc:
logger.error(exc)
raise exc
config = load_yaml_config(yaml_path)
return cls(config, project_dir=project_dir)

@classmethod
Expand Down
8 changes: 8 additions & 0 deletions autorag/evaluation/metric/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ def calculate_cosine_similarity(a, b):
return similarity


def calculate_l2_distance(a, b):
return np.linalg.norm(a - b)


def calculate_inner_product(a, b):
return np.dot(a, b)


def autorag_metric(fields_to_check: List[str]):
def decorator_autorag_metric(func):
@functools.wraps(func)
Expand Down
162 changes: 70 additions & 92 deletions autorag/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@
from itertools import chain
from typing import List, Dict, Optional

import chromadb
import pandas as pd
import yaml

from autorag import embedding_models
from autorag.node_line import run_node_line
from autorag.nodes.retrieval.base import get_bm25_pkl_name
from autorag.nodes.retrieval.bm25 import bm25_ingest
from autorag.nodes.retrieval.vectordb import vectordb_ingest
from autorag.nodes.retrieval.vectordb import (
vectordb_ingest,
filter_exist_ids,
filter_exist_ids_from_retrieval_gt,
)
from autorag.schema import Node
from autorag.schema.node import (
module_type_exists,
Expand All @@ -29,11 +31,11 @@
)
from autorag.utils.util import (
load_summary_file,
convert_string_to_tuple_in_dict,
convert_env_in_dict,
explode,
empty_cuda_cache,
load_yaml_config,
get_event_loop,
)
from autorag.vectordb import load_all_vectordb_from_yaml

logger = logging.getLogger("AutoRAG")

Expand Down Expand Up @@ -100,7 +102,26 @@ def __init__(
if not os.path.exists(corpus_path_in_project):
self.corpus_data.to_parquet(corpus_path_in_project, index=False)

def start_trial(self, yaml_path: str, skip_validation: bool = False):
def start_trial(
self, yaml_path: str, skip_validation: bool = False, full_ingest: bool = True
):
"""
Start AutoRAG trial.
The trial means one experiment to optimize the RAG pipeline.
It consists of ingesting corpus data, running all nodes and modules, evaluating and finding the optimal modules.

:param yaml_path: The config YAML path
:param skip_validation: If True, it skips the validation step.
The validation step checks the input config YAML file is well formatted,
and there is any problem with the system settings.
Default is False.
:param full_ingest: If True, it checks the whole corpus data from corpus.parquet that exists in the Vector DB.
If your corpus is huge and don't want to check the whole vector DB, please set it to False.
:return: None
"""
# Make Resources directory
os.makedirs(os.path.join(self.project_dir, "resources"), exist_ok=True)

if not skip_validation:
logger.info(ascii_art)
logger.info(
Expand All @@ -115,15 +136,38 @@ def start_trial(self, yaml_path: str, skip_validation: bool = False):
)
validator.validate(yaml_path)

os.environ["PROJECT_DIR"] = self.project_dir

trial_name = self.__get_new_trial_name()
self.__make_trial_dir(trial_name)

# copy YAML file to the trial directory
shutil.copy(
yaml_path, os.path.join(self.project_dir, trial_name, "config.yaml")
)
yaml_dict = load_yaml_config(yaml_path)
vectordb = yaml_dict.get("vectordb", [])

vectordb_config_path = os.path.join(
self.project_dir, "resources", "vectordb.yaml"
)
with open(vectordb_config_path, "w") as f:
yaml.safe_dump({"vectordb": vectordb}, f)

node_lines = self._load_node_lines(yaml_path)
self.__embed(node_lines)
self.__ingest_bm25_full(node_lines)

# Ingest VectorDB corpus
if any(
list(
map(
lambda nodes: module_type_exists(nodes, "vectordb"),
node_lines.values(),
)
)
):
loop = get_event_loop()
loop.run_until_complete(self.__ingest_vectordb(yaml_path, full_ingest))

trial_summary_df = pd.DataFrame(
columns=[
Expand Down Expand Up @@ -153,15 +197,14 @@ def start_trial(self, yaml_path: str, skip_validation: bool = False):

logger.info("Evaluation complete.")

def __embed(self, node_lines: Dict[str, List[Node]]):
def __ingest_bm25_full(self, node_lines: Dict[str, List[Node]]):
if any(
list(
map(
lambda nodes: module_type_exists(nodes, "bm25"), node_lines.values()
)
)
):
# ingest BM25 corpus
logger.info("Embedding BM25 corpus...")
bm25_tokenizer_list = list(
chain.from_iterable(
Expand All @@ -184,78 +227,6 @@ def __embed(self, node_lines: Dict[str, List[Node]]):
bm25_ingest(bm25_dir, self.corpus_data, bm25_tokenizer=bm25_tokenizer)
logger.info("BM25 corpus embedding complete.")

if any(
list(
map(
lambda nodes: module_type_exists(nodes, "vectordb"),
node_lines.values(),
)
)
):
# load embedding_models in nodes
embedding_models_list = list(
chain.from_iterable(
map(
lambda nodes: self._find_embedding_model(nodes),
node_lines.values(),
)
)
)

# get embedding batch size in nodes
embedding_batch_list = list(
chain.from_iterable(
map(
lambda nodes: extract_values_from_nodes(
nodes, "embedding_batch"
),
node_lines.values(),
)
)
)
if len(embedding_batch_list) == 0:
embedding_batch = 100
else:
embedding_batch = embedding_batch_list[0]

# duplicate check in embedding_models
embedding_models_list = list(set(embedding_models_list))

vectordb_dir = os.path.join(self.project_dir, "resources", "chroma")
vectordb = chromadb.PersistentClient(path=vectordb_dir)

for embedding_model_str in embedding_models_list:
# ingest VectorDB corpus
logger.info(f"Embedding VectorDB corpus with {embedding_model_str}...")

# Get the collection with GET or CREATE, as it may already exist
collection = vectordb.get_or_create_collection(
name=embedding_model_str, metadata={"hnsw:space": "cosine"}
)
# get embedding_model
if embedding_model_str in embedding_models:
embedding_model = embedding_models[embedding_model_str]()
else:
logger.error(
f"embedding_model_str {embedding_model_str} does not exist."
)
raise KeyError(
f"embedding_model_str {embedding_model_str} does not exist."
)
vectordb_ingest(
collection,
self.corpus_data,
embedding_model,
embedding_batch=embedding_batch,
)
logger.info(
f"VectorDB corpus embedding complete with {embedding_model_str}."
)
del embedding_model
empty_cuda_cache()
else:
logger.info("No ingestion needed.")

def __get_new_trial_name(self) -> str:
trial_json_path = os.path.join(self.project_dir, "trial.json")
if not os.path.exists(trial_json_path):
Expand Down Expand Up @@ -284,16 +255,7 @@ def __make_trial_dir(self, trial_name: str):

@staticmethod
def _load_node_lines(yaml_path: str) -> Dict[str, List[Node]]:
if not os.path.exists(yaml_path):
raise ValueError(f"YAML file {yaml_path} does not exist.")
with open(yaml_path, "r", encoding="utf-8") as stream:
try:
yaml_dict = yaml.safe_load(stream)
except yaml.YAMLError as exc:
raise ValueError(f"YAML file {yaml_path} could not be loaded.") from exc

yaml_dict = convert_string_to_tuple_in_dict(yaml_dict)
yaml_dict = convert_env_in_dict(yaml_dict)
yaml_dict = load_yaml_config(yaml_path)
node_lines = yaml_dict["node_lines"]
node_line_dict = {}
for node_line in node_lines:
Expand All @@ -304,6 +266,7 @@ def _load_node_lines(yaml_path: str) -> Dict[str, List[Node]]:

def restart_trial(self, trial_path: str):
logger.info(ascii_art)
os.environ["PROJECT_DIR"] = self.project_dir
# Check if trial_path exists
if not os.path.exists(trial_path):
raise ValueError(f"Trial path {trial_path} does not exist.")
Expand Down Expand Up @@ -560,3 +523,18 @@ def _find_embedding_model(nodes: List[Node]):
filter(lambda x: x is not None, embedding_models_list)
)
return list(set(embedding_models_list))

async def __ingest_vectordb(self, yaml_path, full_ingest: bool):
vectordb_list = load_all_vectordb_from_yaml(yaml_path, self.project_dir)
if full_ingest is True:
# get the target ingest corpus from the whole corpus
for vectordb in vectordb_list:
target_corpus = await filter_exist_ids(vectordb, self.corpus_data)
await vectordb_ingest(vectordb, target_corpus)
else:
# get the target ingest corpus from the retrieval gt only
for vectordb in vectordb_list:
target_corpus = await filter_exist_ids_from_retrieval_gt(
vectordb, self.qa_data, self.corpus_data
)
await vectordb_ingest(vectordb, target_corpus)
Loading