Skip to content

Commit

Permalink
Merge pull request #13 from homanp/finetune-gpt-3.5
Browse files Browse the repository at this point in the history
WIP
  • Loading branch information
homanp authored Oct 7, 2023
2 parents a2bc1bf + 0fd0a1e commit 6292536
Show file tree
Hide file tree
Showing 12 changed files with 255 additions and 32 deletions.
14 changes: 6 additions & 8 deletions lib/api/ingest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from fastapi import APIRouter
from service.flows import create_embeddings, create_finetune

from lib.service.embedding import EmbeddingService
from lib.models.ingest import IngestRequest
from lib.utils.prisma import prisma

router = APIRouter()
Expand All @@ -11,12 +12,9 @@
name="ingest",
description="Ingest data",
)
async def ingest(body: dict):
async def ingest(body: IngestRequest):
"""Endpoint for ingesting data"""
datasource = await prisma.datasource.create(data={**body})
embedding_service = EmbeddingService(datasource=datasource)
documents = embedding_service.generate_documents()
nodes = embedding_service.generate_chunks(documents=documents)
embeddings = embedding_service.generate_embeddings(nodes=nodes)
print(embeddings)
return {"success": True, "data": None}
await create_embeddings(datasource=datasource)
await create_finetune(datasource=datasource)
return {"success": True, "data": datasource}
Empty file added lib/models/__init__.py
Empty file.
5 changes: 5 additions & 0 deletions lib/models/ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from pydantic import BaseModel


class IngestRequest(BaseModel):
webhook_url: str
36 changes: 27 additions & 9 deletions lib/service/embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def get_datasource_suffix(self) -> str:
except KeyError:
raise ValueError("Unsupported datasource type")

def generate_documents(self) -> List[Document]:
async def generate_documents(self) -> List[Document]:
with NamedTemporaryFile(
suffix=self.get_datasource_suffix(), delete=True
) as temp_file:
Expand All @@ -37,16 +37,16 @@ def generate_documents(self) -> List[Document]:
docs = reader.load_data()
return docs

def generate_chunks(self, documents: List[Document]) -> List[Union[Document, None]]:
async def generate_chunks(
self, documents: List[Document]
) -> List[Union[Document, None]]:
parser = SimpleNodeParser.from_defaults(chunk_size=350, chunk_overlap=20)
nodes = parser.get_nodes_from_documents(documents, show_progress=True)
return nodes

# def generate_qa_pairs(self, nodes: List[Union[Document, None]]) -> Dict[str, Any]:
# qa_pairs = generate_qa_embedding_pairs(nodes=nodes)
# return qa_pairs

