Skip to content

Commit

Permalink
Refactoring of vPoller Worker
Browse files Browse the repository at this point in the history
  • Loading branch information
dnaeon committed Mar 27, 2014
1 parent 1130a5a commit fd0704d
Showing 1 changed file with 67 additions and 82 deletions.
149 changes: 67 additions & 82 deletions src/vpoller/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class VPollerWorker(Daemon):
Prepares all vSphere Agents for polling from the vSphere hosts.
This is the main vPoller worker, which contains the vSphere Agents
This is the main vPoller Worker, which runs the vSphere Agents
Extends:
Daemon class
Expand All @@ -53,15 +53,15 @@ class VPollerWorker(Daemon):
run() method
"""
def run(self, config_file):
def run(self, config):
"""
The main worker method.
Args:
config_file (str): Configuration file for the VPollerWorker
config (str): Path to the confuguration file for vPoller Worker
"""
logging.debug('Preparing vPoller Worker for starting up')
logging.debug('Preparing vPoller Worker for start up')

# Note the time we start up
self.running_since = asctime()
Expand All @@ -70,7 +70,7 @@ def run(self, config_file):
self.time_to_die = False

# Load the configuration file of the vPoller Worker
self.load_worker_config(config_file)
self.load_worker_config(config)

# Create the worker sockets
self.create_worker_sockets()
Expand All @@ -93,7 +93,7 @@ def run(self, config_file):
# Frame 1: [ N ][...] <- Identity of connection
# Frame 2: [ 0 ][] <- Empty delimiter frame
# Frame 3: [ N ][...] <- Data frame
logging.debug('Received new message on worker socket')
logging.debug('Received message on the worker socket')

_id = self.worker_socket.recv()
_empty = self.worker_socket.recv()
Expand All @@ -108,47 +108,44 @@ def run(self, config_file):

# Management socket
if socks.get(self.mgmt_socket) == zmq.POLLIN:
logging.debug('Received new message on mgmt socket')

logging.debug('Received message on the management socket')
msg = self.mgmt_socket.recv_json()

result = self.process_mgmt_message(msg)

self.mgmt_socket.send_json(result)

# Shutdown time has arrived, let's clean up a bit
logging.debug('Shutdown time, vPoller Worker is going down...')
logging.debug('Shutdown time, vPoller Worker is going down')
self.close_worker_sockets()
self.shutdown_vsphere_agents()
# self.zcontext.term()
self.stop()

def load_worker_config(self, config_file):
def load_worker_config(self, config):
"""
Loads the vPoller Worker configuration file
Args:
config_file (str): Config file of the Worker
config (str): Path to the config file of vPoller Worker
Raises:
VPollerException
"""
logging.debug('Loading vPoller Worker config file %s', config_file)
logging.debug('Loading vPoller Worker config file %s', config)

if not os.path.exists(config_file):
logging.error("Configuration file does not exists: %s", config_file)
raise VPollerException, "Configuration file does not exists: %s" % config_file
if not os.path.exists(config):
logging.error('Configuration file does not exists: %s', config)
raise VPollerException, 'Configuration file does not exists: %s' % config

config = ConfigParser.ConfigParser()
config.read(config_file)
parser = ConfigParser.ConfigParser()
parser.read(config_file)

try:
self.proxy_endpoint = config.get('Default', 'proxy')
self.mgmt_endpoint = config.get('Default', 'mgmt')
self.vsphere_hosts_dir = config.get('Default', 'vsphere_hosts_dir')
self.proxy_endpoint = config.get('worker', 'proxy')
self.mgmt_endpoint = config.get('worker', 'mgmt')
self.vsphere_hosts_dir = config.get('worker', 'vsphere_hosts_dir')
except ConfigParser.NoOptionError as e:
logging.error("Configuration issues detected in %s: %s" , config_file, e)
logging.error('Configuration issues detected in %s: %s' , config, e)
raise

def create_worker_sockets(self):
Expand All @@ -168,29 +165,15 @@ def create_worker_sockets(self):

self.zcontext = zmq.Context()

# A management socket used to control the vPoller Worker daemon
self.mgmt_socket = self.zcontext.socket(zmq.REP)
self.mgmt_socket.bind(self.mgmt_endpoint)

logging.debug('Binding mgmt socket to: %s', self.mgmt_endpoint)
try:
self.mgmt_socket.bind(self.mgmt_endpoint)
except zmq.ZMQError as e:
logging.error("Cannot bind management socket: %s", e)
raise VPollerException, "Cannot bind management socket: %s" % e

# Create a DEALER socket for processing client messages
logging.info('Connecting to the vPoller Proxy server')
logging.debug('Connecting to vPoller Proxy endpoint: %s', self.proxy_endpoint)
self.worker_socket = self.zcontext.socket(zmq.DEALER)

try:
self.worker_socket.connect(self.proxy_endpoint)
except zmq.ZMQError as e:
logging.error("Cannot connect worker to vPoller Proxy: %s", e)
raise VPollerException, "Cannot connect worker to vPoller Proxy: %s" % e
self.worker_socket.connect(self.proxy_endpoint)

# Create a poll set for our sockets
logging.debug('Creating poll set for our sockets')
logging.debug('Creating poll set for vPoller Worker sockets')
self.zpoller = zmq.Poller()
self.zpoller.register(self.mgmt_socket, zmq.POLLIN)
self.zpoller.register(self.worker_socket, zmq.POLLIN)
Expand All @@ -201,52 +184,51 @@ def close_worker_sockets(self):
"""
logging.debug('Closing vPoller Worker sockets')

