-
-
Notifications
You must be signed in to change notification settings - Fork 62
/
abstract.py
108 lines (83 loc) · 2.76 KB
/
abstract.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import enum
import pendulum
from typing import Dict, List, NamedTuple, Any
from ..context import Context
import pykube.objects
@enum.unique
class SnapshotStatus(enum.Enum):
PENDING = 'snapshot.pending'
COMPLETE = 'snapshot.complete'
# It's up to a backend to decide how a disk should be identified.
# However, it does need to be something that is hashable, ideally
# a tuple.
DiskIdentifier = Any
class Snapshot(NamedTuple):
"""
Identifies an existing snapshot.
"""
name: str
created_at: pendulum.Pendulum
# A disk id that is known to Kubernetes.
disk: DiskIdentifier
# Snapshot creation is a multi-step process. This is an arbitrary value that
# a Cloud provider backend can return to refer to the snapshot within the
# cloud as it's being created. This is distinct from :class:`Snapshot`, which
# represents a completed snapshot.
NewSnapshotIdentifier = Any
def get_disk_identifier(volume: pykube.objects.PersistentVolume) -> DiskIdentifier:
"""Return a DiskIdentifier from a PersistentVolume."""
raise NotImplementedError()
def supports_volume(volume: pykube.objects.PersistentVolume):
"""Return either the given persistent volume is supported by
the backend."""
raise NotImplementedError()
def validate_disk_identifier(disk_id: Dict) -> DiskIdentifier:
"""Should take the user-specified dictionary, and convert it to
it's own, local `DiskIdentifier`. If the disk_id is not valid,
it should raise a `ValueError` with a suitable error message.
"""
raise NotImplementedError()
def load_snapshots(ctx: Context, label_filters: Dict[str, str]) -> List[Snapshot]:
"""
Return the existing snapshots. Important!! This function must filter
the list of returned snapshots by ``label_filters``. This is because
usually cloud providers make filtering part of their API.
"""
raise NotImplementedError()
def create_snapshot(
ctx: Context,
disk: DiskIdentifier,
snapshot_name: str,
snapshot_description: str
) -> NewSnapshotIdentifier:
"""
Create a snapshot for the given disk.
This operation is expected to be asynchronous, so the value you return
will identify the snapshot for the next call.
"""
raise NotImplementedError()
def get_snapshot_status(
ctx: Context,
snapshot_identifier: NewSnapshotIdentifier
) -> SnapshotStatus:
"""
Should return the current status of the snapshot.
"""
raise NotImplementedError()
def set_snapshot_labels(
ctx: Context,
snapshot_identifier: NewSnapshotIdentifier,
labels: Dict
):
"""
Set labels on the snapshot.
"""
raise NotImplementedError()
def delete_snapshot(
ctx: Context,
snapshot: Snapshot
):
"""
Delete the snapshot given.
"""
raise NotImplementedError()