def generate_embeddings(self, nodes: List[Union[Document, None]]) -> List[ndarray]:
async def generate_embeddings(
self, nodes: List[Union[Document, None]]
) -> List[ndarray]:
vectordb = get_vector_service(
provider="pinecone",
index_name="all-minilm-l6-v2",
Expand All @@ -59,7 +59,25 @@ def generate_embeddings(self, nodes: List[Union[Document, None]]) -> List[ndarra
embeddings = []
for node in nodes:
if node is not None:
embedding = (node.id_, model.encode(node.text).tolist(), node.metadata)
embedding = (
node.id_,
model.encode(node.text).tolist(),
{**node.metadata, "content": node.text},
)
embeddings.append(embedding)
vectordb.upsert(vectors=embeddings)
await vectordb.upsert(vectors=embeddings)
return embeddings

# def generate_query(self):
# model = SentenceTransformer(
# "all-MiniLM-L6-v2", use_auth_token=config("HF_API_KEY")
# )
# vectordb = get_vector_service(
# provider="pinecone",
# index_name="all-minilm-l6-v2",
# namespace=self.datasource.id,
# dimension=384,
# )
# query = "How many cars were sold?"
# embedding = model.encode([query]).tolist()
# return vectordb.query(queries=embedding, top_k=5, include_metadata=True)
97 changes: 97 additions & 0 deletions lib/service/finetune.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import asyncio
import os
import uuid
from abc import ABC, abstractmethod
from typing import Dict, List, Tuple, Union

import openai
from decouple import config
from llama_index import Document
from numpy import ndarray

from lib.service.prompts import GPT_DATA_FORMAT, generate_qa_pair_prompt

openai.api_key = config("OPENAI_API_KEY")


class FinetuningService(ABC):
def __init__(self, nodes: List[Union[Document, None]]):
self.nodes = nodes

@abstractmethod
async def generate_dataset(self) -> List[Tuple[str, ndarray]]:
pass

@abstractmethod
async def finetune(self, training_file: str) -> Dict:
pass

@abstractmethod
async def cleanup(self, training_file: str) -> None:
os.remove(training_file)


class OpenAIFinetuningService(FinetuningService):
def __init__(
self,
nodes: List[Union[Document, None]],
num_questions_per_chunk: int = 10,
batch_size: int = 10,
):
super().__init__(nodes=nodes)
self.num_questions_per_chunk = num_questions_per_chunk
self.batch_size = batch_size

async def generate_prompt_and_completion(self, node):
prompt = generate_qa_pair_prompt(
context=node.text, num_of_qa_paris=10, format=GPT_DATA_FORMAT
)
completion = await openai.ChatCompletion.acreate(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
temperature=0,
)
return completion.choices[0].message.content

async def generate_dataset(self) -> str:
training_file = f"{uuid.uuid4()}.jsonl"
with open(training_file, "w") as f:
for i in range(
0, len(self.nodes), self.batch_size
): # Process nodes in chunks of batch_size
tasks = [
self.generate_prompt_and_completion(node)
for node in self.nodes[i : i + self.batch_size]
]
qa_pairs = await asyncio.gather(*tasks)
for qa_pair in qa_pairs:
json_objects = qa_pair.split("\n\n")
for json_obj in json_objects:
f.write(json_obj + "\n")

async def finetune(self, training_file: str) -> Dict:
file = openai.File.create(file=open(training_file, "rb"), purpose="fine-tune")
finetune = await openai.FineTuningJob.acreate(
training_file=file.get("id"), model="gpt-3.5-turbo"
)
return {**finetune, "training_file": training_file}


async def get_finetuning_service(
nodes: List[Union[Document, None]],
provider: str = "openai",
num_questions_per_chunk: int = 10,
batch_size: int = 10,
):
services = {
"openai": OpenAIFinetuningService,
# Add other providers here
}
service = services.get(provider)
if service is None:
raise ValueError(f"Unsupported provider: {provider}")
return service(
nodes=nodes,
num_questions_per_chunk=num_questions_per_chunk,
batch_size=batch_size,
)
50 changes: 50 additions & 0 deletions lib/service/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from typing import List, Union

import openai
from llama_index import Document
from prefect import flow, task

from lib.service.embedding import EmbeddingService
from lib.service.finetune import get_finetuning_service
from lib.utils.prisma import prisma
from prisma.models import Datasource


@task
async def create_vector_embeddings(
datasource: Datasource,
) -> List[Union[Document, None]]:
embedding_service = EmbeddingService(datasource=datasource)
documents = await embedding_service.generate_documents()
nodes = await embedding_service.generate_chunks(documents=documents)
await embedding_service.generate_embeddings(nodes=nodes)
return nodes


@task
async def create_finetuned_model(datasource: Datasource):
embedding_service = EmbeddingService(datasource=datasource)
documents = await embedding_service.generate_documents()
nodes = await embedding_service.generate_chunks(documents=documents)
finetunning_service = await get_finetuning_service(
nodes=nodes, provider="openai", batch_size=5
)
await finetunning_service.generate_dataset()
finetune_job = await finetunning_service.finetune()
finetune = await openai.FineTune.retrieve(id=finetune_job.id)
await finetunning_service.cleanup(training_file=finetune_job.get("training_file"))
return finetune


@flow(name="create_embeddings", description="Create embeddings", retries=0)
async def create_embeddings(datasource: Datasource):
await create_vector_embeddings(datasource=datasource)


@flow(name="create_finetune", description="Create a finetune", retries=0)
async def create_finetune(datasource: Datasource):
finetune = await create_finetuned_model(datasource=datasource)
await prisma.datasource.update(
where={"id": datasource.id},
data={"finetune": finetune},
)
25 changes: 25 additions & 0 deletions lib/service/prompts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# flake8: noqa

GPT_DATA_FORMAT = (
"{"
'"messages": ['
'{"role": "system", "content": "You are an AI agent that\'s an expert at answering questions."}, '
'{"role": "user", "content": "What\'s the capital of France?"}, '
'{"role": "assistant", "content": "Paris, as if everyone doesn\'t know that already."}'
"]"
"}"
)


def generate_qa_pair_prompt(format: str, context: str, num_of_qa_paris: int = 10):
prompt = (
"You are an AI assistant tasked with generating question and answer pairs"
"for the given context using the given format. Only answer in the format with"
f"no other text. You should create the following number of question/answer pairs: {num_of_qa_paris}"
"Return the question/answer pairs as a JSONL."
"Each dict in the list should have the full context provided,"
"a relevant question to the context and an answer to the question.\n\n"
f"Format:\n {format}\n\n"
f"Context:\n {context}"
)
return prompt
20 changes: 12 additions & 8 deletions lib/service/vectordb.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,27 @@ def __init__(self, index_name: str, dimension: int, namespace: str = None):
)
self.index = pinecone.Index(index_name=self.index_name)

def __del__(self):
pinecone.deinit()

def upsert(self, vectors: ndarray):
async def upsert(self, vectors: ndarray):
self.index.upsert(vectors=vectors, namespace=self.namespace)

def query(self, queries: List[ndarray], top_k: int):
return self.index.query(queries=queries, top_k=top_k)
async def query(
self, queries: List[ndarray], top_k: int, include_metadata: bool = True
):
return self.index.query(
queries=queries,
top_k=top_k,
include_metadata=include_metadata,
namespace=self.namespace,
)


def get_vector_service(
async def get_vector_service(
provider: str, index_name: str, namespace: str = None, dimension: int = 384
):
services = {
"pinecone": PineconeVectorService,
# Add other providers here
# "weaviate": WeaviateVectorService,
# e.g "weaviate": WeaviateVectorService,
}
service = services.get(provider)
if service is None:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- AlterTable
ALTER TABLE "Datasource" ADD COLUMN "finetuneId" TEXT,
ADD COLUMN "webhookUrl" TEXT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
Warnings:
- You are about to drop the column `finetuneId` on the `Datasource` table. All the data in the column will be lost.
- You are about to drop the column `webhookUrl` on the `Datasource` table. All the data in the column will be lost.
*/
-- AlterTable
ALTER TABLE "Datasource" DROP COLUMN "finetuneId",
DROP COLUMN "webhookUrl",
ADD COLUMN "finetune_id" TEXT,
ADD COLUMN "webhook_url" TEXT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
Warnings:
- You are about to drop the column `finetune_id` on the `Datasource` table. All the data in the column will be lost.
*/
-- AlterTable
ALTER TABLE "Datasource" DROP COLUMN "finetune_id",
ADD COLUMN "finetune" JSONB;
16 changes: 9 additions & 7 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ enum DatasourceStatus {
}

model Datasource {
id String @id @default(uuid())
content String? @db.Text()
status DatasourceStatus @default(IN_PROGRESS)
type DatasourceType
url String?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
id String @id @default(uuid())
content String? @db.Text()
status DatasourceStatus @default(IN_PROGRESS)
type DatasourceType
url String?
finetune Json?
webhook_url String?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
}

0 comments on commit 6292536

Please sign in to comment.