Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changed system update API and added maintenance view with system update functionality into GUI #338

Merged
merged 10 commits into from
Jan 10, 2018
14 changes: 14 additions & 0 deletions api/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,17 @@ def __init__(self, user_id, dc_id=None, **kwargs):
task_id = task_id_from_string(user_id, dummy=True, dc_id=dc_id, tt=TT_DUMMY, tg=tg)
kwargs['direct'] = True
super(DirectEvent, self).__init__(task_id, **kwargs)


class BroadcastEvent(Event):
"""
Broadcast task event dispatched to socket.io monitor, which then sends the signal to all active users.
"""
def __init__(self, task_id=None, **kwargs):
if not task_id:
dc_id = cq.conf.ERIGONES_DEFAULT_DC # DefaultDc().id
system_user_id = cq.conf.ERIGONES_TASK_USER # 7
task_id = task_id_from_string(system_user_id, dummy=True, dc_id=dc_id, tt=TT_DUMMY, tg=TG_DC_UNBOUND)

kwargs['broadcast'] = True
super(BroadcastEvent, self).__init__(task_id, **kwargs)
12 changes: 12 additions & 0 deletions api/node/sysinfo/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
# noinspection PyProtectedMember
from api.vm.status.tasks import vm_status_all
from api.dns.record.api_views import RecordView
from api.system.node.events import NodeSystemRestarted
from vms.models import Node, DefaultDc, IPAddress
from vms.signals import node_created, node_json_changed, node_json_unchanged
from que.tasks import cq, get_task_logger
from que.utils import TASK_USER, owner_id_from_task_id
from que.mgmt import MgmtCallbackTask
from que.exceptions import TaskException

Expand Down Expand Up @@ -164,4 +166,14 @@ def node_sysinfo_cb(result, task_id, node_uuid=None):
except Exception as e:
logger.exception(e)

# Refresh cached version information + emit event informing about restarted erigonesd:fast
try:
del node.system_version

if owner_id_from_task_id(task_id) == TASK_USER: # internal user ID
NodeSystemRestarted(node, system_version=node.system_version).send()

except Exception as e:
logger.exception(e)

return result
113 changes: 73 additions & 40 deletions api/system/node/api_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,18 @@
from django.utils.six import text_type

from api.api_views import APIView
from api.exceptions import (NodeIsNotOperational, PreconditionRequired, TaskIsAlreadyRunning,
ObjectNotFound, GatewayTimeout)
# noinspection PyProtectedMember
from api.fields import get_boolean_value
from api.exceptions import NodeIsNotOperational, PreconditionRequired, ObjectNotFound, GatewayTimeout
from api.node.utils import get_node, get_nodes
from api.system.messages import LOG_SYSTEM_UPDATE
from api.system.node.serializers import NodeVersionSerializer
from api.system.node.events import NodeUpdateStarted, NodeUpdateFinished
from api.system.service.control import NodeServiceControl
from api.system.update.serializers import UpdateSerializer
from api.system.update.utils import process_update_reply
from api.task.response import SuccessTaskResponse, FailureTaskResponse
from que import TG_DC_UNBOUND, TT_DUMMY, Q_FAST
from que.lock import TaskLock
from que.utils import task_id_from_request, worker_command
from vms.models import DefaultDc
from api.task.response import SuccessTaskResponse, FailureTaskResponse, TaskResponse
from que import Q_FAST, TG_DC_UNBOUND
from que.utils import worker_command
from que.tasks import execute


logger = getLogger(__name__)
Expand All @@ -34,6 +32,9 @@ def __init__(self, request, hostname, data):

if hostname:
self.node = get_node(request, hostname)

if self.data and get_boolean_value(self.data.get('fresh', None)):
del self.node.system_version # Remove cached version information
else:
self.node = get_nodes(request)

Expand Down Expand Up @@ -70,28 +71,37 @@ def get(self):

class NodeUpdateView(APIView):
"""api.system.node.views.system_node_update"""
LOCK = 'system_node_update:%s'
dc_bound = False
_lock_key = 'system_update'

def __init__(self, request, hostname, data):
super(NodeUpdateView, self).__init__(request)
self.hostname = hostname
self.data = data
self.node = get_node(request, hostname)
self.task_id = task_id_from_request(self.request, dummy=True, tt=TT_DUMMY, tg=TG_DC_UNBOUND)

