Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for enabling and disabling async replication in schema #1105

Merged
merged 13 commits into from
Jun 27, 2024
Merged
8 changes: 5 additions & 3 deletions integration/test_collection_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def test_collection_config_empty(collection_factory: CollectionFactory) -> None:
assert config.multi_tenancy_config.enabled is False

assert config.replication_config.factor == 1
assert config.replication_config.async_enabled is False

assert isinstance(config.vector_index_config, _VectorIndexConfigHNSW)
assert config.vector_index_config.cleanup_interval_seconds == 300
Expand Down Expand Up @@ -189,6 +190,7 @@ def test_collection_config_defaults(collection_factory: CollectionFactory) -> No
assert config.multi_tenancy_config.enabled is True

assert config.replication_config.factor == 1
assert config.replication_config.async_enabled is False

assert isinstance(config.vector_index_config, _VectorIndexConfigHNSW)
assert config.vector_index_config.cleanup_interval_seconds == 300
Expand Down Expand Up @@ -241,7 +243,7 @@ def test_collection_config_full(collection_factory: CollectionFactory) -> None:
multi_tenancy_config=Configure.multi_tenancy(
enabled=True, auto_tenant_activation=True, auto_tenant_creation=True
),
# replication_config=Configure.replication(factor=2), # currently not updateable in RAFT
# replication_config=Configure.replication(factor=2, async_enabled=True), # currently not updateable in RAFT
vector_index_config=Configure.VectorIndex.hnsw(
cleanup_interval_seconds=10,
distance_metric=VectorDistances.DOT,
Expand Down Expand Up @@ -373,7 +375,7 @@ def test_collection_config_update(collection_factory: CollectionFactory) -> None
stopwords_preset=StopwordsPreset.EN,
stopwords_removals=["the"],
),
# replication_config=Reconfigure.replication(factor=2), # currently not updateable in RAFT
# replication_config=Reconfigure.replication(factor=2, async_enabled=True), # currently not updateable in RAFT
vectorizer_config=Reconfigure.VectorIndex.hnsw(
vector_cache_max_objects=2000000,
quantizer=Reconfigure.VectorIndex.Quantizer.pq(
Expand Down Expand Up @@ -711,7 +713,7 @@ def test_config_export_and_recreate_from_dict(collection_factory: CollectionFact
Property(name="age", data_type=DataType.INT),
],
multi_tenancy_config=Configure.multi_tenancy(enabled=True),
replication_config=Configure.replication(factor=1),
replication_config=Configure.replication(factor=1, async_enabled=False),
vector_index_config=Configure.VectorIndex.hnsw(
quantizer=Configure.VectorIndex.Quantizer.pq(centroids=256)
),
Expand Down
2 changes: 1 addition & 1 deletion integration/test_collection_multi_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def test_consistency_on_multinode(
properties=[
Property(name="name", data_type=DataType.TEXT),
],
replication_config=Configure.replication(factor=2),
replication_config=Configure.replication(factor=2, async_enabled=False),
ports=(8087, 50058),
).with_consistency_level(level)

Expand Down
2 changes: 1 addition & 1 deletion integration_v3/test_graphql.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def client(request):
if opts.get("cluster"):
port = 8087
for _, c in enumerate(schema["classes"]):
c["replicationConfig"] = {"factor": 2}
c["replicationConfig"] = {"factor": 2, "asyncEnabled": False}

client = weaviate.Client(f"http://localhost:{port}")
client.schema.delete_all()
Expand Down
20 changes: 11 additions & 9 deletions integration_v3/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ def client():

tsmith023 marked this conversation as resolved.
Show resolved Hide resolved

@pytest.mark.parametrize("replicationFactor", [None, 1])
def test_create_class_with_implicit_and_explicit_replication_factor(
client: weaviate.Client, replicationFactor: Optional[int]
@pytest.mark.parametrize("asyncEnabled", [None, False])
def test_create_class_with_implicit_and_explicit_replication_config(
client: weaviate.Client, replicationFactor: Optional[int], asyncEnabled: Optional[bool]
):
single_class = {
"class": "Barbecue",
Expand All @@ -29,18 +30,19 @@ def test_create_class_with_implicit_and_explicit_replication_factor(
},
],
}
if replicationFactor is None:
expected_factor = 1
else:
expected_factor = replicationFactor
single_class["replicationConfig"] = {
"factor": replicationFactor,
}

expected_factor = 1 if replicationFactor == None else replicationFactor
expected_async_enabled = False if asyncEnabled == None else asyncEnabled
single_class["replicationConfig"] = {
"factor": replicationFactor,
"asyncEnabled": asyncEnabled,
}

client.schema.create_class(single_class)
created_class = client.schema.get("Barbecue")
assert created_class["class"] == "Barbecue"
assert created_class["replicationConfig"]["factor"] == expected_factor
assert created_class["replicationConfig"].get("asyncEnabled", False) == expected_async_enabled

client.schema.delete_class("Barbecue")

Expand Down
4 changes: 2 additions & 2 deletions mock_tests/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def test_missing_multi_tenancy_config(
),
properties=[],
references=[],
replication_config=ReplicationConfig(factor=0),
replication_config=ReplicationConfig(factor=0, async_enabled=False),
vector_index_config=vic,
vector_index_type=VectorIndexType.FLAT,
vectorizer=Vectorizers.NONE,
Expand Down Expand Up @@ -242,7 +242,7 @@ def test_return_from_bind_module(
"invertedIndexConfig": ii_config,
"multiTenancyConfig": config.multi_tenancy()._to_dict(),
"vectorizer": "multi2vec-bind",
"replicationConfig": {"factor": 2},
"replicationConfig": {"factor": 2, "asyncEnabled": False},
"moduleConfig": {"multi2vec-bind": {}},
}
weaviate_auth_mock.expect_request("/v1/schema/TestBindCollection").respond_with_json(
Expand Down
19 changes: 15 additions & 4 deletions weaviate/collections/classes/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,12 @@ class _ShardingConfigCreate(_ConfigCreateModel):

class _ReplicationConfigCreate(_ConfigCreateModel):
factor: Optional[int]
asyncEnabled: Optional[bool]


class _ReplicationConfigUpdate(_ConfigUpdateModel):
factor: Optional[int]
asyncEnabled: Optional[bool]


class _BM25ConfigCreate(_ConfigCreateModel):
Expand Down Expand Up @@ -1053,6 +1055,7 @@ def to_dict(self) -> Dict[str, Any]:
@dataclass
class _ReplicationConfig(_ConfigBase):
factor: int
async_enabled: bool


ReplicationConfig = _ReplicationConfig
Expand Down Expand Up @@ -1775,14 +1778,18 @@ def multi_tenancy(
)

@staticmethod
def replication(factor: Optional[int] = None) -> _ReplicationConfigCreate:
def replication(
factor: Optional[int] = None, async_enabled: Optional[bool] = None
) -> _ReplicationConfigCreate:
"""Create a `ReplicationConfigCreate` object to be used when defining the replication configuration of Weaviate.

Arguments:
`factor`
The replication factor.
`async_enabled`
Enabled async replication.
"""
return _ReplicationConfigCreate(factor=factor)
return _ReplicationConfigCreate(factor=factor, asyncEnabled=async_enabled)

@staticmethod
def sharding(
Expand Down Expand Up @@ -1977,16 +1984,20 @@ def inverted_index(
)

@staticmethod
def replication(factor: Optional[int] = None) -> _ReplicationConfigUpdate:
def replication(
factor: Optional[int] = None, async_enabled: Optional[bool] = None
tsmith023 marked this conversation as resolved.
Show resolved Hide resolved
) -> _ReplicationConfigUpdate:
"""Create a `ReplicationConfigUpdate` object.

Use this method when defining the `replication_config` argument in `collection.update()`.

Arguments:
`factor`
The replication factor.
`async_enabled`
Enable async replication.
"""
return _ReplicationConfigUpdate(factor=factor)
return _ReplicationConfigUpdate(factor=factor, asyncEnabled=async_enabled)

@staticmethod
def multi_tenancy(
Expand Down
5 changes: 4 additions & 1 deletion weaviate/collections/classes/config_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,10 @@ def _collection_config_from_json(schema: Dict[str, Any]) -> _CollectionConfig:
),
properties=_properties_from_config(schema) if schema.get("properties") is not None else [],
references=_references_from_config(schema) if schema.get("properties") is not None else [],
replication_config=_ReplicationConfig(factor=schema["replicationConfig"]["factor"]),
replication_config=_ReplicationConfig(
factor=schema["replicationConfig"]["factor"],
async_enabled=schema["replicationConfig"]["asyncEnabled"],
tsmith023 marked this conversation as resolved.
Show resolved Hide resolved
),
reranker_config=__get_rerank_config(schema),
sharding_config=(
None
Expand Down
6 changes: 4 additions & 2 deletions weaviate/schema/crud_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,8 @@ def get(self, class_name: Optional[str] = None) -> dict:
"vectorIndexType": "hnsw",
"vectorizer": "text2vec-contextionary",
"replicationConfig": {
"factor": 1
"factor": 1,
"asyncEnabled": false
tsmith023 marked this conversation as resolved.
Show resolved Hide resolved
}
}
]
Expand Down Expand Up @@ -545,7 +546,8 @@ def get(self, class_name: Optional[str] = None) -> dict:
"vectorIndexType": "hnsw",
"vectorizer": "text2vec-contextionary",
"replicationConfig": {
"factor": 1
"factor": 1,
"asyncEnabled": false
}
}

Expand Down
Loading