Skip to content

Commit

Permalink
feat: add cloud storage via rclone
Browse files Browse the repository at this point in the history
squashme: use the amalthea session cache
  • Loading branch information
olevski committed Sep 23, 2024
1 parent 72c23b8 commit ae8096f
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 72 deletions.
1 change: 1 addition & 0 deletions bases/renku_data_services/data_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def register_all_handlers(app: Sanic, config: Config) -> Sanic:
nb_config=config.nb_config,
internal_gitlab_authenticator=config.gitlab_authenticator,
git_repo=config.git_repositories_repo,
rp_repo=config.rp_repo,
)
notebooks_new = NotebooksNewBP(
name="notebooks",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@
class ICloudStorageRequest(Protocol):
"""The abstract class for cloud storage."""

exists: bool
mount_folder: str
source_folder: str
bucket: str
source_path: str

def get_manifest_patch(
self,
base_name: str,
namespace: str,
labels: dict[str, str] = {},
annotations: dict[str, str] = {},
labels: dict[str, str] | None = None,
annotations: dict[str, str] | None = None,
) -> list[dict[str, Any]]:
"""The patches applied to a jupyter server to insert the storage in the session."""
...
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,13 @@ def __init__(self, url: str, server_type: type[_SessionType]):
self.url = url
self.client = httpx.AsyncClient()
self.server_type: type[_SessionType] = server_type
self.url_path_name = "servers"
if server_type == AmaltheaSessionV1Alpha1:
self.url_path_name = "sessions"

async def list_servers(self, safe_username: str) -> list[_SessionType]:
"""List the jupyter servers."""
url = urljoin(self.url, f"/users/{safe_username}/servers")
url = urljoin(self.url, f"/users/{safe_username}/{self.url_path_name}")
try:
res = await self.client.get(url, timeout=10)
except httpx.RequestError as err:
Expand All @@ -372,7 +375,7 @@ async def list_servers(self, safe_username: str) -> list[_SessionType]:

async def get_server(self, name: str) -> _SessionType | None:
"""Get a specific jupyter server."""
url = urljoin(self.url, f"/servers/{name}")
url = urljoin(self.url, f"/{self.url_path_name}/{name}")
try:
res = await self.client.get(url, timeout=10)
except httpx.RequestError as err:
Expand Down
121 changes: 78 additions & 43 deletions components/renku_data_services/notebooks/api/schemas/cloud_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@

from configparser import ConfigParser
from io import StringIO
from pathlib import Path
from typing import Any, Optional, Self
from pathlib import PurePosixPath
from typing import Any, Final, Optional, Self

from kubernetes import client
from marshmallow import EXCLUDE, Schema, ValidationError, fields, validates_schema

from renku_data_services.base_models import APIUser
from renku_data_services.notebooks.api.classes.cloud_storage import ICloudStorageRequest
from renku_data_services.notebooks.config import _NotebooksConfig

_sanitize_for_serialization = client.ApiClient().sanitize_for_serialization


class RCloneStorageRequest(Schema):
"""Request for RClone based storage."""
Expand All @@ -36,6 +39,8 @@ def validate_storage(self, data: dict, **kwargs: dict) -> None:
class RCloneStorage(ICloudStorageRequest):
"""RClone based storage."""

pvc_secret_annotation_name: Final[str] = "csi-rclone.dev/secretName"

def __init__(
self,
source_path: str,
Expand All @@ -60,7 +65,7 @@ async def storage_from_schema(
user: APIUser,
internal_gitlab_user: APIUser,
project_id: int,
work_dir: Path,
work_dir: PurePosixPath,
config: _NotebooksConfig,
) -> Self:
"""Create storage object from request."""
Expand Down Expand Up @@ -92,8 +97,73 @@ async def storage_from_schema(
await config.storage_validator.validate_storage_configuration(configuration, source_path)
return cls(source_path, configuration, readonly, mount_folder, name, config)

def pvc(
self,
base_name: str,
namespace: str,
labels: dict[str, str] | None = None,
annotations: dict[str, str] | None = None,
) -> client.V1PersistentVolumeClaim:
"""The PVC for mounting cloud storage."""
return client.V1PersistentVolumeClaim(
metadata=client.V1ObjectMeta(
name=base_name,
namespace=namespace,
annotations={self.pvc_secret_annotation_name: base_name} | (annotations or {}),
labels={"name": base_name} | (labels or {}),
),
spec=client.V1PersistentVolumeClaimSpec(
access_modes=["ReadOnlyMany" if self.readonly else "ReadWriteMany"],
resources=client.V1VolumeResourceRequirements(requests={"storage": "10Gi"}),
storage_class_name=self.config.cloud_storage.storage_class,
),
)

def volume_mount(self, base_name: str) -> client.V1VolumeMount:
"""The volume mount for cloud storage."""
return client.V1VolumeMount(
mount_path=self.mount_folder,
name=base_name,
read_only=self.readonly,
)

def volume(self, base_name: str) -> client.V1Volume:
"""The volume entry for the statefulset specification."""
return client.V1Volume(
name=base_name,
persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(
claim_name=base_name, read_only=self.readonly
),
)

def secret(
self,
base_name: str,
namespace: str,
labels: dict[str, str] | None = None,
annotations: dict[str, str] | None = None,
) -> client.V1Secret:
"""The secret containing the configuration for the rclone csi driver."""
return client.V1Secret(
metadata=client.V1ObjectMeta(
name=base_name,
namespace=namespace,
annotations=annotations,
labels={"name": base_name} | (labels or {}),
),
string_data={
"remote": self.name or base_name,
"remotePath": self.source_path,
"configData": self.config_string(self.name or base_name),
},
)

def get_manifest_patch(
self, base_name: str, namespace: str, labels: dict = {}, annotations: dict = {}
self,
base_name: str,
namespace: str,
labels: dict[str, str] | None = None,
annotations: dict[str, str] | None = None,
) -> list[dict[str, Any]]:
"""Get server manifest patch."""
patches = []
Expand All @@ -104,57 +174,22 @@ def get_manifest_patch(
{
"op": "add",
"path": f"/{base_name}-pv",
"value": {
"apiVersion": "v1",
"kind": "PersistentVolumeClaim",
"metadata": {
"name": base_name,
"labels": {"name": base_name},
},
"spec": {
"accessModes": ["ReadOnlyMany" if self.readonly else "ReadWriteMany"],
"resources": {"requests": {"storage": "10Gi"}},
"storageClassName": self.config.cloud_storage.storage_class,
},
},
"value": _sanitize_for_serialization(self.pvc(base_name, namespace, labels, annotations)),
},
{
"op": "add",
"path": f"/{base_name}-secret",
"value": {
"apiVersion": "v1",
"kind": "Secret",
"metadata": {
"name": base_name,
"labels": {"name": base_name},
},
"type": "Opaque",
"stringData": {
"remote": self.name or base_name,
"remotePath": self.source_path,
"configData": self.config_string(self.name or base_name),
},
},
"value": _sanitize_for_serialization(self.secret(base_name, namespace, labels, annotations)),
},
{
"op": "add",
"path": "/statefulset/spec/template/spec/containers/0/volumeMounts/-",
"value": {
"mountPath": self.mount_folder,
"name": base_name,
"readOnly": self.readonly,
},
"value": _sanitize_for_serialization(self.volume_mount(base_name)),
},
{
"op": "add",
"path": "/statefulset/spec/template/spec/volumes/-",
"value": {
"name": base_name,
"persistentVolumeClaim": {
"claimName": base_name,
"readOnly": self.readonly,
},
},
"value": _sanitize_for_serialization(self.volume(base_name)),
},
],
}
Expand Down
Loading

0 comments on commit ae8096f

Please sign in to comment.