def _update(self, version, key=None, cert=None):
def _update_v2(self, version, key=None, cert=None):
from api.system.update.utils import process_update_reply

node = self.node
worker = node.worker(Q_FAST)
logger.debug('Running node "%s" system update to version: "%s"', node, version)
logger.info('Running oldstyle (v2.x) node "%s" system update to version: "%s"', node, version)
reply = worker_command('system_update', worker, version=version, key=key, cert=cert, timeout=600)

if reply is None:
raise GatewayTimeout('Node worker is not responding')

response_class, result = process_update_reply(reply, node, version)
response = response_class(self.request, result, task_id=self.task_id, obj=node, msg=LOG_SYSTEM_UPDATE,
detail_dict=result, dc_bound=False)
result, error = process_update_reply(reply, node, version)

if error:
response_class = FailureTaskResponse
else:
response_class = SuccessTaskResponse

detail_dict = result.copy()
detail_dict['version'] = version
response = response_class(self.request, result, obj=node, msg=LOG_SYSTEM_UPDATE, dc_bound=False,
detail_dict=detail_dict)

if response.status_code == 200:
# Restart all erigonesd workers
Expand All @@ -102,49 +112,72 @@ def _update(self, version, key=None, cert=None):

return response

@classmethod
def get_task_lock(cls):
# Also used in socket.io namespace
return TaskLock(cls._lock_key, desc='System task')

def put(self):
assert self.request.dc.id == DefaultDc().id
assert self.request.dc.is_default()

ser = UpdateSerializer(self.request, data=self.data)

if not ser.is_valid():
return FailureTaskResponse(self.request, ser.errors, task_id=self.task_id, dc_bound=False)
return FailureTaskResponse(self.request, ser.errors, dc_bound=False)

node = self.node
version = ser.object['version']

version = ser.data['version']
key = ser.data.get('key')
cert = ser.data.get('cert')
del node.system_version # Request latest version in next command
node_version = node.system_version

if not (isinstance(node_version, text_type) and node_version):
raise NodeIsNotOperational('Node version information could not be retrieved')

if version == ('v' + node.system_version):
node_version = node_version.split(':')[-1] # remove edition prefix

if version == ('v' + node_version) and not ser.data.get('force'):
raise PreconditionRequired('Node is already up-to-date')

if node.status != node.OFFLINE:
raise NodeIsNotOperational('Unable to perform update on node that is not in maintenance state!')
raise NodeIsNotOperational('Unable to perform update on node that is not in maintenance state')

lock = self.get_task_lock()
if node_version.startswith('2.'):
# Old-style (pre 3.0) update mechanism
return self._update_v2(version, key=key, cert=cert)

if not lock.acquire(self.task_id, timeout=7200, save_reverse=False):
raise TaskIsAlreadyRunning
# Upload key and cert and get command array
worker = node.worker(Q_FAST)
update_cmd = worker_command('system_update_command', worker, version=version, key=key, cert=cert,
force=ser.data.get('force'), timeout=10)

try:
# Emit event into socket.io
NodeUpdateStarted(self.task_id, request=self.request).send()
if update_cmd is None:
raise GatewayTimeout('Node worker is not responding')

return self._update(version, key=ser.object.get('key'), cert=ser.object.get('cert'))
finally:
lock.delete(fail_silently=True, delete_reverse=False)
# Delete cached node version information (will be cached again during next node.system_version call)
del node.system_version
# Emit event into socket.io
NodeUpdateFinished(self.task_id, request=self.request).send()
if not isinstance(update_cmd, list):
raise PreconditionRequired('Node update command could be retrieved')

msg = LOG_SYSTEM_UPDATE
_apiview_ = {
'view': 'system_node_update',
'method': self.request.method,
'hostname': node.hostname,
'version': version,
}
meta = {
'apiview': _apiview_,
'msg': msg,
'node_uuid': node.uuid,
'output': {'returncode': 'returncode', 'stdout': 'message'},
'check_returncode': True,
}
lock = self.LOCK % node.hostname
cmd = '%s 2>&1' % ' '.join(update_cmd)

