From f835472bcfe2560253b26221f76a77e8baeb6128 Mon Sep 17 00:00:00 2001 From: Ismail Pelaseyed Date: Fri, 6 Oct 2023 00:17:31 +0200 Subject: [PATCH 1/6] WIP --- lib/api/ingest.py | 11 ++++--- lib/service/embedding.py | 36 ++++++++++++++++------ lib/service/finetune.py | 65 ++++++++++++++++++++++++++++++++++++++++ lib/service/prompts.py | 23 ++++++++++++++ lib/service/vectordb.py | 20 ++++++++----- 5 files changed, 134 insertions(+), 21 deletions(-) create mode 100644 lib/service/finetune.py create mode 100644 lib/service/prompts.py diff --git a/lib/api/ingest.py b/lib/api/ingest.py index af4b626..784237c 100644 --- a/lib/api/ingest.py +++ b/lib/api/ingest.py @@ -1,6 +1,7 @@ from fastapi import APIRouter from lib.service.embedding import EmbeddingService +from lib.service.finetune import get_finetuning_service from lib.utils.prisma import prisma router = APIRouter() @@ -15,8 +16,10 @@ async def ingest(body: dict): """Endpoint for ingesting data""" datasource = await prisma.datasource.create(data={**body}) embedding_service = EmbeddingService(datasource=datasource) - documents = embedding_service.generate_documents() - nodes = embedding_service.generate_chunks(documents=documents) - embeddings = embedding_service.generate_embeddings(nodes=nodes) - print(embeddings) + documents = await embedding_service.generate_documents() + nodes = await embedding_service.generate_chunks(documents=documents) + # embeddings = await embedding_service.generate_embeddings(nodes=nodes) + finetunning_service = await get_finetuning_service(nodes=nodes, provider="openai") + await finetunning_service.generate_dataset() + # print(embeddings) return {"success": True, "data": None} diff --git a/lib/service/embedding.py b/lib/service/embedding.py index 9aecd9d..3f36259 100644 --- a/lib/service/embedding.py +++ b/lib/service/embedding.py @@ -23,7 +23,7 @@ def get_datasource_suffix(self) -> str: except KeyError: raise ValueError("Unsupported datasource type") - def generate_documents(self) -> List[Document]: + async def generate_documents(self) -> List[Document]: with NamedTemporaryFile( suffix=self.get_datasource_suffix(), delete=True ) as temp_file: @@ -37,16 +37,16 @@ def generate_documents(self) -> List[Document]: docs = reader.load_data() return docs - def generate_chunks(self, documents: List[Document]) -> List[Union[Document, None]]: + async def generate_chunks( + self, documents: List[Document] + ) -> List[Union[Document, None]]: parser = SimpleNodeParser.from_defaults(chunk_size=350, chunk_overlap=20) nodes = parser.get_nodes_from_documents(documents, show_progress=True) return nodes - # def generate_qa_pairs(self, nodes: List[Union[Document, None]]) -> Dict[str, Any]: - # qa_pairs = generate_qa_embedding_pairs(nodes=nodes) - # return qa_pairs - - def generate_embeddings(self, nodes: List[Union[Document, None]]) -> List[ndarray]: + async def generate_embeddings( + self, nodes: List[Union[Document, None]] + ) -> List[ndarray]: vectordb = get_vector_service( provider="pinecone", index_name="all-minilm-l6-v2", @@ -59,7 +59,25 @@ def generate_embeddings(self, nodes: List[Union[Document, None]]) -> List[ndarra embeddings = [] for node in nodes: if node is not None: - embedding = (node.id_, model.encode(node.text).tolist(), node.metadata) + embedding = ( + node.id_, + model.encode(node.text).tolist(), + {**node.metadata, "content": node.text}, + ) embeddings.append(embedding) - vectordb.upsert(vectors=embeddings) + await vectordb.upsert(vectors=embeddings) return embeddings + + # def generate_query(self): + # model = SentenceTransformer( + # "all-MiniLM-L6-v2", use_auth_token=config("HF_API_KEY") + # ) + # vectordb = get_vector_service( + # provider="pinecone", + # index_name="all-minilm-l6-v2", + # namespace=self.datasource.id, + # dimension=384, + # ) + # query = "How many cars were sold?" + # embedding = model.encode([query]).tolist() + # return vectordb.query(queries=embedding, top_k=5, include_metadata=True) diff --git a/lib/service/finetune.py b/lib/service/finetune.py new file mode 100644 index 0000000..6118881 --- /dev/null +++ b/lib/service/finetune.py @@ -0,0 +1,65 @@ +import json +import openai +import asyncio + +from abc import ABC, abstractmethod +from typing import List, Union, Tuple +from numpy import ndarray +from decouple import config +from llama_index import Document +from lib.service.prompts import generate_qa_pair_prompt, GPT_DATA_FORMAT + +openai.api_key = config("OPENAI_API_KEY") + + +class FinetuningService(ABC): + def __init__(self, nodes: List[Union[Document, None]]): + self.nodes = nodes + + @abstractmethod + async def generate_dataset(self) -> List[Tuple[str, ndarray]]: + pass + + +class OpenAIFinetuningService(FinetuningService): + def __init__( + self, nodes: List[Union[Document, None]], num_questions_per_chunk: int = 10 + ): + super().__init__(nodes=nodes) + self.num_questions_per_chunk = num_questions_per_chunk + + async def generate_prompt_and_completion(self, node): + prompt = generate_qa_pair_prompt( + context=node.text, num_of_qa_paris=10, format=GPT_DATA_FORMAT + ) + completion = await openai.ChatCompletion.acreate( + model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}] + ) + return completion.choices[0].message.content + + async def generate_dataset(self): + with open("dataset.jsonl", "w") as f: + for i in range(0, len(self.nodes), 10): # Process nodes in chunks of 10 + tasks = [ + self.generate_prompt_and_completion(node) + for node in self.nodes[i : i + 10] + ] + results = await asyncio.gather(*tasks) + for data in results: + json.dump(data, f) + f.write("\n") + + +async def get_finetuning_service( + nodes: List[Union[Document, None]], + provider: str = "openai", + num_questions_per_chunk: int = 10, +): + services = { + "openai": OpenAIFinetuningService, + # Add other providers here + } + service = services.get(provider) + if service is None: + raise ValueError(f"Unsupported provider: {provider}") + return service(nodes=nodes, num_questions_per_chunk=num_questions_per_chunk) diff --git a/lib/service/prompts.py b/lib/service/prompts.py new file mode 100644 index 0000000..a2c5e81 --- /dev/null +++ b/lib/service/prompts.py @@ -0,0 +1,23 @@ +GPT_DATA_FORMAT = ( + "{" + "'messages': [" + "{'role': 'system', 'content': 'You are an AI agent that's an expert at answering questions.'}, " + "{'role': 'user', 'content': 'What's the capital of France?'}, " + "{'role': 'assistant', 'content': 'Paris, as if everyone doesn't know that already.'}" + "]" + "}" +) + + +def generate_qa_pair_prompt(format: str, context: str, num_of_qa_paris: int = 10): + prompt = ( + "You are an AI assistant tasked with generating question and answer pairs" + "for the given context using the given format. Only answer in the format with" + f"no other text. You should create the following number of question/answer pairs: {num_of_qa_paris}" + "Return the question/answer pairs as a Python List." + "Each dict in the list should have the full context provided," + "a relevant question to the context and an answer to the question.\n\n" + f"Format:\n {format}\n\n" + f"Context:\n {context}" + ) + return prompt diff --git a/lib/service/vectordb.py b/lib/service/vectordb.py index de59374..192ae72 100644 --- a/lib/service/vectordb.py +++ b/lib/service/vectordb.py @@ -35,23 +35,27 @@ def __init__(self, index_name: str, dimension: int, namespace: str = None): ) self.index = pinecone.Index(index_name=self.index_name) - def __del__(self): - pinecone.deinit() - - def upsert(self, vectors: ndarray): + async def upsert(self, vectors: ndarray): self.index.upsert(vectors=vectors, namespace=self.namespace) - def query(self, queries: List[ndarray], top_k: int): - return self.index.query(queries=queries, top_k=top_k) + async def query( + self, queries: List[ndarray], top_k: int, include_metadata: bool = True + ): + return self.index.query( + queries=queries, + top_k=top_k, + include_metadata=include_metadata, + namespace=self.namespace, + ) -def get_vector_service( +async def get_vector_service( provider: str, index_name: str, namespace: str = None, dimension: int = 384 ): services = { "pinecone": PineconeVectorService, # Add other providers here - # "weaviate": WeaviateVectorService, + # e.g "weaviate": WeaviateVectorService, } service = services.get(provider) if service is None: From 81f19b1fa695db50e034e7fe1f5421868a0bc3cc Mon Sep 17 00:00:00 2001 From: Ismail Pelaseyed Date: Fri, 6 Oct 2023 09:34:01 +0200 Subject: [PATCH 2/6] Add method for generating synthetic qa pairs --- lib/api/ingest.py | 4 +++- lib/service/finetune.py | 32 +++++++++++++++++++++++--------- lib/service/prompts.py | 10 +++++----- 3 files changed, 31 insertions(+), 15 deletions(-) diff --git a/lib/api/ingest.py b/lib/api/ingest.py index 784237c..019d605 100644 --- a/lib/api/ingest.py +++ b/lib/api/ingest.py @@ -19,7 +19,9 @@ async def ingest(body: dict): documents = await embedding_service.generate_documents() nodes = await embedding_service.generate_chunks(documents=documents) # embeddings = await embedding_service.generate_embeddings(nodes=nodes) - finetunning_service = await get_finetuning_service(nodes=nodes, provider="openai") + finetunning_service = await get_finetuning_service( + nodes=nodes, provider="openai", batch_size=5 + ) await finetunning_service.generate_dataset() # print(embeddings) return {"success": True, "data": None} diff --git a/lib/service/finetune.py b/lib/service/finetune.py index 6118881..1bb57b1 100644 --- a/lib/service/finetune.py +++ b/lib/service/finetune.py @@ -23,37 +23,47 @@ async def generate_dataset(self) -> List[Tuple[str, ndarray]]: class OpenAIFinetuningService(FinetuningService): def __init__( - self, nodes: List[Union[Document, None]], num_questions_per_chunk: int = 10 + self, + nodes: List[Union[Document, None]], + num_questions_per_chunk: int = 10, + batch_size: int = 10, ): super().__init__(nodes=nodes) self.num_questions_per_chunk = num_questions_per_chunk + self.batch_size = batch_size async def generate_prompt_and_completion(self, node): prompt = generate_qa_pair_prompt( context=node.text, num_of_qa_paris=10, format=GPT_DATA_FORMAT ) completion = await openai.ChatCompletion.acreate( - model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}] + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": prompt}], + temperature=0.3, ) return completion.choices[0].message.content async def generate_dataset(self): with open("dataset.jsonl", "w") as f: - for i in range(0, len(self.nodes), 10): # Process nodes in chunks of 10 + for i in range( + 0, len(self.nodes), self.batch_size + ): # Process nodes in chunks of batch_size tasks = [ self.generate_prompt_and_completion(node) - for node in self.nodes[i : i + 10] + for node in self.nodes[i : i + self.batch_size] ] - results = await asyncio.gather(*tasks) - for data in results: - json.dump(data, f) - f.write("\n") + qa_pairs = await asyncio.gather(*tasks) + for qa_pair in qa_pairs: + json_objects = qa_pair.split("\n\n") + for json_obj in json_objects: + f.write(json_obj + "\n") async def get_finetuning_service( nodes: List[Union[Document, None]], provider: str = "openai", num_questions_per_chunk: int = 10, + batch_size: int = 10, ): services = { "openai": OpenAIFinetuningService, @@ -62,4 +72,8 @@ async def get_finetuning_service( service = services.get(provider) if service is None: raise ValueError(f"Unsupported provider: {provider}") - return service(nodes=nodes, num_questions_per_chunk=num_questions_per_chunk) + return service( + nodes=nodes, + num_questions_per_chunk=num_questions_per_chunk, + batch_size=batch_size, + ) diff --git a/lib/service/prompts.py b/lib/service/prompts.py index a2c5e81..03f3bb4 100644 --- a/lib/service/prompts.py +++ b/lib/service/prompts.py @@ -1,9 +1,9 @@ GPT_DATA_FORMAT = ( "{" - "'messages': [" - "{'role': 'system', 'content': 'You are an AI agent that's an expert at answering questions.'}, " - "{'role': 'user', 'content': 'What's the capital of France?'}, " - "{'role': 'assistant', 'content': 'Paris, as if everyone doesn't know that already.'}" + '"messages": [' + '{"role": "system", "content": "You are an AI agent that\'s an expert at answering questions."}, ' + '{"role": "user", "content": "What\'s the capital of France?"}, ' + '{"role": "assistant", "content": "Paris, as if everyone doesn\'t know that already."}' "]" "}" ) @@ -14,7 +14,7 @@ def generate_qa_pair_prompt(format: str, context: str, num_of_qa_paris: int = 10 "You are an AI assistant tasked with generating question and answer pairs" "for the given context using the given format. Only answer in the format with" f"no other text. You should create the following number of question/answer pairs: {num_of_qa_paris}" - "Return the question/answer pairs as a Python List." + "Return the question/answer pairs as a JSONL." "Each dict in the list should have the full context provided," "a relevant question to the context and an answer to the question.\n\n" f"Format:\n {format}\n\n" From f0054022e408d785afe3cd4340e5ac894e4045a1 Mon Sep 17 00:00:00 2001 From: Ismail Pelaseyed Date: Sat, 7 Oct 2023 21:45:10 +0200 Subject: [PATCH 3/6] Save finetune object to datasource --- lib/api/ingest.py | 18 ++++++++++--- lib/models/__init__.py | 0 lib/models/ingest.py | 5 ++++ lib/service/finetune.py | 27 +++++++++++++++---- .../migration.sql | 3 +++ .../migration.sql | 12 +++++++++ .../migration.sql | 9 +++++++ prisma/schema.prisma | 16 ++++++----- 8 files changed, 74 insertions(+), 16 deletions(-) create mode 100644 lib/models/__init__.py create mode 100644 lib/models/ingest.py create mode 100644 prisma/migrations/20231007192636_datasource_finetune_webhook/migration.sql create mode 100644 prisma/migrations/20231007193258_datasource_fields_update/migration.sql create mode 100644 prisma/migrations/20231007194311_datasource_finetune_object/migration.sql diff --git a/lib/api/ingest.py b/lib/api/ingest.py index 019d605..2741bea 100644 --- a/lib/api/ingest.py +++ b/lib/api/ingest.py @@ -1,8 +1,11 @@ +import openai + from fastapi import APIRouter from lib.service.embedding import EmbeddingService from lib.service.finetune import get_finetuning_service from lib.utils.prisma import prisma +from lib.models.ingest import IngestRequest router = APIRouter() @@ -12,16 +15,23 @@ name="ingest", description="Ingest data", ) -async def ingest(body: dict): +async def ingest(body: IngestRequest): """Endpoint for ingesting data""" + webhook_url = body.webhook_url datasource = await prisma.datasource.create(data={**body}) embedding_service = EmbeddingService(datasource=datasource) documents = await embedding_service.generate_documents() nodes = await embedding_service.generate_chunks(documents=documents) - # embeddings = await embedding_service.generate_embeddings(nodes=nodes) + await embedding_service.generate_embeddings(nodes=nodes) finetunning_service = await get_finetuning_service( nodes=nodes, provider="openai", batch_size=5 ) await finetunning_service.generate_dataset() - # print(embeddings) - return {"success": True, "data": None} + finetune_job = await finetunning_service.finetune() + finetune = await openai.FineTune.retrieve(id=finetune_job.id) + await prisma.datasource.update( + where={"id": datasource.id}, + data={"webhook_url": webhook_url, "finetune": finetune}, + ) + await finetunning_service.cleanup(training_file=finetune_job.get("training_file")) + return {"success": True, "data": datasource} diff --git a/lib/models/__init__.py b/lib/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lib/models/ingest.py b/lib/models/ingest.py new file mode 100644 index 0000000..d1fc22f --- /dev/null +++ b/lib/models/ingest.py @@ -0,0 +1,5 @@ +from pydantic import BaseModel + + +class IngestRequest(BaseModel): + webhook_url: str diff --git a/lib/service/finetune.py b/lib/service/finetune.py index 1bb57b1..1c2a2f5 100644 --- a/lib/service/finetune.py +++ b/lib/service/finetune.py @@ -1,9 +1,10 @@ -import json +import os +import uuid import openai import asyncio from abc import ABC, abstractmethod -from typing import List, Union, Tuple +from typing import Dict, List, Union, Tuple from numpy import ndarray from decouple import config from llama_index import Document @@ -20,6 +21,14 @@ def __init__(self, nodes: List[Union[Document, None]]): async def generate_dataset(self) -> List[Tuple[str, ndarray]]: pass + @abstractmethod + async def finetune(self, training_file: str) -> Dict: + pass + + @abstractmethod + async def cleanup(self, training_file: str) -> None: + os.remove(training_file) + class OpenAIFinetuningService(FinetuningService): def __init__( @@ -39,12 +48,13 @@ async def generate_prompt_and_completion(self, node): completion = await openai.ChatCompletion.acreate( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}], - temperature=0.3, + temperature=0, ) return completion.choices[0].message.content - async def generate_dataset(self): - with open("dataset.jsonl", "w") as f: + async def generate_dataset(self) -> str: + training_file = f"{uuid.uuid4()}.jsonl" + with open(training_file, "w") as f: for i in range( 0, len(self.nodes), self.batch_size ): # Process nodes in chunks of batch_size @@ -58,6 +68,13 @@ async def generate_dataset(self): for json_obj in json_objects: f.write(json_obj + "\n") + async def finetune(self, training_file: str) -> Dict: + file = openai.File.create(file=open(training_file, "rb"), purpose="fine-tune") + finetune = await openai.FineTuningJob.acreate( + training_file=file.get("id"), model="gpt-3.5-turbo" + ) + return {**finetune, "training_file": training_file} + async def get_finetuning_service( nodes: List[Union[Document, None]], diff --git a/prisma/migrations/20231007192636_datasource_finetune_webhook/migration.sql b/prisma/migrations/20231007192636_datasource_finetune_webhook/migration.sql new file mode 100644 index 0000000..0f36426 --- /dev/null +++ b/prisma/migrations/20231007192636_datasource_finetune_webhook/migration.sql @@ -0,0 +1,3 @@ +-- AlterTable +ALTER TABLE "Datasource" ADD COLUMN "finetuneId" TEXT, +ADD COLUMN "webhookUrl" TEXT; diff --git a/prisma/migrations/20231007193258_datasource_fields_update/migration.sql b/prisma/migrations/20231007193258_datasource_fields_update/migration.sql new file mode 100644 index 0000000..f7477af --- /dev/null +++ b/prisma/migrations/20231007193258_datasource_fields_update/migration.sql @@ -0,0 +1,12 @@ +/* + Warnings: + + - You are about to drop the column `finetuneId` on the `Datasource` table. All the data in the column will be lost. + - You are about to drop the column `webhookUrl` on the `Datasource` table. All the data in the column will be lost. + +*/ +-- AlterTable +ALTER TABLE "Datasource" DROP COLUMN "finetuneId", +DROP COLUMN "webhookUrl", +ADD COLUMN "finetune_id" TEXT, +ADD COLUMN "webhook_url" TEXT; diff --git a/prisma/migrations/20231007194311_datasource_finetune_object/migration.sql b/prisma/migrations/20231007194311_datasource_finetune_object/migration.sql new file mode 100644 index 0000000..5dbb23d --- /dev/null +++ b/prisma/migrations/20231007194311_datasource_finetune_object/migration.sql @@ -0,0 +1,9 @@ +/* + Warnings: + + - You are about to drop the column `finetune_id` on the `Datasource` table. All the data in the column will be lost. + +*/ +-- AlterTable +ALTER TABLE "Datasource" DROP COLUMN "finetune_id", +ADD COLUMN "finetune" JSONB; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index ad2308d..efa399c 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -23,11 +23,13 @@ enum DatasourceStatus { } model Datasource { - id String @id @default(uuid()) - content String? @db.Text() - status DatasourceStatus @default(IN_PROGRESS) - type DatasourceType - url String? - createdAt DateTime @default(now()) - updatedAt DateTime @updatedAt + id String @id @default(uuid()) + content String? @db.Text() + status DatasourceStatus @default(IN_PROGRESS) + type DatasourceType + url String? + finetune Json? + webhook_url String? + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt } From d7b817fbdc5a23708e8bfd4b6b77c63d23858769 Mon Sep 17 00:00:00 2001 From: Ismail Pelaseyed Date: Sat, 7 Oct 2023 21:47:01 +0200 Subject: [PATCH 4/6] Fix linting --- lib/api/ingest.py | 3 +-- lib/service/finetune.py | 13 +++++++------ lib/service/prompts.py | 2 ++ 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/lib/api/ingest.py b/lib/api/ingest.py index 2741bea..44c5466 100644 --- a/lib/api/ingest.py +++ b/lib/api/ingest.py @@ -1,11 +1,10 @@ import openai - from fastapi import APIRouter +from lib.models.ingest import IngestRequest from lib.service.embedding import EmbeddingService from lib.service.finetune import get_finetuning_service from lib.utils.prisma import prisma -from lib.models.ingest import IngestRequest router = APIRouter() diff --git a/lib/service/finetune.py b/lib/service/finetune.py index 1c2a2f5..eb67f66 100644 --- a/lib/service/finetune.py +++ b/lib/service/finetune.py @@ -1,14 +1,15 @@ +import asyncio import os import uuid -import openai -import asyncio - from abc import ABC, abstractmethod -from typing import Dict, List, Union, Tuple -from numpy import ndarray +from typing import Dict, List, Tuple, Union + +import openai from decouple import config from llama_index import Document -from lib.service.prompts import generate_qa_pair_prompt, GPT_DATA_FORMAT +from numpy import ndarray + +from lib.service.prompts import GPT_DATA_FORMAT, generate_qa_pair_prompt openai.api_key = config("OPENAI_API_KEY") diff --git a/lib/service/prompts.py b/lib/service/prompts.py index 03f3bb4..223735c 100644 --- a/lib/service/prompts.py +++ b/lib/service/prompts.py @@ -1,3 +1,5 @@ +# flake8: noqa + GPT_DATA_FORMAT = ( "{" '"messages": [' From 579fa517aee2738809b1decc0d4487de7c16c21f Mon Sep 17 00:00:00 2001 From: Ismail Pelaseyed Date: Sat, 7 Oct 2023 22:40:09 +0200 Subject: [PATCH 5/6] Add embedding and finetune flows --- lib/api/ingest.py | 22 +++---------------- lib/service/flows.py | 50 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 19 deletions(-) create mode 100644 lib/service/flows.py diff --git a/lib/api/ingest.py b/lib/api/ingest.py index 44c5466..5fc1c74 100644 --- a/lib/api/ingest.py +++ b/lib/api/ingest.py @@ -1,9 +1,7 @@ -import openai from fastapi import APIRouter from lib.models.ingest import IngestRequest -from lib.service.embedding import EmbeddingService -from lib.service.finetune import get_finetuning_service +from service.flows import create_finetune, create_embeddings from lib.utils.prisma import prisma router = APIRouter() @@ -16,21 +14,7 @@ ) async def ingest(body: IngestRequest): """Endpoint for ingesting data""" - webhook_url = body.webhook_url datasource = await prisma.datasource.create(data={**body}) - embedding_service = EmbeddingService(datasource=datasource) - documents = await embedding_service.generate_documents() - nodes = await embedding_service.generate_chunks(documents=documents) - await embedding_service.generate_embeddings(nodes=nodes) - finetunning_service = await get_finetuning_service( - nodes=nodes, provider="openai", batch_size=5 - ) - await finetunning_service.generate_dataset() - finetune_job = await finetunning_service.finetune() - finetune = await openai.FineTune.retrieve(id=finetune_job.id) - await prisma.datasource.update( - where={"id": datasource.id}, - data={"webhook_url": webhook_url, "finetune": finetune}, - ) - await finetunning_service.cleanup(training_file=finetune_job.get("training_file")) + await create_embeddings(datasource=datasource) + await create_finetune(datasource=datasource) return {"success": True, "data": datasource} diff --git a/lib/service/flows.py b/lib/service/flows.py new file mode 100644 index 0000000..2a8182e --- /dev/null +++ b/lib/service/flows.py @@ -0,0 +1,50 @@ +import openai + +from typing import List, Union +from llama_index import Document +from prisma.models import Datasource +from prefect import flow, task +from lib.models.ingest import IngestRequest +from lib.service.embedding import EmbeddingService +from lib.service.finetune import get_finetuning_service +from lib.utils.prisma import prisma + + +@task +async def create_vector_embeddings( + datasource: Datasource, +) -> List[Union[Document, None]]: + embedding_service = EmbeddingService(datasource=datasource) + documents = await embedding_service.generate_documents() + nodes = await embedding_service.generate_chunks(documents=documents) + await embedding_service.generate_embeddings(nodes=nodes) + return nodes + + +@task +async def create_finetuned_model(datasource: Datasource): + embedding_service = EmbeddingService(datasource=datasource) + documents = await embedding_service.generate_documents() + nodes = await embedding_service.generate_chunks(documents=documents) + finetunning_service = await get_finetuning_service( + nodes=nodes, provider="openai", batch_size=5 + ) + await finetunning_service.generate_dataset() + finetune_job = await finetunning_service.finetune() + finetune = await openai.FineTune.retrieve(id=finetune_job.id) + await finetunning_service.cleanup(training_file=finetune_job.get("training_file")) + return finetune + + +@flow(name="create_embeddings", description="Create embeddings", retries=0) +async def create_embeddings(datasource: Datasource): + await create_vector_embeddings(datasource=datasource) + + +@flow(name="create_finetune", description="Create a finetune", retries=0) +async def create_finetune(datasource: Datasource): + finetune = await create_finetuned_model(datasource=datasource) + await prisma.datasource.update( + where={"id": datasource.id}, + data={"finetune": finetune}, + ) From 0fd0a1ee83836885dd73fc77fb1c5e9524778637 Mon Sep 17 00:00:00 2001 From: Ismail Pelaseyed Date: Sat, 7 Oct 2023 22:40:46 +0200 Subject: [PATCH 6/6] Fix formatting --- lib/api/ingest.py | 2 +- lib/service/flows.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/api/ingest.py b/lib/api/ingest.py index 5fc1c74..ece3f61 100644 --- a/lib/api/ingest.py +++ b/lib/api/ingest.py @@ -1,7 +1,7 @@ from fastapi import APIRouter +from service.flows import create_embeddings, create_finetune from lib.models.ingest import IngestRequest -from service.flows import create_finetune, create_embeddings from lib.utils.prisma import prisma router = APIRouter() diff --git a/lib/service/flows.py b/lib/service/flows.py index 2a8182e..4c1c4f4 100644 --- a/lib/service/flows.py +++ b/lib/service/flows.py @@ -1,13 +1,13 @@ -import openai - from typing import List, Union + +import openai from llama_index import Document -from prisma.models import Datasource from prefect import flow, task -from lib.models.ingest import IngestRequest + from lib.service.embedding import EmbeddingService from lib.service.finetune import get_finetuning_service from lib.utils.prisma import prisma +from prisma.models import Datasource @task