Skip to content

Commit

Permalink
Changed system update API and added maintenance view with system upda…
Browse files Browse the repository at this point in the history
…te functionality into GUI

**API changes**

- Refactored system_update and system_node_update API views to be asynchronous:
    - system_update creates a mgmt task, which runs the esdc-git-update command
    - system_node_update create an execute task (with implicit log callback), which runs the esdc-git-update command
- Added esdc-service-control script responsible for restarting all Danube Cloud services
- Added ``--force`` and ``--esdc-service-restart``  options into esdc-git-update
- Added locking into esdc-git-update
- Changed temp location for esdc-docs in ctl.sh gendoc command

**GUI changes**

A new GUI (URL) namespace was created - /system, which will provide views for the whole Danube Cloud management system available only to a SuperAdmin.

3 views have been added:

- /system/maintenance -> used for system/node updates
- /system/overview -> a dashboard-like view
- /system/settings -> system settings view
  • Loading branch information
dn0 committed Jan 4, 2018
1 parent 804ec8b commit 411bae3
Show file tree
Hide file tree
Showing 57 changed files with 1,573 additions and 440 deletions.
14 changes: 14 additions & 0 deletions api/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,17 @@ def __init__(self, user_id, dc_id=None, **kwargs):
task_id = task_id_from_string(user_id, dummy=True, dc_id=dc_id, tt=TT_DUMMY, tg=tg)
kwargs['direct'] = True
super(DirectEvent, self).__init__(task_id, **kwargs)


class BroadcastEvent(Event):
"""
Broadcast task event dispatched to socket.io monitor, which then sends the signal to all active users.
"""
def __init__(self, task_id=None, **kwargs):
if not task_id:
dc_id = cq.conf.ERIGONES_DEFAULT_DC # DefaultDc().id
system_user_id = cq.conf.ERIGONES_TASK_USER # 7
task_id = task_id_from_string(system_user_id, dummy=True, dc_id=dc_id, tt=TT_DUMMY, tg=TG_DC_UNBOUND)

kwargs['broadcast'] = True
super(BroadcastEvent, self).__init__(task_id, **kwargs)
12 changes: 12 additions & 0 deletions api/node/sysinfo/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
# noinspection PyProtectedMember
from api.vm.status.tasks import vm_status_all
from api.dns.record.api_views import RecordView
from api.system.node.events import NodeSystemRestarted
from vms.models import Node, DefaultDc, IPAddress
from vms.signals import node_created, node_json_changed, node_json_unchanged
from que.tasks import cq, get_task_logger
from que.utils import TASK_USER, owner_id_from_task_id
from que.mgmt import MgmtCallbackTask
from que.exceptions import TaskException

Expand Down Expand Up @@ -164,4 +166,14 @@ def node_sysinfo_cb(result, task_id, node_uuid=None):
except Exception as e:
logger.exception(e)

# Refresh cached version information + emit event informing about restarted erigonesd:fast
try:
del node.system_version

if owner_id_from_task_id(task_id) == TASK_USER: # internal user ID
NodeSystemRestarted(node, system_version=node.system_version).send()

except Exception as e:
logger.exception(e)

return result
113 changes: 73 additions & 40 deletions api/system/node/api_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,18 @@
from django.utils.six import text_type

from api.api_views import APIView
from api.exceptions import (NodeIsNotOperational, PreconditionRequired, TaskIsAlreadyRunning,
ObjectNotFound, GatewayTimeout)
# noinspection PyProtectedMember
from api.fields import get_boolean_value
from api.exceptions import NodeIsNotOperational, PreconditionRequired, ObjectNotFound, GatewayTimeout
from api.node.utils import get_node, get_nodes
from api.system.messages import LOG_SYSTEM_UPDATE
from api.system.node.serializers import NodeVersionSerializer
from api.system.node.events import NodeUpdateStarted, NodeUpdateFinished
from api.system.service.control import NodeServiceControl
from api.system.update.serializers import UpdateSerializer
from api.system.update.utils import process_update_reply
from api.task.response import SuccessTaskResponse, FailureTaskResponse
from que import TG_DC_UNBOUND, TT_DUMMY, Q_FAST
from que.lock import TaskLock
from que.utils import task_id_from_request, worker_command
from vms.models import DefaultDc
from api.task.response import SuccessTaskResponse, FailureTaskResponse, TaskResponse
from que import Q_FAST, TG_DC_UNBOUND
from que.utils import worker_command
from que.tasks import execute


