Skip to content

Commit

Permalink
Enable using hybrid retrieval at deploy. (#107)
Browse files Browse the repository at this point in the history
* add full trail result at resources folder for testing Runner

* test by full trial_folder and run whole pipeline.

* Fix pytest fixture that contains config.yaml

* Using hybrid retrieval without using run.py, with target_module_params and target_modules

* delete ids and scores module at summary, and add target_modules and target_module_params for deploy

---------

Co-authored-by: jeffrey <vkefhdl1@gmail.com>
  • Loading branch information
vkehfdl1 and jeffrey authored Feb 3, 2024
1 parent 605290f commit 4c3c356
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 5 deletions.
21 changes: 20 additions & 1 deletion autorag/nodes/retrieval/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pandas as pd

from autorag import embedding_models
from autorag.support import get_support_modules
from autorag.utils import fetch_contents, result_to_dataframe, validate_qa_dataset

import logging
Expand Down Expand Up @@ -68,7 +69,20 @@ def wrapper(
ids, scores = func(queries=queries, collection=chroma_collection,
embedding_model=embedding_model, **kwargs)
elif func.__name__ in ["hybrid_rrf", "hybrid_cc"]:
ids, scores = func(**kwargs)
if 'ids' in kwargs and 'scores' in kwargs:
ids, scores = func(**kwargs)
else:
if not ('target_modules' in kwargs and 'target_module_params' in kwargs):
raise ValueError(
f"If there are no ids and scores specified, target_modules and target_module_params must be specified for using {func.__name__}.")
target_modules = kwargs.pop('target_modules')
target_module_params = kwargs.pop('target_module_params')
result_dfs = list(map(lambda x: get_support_modules(x[0])(**x[1], project_dir=project_dir,
previous_result=previous_result),
zip(target_modules, target_module_params)))
ids = tuple(map(lambda df: df['retrieved_ids'].apply(list).tolist(), result_dfs))
scores = tuple(map(lambda df: df['retrieve_scores'].apply(list).tolist(), result_dfs))
ids, scores = func(ids=ids, scores=scores, **kwargs)
else:
raise ValueError(f"invalid func name for using retrieval_io decorator.")

Expand Down Expand Up @@ -122,3 +136,8 @@ def evenly_distribute_passages(ids: List[List[str]], scores: List[List[float]],
new_scores.extend(scores[i][:avg_len])

return new_ids, new_scores


def run_retrieval_modules(project_dir: str, previous_result: pd.DataFrame,
module_name: str, module_params: Dict) -> pd.DataFrame:
return
25 changes: 24 additions & 1 deletion autorag/nodes/retrieval/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,15 @@ def run_and_save(input_modules, input_module_params, filename_start: int):
target_modules = list(map(lambda x: x.pop('target_modules'), hybrid_module_params))
target_filenames = list(map(lambda x: select_result_for_hybrid(save_dir, x), target_modules))
ids_scores = list(map(lambda x: get_ids_and_scores(save_dir, x), target_filenames))
target_module_params = list(map(lambda x: get_module_params(save_dir, x), target_filenames))
hybrid_module_params = list(map(lambda x: {**x[0], **x[1]}, zip(hybrid_module_params, ids_scores)))
real_hybrid_times = list(map(lambda filename: get_hybrid_execution_times(save_dir, filename), target_filenames))
hybrid_results, hybrid_times, hybrid_summary_df = run_and_save(hybrid_modules, hybrid_module_params, filename_first)
hybrid_results, hybrid_times, hybrid_summary_df = run_and_save(hybrid_modules, hybrid_module_params,
filename_first)
filename_first += len(hybrid_modules)
hybrid_times = real_hybrid_times.copy()
hybrid_summary_df['execution_time'] = hybrid_times
hybrid_summary_df = edit_summary_df_params(hybrid_summary_df, target_modules, target_module_params)
else:
hybrid_results, hybrid_times, hybrid_summary_df = [], [], pd.DataFrame()

Expand Down Expand Up @@ -158,6 +161,26 @@ def select_best_among_module(df: pd.DataFrame, module_name: str):
return best_filenames


def get_module_params(node_dir: str, filenames: List[str]) -> List[Dict]:
summary_df = load_summary_file(os.path.join(node_dir, "summary.csv"))
best_results = summary_df[summary_df['filename'].isin(filenames)]
module_params = best_results['module_params'].tolist()
return module_params


def edit_summary_df_params(summary_df: pd.DataFrame, target_modules, target_module_params) -> pd.DataFrame:
def delete_ids_scores(x):
del x['ids']
del x['scores']
return x

summary_df['module_params'] = summary_df['module_params'].apply(delete_ids_scores)
summary_df['new_params'] = [{'target_modules': x, 'target_module_params': y} for x, y in zip(target_modules, target_module_params)]
summary_df['module_params'] = summary_df.apply(lambda row: {**row['module_params'], **row['new_params']}, axis=1)
summary_df = summary_df.drop(columns=['new_params'])
return summary_df


def get_ids_and_scores(node_dir: str, filenames: List[str]) -> Dict:
best_results_df = list(map(lambda filename: pd.read_parquet(os.path.join(node_dir, filename)), filenames))
ids = tuple(map(lambda df: df['retrieved_ids'].apply(list).tolist(), best_results_df))
Expand Down
39 changes: 36 additions & 3 deletions tests/autorag/nodes/retrieval/test_hybrid_rrf.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import os
import tempfile
from datetime import datetime

import chromadb
import pandas as pd
import pytest
from llama_index import OpenAIEmbedding

from autorag.nodes.retrieval.bm25 import bm25_ingest
from autorag.nodes.retrieval.hybrid_rrf import rrf_pure
from autorag.nodes.retrieval import hybrid_rrf
from autorag.nodes.retrieval.vectordb import vectordb_ingest
from tests.autorag.nodes.retrieval.test_run_retrieval_node import pseudo_node_dir


Expand Down Expand Up @@ -49,14 +54,20 @@ def pseudo_project_dir():
corpus_df = pd.DataFrame({
'doc_id': ['id-1', 'id-2', 'id-3', 'id-4', 'id-5', 'id-6', 'id-7', 'id-8', 'id-9'],
'contents': ['doc-1', 'doc-2', 'doc-3', 'doc-4', 'doc-5', 'doc-6', 'doc-7', 'doc-8', 'doc-9'],
'metadata': [{'last_modified_date': datetime.now()} for _ in range(9)]
})
os.makedirs(os.path.join(project_dir, "data"))
corpus_df.to_parquet(os.path.join(project_dir, "data", 'corpus.parquet'))
resource_dir = os.path.join(project_dir, "resources")
os.makedirs(resource_dir)
bm25_ingest(os.path.join(resource_dir, 'bm25.pkl'), corpus_df)
chroma_path = os.path.join(resource_dir, 'chroma')
db = chromadb.PersistentClient(path=chroma_path)
collection = db.create_collection(name="openai", metadata={"hnsw:space": "cosine"})
vectordb_ingest(collection, corpus_df, OpenAIEmbedding())
yield project_dir


def test_hybrid_rrf_node(pseudo_project_dir, pseudo_node_dir):
previous_result = pd.DataFrame({
previous_result = pd.DataFrame({
'qid': ['query-1', 'query-2', 'query-3'],
'query': ['query-1', 'query-2', 'query-3'],
'retrieval_gt': [
Expand All @@ -70,6 +81,9 @@ def test_hybrid_rrf_node(pseudo_project_dir, pseudo_node_dir):
['gen-1', 'gen-2']
]
})


def test_hybrid_rrf_node(pseudo_project_dir):
modules = {
'ids': ([['id-1', 'id-2', 'id-3'],
['id-1', 'id-2', 'id-3'],
Expand All @@ -96,3 +110,22 @@ def test_hybrid_rrf_node(pseudo_project_dir, pseudo_node_dir):
assert set(result_df['retrieved_ids'].tolist()[0]) == {'id-9', 'id-3', 'id-2'}
assert result_df['retrieve_scores'].tolist()[0] == pytest.approx([0.5, 0.5, 1 / 3])
assert set(result_df['retrieved_contents'].tolist()[0]) == {'doc-9', 'doc-3', 'doc-2'}


def test_hybrid_rrf_node_deploy(pseudo_project_dir):
modules = {
'target_modules': ('bm25', 'vectordb'),
'target_module_params': [
{'top_k': 3},
{'embedding_model': 'openai', 'top_k': 3}
],
'top_k': 3,
'rrf_k': 1,
}
result_df = hybrid_rrf(project_dir=pseudo_project_dir, previous_result=previous_result, **modules)
assert len(result_df) == 3
assert isinstance(result_df, pd.DataFrame)
assert set(result_df.columns) == {'retrieved_contents', 'retrieved_ids', 'retrieve_scores'}
assert len(result_df['retrieved_ids'].tolist()[0]) == 3
assert len(result_df['retrieve_scores'].tolist()[0]) == 3
assert len(result_df['retrieved_contents'].tolist()[0]) == 3
10 changes: 10 additions & 0 deletions tests/autorag/nodes/retrieval/test_run_retrieval_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ def test_run_retrieval_node(node_line_dir):
assert summary_df['filename'].nunique() == len(summary_df)
assert len(summary_df[summary_df['is_best'] == True]) == 1

# test summary_df hybrid retrieval convert well
assert all(summary_df['module_params'].apply(lambda x: 'ids' not in x))
assert all(summary_df['module_params'].apply(lambda x: 'scores' not in x))
hybrid_summary_df = summary_df[summary_df['module_name'].str.contains('hybrid')]
assert all(hybrid_summary_df['module_params'].apply(lambda x: 'target_modules' in x))
assert all(hybrid_summary_df['module_params'].apply(lambda x: 'target_module_params' in x))
assert all(hybrid_summary_df['module_params'].apply(lambda x: x['target_modules'] == ('bm25', 'vectordb')))
assert all(hybrid_summary_df['module_params'].apply(
lambda x: x['target_module_params'] == [{'top_k': 4}, {'top_k': 4, 'embedding_model': 'openai'}]))

# test the best file is saved properly
best_filename = summary_df[summary_df['is_best'] == True]['filename'].values[0]
best_path = os.path.join(node_line_dir, "retrieval", f'best_{best_filename}')
Expand Down

0 comments on commit 4c3c356

Please sign in to comment.