Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make fixes to client proxy definitions #921

Merged
merged 11 commits into from
Feb 29, 2024
46 changes: 46 additions & 0 deletions ci/docker-compose-proxy.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
---
version: '3.4'
services:
weaviate-proxy:
command:
- --host
- 0.0.0.0
- --port
- '8080'
- --scheme
- http
- --write-timeout=600s
ports:
- 8075:8080
image: semitechnologies/weaviate:${WEAVIATE_VERSION}
restart: on-failure:0
environment:
CONTEXTIONARY_URL: contextionary:9999
QUERY_DEFAULTS_LIMIT: 25
AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
PERSISTENCE_DATA_PATH: '/var/lib/weaviate'
DEFAULT_VECTORIZER_MODULE: 'text2vec-contextionary'
ENABLE_MODULES: text2vec-contextionary
BACKUP_FILESYSTEM_PATH: "/tmp/backups"
CLUSTER_GOSSIP_BIND_PORT: "7100"
CLUSTER_DATA_BIND_PORT: "7101"
CLUSTER_HOSTNAME: "node1"
AUTOSCHEMA_ENABLED: 'false'
DISABLE_TELEMETRY: 'true'
contextionary:
environment:
OCCURRENCE_WEIGHT_LINEAR_FACTOR: 0.75
EXTENSIONS_STORAGE_MODE: weaviate
EXTENSIONS_STORAGE_ORIGIN: http://weaviate-proxy:8080
NEIGHBOR_OCCURRENCE_IGNORE_PERCENTILE: 5
ENABLE_COMPOUND_SPLITTING: 'false'
image: semitechnologies/contextionary:en0.16.0-v1.2.0
ports:
- 9999:9999
proxy:
image: envoyproxy/envoy:v1.29-latest
ports:
- 10000:10000
volumes:
- ./proxy:/etc/envoy
...
67 changes: 67 additions & 0 deletions ci/proxy/envoy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# This proxy configuration is one that allows the use of a gRPC proxy between Weaviate and the client.
# It is used in the CI/CD pipeline to test the gRPC proxy functionality of the client.
# It follows the method as described here: https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/http/upgrades#connect-support
# where special attention should be paid to the fact that we are using a terminating HTTP/2 connection,
# as in this example: https://github.com/envoyproxy/envoy/blob/8e93d16d433d3364c2b000dc9067ffc400e8f0d6/configs/terminate_http2_connect.yaml,
# because Weaviate itself is not capable of handling CONNECT requests. So Envoy instead upgrades these to POSTs and sends them on
admin:
address:
socket_address:
protocol: TCP
address: 127.0.0.1
port_value: 9902
static_resources:
listeners:
- name: proxy
address:
socket_address:
protocol: TCP
address: 0.0.0.0
port_value: 10000
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: ingress_http
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains:
- "*"
routes:
- match:
connect_matcher: {}
route:
cluster: weaviate-proxy
upgrade_configs:
- upgrade_type: CONNECT
connect_config:
{}
http_filters:
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
http2_protocol_options:
allow_connect: true
upgrade_configs:
- upgrade_type: CONNECT
clusters:
- name: weaviate-proxy
type: STRICT_DNS
connect_timeout: 5s
typed_extension_protocol_options:
envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
"@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
explicit_http_config:
http2_protocol_options: {}
load_assignment:
cluster_name: weaviate-proxy
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: weaviate-proxy
port_value: 50051
1 change: 1 addition & 0 deletions ci/start_weaviate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ nohup docker-compose -f ci/docker-compose-wcs.yml up -d
nohup docker-compose -f ci/docker-compose-generative.yml up -d
nohup docker-compose -f ci/docker-compose-cluster.yml up -d
nohup docker-compose -f ci/docker-compose-rerank.yml up -d
nohup docker-compose -f ci/docker-compose-proxy.yml up -d
1 change: 1 addition & 0 deletions ci/stop_weaviate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ docker-compose -f ci/docker-compose-wcs.yml down --remove-orphans
docker-compose -f ci/docker-compose-generative.yml down --remove-orphans
docker-compose -f ci/docker-compose-cluster.yml down --remove-orphans
docker-compose -f ci/docker-compose-rerank.yml down --remove-orphans
docker-compose -f ci/docker-compose-proxy.yml down --remove-orphans
18 changes: 18 additions & 0 deletions integration/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,3 +539,21 @@ def test_client_is_ready() -> None:
assert weaviate.connect_to_wcs(
cluster_url=WCS_URL, auth_credentials=WCS_CREDS, skip_init_checks=True
).is_ready()