logger = getLogger(__name__)
Expand All @@ -34,6 +32,9 @@ def __init__(self, request, hostname, data):

if hostname:
self.node = get_node(request, hostname)

if self.data and get_boolean_value(self.data.get('fresh', None)):
del self.node.system_version # Remove cached version information
else:
self.node = get_nodes(request)

Expand Down Expand Up @@ -70,28 +71,37 @@ def get(self):

class NodeUpdateView(APIView):
"""api.system.node.views.system_node_update"""
LOCK = 'system_node_update:%s'
dc_bound = False
_lock_key = 'system_update'

def __init__(self, request, hostname, data):
super(NodeUpdateView, self).__init__(request)
self.hostname = hostname
self.data = data
self.node = get_node(request, hostname)
self.task_id = task_id_from_request(self.request, dummy=True, tt=TT_DUMMY, tg=TG_DC_UNBOUND)

def _update(self, version, key=None, cert=None):
def _update_v2(self, version, key=None, cert=None):
from api.system.update.utils import process_update_reply

node = self.node
worker = node.worker(Q_FAST)
logger.debug('Running node "%s" system update to version: "%s"', node, version)
logger.info('Running oldstyle (v2.x) node "%s" system update to version: "%s"', node, version)
reply = worker_command('system_update', worker, version=version, key=key, cert=cert, timeout=600)

if reply is None:
raise GatewayTimeout('Node worker is not responding')

response_class, result = process_update_reply(reply, node, version)
response = response_class(self.request, result, task_id=self.task_id, obj=node, msg=LOG_SYSTEM_UPDATE,
detail_dict=result, dc_bound=False)
result, error = process_update_reply(reply, node, version)

if error:
response_class = FailureTaskResponse
else:
response_class = SuccessTaskResponse

detail_dict = result.copy()
detail_dict['version'] = version
response = response_class(self.request, result, obj=node, msg=LOG_SYSTEM_UPDATE, dc_bound=False,
detail_dict=detail_dict)

if response.status_code == 200:
# Restart all erigonesd workers
Expand All @@ -102,49 +112,72 @@ def _update(self, version, key=None, cert=None):

return response

@classmethod
def get_task_lock(cls):
# Also used in socket.io namespace
return TaskLock(cls._lock_key, desc='System task')

def put(self):
assert self.request.dc.id == DefaultDc().id
assert self.request.dc.is_default()

ser = UpdateSerializer(self.request, data=self.data)

if not ser.is_valid():
return FailureTaskResponse(self.request, ser.errors, task_id=self.task_id, dc_bound=False)
return FailureTaskResponse(self.request, ser.errors, dc_bound=False)

node = self.node
version = ser.object['version']

version = ser.data['version']
key = ser.data.get('key')
cert = ser.data.get('cert')
del node.system_version # Request latest version in next command
node_version = node.system_version

if not (isinstance(node_version, text_type) and node_version):
raise NodeIsNotOperational('Node version information could not be retrieved')

if version == ('v' + node.system_version):
node_version = node_version.split(':')[-1] # remove edition prefix

if version == ('v' + node_version) and not ser.data.get('force'):
raise PreconditionRequired('Node is already up-to-date')

