-
Notifications
You must be signed in to change notification settings - Fork 120
/
simple_app.py
142 lines (120 loc) · 4.38 KB
/
simple_app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from sentence_transformers import SentenceTransformer
from fastembed import TextEmbedding
import os
from contextlib import asynccontextmanager
from fastapi import FastAPI, responses
from infinity_emb.fastapi_schemas.pymodels import (
MultiModalOpenAIEmbedding,
OpenAIEmbeddingResult,
)
from embed import BatchedInference
from infinity_emb import AsyncEmbeddingEngine, EngineArgs
import asyncio
import numpy as np
import torch
# benchmark settings
BATCH_SIZE = os.environ.get("BATCH_SIZE", 32)
BENCHMARK_NAME = os.environ.get("BENCHMARK_NAME", "")
USE_FASTEMBED = BENCHMARK_NAME == "fastembed"
USE_INFINITY = BENCHMARK_NAME == "infinity"
USE_EMBED = BENCHMARK_NAME == "embed"
DEVICE = os.environ.get("DEVICE", "cpu")
# load large for cuda, small for cpu. (benchmarking large on cpu takes too long)
MODEL_NAME = (
"BAAI/bge-small-en-v1.5" if DEVICE == "cpu" else "BAAI/bge-large-en-v1.5"
)
# model loading
if USE_FASTEMBED:
print(f"Using fastembed with model {MODEL_NAME}")
model = TextEmbedding(MODEL_NAME, threads=None)
elif USE_EMBED:
print("Using embed")
assert BATCH_SIZE == 32
register = BatchedInference(model_id=MODEL_NAME,
device=DEVICE,
engine="torch" if DEVICE.startswith("cuda") else "optimum")
elif USE_INFINITY:
print("Using infinity")
engine = AsyncEmbeddingEngine.from_args(
EngineArgs(
model_name_or_path=MODEL_NAME,
device=DEVICE,
batch_size=BATCH_SIZE,
lengths_via_tokenize=False,
model_warmup=True,
engine="torch" if DEVICE.startswith("cuda") else "optimum",
)
)
else:
print("Using sentence transformer")
model = SentenceTransformer(MODEL_NAME, device=DEVICE)
if DEVICE == "cuda":
model.half()
def encode_fastembed(text: list[str]):
return list(model.passage_embed(text, batch_size=BATCH_SIZE))
def encode_embed(text: list[str]):
return register.embed(sentences=text, model_id=0).result()[0]
def encode_sentence_transformer(text: list[str]):
# not using multi_process_encode
# as its too slower for len(texts) < 10000
# and parallel interference
with torch.inference_mode():
return model.encode(text, batch_size=BATCH_SIZE)
async def encode_infinity(text: list[str]):
return (await engine.embed(sentences=text))[0]
@asynccontextmanager
async def lifespan(app: FastAPI):
if USE_INFINITY:
async with engine:
yield
elif USE_EMBED:
yield
register.stop()
else:
yield
app = FastAPI(
description="start via `uvicorn simple_app:app --port 7997 --reload`",
lifespan=lifespan,
)
if USE_INFINITY:
@app.post(
"/embeddings",
response_model=OpenAIEmbeddingResult,
response_class=responses.ORJSONResponse,
)
async def embed(request: MultiModalOpenAIEmbedding) -> OpenAIEmbeddingResult:
"""the goal of this code is to write an as simple as possible server
that can we rebuild by any other p
"""
# dispatch async to that multiple requests can be handled at the same time
sentences = request.input if isinstance(request.input, list) else [request.input]
encoded = await encode_infinity(sentences)
# response parsing
return OpenAIEmbeddingResult.to_embeddings_response(
encoded, MODEL_NAME, sum(len(t) for t in sentences)
)
else:
@app.post(
"/embeddings",
response_model=OpenAIEmbeddingResult,
response_class=responses.ORJSONResponse,
)
def embed(request: MultiModalOpenAIEmbedding) -> OpenAIEmbeddingResult:
"""the goal of this code is to write an as simple as possible server
that can we rebuild by any other p
"""
# dispatch async to that multiple requests can be handled at the same time
sentences = request.input if isinstance(request.input, list) else [request.input]
if USE_EMBED:
encoded = encode_embed(sentences)
elif USE_FASTEMBED:
encoded = encode_fastembed(sentences)
else:
encoded = encode_sentence_transformer(sentences)
# response parsing
return OpenAIEmbeddingResult.to_embeddings_response(
encoded, MODEL_NAME, sum(len(t) for t in sentences)
)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=7997)