Skip to content

Commit

Permalink
add support for cdn nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
OriHoch committed Feb 25, 2024
1 parent 07836ce commit a612a7b
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 22 deletions.
12 changes: 12 additions & 0 deletions cwm_worker_operator/cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import sys
import importlib
import subprocess

import click


Expand Down Expand Up @@ -187,5 +189,15 @@ def start_minio_auth_server_devel():
uvicorn.run('cwm_worker_operator.minio_auth_plugin.app:app', host='0.0.0.0', port=5000, reload=True)


@main.command(short_help="Start consecutive daemon run once commands")
@click.argument('DAEMON_NAME', nargs=-1)
def multi_run_once(daemon_name):
for name in daemon_name:
print(f'Running {name} --run-once')
subprocess.check_call([
'cwm-worker-operator', name, 'start_daemon', '--once'
])


if __name__ == '__main__':
main()
46 changes: 32 additions & 14 deletions cwm_worker_operator/deployments_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,9 @@ def delete(self, namespace_name, deployment_type, delete_data=False, **kwargs):
minio_admin = get_minio_admin()
failed = False
for cmd, args, kwargs in [
('user_remove', [namespace_name], {}),
('policy_unset', [namespace_name], {'user': namespace_name}),
('policy_remove', [namespace_name], {}),
('user_remove', [namespace_name], {}),
]:
try:
getattr(minio_admin, cmd)(*args, **kwargs)
Expand Down Expand Up @@ -328,16 +328,21 @@ def iterate_cluster_nodes(self):
assert ret == 0, out
node = json.loads(out)
is_worker = False
is_cdn = False
for taint in node.get('spec', {}).get('taints', []):
if taint.get('key') == 'cwmc-role'and taint.get('value') == 'worker':
is_worker = True
if taint.get('key') == 'cwmc-role':
if taint.get('value') == 'worker':
is_worker = True
elif taint.get('value') == 'cdn':
is_cdn = True
break
unschedulable = bool(node.get('spec', {}).get('unschedulable'))
public_ip = node.get('status', {}).get('addresses', [{}])[0].get('address', '')
labels = node.get('metadata', {}).get('labels', {})
yield {
'name': node_name,
'is_worker': is_worker,
'is_cdn': is_cdn,
'unschedulable': unschedulable,
'public_ip': public_ip,
'cleaner_cordoned': labels.get(NODE_CLEANER_CORDON_LABEL) == 'yes'
Expand Down Expand Up @@ -376,12 +381,19 @@ def iterate_dns_healthchecks(self):
healthcheck_name = tag['Value']
break
healthcheck_ip = healthcheck.get('HealthCheckConfig', {}).get('IPAddress') or ''
if healthcheck_name and healthcheck_name.startswith(config.DNS_RECORDS_PREFIX + ":"):
yield {
'id': healthcheck_id,
'node_name': healthcheck_name.replace(config.DNS_RECORDS_PREFIX + ":", ""),
'ip': healthcheck_ip
}
if healthcheck_name:
if healthcheck_name.startswith(config.DNS_RECORDS_PREFIX + ":"):
yield {
'id': healthcheck_id,
'node_name': healthcheck_name.replace(config.DNS_RECORDS_PREFIX + ":", ""),
'ip': healthcheck_ip
}
elif healthcheck_name.startswith('minio-' + config.DNS_RECORDS_PREFIX + ":"):
yield {
'id': healthcheck_id,
'node_name': healthcheck_name.replace('minio-' + config.DNS_RECORDS_PREFIX + ":", ""),
'ip': healthcheck_ip
}
if res['IsTruncated']:
next_marker = res['NextMarker']
else:
Expand All @@ -398,7 +410,10 @@ def iterate_dns_records(self):
**({'StartRecordType': next_record_type} if next_record_type is not None else {}),
)
for record in res.get('ResourceRecordSets', []):
if record['Type'] == 'A' and record['Name'] == '{}.{}.'.format(config.DNS_RECORDS_PREFIX, config.AWS_ROUTE53_HOSTEDZONE_DOMAIN):
if record['Type'] == 'A' and (
record['Name'] == '{}.{}.'.format(config.DNS_RECORDS_PREFIX, config.AWS_ROUTE53_HOSTEDZONE_DOMAIN)
or record['Name'] == 'minio-{}.{}.'.format(config.DNS_RECORDS_PREFIX, config.AWS_ROUTE53_HOSTEDZONE_DOMAIN)
):
if record['SetIdentifier'].startswith(config.DNS_RECORDS_PREFIX+':'):
record_ip = ''
for value in record.get('ResourceRecords', []):
Expand All @@ -423,7 +438,7 @@ def set_dns_healthcheck(self, node_name, node_ip):
CallerReference=caller_reference,
HealthCheckConfig={
"IPAddress": node_ip,
"Port": 12808,
"Port": 80,
"Type": "HTTP",
"ResourcePath": "/healthz",
"RequestInterval": 30, # according to AWS docs, when using the recommended regions, it actually does a healthcheck every 2-3 seconds
Expand All @@ -440,17 +455,20 @@ def set_dns_healthcheck(self, node_name, node_ip):
)
return healthcheck_id

