From ae8096fce60e218c346f40fae35313980ba7fab9 Mon Sep 17 00:00:00 2001 From: Tasko Olevski Date: Wed, 4 Sep 2024 23:39:01 +0200 Subject: [PATCH] feat: add cloud storage via rclone squashme: use the amalthea session cache --- bases/renku_data_services/data_api/app.py | 1 + .../api/classes/cloud_storage/__init__.py | 8 +- .../notebooks/api/classes/k8s_client.py | 7 +- .../notebooks/api/schemas/cloud_storage.py | 121 +++++++++++------- .../notebooks/blueprints.py | 73 +++++++++-- .../notebooks/cr_amalthea_session.py | 2 +- .../renku_data_services/notebooks/crs.py | 2 + components/renku_data_services/storage/db.py | 6 +- poetry.lock | 6 +- projects/renku_data_service/poetry.lock | 6 +- 10 files changed, 160 insertions(+), 72 deletions(-) diff --git a/bases/renku_data_services/data_api/app.py b/bases/renku_data_services/data_api/app.py index aa3cfd116..a51de6989 100644 --- a/bases/renku_data_services/data_api/app.py +++ b/bases/renku_data_services/data_api/app.py @@ -142,6 +142,7 @@ def register_all_handlers(app: Sanic, config: Config) -> Sanic: nb_config=config.nb_config, internal_gitlab_authenticator=config.gitlab_authenticator, git_repo=config.git_repositories_repo, + rp_repo=config.rp_repo, ) notebooks_new = NotebooksNewBP( name="notebooks", diff --git a/components/renku_data_services/notebooks/api/classes/cloud_storage/__init__.py b/components/renku_data_services/notebooks/api/classes/cloud_storage/__init__.py index 015653284..a66b2728d 100644 --- a/components/renku_data_services/notebooks/api/classes/cloud_storage/__init__.py +++ b/components/renku_data_services/notebooks/api/classes/cloud_storage/__init__.py @@ -6,17 +6,15 @@ class ICloudStorageRequest(Protocol): """The abstract class for cloud storage.""" - exists: bool mount_folder: str - source_folder: str - bucket: str + source_path: str def get_manifest_patch( self, base_name: str, namespace: str, - labels: dict[str, str] = {}, - annotations: dict[str, str] = {}, + labels: dict[str, str] | None = None, + annotations: dict[str, str] | None = None, ) -> list[dict[str, Any]]: """The patches applied to a jupyter server to insert the storage in the session.""" ... diff --git a/components/renku_data_services/notebooks/api/classes/k8s_client.py b/components/renku_data_services/notebooks/api/classes/k8s_client.py index 822e301e8..b99839657 100644 --- a/components/renku_data_services/notebooks/api/classes/k8s_client.py +++ b/components/renku_data_services/notebooks/api/classes/k8s_client.py @@ -351,10 +351,13 @@ def __init__(self, url: str, server_type: type[_SessionType]): self.url = url self.client = httpx.AsyncClient() self.server_type: type[_SessionType] = server_type + self.url_path_name = "servers" + if server_type == AmaltheaSessionV1Alpha1: + self.url_path_name = "sessions" async def list_servers(self, safe_username: str) -> list[_SessionType]: """List the jupyter servers.""" - url = urljoin(self.url, f"/users/{safe_username}/servers") + url = urljoin(self.url, f"/users/{safe_username}/{self.url_path_name}") try: res = await self.client.get(url, timeout=10) except httpx.RequestError as err: @@ -372,7 +375,7 @@ async def list_servers(self, safe_username: str) -> list[_SessionType]: async def get_server(self, name: str) -> _SessionType | None: """Get a specific jupyter server.""" - url = urljoin(self.url, f"/servers/{name}") + url = urljoin(self.url, f"/{self.url_path_name}/{name}") try: res = await self.client.get(url, timeout=10) except httpx.RequestError as err: diff --git a/components/renku_data_services/notebooks/api/schemas/cloud_storage.py b/components/renku_data_services/notebooks/api/schemas/cloud_storage.py index 05b141c3c..5b848f8fa 100644 --- a/components/renku_data_services/notebooks/api/schemas/cloud_storage.py +++ b/components/renku_data_services/notebooks/api/schemas/cloud_storage.py @@ -2,15 +2,18 @@ from configparser import ConfigParser from io import StringIO -from pathlib import Path -from typing import Any, Optional, Self +from pathlib import PurePosixPath +from typing import Any, Final, Optional, Self +from kubernetes import client from marshmallow import EXCLUDE, Schema, ValidationError, fields, validates_schema from renku_data_services.base_models import APIUser from renku_data_services.notebooks.api.classes.cloud_storage import ICloudStorageRequest from renku_data_services.notebooks.config import _NotebooksConfig +_sanitize_for_serialization = client.ApiClient().sanitize_for_serialization + class RCloneStorageRequest(Schema): """Request for RClone based storage.""" @@ -36,6 +39,8 @@ def validate_storage(self, data: dict, **kwargs: dict) -> None: class RCloneStorage(ICloudStorageRequest): """RClone based storage.""" + pvc_secret_annotation_name: Final[str] = "csi-rclone.dev/secretName" + def __init__( self, source_path: str, @@ -60,7 +65,7 @@ async def storage_from_schema( user: APIUser, internal_gitlab_user: APIUser, project_id: int, - work_dir: Path, + work_dir: PurePosixPath, config: _NotebooksConfig, ) -> Self: """Create storage object from request.""" @@ -92,8 +97,73 @@ async def storage_from_schema( await config.storage_validator.validate_storage_configuration(configuration, source_path) return cls(source_path, configuration, readonly, mount_folder, name, config) + def pvc( + self, + base_name: str, + namespace: str, + labels: dict[str, str] | None = None, + annotations: dict[str, str] | None = None, + ) -> client.V1PersistentVolumeClaim: + """The PVC for mounting cloud storage.""" + return client.V1PersistentVolumeClaim( + metadata=client.V1ObjectMeta( + name=base_name, + namespace=namespace, + annotations={self.pvc_secret_annotation_name: base_name} | (annotations or {}), + labels={"name": base_name} | (labels or {}), + ), + spec=client.V1PersistentVolumeClaimSpec( + access_modes=["ReadOnlyMany" if self.readonly else "ReadWriteMany"], + resources=client.V1VolumeResourceRequirements(requests={"storage": "10Gi"}), + storage_class_name=self.config.cloud_storage.storage_class, + ), + ) + + def volume_mount(self, base_name: str) -> client.V1VolumeMount: + """The volume mount for cloud storage.""" + return client.V1VolumeMount( + mount_path=self.mount_folder, + name=base_name, + read_only=self.readonly, + ) + + def volume(self, base_name: str) -> client.V1Volume: + """The volume entry for the statefulset specification.""" + return client.V1Volume( + name=base_name, + persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource( + claim_name=base_name, read_only=self.readonly + ), + ) + + def secret( + self, + base_name: str, + namespace: str, + labels: dict[str, str] | None = None, + annotations: dict[str, str] | None = None, + ) -> client.V1Secret: + """The secret containing the configuration for the rclone csi driver.""" + return client.V1Secret( + metadata=client.V1ObjectMeta( + name=base_name, + namespace=namespace, + annotations=annotations, + labels={"name": base_name} | (labels or {}), + ), + string_data={ + "remote": self.name or base_name, + "remotePath": self.source_path, + "configData": self.config_string(self.name or base_name), + }, + ) + def get_manifest_patch( - self, base_name: str, namespace: str, labels: dict = {}, annotations: dict = {} + self, + base_name: str, + namespace: str, + labels: dict[str, str] | None = None, + annotations: dict[str, str] | None = None, ) -> list[dict[str, Any]]: """Get server manifest patch.""" patches = [] @@ -104,57 +174,22 @@ def get_manifest_patch( { "op": "add", "path": f"/{base_name}-pv", - "value": { - "apiVersion": "v1", - "kind": "PersistentVolumeClaim", - "metadata": { - "name": base_name, - "labels": {"name": base_name}, - }, - "spec": { - "accessModes": ["ReadOnlyMany" if self.readonly else "ReadWriteMany"], - "resources": {"requests": {"storage": "10Gi"}}, - "storageClassName": self.config.cloud_storage.storage_class, - }, - }, + "value": _sanitize_for_serialization(self.pvc(base_name, namespace, labels, annotations)), }, { "op": "add", "path": f"/{base_name}-secret", - "value": { - "apiVersion": "v1", - "kind": "Secret", - "metadata": { - "name": base_name, - "labels": {"name": base_name}, - }, - "type": "Opaque", - "stringData": { - "remote": self.name or base_name, - "remotePath": self.source_path, - "configData": self.config_string(self.name or base_name), - }, - }, + "value": _sanitize_for_serialization(self.secret(base_name, namespace, labels, annotations)), }, { "op": "add", "path": "/statefulset/spec/template/spec/containers/0/volumeMounts/-", - "value": { - "mountPath": self.mount_folder, - "name": base_name, - "readOnly": self.readonly, - }, + "value": _sanitize_for_serialization(self.volume_mount(base_name)), }, { "op": "add", "path": "/statefulset/spec/template/spec/volumes/-", - "value": { - "name": base_name, - "persistentVolumeClaim": { - "claimName": base_name, - "readOnly": self.readonly, - }, - }, + "value": _sanitize_for_serialization(self.volume(base_name)), }, ], } diff --git a/components/renku_data_services/notebooks/blueprints.py b/components/renku_data_services/notebooks/blueprints.py index 79059e6cf..5d7c3e83d 100644 --- a/components/renku_data_services/notebooks/blueprints.py +++ b/components/renku_data_services/notebooks/blueprints.py @@ -7,7 +7,7 @@ from dataclasses import dataclass from datetime import UTC, datetime from math import floor -from pathlib import Path +from pathlib import PurePosixPath from typing import Any from urllib.parse import urljoin, urlparse @@ -55,6 +55,7 @@ Authentication, AuthenticationType, Culling, + DataSource, ExtraContainer, ExtraVolume, ExtraVolumeMount, @@ -64,7 +65,8 @@ Resources, SecretAsVolume, SecretAsVolumeItem, - SecretRef, + SecretRefKey, + SecretRefWhole, Session, SessionEnvItem, State, @@ -83,6 +85,7 @@ from renku_data_services.project.db import ProjectRepository from renku_data_services.repositories.db import GitRepositoriesRepository from renku_data_services.session.db import SessionRepository +from renku_data_services.storage.db import StorageV2Repository @dataclass(kw_only=True) @@ -93,6 +96,7 @@ class NotebooksBP(CustomBlueprint): nb_config: _NotebooksConfig git_repo: GitRepositoriesRepository internal_gitlab_authenticator: base_models.Authenticator + rp_repo: ResourcePoolRepository def version(self) -> BlueprintFactoryResponse: """Return notebook services version.""" @@ -409,7 +413,7 @@ async def launch_notebook_helper( if lfs_auto_fetch is not None: parsed_server_options.lfs_auto_fetch = lfs_auto_fetch - image_work_dir = image_repo.image_workdir(parsed_image) or Path("/") + image_work_dir = image_repo.image_workdir(parsed_image) or PurePosixPath("/") mount_path = image_work_dir / "work" server_work_dir = mount_path / gl_project_path @@ -424,7 +428,7 @@ async def launch_notebook_helper( cstorage.model_dump(), user=user, project_id=gl_project_id, - work_dir=server_work_dir.absolute(), + work_dir=server_work_dir, config=nb_config, internal_gitlab_user=internal_gitlab_user, ) @@ -768,6 +772,7 @@ class NotebooksNewBP(CustomBlueprint): project_repo: ProjectRepository session_repo: SessionRepository rp_repo: ResourcePoolRepository + storage_repo: StorageV2Repository def start(self) -> BlueprintFactoryResponse: """Start a session with the new operator.""" @@ -798,7 +803,7 @@ async def _handler( parsed_server_options = await self.nb_config.crc_validator.validate_class_storage( user, resource_class_id, body.disk_storage ) - work_dir = Path("/home/jovyan/work") + work_dir = environment.working_directory user_secrets: K8sUserSecrets | None = None # if body.user_secrets: # user_secrets = K8sUserSecrets( @@ -806,9 +811,41 @@ async def _handler( # user_secret_ids=body.user_secrets.user_secret_ids, # mount_path=body.user_secrets.mount_path, # ) - cloud_storage: list[RCloneStorage] = [] + cloud_storages_db = await self.storage_repo.get_storage( + user=user, project_id=project.id, include_secrets=True + ) + cloud_storage: dict[str, RCloneStorage] = { + str(s.storage_id): RCloneStorage( + source_path=s.source_path, + mount_folder=(work_dir / s.target_path).as_posix(), + configuration=s.configuration.model_dump(mode="python"), + readonly=s.readonly, + config=self.nb_config, + name=s.name, + ) + for s in cloud_storages_db + } + cloud_storage_request: dict[str, RCloneStorage] = { + s.storage_id: RCloneStorage( + source_path=s.source_path, + mount_folder=(work_dir / s.target_path).as_posix(), + configuration=s.configuration, + readonly=s.readonly, + config=self.nb_config, + name=None, + ) + for s in body.cloudstorage or [] + if s.storage_id is not None + } + # NOTE: Check the cloud storage in the request body and if any match + # then overwrite the projects cloud storages + # NOTE: Cloud storages in the session launch request body that are not form the DB are ignored + for csr_id, csr in cloud_storage_request.items(): + if csr_id in cloud_storage: + cloud_storage[csr_id] = csr # repositories = [Repository(i.url, branch=i.branch, commit_sha=i.commit_sha) for i in body.repositories] repositories = [Repository(url=i) for i in project.repositories] + secrets_to_create: list[V1Secret] = [] server = Renku2UserServer( user=user, image=image, @@ -818,7 +855,7 @@ async def _handler( server_options=parsed_server_options, environment_variables={}, user_secrets=user_secrets, - cloudstorage=cloud_storage, + cloudstorage=[i for i in cloud_storage.values()], k8s_client=self.nb_config.k8s_v2_client, workspace_mount_path=work_dir, work_dir=work_dir, @@ -828,6 +865,14 @@ async def _handler( is_image_private=False, internal_gitlab_user=internal_gitlab_user, ) + # Generate the cloud starge secrets + data_sources: list[DataSource] = [] + for ics, cs in enumerate(cloud_storage.values()): + secret_name = f"{server_name}-ds-{ics}" + secrets_to_create.append(cs.secret(secret_name, server.k8s_client.preferred_namespace)) + data_sources.append( + DataSource(mountPath=cs.mount_folder, secretRef=SecretRefWhole(name=secret_name, adopt=True)) + ) cert_init, cert_vols = init_containers.certificates_container(self.nb_config) session_init_containers = [InitContainer.model_validate(self.nb_config.k8s_v2_client.sanitize(cert_init))] extra_volumes = [ExtraVolume.model_validate(self.nb_config.k8s_v2_client.sanitize(i)) for i in cert_vols] @@ -861,7 +906,6 @@ async def _handler( metadata=Metadata(name=server_name, annotations=annotations), spec=AmaltheaSessionSpec( codeRepositories=[], - dataSources=[], hibernated=False, session=Session( image=image, @@ -908,13 +952,14 @@ async def _handler( type=AuthenticationType.oauth2proxy if isinstance(user, AuthenticatedAPIUser) else AuthenticationType.token, - secretRef=SecretRef(name=server_name, key="auth", adopt=True), + secretRef=SecretRefKey(name=server_name, key="auth", adopt=True), extraVolumeMounts=[ ExtraVolumeMount(name="renku-authorized-emails", mountPath="/authorized_emails") ] if isinstance(user, AuthenticatedAPIUser) else [], ), + dataSources=data_sources, ), ) parsed_proxy_url = urlparse(urljoin(server.server_url + "/", "oauth2")) @@ -945,12 +990,14 @@ async def _handler( "verbose": True, } ) - secret = V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data) - secret = await self.nb_config.k8s_v2_client.create_secret(secret) + secrets_to_create.append(V1Secret(metadata=V1ObjectMeta(name=server_name), string_data=secret_data)) + for s in secrets_to_create: + await self.nb_config.k8s_v2_client.create_secret(s) try: manifest = await self.nb_config.k8s_v2_client.create_server(manifest, user.id) except Exception: - await self.nb_config.k8s_v2_client.delete_secret(secret.metadata.name) + for s in secrets_to_create: + await self.nb_config.k8s_v2_client.delete_secret(s.metadata.name) raise errors.ProgrammingError(message="Could not start the amalthea session") return json(manifest.as_apispec().model_dump(mode="json", exclude_none=True), 201) @@ -1067,6 +1114,6 @@ async def _handler( query: apispec.SessionsSessionIdLogsGetParametersQuery, ) -> HTTPResponse: logs = await self.nb_config.k8s_v2_client.get_server_logs(session_id, user.id, query.max_lines) - return json(apispec.SessionLogsResponse.model_validate(logs).model_dump_json(exclude_none=True)) + return json(apispec.SessionLogsResponse.model_validate(logs).model_dump(exclude_none=True)) return "/sessions//logs", ["GET"], _handler diff --git a/components/renku_data_services/notebooks/cr_amalthea_session.py b/components/renku_data_services/notebooks/cr_amalthea_session.py index 16aa355e0..a4c2e3fd9 100644 --- a/components/renku_data_services/notebooks/cr_amalthea_session.py +++ b/components/renku_data_services/notebooks/cr_amalthea_session.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: -# timestamp: 2024-09-04T22:45:28+00:00 +# timestamp: 2024-09-04T21:22:45+00:00 from __future__ import annotations diff --git a/components/renku_data_services/notebooks/crs.py b/components/renku_data_services/notebooks/crs.py index 206a32f96..76ea08dc3 100644 --- a/components/renku_data_services/notebooks/crs.py +++ b/components/renku_data_services/notebooks/crs.py @@ -32,6 +32,8 @@ from renku_data_services.notebooks.cr_amalthea_session import Model as _ASModel from renku_data_services.notebooks.cr_amalthea_session import Resources3 as Resources from renku_data_services.notebooks.cr_amalthea_session import Secret1 as SecretAsVolume +from renku_data_services.notebooks.cr_amalthea_session import SecretRef as SecretRefKey +from renku_data_services.notebooks.cr_amalthea_session import SecretRef1 as SecretRefWhole from renku_data_services.notebooks.cr_amalthea_session import Spec as AmaltheaSessionSpec from renku_data_services.notebooks.cr_amalthea_session import Type as AuthenticationType from renku_data_services.notebooks.cr_amalthea_session import Type1 as CodeRepositoryType diff --git a/components/renku_data_services/storage/db.py b/components/renku_data_services/storage/db.py index 583331d5f..7a59ca7b0 100644 --- a/components/renku_data_services/storage/db.py +++ b/components/renku_data_services/storage/db.py @@ -67,7 +67,7 @@ async def get_storage( stmt = select(schemas.CloudStorageORM) if project_id is not None: - stmt = stmt.where(schemas.CloudStorageORM.project_id == project_id) + stmt = stmt.where(schemas.CloudStorageORM.project_id == str(project_id)) if id is not None: stmt = stmt.where(schemas.CloudStorageORM.storage_id == id) if name is not None: @@ -102,7 +102,9 @@ async def get_storage_by_id(self, storage_id: ULID, user: base_models.APIUser) - return storages[0] - async def insert_storage(self, storage: models.CloudStorage, user: base_models.APIUser) -> models.SavedCloudStorage: + async def insert_storage( + self, storage: models.UnsavedCloudStorage, user: base_models.APIUser + ) -> models.SavedCloudStorage: """Insert a new cloud storage entry.""" if not await self.filter_projects_by_access_level(user, [storage.project_id], authz_models.Role.OWNER): raise errors.ForbiddenError(message="User does not have access to this project") diff --git a/poetry.lock b/poetry.lock index a4fd56ed5..e68011d43 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3714,13 +3714,13 @@ pbr = ">=2.0.0,<2.1.0 || >2.1.0" [[package]] name = "tenacity" -version = "8.5.0" +version = "9.0.0" description = "Retry code until it succeeds" optional = false python-versions = ">=3.8" files = [ - {file = "tenacity-8.5.0-py3-none-any.whl", hash = "sha256:b594c2a5945830c267ce6b79a166228323ed52718f30302c1359836112346687"}, - {file = "tenacity-8.5.0.tar.gz", hash = "sha256:8bc6c0c8a09b31e6cad13c47afbed1a567518250a9a171418582ed8d9c20ca78"}, + {file = "tenacity-9.0.0-py3-none-any.whl", hash = "sha256:93de0c98785b27fcf659856aa9f54bfbd399e29969b0621bc7f762bd441b4539"}, + {file = "tenacity-9.0.0.tar.gz", hash = "sha256:807f37ca97d62aa361264d497b0e31e92b8027044942bfa756160d908320d73b"}, ] [package.extras] diff --git a/projects/renku_data_service/poetry.lock b/projects/renku_data_service/poetry.lock index 994b0d5bf..9961a448d 100644 --- a/projects/renku_data_service/poetry.lock +++ b/projects/renku_data_service/poetry.lock @@ -2751,13 +2751,13 @@ sqlcipher = ["sqlcipher3_binary"] [[package]] name = "tenacity" -version = "8.5.0" +version = "9.0.0" description = "Retry code until it succeeds" optional = false python-versions = ">=3.8" files = [ - {file = "tenacity-8.5.0-py3-none-any.whl", hash = "sha256:b594c2a5945830c267ce6b79a166228323ed52718f30302c1359836112346687"}, - {file = "tenacity-8.5.0.tar.gz", hash = "sha256:8bc6c0c8a09b31e6cad13c47afbed1a567518250a9a171418582ed8d9c20ca78"}, + {file = "tenacity-9.0.0-py3-none-any.whl", hash = "sha256:93de0c98785b27fcf659856aa9f54bfbd399e29969b0621bc7f762bd441b4539"}, + {file = "tenacity-9.0.0.tar.gz", hash = "sha256:807f37ca97d62aa361264d497b0e31e92b8027044942bfa756160d908320d73b"}, ] [package.extras]