if node.status != node.OFFLINE:
raise NodeIsNotOperational('Unable to perform update on node that is not in maintenance state!')
raise NodeIsNotOperational('Unable to perform update on node that is not in maintenance state')

lock = self.get_task_lock()
if node_version.startswith('2.'):
# Old-style (pre 3.0) update mechanism
return self._update_v2(version, key=key, cert=cert)

if not lock.acquire(self.task_id, timeout=7200, save_reverse=False):
raise TaskIsAlreadyRunning
# Upload key and cert and get command array
worker = node.worker(Q_FAST)
update_cmd = worker_command('system_update_command', worker, version=version, key=key, cert=cert,
force=ser.data.get('force'), timeout=10)

try:
# Emit event into socket.io
NodeUpdateStarted(self.task_id, request=self.request).send()
if update_cmd is None:
raise GatewayTimeout('Node worker is not responding')

return self._update(version, key=ser.object.get('key'), cert=ser.object.get('cert'))
finally:
lock.delete(fail_silently=True, delete_reverse=False)
# Delete cached node version information (will be cached again during next node.system_version call)
del node.system_version
# Emit event into socket.io
NodeUpdateFinished(self.task_id, request=self.request).send()
if not isinstance(update_cmd, list):
raise PreconditionRequired('Node update command could be retrieved')

msg = LOG_SYSTEM_UPDATE
_apiview_ = {
'view': 'system_node_update',
'method': self.request.method,
'hostname': node.hostname,
'version': version,
}
meta = {
'apiview': _apiview_,
'msg': msg,
'node_uuid': node.uuid,
'output': {'returncode': 'returncode', 'stdout': 'message'},
'check_returncode': True,
}
lock = self.LOCK % node.hostname
cmd = '%s 2>&1' % ' '.join(update_cmd)

tid, err = execute(self.request, node.owner.id, cmd, meta=meta, lock=lock, queue=node.fast_queue,
tg=TG_DC_UNBOUND)

if err:
return FailureTaskResponse(self.request, err, dc_bound=False)
else:
return TaskResponse(self.request, tid, msg=msg, obj=node, api_view=_apiview_, data=self.data,
dc_bound=False, detail_dict=ser.detail_dict(force_full=True))


class NodeLogsView(APIView):
Expand Down
21 changes: 11 additions & 10 deletions api/system/node/events.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from api.system.update.events import BaseUpdateEvent
from api.event import Event
from que import TT_DUMMY, TG_DC_UNBOUND
from que.utils import DEFAULT_DC, task_id_from_string


class NodeUpdateStarted(BaseUpdateEvent):
class NodeSystemRestarted(Event):
"""
Called from the NodeUpdateView.
Called from node_sysinfo_cb after erigonesd:fast is restarted on a compute node.
"""
_name_ = 'node_update_started'
_name_ = 'node_system_restarted'