tid, err = execute(self.request, node.owner.id, cmd, meta=meta, lock=lock, queue=node.fast_queue,
tg=TG_DC_UNBOUND)

if err:
return FailureTaskResponse(self.request, err, dc_bound=False)
else:
return TaskResponse(self.request, tid, msg=msg, obj=node, api_view=_apiview_, data=self.data,
dc_bound=False, detail_dict=ser.detail_dict(force_full=True))


class NodeLogsView(APIView):
Expand Down
21 changes: 11 additions & 10 deletions api/system/node/events.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from api.system.update.events import BaseUpdateEvent
from api.event import Event
from que import TT_DUMMY, TG_DC_UNBOUND
from que.utils import DEFAULT_DC, task_id_from_string


class NodeUpdateStarted(BaseUpdateEvent):
class NodeSystemRestarted(Event):
"""
Called from the NodeUpdateView.
Called from node_sysinfo_cb after erigonesd:fast is restarted on a compute node.
"""
_name_ = 'node_update_started'
_name_ = 'node_system_restarted'


class NodeUpdateFinished(BaseUpdateEvent):
"""
Called from the NodeUpdateView.
"""
_name_ = 'node_update_finished'
def __init__(self, node, **kwargs):
# Create such a task_id that info is send to SuperAdmins and node owner
task_id = task_id_from_string(node.owner.id, dummy=True, dc_id=DEFAULT_DC, tt=TT_DUMMY, tg=TG_DC_UNBOUND)
kwargs['node_hostname'] = node.hostname
super(NodeSystemRestarted, self).__init__(task_id, **kwargs)
16 changes: 9 additions & 7 deletions api/system/node/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ def system_node_version(request, hostname, data=None):
* |async-no|
:arg hostname: **required** - Node hostname
:type hostname: string
:arg.data.fresh: Refresh cached node version information (default: false)
:type data.fresh: boolean
:status 200: SUCCESS
:status 403: Forbidden
:status 404: Node not found
Expand Down Expand Up @@ -111,11 +113,11 @@ def system_node_service_status(request, hostname, name, data=None):
@request_data_defaultdc(permissions=(IsSuperAdmin,))
def system_node_update(request, hostname, data=None):
"""
Install (:http:put:`PUT </system/node/(hostname)/update>`) Danube Cloud update on a compute node.
Update (:http:put:`PUT </system/node/(hostname)/update>`) Danube Cloud on a compute node.

.. http:put:: /system/node/(hostname)/update

.. note:: The compute node software will be updated to the \
.. note:: The compute node software should be updated to the \
same :http:get:`version </system/version>` as installed on the main Danube Cloud management VM. \
Use this API call after successful :http:put:`system update </system/update>`.

Expand All @@ -124,11 +126,13 @@ def system_node_update(request, hostname, data=None):
:Permissions:
* |SuperAdmin|
:Asynchronous?:
* |async-no|
* |async-yes|
:arg hostname: **required** - Node hostname
:type hostname: string
:arg data.version: **required** - git tag (e.g. ``v2.6.5``) or git commit to which the system should be updated
:type data.version: string
:arg data.force: Whether to perform the update operation even though the software is already at selected version
:type data.force: boolean
:arg data.key: X509 private key file used for authentication against EE git server. \
Please note that file MUST contain standard x509 file BEGIN/END header/footer. \
If not present, cached key file "update.key" will be used.
Expand All @@ -138,14 +142,12 @@ def system_node_update(request, hostname, data=None):
If not present, cached cert file "update.crt" will be used.
:type data.cert: string
:status 200: SUCCESS
:status 201: PENDING
:status 400: FAILURE
:status 403: Forbidden
:status 404: Node not found
:status 417: Node update file is not available
:status 423: Node is not in maintenance state / Node version information could not be retrieved / \
Task is already running
:status 423: Node is not in maintenance state / Node version information could not be retrieved
:status 428: Node is already up-to-date
:status 504: Node worker is not responding
"""
return NodeUpdateView(request, hostname, data).put()

Expand Down
2 changes: 1 addition & 1 deletion api/system/service/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class SystemReloadThread(Thread):
"""
Reload all app services in background.
It is important to restart/reload the calling service as last one.
Used by system update and eslic.
Used by eslic.
"""
daemon = True

Expand Down
Loading