From 415fb1426e139de8e02af04b91088ae4d05538be Mon Sep 17 00:00:00 2001 From: bogdankostic Date: Tue, 30 May 2023 17:25:40 +0200 Subject: [PATCH 1/7] Include benchmark config in output --- test/benchmarks/run.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/benchmarks/run.py b/test/benchmarks/run.py index 5de92bedee..1c20627993 100644 --- a/test/benchmarks/run.py +++ b/test/benchmarks/run.py @@ -59,7 +59,8 @@ def run_benchmark(pipeline_yaml: Path) -> Dict: else: raise ValueError("Pipeline must be a retriever, reader, or retriever-reader pipeline.") - results["config_file"] = pipeline_config + pipeline_config["benchmark_config"] = benchmark_config + results["config"] = pipeline_config return results From f0b54bf233920a5a3a76000d9a50aec885343f50 Mon Sep 17 00:00:00 2001 From: bogdankostic Date: Wed, 31 May 2023 14:11:32 +0200 Subject: [PATCH 2/7] Use queries from aggregated labels --- test/benchmarks/retriever.py | 3 ++- test/benchmarks/retriever_reader.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/test/benchmarks/retriever.py b/test/benchmarks/retriever.py index 431a4acc6e..132ea76409 100644 --- a/test/benchmarks/retriever.py +++ b/test/benchmarks/retriever.py @@ -91,8 +91,9 @@ def benchmark_querying(pipeline: Pipeline, eval_set: Path) -> Dict: """ try: # Load eval data - labels, queries = load_eval_data(eval_set) + labels, _ = load_eval_data(eval_set) multi_labels = aggregate_labels(labels) + queries = [label.query for label in multi_labels] # Run querying start_time = perf_counter() diff --git a/test/benchmarks/retriever_reader.py b/test/benchmarks/retriever_reader.py index f884058775..7de03b58f7 100644 --- a/test/benchmarks/retriever_reader.py +++ b/test/benchmarks/retriever_reader.py @@ -41,8 +41,9 @@ def benchmark_querying(pipeline: Pipeline, eval_set: Path) -> Dict: """ try: # Load eval data - labels, queries = load_eval_data(eval_set) + labels, _ = load_eval_data(eval_set) multi_labels = aggregate_labels(labels) + queries = [label.query for label in multi_labels] # Run querying start_time = perf_counter() From 6f1bc31062a02b211679e9da91a797c948dbeaef Mon Sep 17 00:00:00 2001 From: bogdankostic Date: Thu, 1 Jun 2023 16:56:09 +0200 Subject: [PATCH 3/7] Introduce batching for querying in ElasticsearchDocStore and OpenSearchDocStore --- haystack/document_stores/elasticsearch.py | 4 ++ haystack/document_stores/opensearch.py | 13 +++- haystack/document_stores/search_engine.py | 76 ++++++++++++++-------- test/document_stores/test_search_engine.py | 14 ++++ 4 files changed, 78 insertions(+), 29 deletions(-) diff --git a/haystack/document_stores/elasticsearch.py b/haystack/document_stores/elasticsearch.py index 3d453d41ad..80b3ca0ca3 100644 --- a/haystack/document_stores/elasticsearch.py +++ b/haystack/document_stores/elasticsearch.py @@ -56,6 +56,7 @@ def __init__( synonyms: Optional[List] = None, synonym_type: str = "synonym", use_system_proxy: bool = False, + batch_size: int = 1000, ): """ A DocumentStore using Elasticsearch to store and query the documents for our search. @@ -127,6 +128,8 @@ def __init__( Synonym or Synonym_graph to handle synonyms, including multi-word synonyms correctly during the analysis process. More info at https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-synonym-graph-tokenfilter.html :param use_system_proxy: Whether to use system proxy. + :param batch_size: Number of Documents to index at once / Number of queries to execute at once. If you face + memory issues, decrease the batch_size. """ # Base constructor might need the client to be ready, create it first @@ -167,6 +170,7 @@ def __init__( skip_missing_embeddings=skip_missing_embeddings, synonyms=synonyms, synonym_type=synonym_type, + batch_size=batch_size, ) # Let the base class trap the right exception from the elasticpy client diff --git a/haystack/document_stores/opensearch.py b/haystack/document_stores/opensearch.py index a3cd45261e..5a0dfbd33d 100644 --- a/haystack/document_stores/opensearch.py +++ b/haystack/document_stores/opensearch.py @@ -74,6 +74,7 @@ def __init__( knn_engine: str = "nmslib", knn_parameters: Optional[Dict] = None, ivf_train_size: Optional[int] = None, + batch_size: int = 1000, ): """ Document Store using OpenSearch (https://opensearch.org/). It is compatible with the Amazon OpenSearch Service. @@ -165,6 +166,8 @@ def __init__( index type and knn parameters). If `0`, training doesn't happen automatically but needs to be triggered manually via the `train_index` method. Default: `None` + :param batch_size: Number of Documents to index at once / Number of queries to execute at once. If you face + memory issues, decrease the batch_size. """ # These parameters aren't used by Opensearch at the moment but could be in the future, see # https://github.com/opensearch-project/security/issues/1504. Let's not deprecate them for @@ -243,6 +246,7 @@ def __init__( skip_missing_embeddings=skip_missing_embeddings, synonyms=synonyms, synonym_type=synonym_type, + batch_size=batch_size, ) # Let the base class catch the right error from the Opensearch client @@ -529,6 +533,7 @@ def query_by_embedding_batch( return_embedding: Optional[bool] = None, headers: Optional[Dict[str, str]] = None, scale_score: bool = True, + batch_size: Optional[int] = None, ) -> List[List[Document]]: """ Find the documents that are most similar to the provided `query_embs` by using a vector similarity metric. @@ -605,17 +610,19 @@ def query_by_embedding_batch( Check out https://www.elastic.co/guide/en/elasticsearch/reference/current/http-clients.html for more information. :param scale_score: Whether to scale the similarity score to the unit interval (range of [0,1]). If true (default) similarity scores (e.g. cosine or dot_product) which naturally have a different value range will be scaled to a range of [0,1], where 1 means extremely relevant. - Otherwise raw similarity scores (e.g. cosine or dot_product) will be used. - :return: + Otherwise, raw similarity scores (e.g. cosine or dot_product) will be used. + :param batch_size: Number of query embeddings to process at once. If not specified, self.batch_size is used. """ if index is None: index = self.index + batch_size = batch_size or self.batch_size + if self.index_type in ["ivf", "ivf_pq"] and not self._ivf_model_exists(index=index): self._ivf_index_not_trained_error(index=index, headers=headers) return super().query_by_embedding_batch( - query_embs, filters, top_k, index, return_embedding, headers, scale_score + query_embs, filters, top_k, index, return_embedding, headers, scale_score, batch_size ) def query( diff --git a/haystack/document_stores/search_engine.py b/haystack/document_stores/search_engine.py index d4acf0507d..2bfa34d2d4 100644 --- a/haystack/document_stores/search_engine.py +++ b/haystack/document_stores/search_engine.py @@ -70,6 +70,7 @@ def __init__( skip_missing_embeddings: bool = True, synonyms: Optional[List] = None, synonym_type: str = "synonym", + batch_size: int = 1000, ): super().__init__() @@ -98,6 +99,7 @@ def __init__( self.skip_missing_embeddings: bool = skip_missing_embeddings self.duplicate_documents = duplicate_documents self.refresh_type = refresh_type + self.batch_size = batch_size if similarity in ["cosine", "dot_product", "l2"]: self.similarity: str = similarity else: @@ -367,7 +369,7 @@ def write_documents( self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, - batch_size: int = 10_000, + batch_size: Optional[int] = None, duplicate_documents: Optional[str] = None, headers: Optional[Dict[str, str]] = None, ): @@ -390,6 +392,7 @@ def write_documents( to what you have set for self.content_field and self.name_field. :param index: search index where the documents should be indexed. If you don't specify it, self.index is used. :param batch_size: Number of documents that are passed to the bulk function at each round. + If not specified, self.batch_size is used. :param duplicate_documents: Handle duplicate documents based on parameter options. Parameter options: ( 'skip','overwrite','fail') skip: Ignore the duplicate documents @@ -407,6 +410,9 @@ def write_documents( if index is None: index = self.index + + batch_size = batch_size or self.batch_size + duplicate_documents = duplicate_documents or self.duplicate_documents assert ( duplicate_documents in self.duplicate_documents_options @@ -923,9 +929,10 @@ def query_batch( headers: Optional[Dict[str, str]] = None, all_terms_must_match: bool = False, scale_score: bool = True, + batch_size: Optional[int] = None, ) -> List[List[Document]]: """ - Scan through documents in DocumentStore and return a small number documents + Scan through documents in DocumentStore and return a small number of documents that are most relevant to the provided queries as defined by keyword matching algorithms like BM25. This method lets you find relevant documents for list of query strings (output: List of Lists of Documents). @@ -1005,17 +1012,19 @@ def query_batch( :param headers: Custom HTTP headers to pass to document store client if supported (e.g. {'Authorization': 'Basic YWRtaW46cm9vdA=='} for basic authentication) :param all_terms_must_match: Whether all terms of the query must match the document. If true all query terms must be present in a document in order to be retrieved (i.e the AND operator is being used implicitly between query terms: "cozy fish restaurant" -> "cozy AND fish AND restaurant"). - Otherwise at least one query term must be present in a document in order to be retrieved (i.e the OR operator is being used implicitly between query terms: "cozy fish restaurant" -> "cozy OR fish OR restaurant"). + Otherwise, at least one query term must be present in a document in order to be retrieved (i.e the OR operator is being used implicitly between query terms: "cozy fish restaurant" -> "cozy OR fish OR restaurant"). Defaults to False. :param scale_score: Whether to scale the similarity score to the unit interval (range of [0,1]). If true (default) similarity scores (e.g. cosine or dot_product) which naturally have a different value range will be scaled to a range of [0,1], where 1 means extremely relevant. - Otherwise raw similarity scores (e.g. cosine or dot_product) will be used. + Otherwise, raw similarity scores (e.g. cosine or dot_product) will be used. + :param batch_size: Number of queries that are processed at once. If not specified, self.batch_size is used. """ if index is None: index = self.index if headers is None: headers = {} + batch_size = batch_size or self.batch_size if isinstance(filters, list): if len(filters) != len(queries): @@ -1027,6 +1036,7 @@ def query_batch( filters = [filters] * len(queries) body = [] + all_documents = [] for query, cur_filters in zip(queries, filters): cur_query_body = self._construct_query_body( query=query, @@ -1038,17 +1048,27 @@ def query_batch( body.append(headers) body.append(cur_query_body) - responses = self.client.msearch(index=index, body=body) + if len(body) == 2 * batch_size: + cur_documents = self._execute_msearch(index=index, body=body, scale_score=scale_score) + all_documents.extend(cur_documents) + body = [] - all_documents = [] - cur_documents = [] - for response in responses["responses"]: - cur_result = response["hits"]["hits"] - cur_documents = [self._convert_es_hit_to_document(hit, scale_score=scale_score) for hit in cur_result] - all_documents.append(cur_documents) + if len(body) > 0: + cur_documents = self._execute_msearch(index=index, body=body, scale_score=scale_score) + all_documents.extend(cur_documents) return all_documents + def _execute_msearch(self, index: str, body: List[Dict[str, Any]], scale_score: bool) -> List[List[Document]]: + responses = self.client.msearch(index=index, body=body) + documents = [] + for response in responses["responses"]: + result = response["hits"]["hits"] + cur_documents = [self._convert_es_hit_to_document(hit, scale_score=scale_score) for hit in result] + documents.append(cur_documents) + + return documents + def _construct_query_body( self, query: Optional[str], @@ -1188,6 +1208,7 @@ def query_by_embedding_batch( return_embedding: Optional[bool] = None, headers: Optional[Dict[str, str]] = None, scale_score: bool = True, + batch_size: Optional[int] = None, ) -> List[List[Document]]: """ Find the documents that are most similar to the provided `query_embs` by using a vector similarity metric. @@ -1264,8 +1285,8 @@ def query_by_embedding_batch( Check out https://www.elastic.co/guide/en/elasticsearch/reference/current/http-clients.html for more information. :param scale_score: Whether to scale the similarity score to the unit interval (range of [0,1]). If true (default) similarity scores (e.g. cosine or dot_product) which naturally have a different value range will be scaled to a range of [0,1], where 1 means extremely relevant. - Otherwise raw similarity scores (e.g. cosine or dot_product) will be used. - :return: + Otherwise, raw similarity scores (e.g. cosine or dot_product) will be used. + :param batch_size: Number of query embeddings to process at once. If not specified, self.batch_size is used. """ if index is None: index = self.index @@ -1276,6 +1297,8 @@ def query_by_embedding_batch( if headers is None: headers = {} + batch_size = batch_size or self.batch_size + if not self.embedding_field: raise DocumentStoreError("Please set a valid `embedding_field` for OpenSearchDocumentStore") @@ -1289,25 +1312,24 @@ def query_by_embedding_batch( filters = [filters] * len(query_embs) if filters is not None else [{}] * len(query_embs) body = [] - for query_emb, cur_filters in zip(query_embs, filters): + all_documents = [] + for query_emb, cur_filters in tqdm(zip(query_embs, filters)): cur_query_body = self._construct_dense_query_body( query_emb=query_emb, filters=cur_filters, top_k=top_k, return_embedding=return_embedding ) body.append(headers) body.append(cur_query_body) - logger.debug("Retriever query: %s", body) - responses = self.client.msearch(index=index, body=body) + if len(body) >= batch_size * 2: + logger.debug("Retriever query: %s", body) + cur_documents = self._execute_msearch(index=index, body=body, scale_score=scale_score) + all_documents.extend(cur_documents) + body = [] - all_documents = [] - cur_documents = [] - for response in responses["responses"]: - cur_result = response["hits"]["hits"] - cur_documents = [ - self._convert_es_hit_to_document(hit, adapt_score_for_embedding=True, scale_score=scale_score) - for hit in cur_result - ] - all_documents.append(cur_documents) + if len(body) > 0: + logger.debug("Retriever query: %s", body) + cur_documents = self._execute_msearch(index=index, body=body, scale_score=scale_score) + all_documents.extend(cur_documents) return all_documents @@ -1323,7 +1345,7 @@ def update_embeddings( index: Optional[str] = None, filters: Optional[FilterType] = None, update_existing_embeddings: bool = True, - batch_size: int = 10_000, + batch_size: Optional[int] = None, headers: Optional[Dict[str, str]] = None, ): """ @@ -1370,6 +1392,8 @@ def update_embeddings( if index is None: index = self.index + batch_size = batch_size or self.batch_size + if self.refresh_type == "false": self.client.indices.refresh(index=index, headers=headers) diff --git a/test/document_stores/test_search_engine.py b/test/document_stores/test_search_engine.py index e83adad247..a99d03c441 100644 --- a/test/document_stores/test_search_engine.py +++ b/test/document_stores/test_search_engine.py @@ -1,4 +1,6 @@ from unittest.mock import MagicMock + +import numpy as np import pytest from haystack.document_stores.search_engine import SearchEngineDocumentStore, prepare_hosts @@ -167,6 +169,18 @@ def test_get_all_labels_legacy_document_id(self, mocked_document_store, mocked_g labels = mocked_document_store.get_all_labels() assert labels[0].answer.document_ids == ["fc18c987a8312e72a47fb1524f230bb0"] + @pytest.mark.unit + def test_query_batch_req_for_each_batch(self, mocked_document_store): + mocked_document_store.batch_size = 2 + mocked_document_store.query_batch([self.query] * 3) + assert mocked_document_store.client.msearch.call_count == 2 + + @pytest.mark.unit + def test_query_by_embedding_batch_req_for_each_batch(self, mocked_document_store): + mocked_document_store.batch_size = 2 + mocked_document_store.query_by_embedding_batch([np.array([1, 2, 3])] * 3) + assert mocked_document_store.client.msearch.call_count == 2 + @pytest.mark.document_store class TestSearchEngineDocumentStore: From e5536aa173cf0fca9958b9f32e98d6892e8b411d Mon Sep 17 00:00:00 2001 From: bogdankostic Date: Thu, 1 Jun 2023 17:19:26 +0200 Subject: [PATCH 4/7] Fix mypy --- haystack/document_stores/opensearch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/haystack/document_stores/opensearch.py b/haystack/document_stores/opensearch.py index 5a0dfbd33d..f1abf41670 100644 --- a/haystack/document_stores/opensearch.py +++ b/haystack/document_stores/opensearch.py @@ -325,7 +325,7 @@ def write_documents( self, documents: Union[List[dict], List[Document]], index: Optional[str] = None, - batch_size: int = 10_000, + batch_size: Optional[int] = None, duplicate_documents: Optional[str] = None, headers: Optional[Dict[str, str]] = None, ): From e485ef7cdfb7eea1fe1cfc6fca23159ce3c7926a Mon Sep 17 00:00:00 2001 From: bogdankostic Date: Thu, 1 Jun 2023 17:27:28 +0200 Subject: [PATCH 5/7] Use self.batch_size in write_documents --- haystack/document_stores/opensearch.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/haystack/document_stores/opensearch.py b/haystack/document_stores/opensearch.py index f1abf41670..cc882dc622 100644 --- a/haystack/document_stores/opensearch.py +++ b/haystack/document_stores/opensearch.py @@ -362,6 +362,8 @@ def write_documents( if index is None: index = self.index + batch_size = batch_size or self.batch_size + if self.knn_engine == "faiss" and self.similarity == "cosine": field_map = self._create_document_field_map() documents = [Document.from_dict(d, field_map=field_map) if isinstance(d, dict) else d for d in documents] From 90056f9657beccad14d3f534d7c4d095aabf6dfb Mon Sep 17 00:00:00 2001 From: bogdankostic Date: Thu, 1 Jun 2023 17:39:41 +0200 Subject: [PATCH 6/7] Use 10_000 as default batch size --- haystack/document_stores/elasticsearch.py | 2 +- haystack/document_stores/opensearch.py | 2 +- haystack/document_stores/search_engine.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/haystack/document_stores/elasticsearch.py b/haystack/document_stores/elasticsearch.py index 80b3ca0ca3..9d7ce2dfe3 100644 --- a/haystack/document_stores/elasticsearch.py +++ b/haystack/document_stores/elasticsearch.py @@ -56,7 +56,7 @@ def __init__( synonyms: Optional[List] = None, synonym_type: str = "synonym", use_system_proxy: bool = False, - batch_size: int = 1000, + batch_size: int = 10_000, ): """ A DocumentStore using Elasticsearch to store and query the documents for our search. diff --git a/haystack/document_stores/opensearch.py b/haystack/document_stores/opensearch.py index cc882dc622..6978438d8b 100644 --- a/haystack/document_stores/opensearch.py +++ b/haystack/document_stores/opensearch.py @@ -74,7 +74,7 @@ def __init__( knn_engine: str = "nmslib", knn_parameters: Optional[Dict] = None, ivf_train_size: Optional[int] = None, - batch_size: int = 1000, + batch_size: int = 10_000, ): """ Document Store using OpenSearch (https://opensearch.org/). It is compatible with the Amazon OpenSearch Service. diff --git a/haystack/document_stores/search_engine.py b/haystack/document_stores/search_engine.py index 2bfa34d2d4..87edbb500c 100644 --- a/haystack/document_stores/search_engine.py +++ b/haystack/document_stores/search_engine.py @@ -70,7 +70,7 @@ def __init__( skip_missing_embeddings: bool = True, synonyms: Optional[List] = None, synonym_type: str = "synonym", - batch_size: int = 1000, + batch_size: int = 10_000, ): super().__init__() From 0122240889ab0ad2c3ef9c847d5b8b9e38f62231 Mon Sep 17 00:00:00 2001 From: bogdankostic Date: Thu, 1 Jun 2023 18:11:03 +0200 Subject: [PATCH 7/7] Add unit tests for write documents --- test/document_stores/test_elasticsearch.py | 9 ++++++++- test/document_stores/test_opensearch.py | 7 +++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/test/document_stores/test_elasticsearch.py b/test/document_stores/test_elasticsearch.py index 2398d47581..27b2588dcf 100644 --- a/test/document_stores/test_elasticsearch.py +++ b/test/document_stores/test_elasticsearch.py @@ -1,6 +1,6 @@ import logging import os -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import numpy as np import pytest @@ -344,3 +344,10 @@ def test_get_document_by_id_excluded_meta_data_has_no_influence(self, mocked_doc # assert the resulting body is not affected by the `excluded_meta_data` value _, kwargs = mocked_document_store.client.search.call_args assert kwargs["_source"] == {"excludes": ["embedding"]} + + @pytest.mark.unit + def test_write_documents_req_for_each_batch(self, mocked_document_store, documents): + mocked_document_store.batch_size = 2 + with patch("haystack.document_stores.elasticsearch.bulk") as mocked_bulk: + mocked_document_store.write_documents(documents) + assert mocked_bulk.call_count == 5 diff --git a/test/document_stores/test_opensearch.py b/test/document_stores/test_opensearch.py index 47bc06c31e..628a7f26b0 100644 --- a/test/document_stores/test_opensearch.py +++ b/test/document_stores/test_opensearch.py @@ -1291,3 +1291,10 @@ def test_get_document_by_id_excluded_meta_data_has_no_influence(self, mocked_doc # assert the resulting body is not affected by the `excluded_meta_data` value _, kwargs = mocked_document_store.client.search.call_args assert kwargs["body"]["_source"] == {"excludes": ["embedding"]} + + @pytest.mark.unit + def test_write_documents_req_for_each_batch(self, mocked_document_store, documents): + mocked_document_store.batch_size = 2 + with patch("haystack.document_stores.opensearch.bulk") as mocked_bulk: + mocked_document_store.write_documents(documents) + assert mocked_bulk.call_count == 5