class NodeUpdateFinished(BaseUpdateEvent):
"""
Called from the NodeUpdateView.
"""
_name_ = 'node_update_finished'
def __init__(self, node, **kwargs):
# Create such a task_id that info is send to SuperAdmins and node owner
task_id = task_id_from_string(node.owner.id, dummy=True, dc_id=DEFAULT_DC, tt=TT_DUMMY, tg=TG_DC_UNBOUND)
kwargs['node_hostname'] = node.hostname
super(NodeSystemRestarted, self).__init__(task_id, **kwargs)
16 changes: 9 additions & 7 deletions api/system/node/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def system_node_version(request, hostname, data=None):
* |async-no|
:arg hostname: **required** - Node hostname
:type hostname: string
:arg.data.fresh: Refresh cached node version information (default: false)
:type data.fresh: boolean
:status 200: SUCCESS
:status 403: Forbidden
:status 404: Node not found
Expand Down Expand Up @@ -111,11 +113,11 @@ def system_node_service_status(request, hostname, name, data=None):
@request_data_defaultdc(permissions=(IsSuperAdmin,))
def system_node_update(request, hostname, data=None):
"""
Install (:http:put:`PUT </system/node/(hostname)/update>`) Danube Cloud update on a compute node.
Update (:http:put:`PUT </system/node/(hostname)/update>`) Danube Cloud on a compute node.
.. http:put:: /system/node/(hostname)/update
.. note:: The compute node software will be updated to the \
.. note:: The compute node software should be updated to the \
same :http:get:`version </system/version>` as installed on the main Danube Cloud management VM. \
Use this API call after successful :http:put:`system update </system/update>`.
Expand All @@ -124,11 +126,13 @@ def system_node_update(request, hostname, data=None):
:Permissions:
* |SuperAdmin|
:Asynchronous?:
* |async-no|
* |async-yes|
:arg hostname: **required** - Node hostname
:type hostname: string
:arg data.version: **required** - git tag (e.g. ``v2.6.5``) or git commit to which the system should be updated
:type data.version: string
:arg data.force: Whether to perform the update operation even though the software is already at selected version
:type data.force: boolean
:arg data.key: X509 private key file used for authentication against EE git server. \
Please note that file MUST contain standard x509 file BEGIN/END header/footer. \
If not present, cached key file "update.key" will be used.
Expand All @@ -138,14 +142,12 @@ def system_node_update(request, hostname, data=None):
If not present, cached cert file "update.crt" will be used.
:type data.cert: string
:status 200: SUCCESS
:status 201: PENDING
:status 400: FAILURE
:status 403: Forbidden
:status 404: Node not found
:status 417: Node update file is not available
:status 423: Node is not in maintenance state / Node version information could not be retrieved / \
Task is already running
:status 423: Node is not in maintenance state / Node version information could not be retrieved
:status 428: Node is already up-to-date
:status 504: Node worker is not responding
"""
return NodeUpdateView(request, hostname, data).put()

Expand Down
61 changes: 0 additions & 61 deletions api/system/service/control.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
from collections import namedtuple, OrderedDict
from subprocess import Popen, PIPE, STDOUT
from threading import Thread
from logging import getLogger
from time import sleep
from django.conf import settings

from api.exceptions import GatewayTimeout
from api.request import Request as APIRequest
from api.system.service.events import SystemReloaded
from que.utils import worker_command

logger = getLogger(__name__)
Expand Down Expand Up @@ -175,59 +170,3 @@ def _service_cmd(self, service, action):
cmd = '/usr/sbin/svcadm {action} {service}'

return self._action_cmd(service, action, cmd)


class SystemReloadThread(Thread):
"""
Reload all app services in background.
It is important to restart/reload the calling service as last one.
Used by system update and eslic.
"""
daemon = True

def __init__(self, delay=3, task_id=None, request=None, reason=''):
self.delay = delay
self.task_id = task_id
self.request = request
self.reason = reason
self.last_service = self._get_last_service(self.request)
self.sctrl = ServiceControl()
super(SystemReloadThread, self).__init__(name='system-reload-thread')

@staticmethod
def _get_last_service(request):
if request:
if isinstance(request, APIRequest):
return 'app-api'
else:
return 'app-gui'
return 'erigonesd:mgmt'

def reload_service(self, name):
# Issue esdc-ce#20
self.sctrl.restart(name)

def reload_all(self):
last = None

for name in self.sctrl.app_services:
if name == self.last_service:
last = name
else:
self.reload_service(name)

if last:
self.reload_service(last)

def run(self):
logger.info('Initializing system reload')
sleep(self.delay)

if settings.DEBUG:
logger.info('Skipping system reload in DEBUG mode')
else:
self.reload_all()
logger.info('System reloaded')

if self.task_id:
SystemReloaded(self.task_id, request=self.request, reason=self.reason).send()
17 changes: 0 additions & 17 deletions api/system/service/events.py

This file was deleted.

Loading

0 comments on commit 411bae3

Please sign in to comment.