def test_local_proxies() -> None:
with weaviate.connect_to_local(
additional_config=wvc.init.AdditionalConfig(
proxies=wvc.init.Proxies(
http="http://localhost:8075",
grpc="http://localhost:10000",
)
)
) as client:
client.collections.delete("TestLocalProxies")
collection = client.collections.create(
"TestLocalProxies",
properties=[wvc.config.Property(name="name", data_type=wvc.config.DataType.TEXT)],
)
collection.data.insert({"name": "Test"})
assert collection.query.fetch_objects().objects[0].properties["name"] == "Test"
7 changes: 4 additions & 3 deletions test/connection/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ def test_get_proxies(self, os_mock):
"""

error_msg = lambda dt: (
"If 'proxies' is not None, it must be of type dict or str. " f"Given type: {dt}."
"If 'proxies' is not None, it must be of type dict, str, or wvc.init.Proxies. "
f"Given type: {dt}."
)
with self.assertRaises(TypeError) as error:
proxies = _get_proxies([], False)
Expand All @@ -79,15 +80,15 @@ def test_get_proxies(self, os_mock):
self.assertEqual(proxies, {"test": True})

proxies = _get_proxies("test", True)
self.assertEqual(proxies, {"http": "test", "https": "test"})
self.assertEqual(proxies, {"http": "test", "https": "test", "grpc": "test"})

os_mock.environ.get.return_value = None
proxies = _get_proxies(None, True)
self.assertEqual(proxies, {})

os_mock.environ.get.return_value = "test"
proxies = _get_proxies(None, True)
self.assertEqual(proxies, {"http": "test", "https": "test"})
self.assertEqual(proxies, {"http": "test", "https": "test", "grpc": "test"})

def test__get_valid_timeout_config(self):
"""
Expand Down
4 changes: 2 additions & 2 deletions weaviate/classes/init.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from weaviate.auth import Auth
from weaviate.config import AdditionalConfig, Timeout
from weaviate.config import AdditionalConfig, Proxies, Timeout

__all__ = ["Auth", "AdditionalConfig", "Timeout"]
__all__ = ["Auth", "AdditionalConfig", "Proxies", "Timeout"]
22 changes: 21 additions & 1 deletion weaviate/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,34 @@ def __post_init__(self) -> None:


class Timeout(BaseModel):
"""Timeouts for the different operations in the client."""

query: int = Field(default=30, ge=0)
insert: int = Field(default=90, ge=0)
init: int = Field(default=1, ge=0)


class Proxies(BaseModel):
"""Proxy configurations for sending requests to Weaviate through a proxy."""

http: Optional[str] = Field(default=None)
https: Optional[str] = Field(default=None)
grpc: Optional[str] = Field(default=None)


class AdditionalConfig(BaseModel):
"""Use this class to specify the connection and proxy settings for your client when connecting to Weaviate.

When specifying the timeout, you can either provide a tuple with the query and insert timeouts, or a `Timeout` object.
The `Timeout` object gives you additional option to configure the `init` timeout, which controls how long the client
initialisation checks will wait for before throwing. This is useful when you have a slow network connection.

When specifying the proxies, be aware that supplying a URL (`str`) will populate all of the `http`, `https`, and grpc proxies.
In order for this to be possible, you must have a proxy that is capable of handling simultaneous HTTP/1.1 and HTTP/2 traffic.
"""

connection: ConnectionConfig = Field(default_factory=ConnectionConfig)
proxies: Union[dict, str, None] = Field(default=None)
proxies: Union[str, Proxies, None] = Field(default=None)
timeout_: Union[Tuple[int, int], Timeout] = Field(default_factory=Timeout, alias="timeout")
trust_env: bool = Field(default=False)

Expand Down
33 changes: 23 additions & 10 deletions weaviate/connect/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import time
from abc import ABC, abstractmethod
from typing import Literal, Tuple, TypeVar, Union, cast, overload
from typing import Dict, Literal, Tuple, TypeVar, Union, cast, overload
from urllib.parse import urlparse