def set_dns_record(self, node_name, node_ip, healthcheck_id):
def set_dns_record(self, node_name, node_ip, healthcheck_id, node_type):
client = boto3.client('route53')
prefix = ''
if node_type == 'worker':
prefix = 'minio-'
client.change_resource_record_sets(
HostedZoneId=config.AWS_ROUTE53_HOSTEDZONE_ID,
ChangeBatch={
"Comment": "cwm-worker-operator deployments_manager.set_dns_record({},{})".format(node_name, node_ip),
"Comment": "cwm-worker-operator deployments_manager.set_dns_record({}{},{})".format(prefix, node_name, node_ip),
"Changes": [
{
"Action": "CREATE",
"ResourceRecordSet": {
"Name": '{}.{}.'.format(config.DNS_RECORDS_PREFIX, config.AWS_ROUTE53_HOSTEDZONE_DOMAIN),
"Name": '{}{}.{}.'.format(prefix, config.DNS_RECORDS_PREFIX, config.AWS_ROUTE53_HOSTEDZONE_DOMAIN),
'Type': 'A',
'SetIdentifier': config.DNS_RECORDS_PREFIX + ':' + node_name,
'MultiValueAnswer': True,
Expand Down
20 changes: 12 additions & 8 deletions cwm_worker_operator/nodes_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,25 @@

def set_node_healthy_keys(domains_config, deployments_manager):
healthy_node_name_ips = {}
node_name_types = {}
# set healthy redis key of healthy nodes - so that ingress will report them as healthy
for node in deployments_manager.iterate_cluster_nodes():
if node['is_worker']:
node_name_types[node['name']] = 'worker'
elif node['is_cdn']:
node_name_types[node['name']] = 'cdn'
if node['name'] in node_name_types:
healthy_node_name_ips[node['name']] = node['public_ip']
domains_config.set_node_healthy(node['name'], True)
# del healthy redis key for nodes which have a key but are not in list of healthy nodes
for node_name in domains_config.iterate_healthy_node_names():
if node_name not in healthy_node_name_ips:
domains_config.set_node_healthy(node_name, False)
return healthy_node_name_ips
return healthy_node_name_ips, node_name_types


def update_dns_records(deployments_manager, healthy_node_name_ips):
logs.debug('update_dns_records', healthy_node_name_ips=healthy_node_name_ips, debug_verbosity=10)
def update_dns_records(deployments_manager, healthy_node_name_ips, node_name_types):
logs.debug('update_dns_records', healthy_node_name_ips=healthy_node_name_ips, node_name_types=node_name_types, debug_verbosity=10)
dns_healthchecks = {}
dns_records = {}
# collect node names of existing dns healthchecks / records
Expand All @@ -43,8 +48,7 @@ def update_dns_records(deployments_manager, healthy_node_name_ips):
for dns_healthcheck in deployments_manager.iterate_dns_healthchecks():
logs.debug('dns_healthcheck', dns_healthcheck=dns_healthcheck, debug_verbosity=10)
dns_healthchecks[dns_healthcheck['node_name']] = {'id': dns_healthcheck['id'], 'ip': dns_healthcheck['ip']}
if dns_healthcheck['node_name'] in healthy_node_name_ips and dns_healthcheck['ip'] != healthy_node_name_ips[
dns_healthcheck['node_name']]:
if dns_healthcheck['node_name'] in healthy_node_name_ips and dns_healthcheck['ip'] != healthy_node_name_ips[dns_healthcheck['node_name']]:
deployments_manager.delete_dns_healthcheck(dns_healthcheck['id'])
update_required_healthcheck_node_names.add(dns_healthcheck['node_name'])
for dns_record in deployments_manager.iterate_dns_records():
Expand All @@ -65,7 +69,7 @@ def update_dns_records(deployments_manager, healthy_node_name_ips):
else:
healthcheck_id = dns_healthchecks[node_name]['id']
if node_name not in dns_records or node_name in update_required_records_node_names:
deployments_manager.set_dns_record(node_name, node_ip, healthcheck_id)
deployments_manager.set_dns_record(node_name, node_ip, healthcheck_id, node_name_types[node_name])
return dns_healthchecks, dns_records


Expand All @@ -81,8 +85,8 @@ def delete_dns_records(deployments_manager, healthy_node_name_ips, dns_healthche


def run_single_iteration(domains_config, deployments_manager, **_):
healthy_node_name_ips = set_node_healthy_keys(domains_config, deployments_manager)
dns_healthchecks, dns_records = update_dns_records(deployments_manager, healthy_node_name_ips)
healthy_node_name_ips, node_name_types = set_node_healthy_keys(domains_config, deployments_manager)
dns_healthchecks, dns_records = update_dns_records(deployments_manager, healthy_node_name_ips, node_name_types)
delete_dns_records(deployments_manager, healthy_node_name_ips, dns_healthchecks, dns_records)


Expand Down
1 change: 1 addition & 0 deletions cwm_worker_operator/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def get_instances_updates(domains_config: DomainsConfig, cwm_api_manager: CwmApi
last_update = None
instances_updates = {}
for update in cwm_api_manager.get_cwm_updates(from_datetime):
logs.debug(f'get_instances_updates: update={update}', debug_verbosity=10)
if last_update is None or last_update < update['update_time']:
last_update = update['update_time']
if update['worker_id'] not in instances_updates:
Expand Down

0 comments on commit a612a7b

Please sign in to comment.