diff --git a/ci/docker-compose-azure.yml b/ci/docker-compose-azure.yml index cb631c0d7..0e7528aa6 100644 --- a/ci/docker-compose-azure.yml +++ b/ci/docker-compose-azure.yml @@ -10,7 +10,7 @@ services: - --scheme - http - --write-timeout=600s - image: semitechnologies/weaviate:1.20.0-prealpha-628c8ff + image: semitechnologies/weaviate:preview-add-batch-queue-congestion-info-to-node-status-c14301b ports: - 8081:8081 restart: on-failure:0 diff --git a/ci/docker-compose-cluster.yml b/ci/docker-compose-cluster.yml index 09b27edbc..66ab84f38 100644 --- a/ci/docker-compose-cluster.yml +++ b/ci/docker-compose-cluster.yml @@ -2,7 +2,7 @@ version: '3.4' services: weaviate-node-1: - image: semitechnologies/weaviate:1.20.0-prealpha-628c8ff + image: semitechnologies/weaviate:preview-add-batch-queue-congestion-info-to-node-status-c14301b restart: on-failure:0 ports: - "8087:8080" diff --git a/ci/docker-compose-okta-cc.yml b/ci/docker-compose-okta-cc.yml index b8561e38e..c83ad0758 100644 --- a/ci/docker-compose-okta-cc.yml +++ b/ci/docker-compose-okta-cc.yml @@ -10,7 +10,7 @@ services: - --scheme - http - --write-timeout=600s - image: semitechnologies/weaviate:1.20.0-prealpha-628c8ff + image: semitechnologies/weaviate:preview-add-batch-queue-congestion-info-to-node-status-c14301b ports: - 8082:8082 restart: on-failure:0 diff --git a/ci/docker-compose-okta-users.yml b/ci/docker-compose-okta-users.yml index 6fec877d9..619d7839f 100644 --- a/ci/docker-compose-okta-users.yml +++ b/ci/docker-compose-okta-users.yml @@ -10,7 +10,7 @@ services: - --scheme - http - --write-timeout=600s - image: semitechnologies/weaviate:1.20.0-prealpha-628c8ff + image: semitechnologies/weaviate:preview-add-batch-queue-congestion-info-to-node-status-c14301b ports: - 8083:8083 restart: on-failure:0 diff --git a/ci/docker-compose-openai.yml b/ci/docker-compose-openai.yml index 31bc2f202..67fce6325 100644 --- a/ci/docker-compose-openai.yml +++ b/ci/docker-compose-openai.yml @@ -9,7 +9,7 @@ services: - '8086' - --scheme - http - image: semitechnologies/weaviate:1.20.0-prealpha-628c8ff + image: semitechnologies/weaviate:preview-add-batch-queue-congestion-info-to-node-status-c14301b ports: - 8086:8086 restart: on-failure:0 diff --git a/ci/docker-compose-wcs.yml b/ci/docker-compose-wcs.yml index f2e102235..85dc4464a 100644 --- a/ci/docker-compose-wcs.yml +++ b/ci/docker-compose-wcs.yml @@ -10,7 +10,7 @@ services: - --scheme - http - --write-timeout=600s - image: semitechnologies/weaviate:1.20.0-prealpha-628c8ff + image: semitechnologies/weaviate:preview-add-batch-queue-congestion-info-to-node-status-c14301b ports: - 8085:8085 restart: on-failure:0 diff --git a/ci/docker-compose.yml b/ci/docker-compose.yml index 04412c69b..db45adc5d 100644 --- a/ci/docker-compose.yml +++ b/ci/docker-compose.yml @@ -10,7 +10,7 @@ services: - --scheme - http - --write-timeout=600s - image: semitechnologies/weaviate:1.20.0-prealpha-7702266 + image: semitechnologies/weaviate:preview-add-batch-queue-congestion-info-to-node-status-c14301b ports: - "8080:8080" - "50051:50051" diff --git a/integration/test_cluster.py b/integration/test_cluster.py index 8055dae2d..a9b77e4e7 100644 --- a/integration/test_cluster.py +++ b/integration/test_cluster.py @@ -4,8 +4,8 @@ import weaviate -GIT_HASH = "7702266" -SERVER_VERSION = "1.20.0-prealpha" +GIT_HASH = "c14301b" +SERVER_VERSION = "1.20.5" NODE_NAME = "node1" NUM_OBJECT = 10 diff --git a/integration/test_stress.py b/integration/test_stress.py index 43a812aee..e8f813e40 100644 --- a/integration/test_stress.py +++ b/integration/test_stress.py @@ -1,10 +1,10 @@ import datetime import random -import uuid from dataclasses import dataclass, field from typing import List, Dict, Optional, Any import pytest +import uuid import weaviate diff --git a/test/batch/test_crud_batch.py b/test/batch/test_crud_batch.py index 4ef97b45a..1e23f7d82 100644 --- a/test/batch/test_crud_batch.py +++ b/test/batch/test_crud_batch.py @@ -1006,335 +1006,6 @@ def alternating_errors(): batch.consistency_level = 1 check_startswith_error_message(self, error, "1 is not a valid ConsistencyLevel") - @patch("weaviate.batch.crud_batch.Batch._auto_create") - def test_configure_call(self, mock_auto_create): - """ - Test the `configure` method, which is the same as `__call__`. - """ - - batch = Batch(mock_connection_func()) - self.check_instance(batch) - # - with self.assertRaises(ValueError) as error: - batch.configure(consistency_level=1) - check_startswith_error_message(self, error, "1 is not a valid ConsistencyLevel") - ####################################################################### - # batching_type: None -> 'fixed' - return_batch = batch.configure( - batch_size=100, - creation_time=20.76, - timeout_retries=2, - consistency_level=ConsistencyLevel.ALL, - ) - self.assertEqual(batch, return_batch) - self.check_instance( - batch, - batch_size=100, - creation_time=20.76, - timeout_retries=2, - batching_type="fixed", - consistency_level="ALL", - ) - mock_auto_create.assert_called() - mock_auto_create.reset_mock() - - ####################################################################### - # batching_type: 'fixed' -> 'dynamic' - return_batch = batch.configure( - batch_size=200, - creation_time=2.5, - timeout_retries=0, - dynamic=True, - ) - self.assertEqual(batch, return_batch) - self.check_instance( - batch, - batch_size=200, - creation_time=2.5, - timeout_retries=0, - batching_type="dynamic", - recom_num_obj=200, - recom_num_ref=200, - ) - mock_auto_create.assert_called() - mock_auto_create.reset_mock() - - ####################################################################### - # batching_type: 'dynamic' -> None - return_batch = batch.configure( - batch_size=None, - creation_time=12.5, - timeout_retries=10, - dynamic=True, - consistency_level="QUORUM", - ) - self.assertEqual(batch, return_batch) - self.check_instance( - batch, - batch_size=None, - creation_time=12.5, - timeout_retries=10, - batching_type=None, - recom_num_ref=200, # does not change if not None - recom_num_obj=200, # does not change if not None - consistency_level=ConsistencyLevel.QUORUM, - ) - mock_auto_create.assert_not_called() - - ####################################################################### - # test errors - ####################################################################### - - ####################################################################### - # creation_time - - type_error = f"'creation_time' must be of type {Real}." - value_error = "'creation_time' must be positive, i.e. greater that zero (>0)." - - with self.assertRaises(TypeError) as error: - batch.configure( - batch_size=None, - creation_time=True, - timeout_retries=10, - dynamic=True, - ) - - check_error_message(self, error, type_error) - self.check_instance( - batch, - batch_size=None, - creation_time=12.5, - timeout_retries=10, - batching_type=None, - recom_num_ref=200, # does not change if not None - recom_num_obj=200, # does not change if not None - ) - mock_auto_create.assert_not_called() - - with self.assertRaises(TypeError) as error: - batch.configure( - batch_size=None, - creation_time="12.5", - timeout_retries=10, - dynamic=True, - ) - check_error_message(self, error, type_error) - self.check_instance( - batch, - batch_size=None, - creation_time=12.5, - timeout_retries=10, - batching_type=None, - recom_num_ref=200, # does not change if not None - recom_num_obj=200, # does not change if not None - ) - mock_auto_create.assert_not_called() - - with self.assertRaises(ValueError) as error: - batch.configure( - batch_size=None, - creation_time=0.0, - timeout_retries=10, - dynamic=True, - ) - check_error_message(self, error, value_error) - self.check_instance( - batch, - batch_size=None, - creation_time=12.5, - timeout_retries=10, - batching_type=None, - recom_num_ref=200, # does not change if not None - recom_num_obj=200, # does not change if not None - ) - mock_auto_create.assert_not_called() - - with self.assertRaises(ValueError) as error: - batch.configure( - batch_size=None, - creation_time=-1, - timeout_retries=10, - dynamic=True, - ) - check_error_message(self, error, value_error) - self.check_instance( - batch, - batch_size=None, - creation_time=12.5, - timeout_retries=10, - batching_type=None, - recom_num_ref=200, # does not change if not None - recom_num_obj=200, # does not change if not None - ) - mock_auto_create.assert_not_called() - - ####################################################################### - # timeout_retries - value_error = "'timeout_retries' must be positive, i.e. greater or equal that zero (>=0)." - type_error = f"'timeout_retries' must be of type {int}." - - ####################################################################### - ## test wrong value - with self.assertRaises(ValueError) as error: - batch.configure( - batch_size=None, - creation_time=12.5, - timeout_retries=-1, - dynamic=True, - ) - self.check_instance( - batch, - batch_size=None, - creation_time=12.5, - timeout_retries=10, - batching_type=None, - recom_num_ref=200, # does not change if not None - recom_num_obj=200, # does not change if not None - ) - check_error_message(self, error, value_error) - - ####################################################################### - ## test wrong type - with self.assertRaises(TypeError) as error: - batch.configure( - batch_size=None, - creation_time=12.5, - timeout_retries=True, - dynamic=True, - ) - self.check_instance( - batch, - batch_size=None, - creation_time=12.5, - timeout_retries=10, - batching_type=None, - recom_num_ref=200, # does not change if not None - recom_num_obj=200, # does not change if not None - ) - check_error_message(self, error, type_error) - - with self.assertRaises(TypeError) as error: - batch.configure( - batch_size=None, - creation_time=12.5, - timeout_retries="12", - dynamic=True, - ) - self.check_instance( - batch, - batch_size=None, - creation_time=12.5, - timeout_retries=10, - batching_type=None, - recom_num_ref=200, # does not change if not None - recom_num_obj=200, # does not change if not None - ) - check_error_message(self, error, type_error) - - ####################################################################### - # dynamic - type_error = "'dynamic' must be of type bool." - - with self.assertRaises(TypeError) as error: - batch.configure( - batch_size=100, - creation_time=12.5, - timeout_retries=10, - dynamic=0, - ) - check_error_message(self, error, type_error) - self.check_instance( - batch, - batch_size=None, - creation_time=12.5, - timeout_retries=10, - batching_type=None, - recom_num_ref=200, # does not change if not None - recom_num_obj=200, # does not change if not None - ) - mock_auto_create.assert_not_called() - - ####################################################################### - # dynamic - type_error = f"'batch_size' must be of type {int}." - value_error = "'batch_size' must be positive, i.e. greater that zero (>0)." - - with self.assertRaises(TypeError) as error: - batch.configure( - batch_size=False, - creation_time=12.5, - timeout_retries=10, - dynamic=True, - ) - check_error_message(self, error, type_error) - self.check_instance( - batch, - batch_size=None, - creation_time=12.5, - timeout_retries=10, - batching_type=None, - recom_num_ref=200, # does not change if not None - recom_num_obj=200, # does not change if not None - ) - mock_auto_create.assert_not_called() - - with self.assertRaises(TypeError) as error: - batch.configure( - batch_size=10.6, - creation_time=12.5, - timeout_retries=10, - dynamic=True, - ) - check_error_message(self, error, type_error) - self.check_instance( - batch, - batch_size=None, - creation_time=12.5, - timeout_retries=10, - batching_type=None, - recom_num_ref=200, # does not change if not None - recom_num_obj=200, # does not change if not None - ) - mock_auto_create.assert_not_called() - - with self.assertRaises(ValueError) as error: - batch.configure( - batch_size=0, - creation_time=12.5, - timeout_retries=10, - dynamic=True, - ) - check_error_message(self, error, value_error) - self.check_instance( - batch, - batch_size=None, - creation_time=12.5, - timeout_retries=10, - batching_type=None, - recom_num_ref=200, # does not change if not None - recom_num_obj=200, # does not change if not None - ) - mock_auto_create.assert_not_called() - - with self.assertRaises(ValueError) as error: - batch.configure( - batch_size=-10, - creation_time=12.5, - timeout_retries=10, - dynamic=True, - ) - check_error_message(self, error, value_error) - self.check_instance( - batch, - batch_size=None, - creation_time=12.5, - timeout_retries=10, - batching_type=None, - recom_num_ref=200, # does not change if not None - recom_num_obj=200, # does not change if not None - ) - mock_auto_create.assert_not_called() - def test_pop_methods(self): """ Test the `pop_object` and the `pop_reference`. diff --git a/weaviate/batch/crud_batch.py b/weaviate/batch/crud_batch.py index e1a38a0bd..9b8e84ea4 100644 --- a/weaviate/batch/crud_batch.py +++ b/weaviate/batch/crud_batch.py @@ -14,11 +14,13 @@ from requests import ReadTimeout, Response from requests.exceptions import ConnectionError as RequestsConnectionError +from requests.exceptions import HTTPError as RequestsHTTPError from weaviate.connect import Connection from weaviate.data.replication import ConsistencyLevel from weaviate.types import UUID from .requests import BatchRequest, ObjectsBatchRequest, ReferenceBatchRequest, BatchResponse +from ..cluster import Cluster from ..error_msgs import ( BATCH_REF_DEPRECATION_NEW_V14_CLS_NS_W, BATCH_REF_DEPRECATION_OLD_V14_CLS_NS_W, @@ -246,6 +248,8 @@ def __init__(self, connection: Connection): """ # set all protected attributes + self._shutdown_background_event: Optional[threading.Event] = None + self._new_dynamic_batching = True self._connection = connection self._objects_batch = ObjectsBatchRequest() self._reference_batch = ReferenceBatchRequest() @@ -416,7 +420,7 @@ def __call__( self._connection_error_retries = connection_error_retries self._weaviate_error_retry = weaviate_error_retries # set Batch to manual import - if batch_size is None: + if batch_size is None and not dynamic: self._batch_size = None self._batching_type = None return self @@ -425,23 +429,76 @@ def __call__( _check_positive_num(num_workers, "num_workers", int) _check_bool(dynamic, "dynamic") - self._batch_size = batch_size - if dynamic is False: # set Batch to auto-commit with fixed batch_size - self._batching_type = "fixed" - else: # else set to 'dynamic' - self._batching_type = "dynamic" - self._recommended_num_objects = batch_size - self._recommended_num_references = batch_size - if self._num_workers != num_workers: self.flush() self.shutdown() self._num_workers = num_workers self.start() + self._batch_size = batch_size + + if dynamic is False: # set Batch to auto-commit with fixed batch_size + self._batching_type = "fixed" + else: # else set to 'dynamic' + self._batching_type = "dynamic" + self._recommended_num_objects = 50 if batch_size is None else batch_size + self._recommended_num_references = 50 if batch_size is None else batch_size + if self._shutdown_background_event is None: + self._update_recommended_batch_size() + self._auto_create() return self + def _update_recommended_batch_size(self): + """Create a background thread that periodically checks how congested the batch queue is.""" + self._shutdown_background_event = threading.Event() + + def periodic_check(): + cluster = Cluster(self._connection) + while not self._shutdown_background_event.is_set(): + try: + status = cluster.get_nodes_status() + if "stats" not in status[0] or "ratePerSecond" not in status[0]["stats"]: + self._new_dynamic_batching = False + return + rate = status[0]["batchStats"]["ratePerSecond"] + rate_per_worker = rate / self._num_workers + batch_length = status[0]["batchStats"]["queueLength"] + + if batch_length == 0: # scale up if queue is empty + self._recommended_num_objects = self._recommended_num_objects + min( + self._recommended_num_objects * 2, 25 + ) + else: + ratio = batch_length / rate + if ( + 2.1 > ratio > 1.9 + ): # ideal, send exactly as many objects as weaviate can process + self._recommended_num_objects = rate_per_worker + elif ratio <= 1.9: # we can send more + self._recommended_num_objects = min( + self._recommended_num_objects * 1.5, rate_per_worker * 2 / ratio + ) + elif ratio < 10: # too high, scale down + self._recommended_num_objects = rate_per_worker * 2 / ratio + else: # way too high, stop sending new batches + self._recommended_num_objects = 0 + + refresh_time = 2 + except (RequestsHTTPError, ReadTimeout): + refresh_time = 0.1 + + time.sleep(refresh_time) + self._recommended_num_objects = 10 # in case some batch needs to be send afterwards + self._shutdown_background_event = None + + demon = threading.Thread( + target=periodic_check, + daemon=True, + name="batchSizeRefresh", + ) + demon.start() + def add_data_object( self, data_object: dict, @@ -1081,7 +1138,11 @@ def _send_batch_requests(self, force_wait: bool) -> None: if timeout_occurred and self._recommended_num_objects is not None: self._recommended_num_objects = max(self._recommended_num_objects // 2, 1) - elif len(self._objects_throughput_frame) != 0 and self._recommended_num_objects is not None: + elif ( + len(self._objects_throughput_frame) != 0 + and self._recommended_num_objects is not None + and not self._new_dynamic_batching + ): obj_per_second = ( sum(self._objects_throughput_frame) / len(self._objects_throughput_frame) * 0.75 ) @@ -1089,6 +1150,7 @@ def _send_batch_requests(self, force_wait: bool) -> None: round(obj_per_second * self._creation_time), self._recommended_num_objects + 250, ) + # Create references after all the objects have been created reference_future_pool = [] for reference_batch in self._reference_batch_queue: @@ -1111,7 +1173,7 @@ def _send_batch_requests(self, force_wait: bool) -> None: else: timeout_occurred = True - if timeout_occurred and self._recommended_num_objects is not None: + if timeout_occurred and self._recommended_num_references is not None: self._recommended_num_references = max(self._recommended_num_references // 2, 1) elif ( len(self._references_throughput_frame) != 0 @@ -1147,6 +1209,9 @@ def _auto_create(self) -> None: self.num_objects() >= self._recommended_num_objects or self.num_references() >= self._recommended_num_references ): + while self._recommended_num_objects == 0: + time.sleep(1) # block if weaviate is overloaded + self._send_batch_requests(force_wait=False) return # just in case @@ -1529,6 +1594,12 @@ def start(self) -> "Batch": if self._executor is None or self._executor.is_shutdown(): self._executor = BatchExecutor(max_workers=self._num_workers) + + if self._batching_type == "dynamic" and ( + self._shutdown_background_event is None or self._shutdown_background_event.is_set() + ): + self._update_recommended_batch_size() + return self def shutdown(self) -> None: @@ -1538,6 +1609,9 @@ def shutdown(self) -> None: if not (self._executor is None or self._executor.is_shutdown()): self._executor.shutdown() + if self._shutdown_background_event is not None: + self._shutdown_background_event.set() + def __enter__(self) -> "Batch": return self.start()