import grpc # type: ignore
Expand All @@ -11,13 +11,14 @@

from pydantic import BaseModel, field_validator, model_validator

from weaviate.config import Proxies
from weaviate.types import NUMBER


JSONPayload = Union[dict, list]
TIMEOUT_TYPE_RETURN = Tuple[NUMBER, NUMBER]
MAX_GRPC_MESSAGE_LENGTH = 104858000 # 10mb, needs to be synchronized with GRPC server
GRPC_OPTIONS = [
GRPC_DEFAULT_OPTIONS = [
("grpc.max_send_message_length", MAX_GRPC_MESSAGE_LENGTH),
("grpc.max_receive_message_length", MAX_GRPC_MESSAGE_LENGTH),
]
Expand Down Expand Up @@ -109,29 +110,35 @@ def _grpc_target(self) -> str:
return f"{self.grpc.host}:{self.grpc.port}"

@overload
def _grpc_channel(self, async_channel: Literal[False]) -> Channel:
def _grpc_channel(self, async_channel: Literal[False], proxies: Dict[str, str]) -> Channel:
...

@overload
def _grpc_channel(self, async_channel: Literal[True]) -> AsyncChannel:
def _grpc_channel(self, async_channel: Literal[True], proxies: Dict[str, str]) -> AsyncChannel:
...

def _grpc_channel(self, async_channel: bool) -> Union[Channel, AsyncChannel]:
def _grpc_channel(
self, async_channel: bool, proxies: Dict[str, str]
) -> Union[Channel, AsyncChannel]:
if async_channel:
import_path = grpc.aio
else:
import_path = grpc

if (p := proxies.get("grpc")) is not None:
options: list = [*GRPC_DEFAULT_OPTIONS, ("grpc.http_proxy", p)]
else:
options = GRPC_DEFAULT_OPTIONS
if self.grpc.secure:
return import_path.secure_channel(
target=self._grpc_target,
credentials=ssl_channel_credentials(),
options=GRPC_OPTIONS,
options=options,
)
else:
return import_path.insecure_channel(
target=self._grpc_target,
options=GRPC_OPTIONS,
options=options,
)

@property
Expand All @@ -153,7 +160,7 @@ def get_proxies(self) -> dict:
raise NotImplementedError


def _get_proxies(proxies: Union[dict, str, None], trust_env: bool) -> dict:
def _get_proxies(proxies: Union[dict, str, Proxies, None], trust_env: bool) -> dict:
"""
Get proxies as dict, compatible with 'requests' library.
NOTE: 'proxies' has priority over 'trust_env', i.e. if 'proxies' is NOT None, 'trust_env'
Expand Down Expand Up @@ -182,11 +189,14 @@ def _get_proxies(proxies: Union[dict, str, None], trust_env: bool) -> dict:
return {
"http": proxies,
"https": proxies,
"grpc": proxies,
}
if isinstance(proxies, dict):
return proxies
if isinstance(proxies, Proxies):
return proxies.model_dump(exclude_none=True)
raise TypeError(
"If 'proxies' is not None, it must be of type dict or str. "
"If 'proxies' is not None, it must be of type dict, str, or wvc.init.Proxies. "
f"Given type: {type(proxies)}."
)

Expand All @@ -195,15 +205,18 @@ def _get_proxies(proxies: Union[dict, str, None], trust_env: bool) -> dict:

http_proxy = (os.environ.get("HTTP_PROXY"), os.environ.get("http_proxy"))
https_proxy = (os.environ.get("HTTPS_PROXY"), os.environ.get("https_proxy"))
grpc_proxy = (os.environ.get("GRPC_PROXY"), os.environ.get("grpc_proxy"))

if not any(http_proxy + https_proxy):
if not any(http_proxy + https_proxy + grpc_proxy):
return {}

proxies = {}
if any(http_proxy):
proxies["http"] = http_proxy[0] if http_proxy[0] else http_proxy[1]
if any(https_proxy):
proxies["https"] = https_proxy[0] if https_proxy[0] else https_proxy[1]
if any(grpc_proxy):
proxies["grpc"] = grpc_proxy[0] if grpc_proxy[0] else grpc_proxy[1]

return proxies

Expand Down
Loading
Loading