Skip to content

Commit

Permalink
feat: add incremental backups and logic to the backend (#46)
Browse files Browse the repository at this point in the history
* feat: add incremental backups and logic to the backend

* add fields to the frontend for incremental scheduling

* seems to be working
  • Loading branch information
fuziontech authored Sep 13, 2023
1 parent 42b02db commit a99db4a
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 20 deletions.
18 changes: 16 additions & 2 deletions frontend/src/pages/Backups/ScheduledBackups.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ interface ScheduleRow {
last_run_time: string
cluster: string
schedule: string
incremental_schedule: string
table: string
database: string
bucket: string
Expand All @@ -26,6 +27,7 @@ interface Backups {
type FieldType = {
cluster?: string
schedule?: string
incremental_schedule?: string
database?: string
table?: string
bucket?: string
Expand Down Expand Up @@ -130,6 +132,7 @@ export default function ScheduledBackups() {
},
{ title: 'Cluster', dataIndex: 'cluster' },
{ title: 'Schedule', dataIndex: 'schedule' },
{ title: 'Incremental Schedule', dataIndex: 'incremental_schedule' },
{ title: 'Last Run Time', dataIndex: 'last_run_time' },
{ title: 'Database', dataIndex: 'database' },
{ title: 'Table', dataIndex: 'table' },
Expand Down Expand Up @@ -207,6 +210,17 @@ export default function ScheduledBackups() {
<Input />
</Form.Item>

<Form.Item<FieldType>
label="Incremental Schedule"
name="incremental_schedule"
initialValue="0 0 * * *"
rules={[
{ required: true, message: 'Please provide a cron schedule for the incremental backup' },
]}
>
<Input />
</Form.Item>

<Form.Item<FieldType>
label="Database"
name="database"
Expand Down Expand Up @@ -247,7 +261,7 @@ export default function ScheduledBackups() {
label="AWS Access Key ID"
name="aws_access_key_id"
initialValue="AKIAIOSFODNN7EXAMPLE"
rules={[{ required: false, message: 'AWS Access Key ID to use for access to the S3 bucket' }]}
rules={[{ required: true, message: 'AWS Access Key ID to use for access to the S3 bucket' }]}
>
<Input />
</Form.Item>
Expand All @@ -256,7 +270,7 @@ export default function ScheduledBackups() {
label="AWS Secret Access Key"
name="aws_secret_access_key"
initialValue="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
rules={[{ required: false, message: 'AWS Secret Access Key used to access S3 bucket' }]}
rules={[{ required: true, message: 'AWS Secret Access Key used to access S3 bucket' }]}
>
<Input />
</Form.Item>
Expand Down
2 changes: 2 additions & 0 deletions housewatch/api/backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class Meta:
def validate(self, data):
if data.get("schedule") and not croniter.is_valid(data["schedule"]):
raise serializers.ValidationError(f"Invalid cron expression: {e}")
if data.get("incremental_schedule") and not croniter.is_valid(data["incremental_schedule"]):
raise serializers.ValidationError(f"Invalid cron expression: {e}")
return data


Expand Down
22 changes: 19 additions & 3 deletions housewatch/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ def setup_periodic_tasks(sender: Celery, **kwargs):


@app.task(track_started=True, ignore_result=False, max_retries=0)
def run_backup(backup_id: str):
def run_backup(backup_id: str, incremental: bool = False):
from housewatch.clickhouse import backups

backups.run_backup(backup_id)
logger.info("Running backup", backup_id=backup_id, incremental=incremental)

backups.run_backup(backup_id, incremental=incremental)


@app.task(track_started=True, ignore_result=False, max_retries=0)
Expand All @@ -52,11 +54,25 @@ def schedule_backups():
nr = croniter(backup.schedule, lrt).get_next(datetime)
if nr.tzinfo is None:
nr = timezone.make_aware(nr)
logger.info("Checking backup", backup_id=backup.id, next_run=nr, now=now)

nir = None
if backup.incremental_schedule is not None:
lirt = backup.last_incremental_run_time
if lirt is None:
lirt = backup.created_at
nir = croniter(backup.incremental_schedule, lirt).get_next(datetime)
if nir.tzinfo is None:
nir = timezone.make_aware(nir)

logger.info("Checking backup", backup_id=backup.id, next_run=nr, next_incremental_run=nir, now=now)
if nr < now:
run_backup.delay(backup.id)
backup.last_run_time = now
backup.save()
elif backup.incremental_schedule is not None and nir < now:
run_backup.delay(backup.id, incremental=True)
backup.last_incremental_run_time = now
backup.save()


@app.task(track_started=True, ignore_result=False, max_retries=0)
Expand Down
113 changes: 98 additions & 15 deletions housewatch/clickhouse/backups.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,65 @@
import structlog
from collections import defaultdict
from datetime import datetime
from housewatch.clickhouse.client import run_query, run_query_on_shards
from typing import Dict, Optional
from uuid import uuid4
from housewatch.clickhouse.client import run_query
from housewatch.models.backup import ScheduledBackup, ScheduledBackupRun
from housewatch.clickhouse.clusters import get_node_per_shard

from django.conf import settings
from django.utils import timezone

from clickhouse_driver import Client

logger = structlog.get_logger(__name__)


def execute_backup_on_shards(
query: str,
params: Dict[str, str | int] = {},
query_settings: Dict[str, str | int] = {},
query_id: Optional[str] = None,
substitute_params: bool = True,
cluster: Optional[str] = None,
aws_key: Optional[str] = None,
aws_secret: Optional[str] = None,
base_backup: Optional[str] = None,
):
"""
This function will execute a backup on each shard in a cluster
This is very similar to run_query_on_shards but it has very specific things for backups
specifically around base_backup settings
"""
nodes = get_node_per_shard(cluster)
responses = []
for shard, node in nodes:
params["shard"] = shard
if base_backup:
query_settings["base_backup"] = f"S3('{base_backup}/{shard}', '{aws_key}', '{aws_secret}')"
final_query = query % (params or {}) if substitute_params else query
client = Client(
host=node["host_address"],
database=settings.CLICKHOUSE_DATABASE,
user=settings.CLICKHOUSE_USER,
secure=settings.CLICKHOUSE_SECURE,
ca_certs=settings.CLICKHOUSE_CA,
verify=settings.CLICKHOUSE_VERIFY,
settings={"max_result_rows": "2000"},
send_receive_timeout=30,
password=settings.CLICKHOUSE_PASSWORD,
)
result = client.execute(final_query, settings=query_settings, with_column_types=True, query_id=query_id)
response = []
for res in result[0]:
item = {}
for index, key in enumerate(result[1]):
item[key[0]] = res[index]
response.append(item)
responses.append((shard, response))
return response


def get_backups(cluster=None):
if cluster:
QUERY = """SELECT id, name, status, error, start_time, end_time, num_files, formatReadableSize(total_size) total_size, num_entries, uncompressed_size, compressed_size, files_read, bytes_read FROM clusterAllReplicas(%(cluster)s, system.backups) ORDER BY start_time DESC"""
Expand All @@ -28,15 +78,16 @@ def get_backup(backup, cluster=None):
return run_query(QUERY, {"uuid": backup}, use_cache=False)


def create_table_backup(database, table, bucket, path, cluster=None, aws_key=None, aws_secret=None):
def create_table_backup(database, table, bucket, path, cluster=None, aws_key=None, aws_secret=None, base_backup=None):
if aws_key is None or aws_secret is None:
aws_key = settings.AWS_ACCESS_KEY_ID
aws_secret = settings.AWS_SECRET_ACCESS_KEY
query_settings = {}
if cluster:
QUERY = """BACKUP TABLE %(database)s.%(table)s
TO S3('https://%(bucket)s.s3.amazonaws.com/%(path)s/%(shard)s', '%(aws_key)s', '%(aws_secret)s')
ASYNC"""
return run_query_on_shards(
return execute_backup_on_shards(
QUERY,
{
"database": database,
Expand All @@ -46,11 +97,17 @@ def create_table_backup(database, table, bucket, path, cluster=None, aws_key=Non
"aws_key": aws_key,
"aws_secret": aws_secret,
},
query_settings=query_settings,
cluster=cluster,
aws_key=aws_key,
aws_secret=aws_secret,
base_backup=base_backup,
)
QUERY = """BACKUP TABLE %(database)s.%(table)s
TO S3('https://%(bucket)s.s3.amazonaws.com/%(path)s', '%(aws_key)s', '%(aws_secret)s')
ASYNC"""
if base_backup:
query_settings["base_backup"] = f"S3('{base_backup}', '{aws_key}', '{aws_secret}')"
return run_query(
QUERY,
{
Expand All @@ -61,19 +118,22 @@ def create_table_backup(database, table, bucket, path, cluster=None, aws_key=Non
"aws_key": aws_key,
"aws_secret": aws_secret,
},
query_settings=query_settings,
use_cache=False,
)


def create_database_backup(database, bucket, path, cluster=None, aws_key=None, aws_secret=None):
def create_database_backup(database, bucket, path, cluster=None, aws_key=None, aws_secret=None, base_backup=None):
if aws_key is None or aws_secret is None:
aws_key = settings.AWS_ACCESS_KEY_ID
aws_secret = settings.AWS_SECRET_ACCESS_KEY
query_settings = {}
if cluster:
QUERY = """BACKUP DATABASE %(database)s
TO S3('https://%(bucket)s.s3.amazonaws.com/%(path)s/%(shard)s', '%(aws_key)s', '%(aws_secret)s')
ASYNC"""
return run_query_on_shards(

return execute_backup_on_shards(
QUERY,
{
"database": database,
Expand All @@ -82,11 +142,17 @@ def create_database_backup(database, bucket, path, cluster=None, aws_key=None, a
"aws_key": aws_key,
"aws_secret": aws_secret,
},
query_settings=query_settings,
cluster=cluster,
aws_key=aws_key,
aws_secret=aws_secret,
base_backup=base_backup,
)
QUERY = """BACKUP DATABASE %(database)s
QUERY = """BACKUP DATABASE %(database)s
TO S3('https://%(bucket)s.s3.amazonaws.com/%(path)s', '%(aws_key)s', '%(aws_secret)s')
ASYNC"""
if base_backup:
query_settings["base_backup"] = f"S3('{base_backup}', '{aws_key}', '{aws_secret}')"
return run_query(
QUERY,
{
Expand All @@ -96,39 +162,56 @@ def create_database_backup(database, bucket, path, cluster=None, aws_key=None, a
"aws_key": aws_key,
"aws_secret": aws_secret,
},
query_settings=query_settings,
use_cache=False,
)


def run_backup(backup_id):
def run_backup(backup_id, incremental=False):
backup = ScheduledBackup.objects.get(id=backup_id)
now = timezone.now()
path = backup.path + "/" + now.isoformat()
base_backup = None
S3_LOCATION = f"https://{backup.bucket}.s3.amazonaws.com/{path}"
if incremental:
if not backup.last_run:
logger.info("Cannot run incremental backup without a base backup")
base_backup = backup.last_base_backup
if backup.is_database_backup():
uuid = create_database_backup(
create_database_backup(
backup.database,
backup.bucket,
path,
backup.cluster,
backup.aws_access_key_id,
backup.aws_secret_access_key,
)[0]["id"]
base_backup=base_backup,
)
elif backup.is_table_backup():
uuid = create_table_backup(
create_table_backup(
backup.database,
backup.table,
backup.bucket,
path,
backup.cluster,
backup.aws_access_key_id,
backup.aws_secret_access_key,
)[0]["id"]
br = ScheduledBackupRun.objects.create(scheduled_backup=backup, id=uuid, started_at=now)
base_backup=base_backup,
)
uuid = str(uuid4())
br = ScheduledBackupRun.objects.create(
scheduled_backup=backup, id=uuid, started_at=now, is_incremental=incremental, base_backup=base_backup
)
br.save()
backup.last_run = br
backup.last_run_time = now
if incremental:
backup.last_incremental_run = br
backup.last_incremental_run_time = now
else:
backup.last_run = br
backup.last_run_time = now
backup.last_base_backup = S3_LOCATION
backup.save()
return uuid
return


def restore_backup(backup):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Generated by Django 4.1.1 on 2023-09-12 02:19

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('housewatch', '0009_scheduledbackup_cluster_alter_scheduledbackup_id'),
]

operations = [
migrations.AddField(
model_name='scheduledbackup',
name='incremental_schedule',
field=models.CharField(max_length=255, null=True),
),
migrations.AddField(
model_name='scheduledbackup',
name='last_base_backup',
field=models.CharField(max_length=255, null=True),
),
migrations.AddField(
model_name='scheduledbackup',
name='last_incremental_run_time',
field=models.DateTimeField(null=True),
),
migrations.AddField(
model_name='scheduledbackuprun',
name='base_backup',
field=models.CharField(max_length=255, null=True),
),
migrations.AddField(
model_name='scheduledbackuprun',
name='is_incremental',
field=models.BooleanField(default=False),
),
]
7 changes: 7 additions & 0 deletions housewatch/models/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ class ScheduledBackup(models.Model):
created_at: models.DateTimeField = models.DateTimeField(auto_now_add=True)
enabled: models.BooleanField = models.BooleanField(default=False)
last_run_time: models.DateTimeField = models.DateTimeField(null=True)
last_incremental_run_time: models.DateTimeField = models.DateTimeField(null=True)
last_base_backup: models.CharField = models.CharField(max_length=255, null=True)
last_run: models.ForeignKey = models.ForeignKey("ScheduledBackupRun", on_delete=models.SET_NULL, null=True)

# This will be a CRON expression for the job
schedule: models.CharField = models.CharField(max_length=255)
incremental_schedule: models.CharField = models.CharField(max_length=255, null=True)
table: models.CharField = models.CharField(max_length=255, null=True)
database: models.CharField = models.CharField(max_length=255)
cluster: models.CharField = models.CharField(max_length=255, null=True)
Expand Down Expand Up @@ -51,12 +54,16 @@ def is_table_backup(self):
def save(self, *args, **kwargs):
if not croniter.is_valid(self.schedule):
raise ValueError("Invalid CRON expression")
if self.incremental_schedule and not croniter.is_valid(self.incremental_schedule):
raise ValueError("Invalid CRON expression")
super().save(*args, **kwargs)


class ScheduledBackupRun(models.Model):
id: models.UUIDField = models.UUIDField(primary_key=True)
created_at: models.DateTimeField = models.DateTimeField(auto_now_add=True)
base_backup: models.CharField = models.CharField(max_length=255, null=True)
is_incremental: models.BooleanField = models.BooleanField(default=False)
scheduled_backup: models.ForeignKey = models.ForeignKey(ScheduledBackup, on_delete=models.CASCADE)
started_at: models.DateTimeField = models.DateTimeField(auto_now_add=True)
finished_at: models.DateTimeField = models.DateTimeField(null=True)
Expand Down

0 comments on commit a99db4a

Please sign in to comment.