From cf0278ca43ab3bf2531195c116b13543c852e3f6 Mon Sep 17 00:00:00 2001 From: billvsme <994171686@qq.com> Date: Fri, 13 Dec 2024 15:42:41 +0800 Subject: [PATCH 1/2] The asyncio.as_completed() function does not guarantee that the results are ordered --- lightrag/kg/milvus_impl.py | 2 +- lightrag/storage.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lightrag/kg/milvus_impl.py b/lightrag/kg/milvus_impl.py index 6d2520ce..c72498c5 100644 --- a/lightrag/kg/milvus_impl.py +++ b/lightrag/kg/milvus_impl.py @@ -59,7 +59,7 @@ async def upsert(self, data: dict[str, dict]): embedding_tasks = [self.embedding_func(batch) for batch in batches] embeddings_list = [] for f in tqdm_async( - asyncio.as_completed(embedding_tasks), + await asyncio.gather(*embedding_tasks), total=len(embedding_tasks), desc="Generating embeddings", unit="batch", diff --git a/lightrag/storage.py b/lightrag/storage.py index 4c043893..534c6e2e 100644 --- a/lightrag/storage.py +++ b/lightrag/storage.py @@ -99,7 +99,7 @@ async def upsert(self, data: dict[str, dict]): embedding_tasks = [self.embedding_func(batch) for batch in batches] embeddings_list = [] for f in tqdm_async( - asyncio.as_completed(embedding_tasks), + await asyncio.gather(*embedding_tasks), total=len(embedding_tasks), desc="Generating embeddings", unit="batch", From a788c7819778c7a7cbeb335657cb702b627f4877 Mon Sep 17 00:00:00 2001 From: billvsme <994171686@qq.com> Date: Fri, 13 Dec 2024 16:48:22 +0800 Subject: [PATCH 2/2] keep tqdm_async work --- lightrag/kg/milvus_impl.py | 20 ++++++++++---------- lightrag/storage.py | 20 ++++++++++---------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/lightrag/kg/milvus_impl.py b/lightrag/kg/milvus_impl.py index c72498c5..fe046eb4 100644 --- a/lightrag/kg/milvus_impl.py +++ b/lightrag/kg/milvus_impl.py @@ -56,16 +56,16 @@ async def upsert(self, data: dict[str, dict]): contents[i : i + self._max_batch_size] for i in range(0, len(contents), self._max_batch_size) ] - embedding_tasks = [self.embedding_func(batch) for batch in batches] - embeddings_list = [] - for f in tqdm_async( - await asyncio.gather(*embedding_tasks), - total=len(embedding_tasks), - desc="Generating embeddings", - unit="batch", - ): - embeddings = await f - embeddings_list.append(embeddings) + + async def wrapped_task(batch): + result = await self.embedding_func(batch) + pbar.update(1) + return result + + embedding_tasks = [wrapped_task(batch) for batch in batches] + pbar = tqdm_async(total=len(embedding_tasks), desc="Generating embeddings", unit="batch") + embeddings_list = await asyncio.gather(*embedding_tasks) + embeddings = np.concatenate(embeddings_list) for i, d in enumerate(list_data): d["vector"] = embeddings[i] diff --git a/lightrag/storage.py b/lightrag/storage.py index 534c6e2e..037a9c2f 100644 --- a/lightrag/storage.py +++ b/lightrag/storage.py @@ -96,16 +96,16 @@ async def upsert(self, data: dict[str, dict]): contents[i : i + self._max_batch_size] for i in range(0, len(contents), self._max_batch_size) ] - embedding_tasks = [self.embedding_func(batch) for batch in batches] - embeddings_list = [] - for f in tqdm_async( - await asyncio.gather(*embedding_tasks), - total=len(embedding_tasks), - desc="Generating embeddings", - unit="batch", - ): - embeddings = await f - embeddings_list.append(embeddings) + + async def wrapped_task(batch): + result = await self.embedding_func(batch) + pbar.update(1) + return result + + embedding_tasks = [wrapped_task(batch) for batch in batches] + pbar = tqdm_async(total=len(embedding_tasks), desc="Generating embeddings", unit="batch") + embeddings_list = await asyncio.gather(*embedding_tasks) + embeddings = np.concatenate(embeddings_list) if len(embeddings) == len(list_data): for i, d in enumerate(list_data):