self.zpoller.unregister(self.mgmt_socket)
self.zpoller.unregister(self.worker_socket)

self.mgmt_socket.close()
self.worker_socket.close()

def spawn_vsphere_agents(self):
"""
Prepares the vSphere Agent objects used by the Worker
Prepares the vSphere Agents used by the vPoller Worker
"""
logging.debug('Spawning vSphere Agents')

# Get the configuration files for our vSphere hosts
confFiles = self.get_vsphere_configs(self.vsphere_hosts_dir)
conf_files = self.get_vsphere_configs(self.vsphere_hosts_dir)

self.agents = dict()

# Load the config for every vSphere Agent
for eachConf in confFiles:
logging.debug('Spawning vSphere Agent from config file: %s', eachConf)
agent = VSphereAgent(eachConf, ignore_locks=True, lockdir="/var/run/vpoller", keep_alive=True)
self.agents[agent.hostname] = agent
# Load the config file for every vSphere Agent
for conf in conf_files:
logging.debug('Spawning vSphere Agent from config file: %s', conf)
agent = VSphereAgent(conf)
self.agents[agent.host] = agent

def start_vsphere_agents(self):
"""
Connects all VPoller Agents to their respective vSphere hosts
Connects all vSphere Agents to their respective VMware vSphere hosts
"""
logging.debug('Starting vSphere Agents')

for eachAgent in self.agents:
try:
self.agents[eachAgent].connect()
except Exception as e:
logging.error('Cannot connect to %s: %s', eachAgent, e)
for agent in self.agents:
self.agents[agent].connect()

def shutdown_vsphere_agents(self):
"""
Disconnects all VPoller Agents from their respective vSphere hosts
Disconnects all VPoller Agents from their respective VMware vSphere hosts
"""
logging.debug('Shutting down vSphere Agents')

for eachAgent in self.agents:
logging.debug('Shutting down vSphere Agent: %s', eachAgent.hostname)
self.agents[eachAgent].disconnect()
for agent in self.agents:
logging.debug('Shutting down vSphere Agent: %s', agent.host)
self.agents[agent].disconnect()

