diff --git a/lightrag/storage.py b/lightrag/storage.py index 007d6534..4c043893 100644 --- a/lightrag/storage.py +++ b/lightrag/storage.py @@ -107,10 +107,16 @@ async def upsert(self, data: dict[str, dict]): embeddings = await f embeddings_list.append(embeddings) embeddings = np.concatenate(embeddings_list) - for i, d in enumerate(list_data): - d["__vector__"] = embeddings[i] - results = self._client.upsert(datas=list_data) - return results + if len(embeddings) == len(list_data): + for i, d in enumerate(list_data): + d["__vector__"] = embeddings[i] + results = self._client.upsert(datas=list_data) + return results + else: + # sometimes the embedding is not returned correctly. just log it. + logger.error( + f"embedding is not 1-1 with data, {len(embeddings)} != {len(list_data)}" + ) async def query(self, query: str, top_k=5): embedding = await self.embedding_func([query]) diff --git a/lightrag/utils.py b/lightrag/utils.py index 0220af06..bdb47592 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -17,6 +17,17 @@ from lightrag.prompt import PROMPTS + +class UnlimitedSemaphore: + """A context manager that allows unlimited access.""" + + async def __aenter__(self): + pass + + async def __aexit__(self, exc_type, exc, tb): + pass + + ENCODER = None logger = logging.getLogger("lightrag") @@ -42,9 +53,17 @@ class EmbeddingFunc: embedding_dim: int max_token_size: int func: callable + concurrent_limit: int = 16 + + def __post_init__(self): + if self.concurrent_limit != 0: + self._semaphore = asyncio.Semaphore(self.concurrent_limit) + else: + self._semaphore = UnlimitedSemaphore() async def __call__(self, *args, **kwargs) -> np.ndarray: - return await self.func(*args, **kwargs) + async with self._semaphore: + return await self.func(*args, **kwargs) def locate_json_string_body_from_string(content: str) -> Union[str, None]: