diff --git a/README.md b/README.md index 70dfe1dc..6d7360c3 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # SuperRag -Super-performant RAG pipeline for AI Agents/Assistants. +Super-performant RAG pipeline for AI Agents/Assistants. ## API @@ -23,6 +23,7 @@ Input example: } }, "index_name": "my_index", + "encoder": "my_encoder" "webhook_url": "https://my-webhook-url" } ``` @@ -41,6 +42,7 @@ Input example: } }, "index_name": "my_index", + "encoder": "my_encoder", } ``` diff --git a/api/ingest.py b/api/ingest.py index 394f54bb..3a05b7e4 100644 --- a/api/ingest.py +++ b/api/ingest.py @@ -4,9 +4,8 @@ import aiohttp from fastapi import APIRouter -import encoders -from models.ingest import EncoderEnum, RequestPayload -from service.embedding import EmbeddingService +from models.ingest import RequestPayload +from service.embedding import EmbeddingService, get_encoder router = APIRouter() @@ -21,17 +20,7 @@ async def ingest(payload: RequestPayload) -> Dict: documents = await embedding_service.generate_documents() chunks = await embedding_service.generate_chunks(documents=documents) - encoder_mapping = { - EncoderEnum.cohere: encoders.CohereEncoder, - EncoderEnum.openai: encoders.OpenAIEncoder, - EncoderEnum.huggingface: encoders.HuggingFaceEncoder, - EncoderEnum.fastembed: encoders.FastEmbedEncoder, - } - - encoder_class = encoder_mapping.get(payload.encoder) - if encoder_class is None: - raise ValueError(f"Unsupported encoder: {payload.encoder}") - encoder = encoder_class() + encoder = get_encoder(encoder_type=payload.encoder) summary_documents = await embedding_service.generate_summary_documents( documents=documents diff --git a/dev/walkthrough.ipynb b/dev/walkthrough.ipynb index e19aba4b..9c229bbc 100644 --- a/dev/walkthrough.ipynb +++ b/dev/walkthrough.ipynb @@ -53,6 +53,66 @@ "\n", "print(response.json())" ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Query the index\n", + "query_url = f\"{API_URL}/api/v1/query\"\n", + "\n", + "query_payload = {\n", + " \"input\": \"What is the best chunk strategy?\",\n", + " \"vector_database\": {\n", + " \"type\": \"pinecone\",\n", + " \"config\": {\n", + " \"api_key\": PINECONE_API_KEY,\n", + " \"host\": PINECONE_HOST,\n", + " }\n", + " },\n", + " \"index_name\": PINECONE_INDEX,\n", + " \"encoder\": \"openai\",\n", + "}\n", + "\n", + "query_response = requests.post(query_url, json=query_payload)\n", + "\n", + "print(query_response.json())\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Delete the index\n", + "query_url = f\"{API_URL}/api/v1/delete\"\n", + "\n", + "delete_payload = {\n", + " \"file_url\": \"https://arxiv.org/pdf/2402.05131.pdf\",\n", + " \"vector_database\": {\n", + " \"type\": \"pinecone\",\n", + " \"config\": {\n", + " \"api_key\": PINECONE_API_KEY,\n", + " \"host\": PINECONE_HOST,\n", + " }\n", + " },\n", + " \"index_name\": PINECONE_INDEX,\n", + "}\n", + "\n", + "delete_response = requests.delete(query_url, json=delete_payload)\n", + "\n", + "print(delete_response.json())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { diff --git a/encoders/base.py b/encoders/base.py index 5c26633f..8320b486 100644 --- a/encoders/base.py +++ b/encoders/base.py @@ -7,6 +7,7 @@ class BaseEncoder(BaseModel): name: str score_threshold: float type: str = Field(default="base") + dimension: int = Field(default=1536) class Config: arbitrary_types_allowed = True diff --git a/encoders/openai.py b/encoders/openai.py index 0a249802..caa9a494 100644 --- a/encoders/openai.py +++ b/encoders/openai.py @@ -3,16 +3,20 @@ from typing import List, Optional import openai +from dotenv import load_dotenv from openai import OpenAIError from openai.types import CreateEmbeddingResponse +from semantic_router.utils.logger import logger from encoders import BaseEncoder -from semantic_router.utils.logger import logger + +load_dotenv() class OpenAIEncoder(BaseEncoder): client: Optional[openai.Client] type: str = "openai" + dimension: int = 1536 def __init__( self, @@ -21,7 +25,7 @@ def __init__( score_threshold: float = 0.82, ): if name is None: - name = os.getenv("OPENAI_MODEL_NAME", "text-embedding-ada-002") + name = os.getenv("OPENAI_MODEL_NAME", "text-embedding-3-small") super().__init__(name=name, score_threshold=score_threshold) api_key = openai_api_key or os.getenv("OPENAI_API_KEY") if api_key is None: diff --git a/models/query.py b/models/query.py index d6958adb..978bfef5 100644 --- a/models/query.py +++ b/models/query.py @@ -1,6 +1,7 @@ from typing import List, Optional from pydantic import BaseModel +from models.ingest import EncoderEnum from models.vector_database import VectorDatabase @@ -9,6 +10,7 @@ class RequestPayload(BaseModel): input: str vector_database: VectorDatabase index_name: str + encoder: EncoderEnum = EncoderEnum.openai class ResponseData(BaseModel): diff --git a/service/embedding.py b/service/embedding.py index dc269346..1008e37b 100644 --- a/service/embedding.py +++ b/service/embedding.py @@ -10,7 +10,9 @@ from tqdm import tqdm from encoders import BaseEncoder +import encoders from models.file import File +from models.ingest import EncoderEnum from service.vector_database import get_vector_service from utils.summarise import completion @@ -85,6 +87,7 @@ async def generate_embedding(node): vector_service = get_vector_service( index_name=index_name or self.index_name, credentials=self.vector_credentials, + encoder=encoder, ) await vector_service.upsert(embeddings=[e for e in embeddings if e is not None]) @@ -102,3 +105,17 @@ async def generate_summary_documents( pbar.update() pbar.close() return summary_documents + + +def get_encoder(*, encoder_type: EncoderEnum) -> encoders.BaseEncoder: + encoder_mapping = { + EncoderEnum.cohere: encoders.CohereEncoder, + EncoderEnum.openai: encoders.OpenAIEncoder, + EncoderEnum.huggingface: encoders.HuggingFaceEncoder, + EncoderEnum.fastembed: encoders.FastEmbedEncoder, + } + + encoder_class = encoder_mapping.get(encoder_type) + if encoder_class is None: + raise ValueError(f"Unsupported encoder: {encoder_type}") + return encoder_class() diff --git a/service/router.py b/service/router.py index 58c8ef20..82109017 100644 --- a/service/router.py +++ b/service/router.py @@ -6,6 +6,7 @@ from semantic_router.route import Route from models.query import RequestPayload +from service.embedding import get_encoder from service.vector_database import VectorService, get_vector_service @@ -27,7 +28,9 @@ def create_route_layer() -> RouteLayer: return RouteLayer(encoder=encoder, routes=routes) -async def get_documents(vector_service: VectorService, payload: RequestPayload) -> List: +async def get_documents( + *, vector_service: VectorService, payload: RequestPayload +) -> List: chunks = await vector_service.query(input=payload.input, top_k=4) documents = await vector_service.convert_to_rerank_format(chunks=chunks) @@ -41,15 +44,19 @@ async def get_documents(vector_service: VectorService, payload: RequestPayload) async def query(payload: RequestPayload) -> List: rl = create_route_layer() decision = rl(payload.input).name + encoder = get_encoder(encoder_type=payload.encoder) if decision == "summarize": vector_service: VectorService = get_vector_service( index_name=f"{payload.index_name}summary", credentials=payload.vector_database, + encoder=encoder, ) - return await get_documents(vector_service, payload) + return await get_documents(vector_service=vector_service, payload=payload) vector_service: VectorService = get_vector_service( - index_name=payload.index_name, credentials=payload.vector_database + index_name=payload.index_name, + credentials=payload.vector_database, + encoder=encoder, ) - return await get_documents(vector_service, payload) + return await get_documents(vector_service=vector_service, payload=payload) diff --git a/service/vector_database.py b/service/vector_database.py index 6b784ca1..1ea429bf 100644 --- a/service/vector_database.py +++ b/service/vector_database.py @@ -1,24 +1,27 @@ from abc import ABC, abstractmethod -from typing import Any, List, Type +from typing import Any, List -import numpy as np import weaviate from astrapy.db import AstraDB from decouple import config -from fastembed import TextEmbedding from pinecone import Pinecone, ServerlessSpec from qdrant_client import QdrantClient from qdrant_client.http import models as rest from tqdm import tqdm +from encoders.base import BaseEncoder +from encoders.openai import OpenAIEncoder from models.vector_database import VectorDatabase class VectorService(ABC): - def __init__(self, index_name: str, dimension: int, credentials: dict): + def __init__( + self, index_name: str, dimension: int, credentials: dict, encoder: BaseEncoder + ): self.index_name = index_name self.dimension = dimension self.credentials = credentials + self.encoder = encoder @abstractmethod async def upsert(): @@ -37,11 +40,12 @@ async def delete(self, file_url: str): pass async def _generate_vectors(self, input: str): - embedding_model = TextEmbedding( - model_name="sentence-transformers/all-MiniLM-L6-v2" - ) - embeddings: List[np.ndarray] = list(embedding_model.embed(input)) - return embeddings[0].tolist() + # embedding_model = TextEmbedding( + # model_name="sentence-transformers/all-MiniLM-L6-v2" + # ) + # embeddings: List[np.ndarray] = list(embedding_model.embed(input)) + # return embeddings[0].tolist() + return self.encoder([input]) async def rerank(self, query: str, documents: list, top_n: int = 4): from cohere import Client @@ -65,15 +69,20 @@ async def rerank(self, query: str, documents: list, top_n: int = 4): class PineconeVectorService(VectorService): - def __init__(self, index_name: str, dimension: int, credentials: dict): + def __init__( + self, index_name: str, dimension: int, credentials: dict, encoder: BaseEncoder + ): super().__init__( - index_name=index_name, dimension=dimension, credentials=credentials + index_name=index_name, + dimension=dimension, + credentials=credentials, + encoder=encoder, ) pinecone = Pinecone(api_key=credentials["api_key"]) if index_name not in [index.name for index in pinecone.list_indexes()]: pinecone.create_index( name=self.index_name, - dimension=1536, # TODO: make it dynamic based on the encoder + dimension=dimension or 1536, metric="dotproduct", spec=ServerlessSpec(cloud="aws", region="us-west-2"), ) @@ -107,9 +116,14 @@ async def delete(self, file_url: str) -> None: class QdrantService(VectorService): - def __init__(self, index_name: str, dimension: int, credentials: dict): + def __init__( + self, index_name: str, dimension: int, credentials: dict, encoder: BaseEncoder + ): super().__init__( - index_name=index_name, dimension=dimension, credentials=credentials + index_name=index_name, + dimension=dimension, + credentials=credentials, + encoder=encoder, ) self.client = QdrantClient( url=credentials["host"], api_key=credentials["api_key"], https=True @@ -120,7 +134,7 @@ def __init__(self, index_name: str, dimension: int, credentials: dict): collection_name=self.index_name, vectors_config={ "content": rest.VectorParams( - size=1024, distance=rest.Distance.COSINE + size=dimension, distance=rest.Distance.COSINE ) }, optimizers_config=rest.OptimizersConfigDiff( @@ -186,9 +200,14 @@ async def delete(self, file_url: str) -> None: class WeaviateService(VectorService): - def __init__(self, index_name: str, dimension: int, credentials: dict): + def __init__( + self, index_name: str, dimension: int, credentials: dict, encoder: BaseEncoder + ): super().__init__( - index_name=index_name, dimension=dimension, credentials=credentials + index_name=index_name, + dimension=dimension, + credentials=credentials, + encoder=encoder, ) self.client = weaviate.Client( url=credentials["host"], @@ -251,9 +270,14 @@ async def delete(self, file_url: str) -> None: class AstraService(VectorService): - def __init__(self, index_name: str, dimension: int, credentials: dict): + def __init__( + self, index_name: str, dimension: int, credentials: dict, encoder: BaseEncoder + ): super().__init__( - index_name=index_name, dimension=dimension, credentials=credentials + index_name=index_name, + dimension=dimension, + credentials=credentials, + encoder=encoder, ) self.client = AstraDB( token=credentials["api_key"], @@ -302,8 +326,11 @@ async def delete(self, file_url: str) -> None: def get_vector_service( - index_name: str, credentials: VectorDatabase, dimension: int = 1024 -) -> Type[VectorService]: + *, + index_name: str, + credentials: VectorDatabase, + encoder: BaseEncoder = OpenAIEncoder(), +) -> VectorService: services = { "pinecone": PineconeVectorService, "qdrant": QdrantService, @@ -317,6 +344,7 @@ def get_vector_service( raise ValueError(f"Unsupported provider: {credentials.type.value}") return service( index_name=index_name, - dimension=dimension, + dimension=encoder.dimension, credentials=dict(credentials.config), + encoder=encoder, )