def get_vsphere_configs(self, config_dir):
"""
Expand All @@ -268,20 +250,20 @@ def get_vsphere_configs(self, config_dir):
logging.debug('Getting vSphere configuration files from: %s', config_dir)

if not os.path.exists(config_dir) or not os.path.isdir(config_dir):
logging.error("%s does not exists or is not a directory", config_dir)
raise VPollerException, "%s does not exists or is not a directory" % config_dir
logging.error('%s does not exists or is not a directory', config_dir)
raise VPollerException, '%s does not exists or is not a directory' % config_dir

# Get all *.conf files for the hosts
path = os.path.join(config_dir, "*.conf")
confFiles = glob.glob(path)
path = os.path.join(config_dir, '*.conf')
conf_files = glob.glob(path)

if not confFiles:
logging.error("No vSphere config files found in %s", config_dir)
raise VPollerException, "No vSphere config files found in %s" % config_dir
if not conf_files:
logging.error('No vSphere config files found in %s', config_dir)
raise VPollerException, 'No vSphere config files found in %s' % config_dir

logging.debug('Discovered vSphere configuration files: %s', confFiles)
logging.debug('Discovered vSphere configuration files: %s', conf_files)

return confFiles
return conf_files

def process_client_message(self, msg):
"""
Expand Down Expand Up @@ -315,15 +297,15 @@ def process_client_message(self, msg):
logging.debug('Processing client message: %s', msg)

# We require to have at least the 'method' and vSphere 'hostname'
if not all(k in msg for k in ("method", "hostname")):
return { "success": -1, "msg": "Missing message properties (e.g. method/hostname)" }
if not all(k in msg for k in ('method', 'hostname')):
return { 'success': -1, 'msg': 'Missing message properties (e.g. method/hostname)' }

vsphere_host = msg["hostname"]
vsphere_host = msg['hostname']

if not self.agents.get(vsphere_host):
return { "success": -1, "msg": "Unknown vSphere Agent requested" }
return { 'success': -1, 'msg': 'Unknown vSphere Agent requested' }

# The methods we support and process
# The vPoller Worker methods we support and process
methods = {
'host.get': self.agents[vsphere_host].get_host_property,
'host.discover': self.agents[vsphere_host].discover_hosts,
Expand All @@ -341,9 +323,10 @@ def process_client_message(self, msg):
'cluster.discover': self.agents[vsphere_host].discover_clusters,
}

result = methods[msg['method']](msg) if methods.get(msg['method']) else { "success": -1, "msg": "Unknown method received" }
if msg['method'] not in methods:
return { 'success': -1, 'msg': 'Uknown method received' }

return result
return methods[msg['method']](msg)

def process_mgmt_message(self, msg):
"""
Expand All @@ -365,20 +348,21 @@ def process_mgmt_message(self, msg):
msg (dict): The client message for processing
"""
logging.debug('Processing mgmt message: %s', msg)
logging.debug('Processing management message: %s', msg)

if not "method" in msg:
return { "success": -1, "msg": "Missing method name" }
if 'method' not in msg:
return { 'success': -1, 'msg': 'Missing method name' }

# The management methods we support and process
# The vPoller Worker management methods we support and process
methods = {
'worker.status': self.get_worker_status,
'worker.shutdown': self.worker_shutdown,
}

result = methods[msg['method']](msg) if methods.get(msg['method']) else { "success": -1, "msg": "Uknown command received" }

return result
if msg['method'] not in methods:
return { 'success': -1, 'msg': 'Unknown method received' }

return methods[msg['method']](msg)

def get_worker_status(self, msg):
"""
Expand All @@ -405,8 +389,8 @@ def get_worker_status(self, msg):
'vsphere_agents': self.agents.keys(),
'running_since': self.running_since,
'uname': ' '.join(os.uname()),
}
}
}

logging.debug('Returning result to client: %s', result)

Expand All @@ -424,4 +408,5 @@ def worker_shutdown(self, msg):

self.time_to_die = True

return { "success": 0, "msg": "vPoller Worker is shutting down" }
return { 'success': 0, 'msg': 'vPoller Worker is shutting down' }

0 comments on commit fd0704d

Please sign in to comment.