Skip to content

Commit

Permalink
Merge pull request #906 from gh-ca/emc_vnx_block_20220529
Browse files Browse the repository at this point in the history
EMC vnx block adds collect performance interface
  • Loading branch information
joseph-v authored Jun 28, 2022
2 parents 0e3e740 + 0b5eaf1 commit 1422a00
Show file tree
Hide file tree
Showing 6 changed files with 822 additions and 10 deletions.
343 changes: 337 additions & 6 deletions delfin/drivers/dell_emc/vnx/vnx_block/component_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import csv
import os
import re
import time

import six
from oslo_log import log
Expand All @@ -21,6 +24,7 @@
from delfin import exception
from delfin.common import constants
from delfin.drivers.dell_emc.vnx.vnx_block import consts
from delfin.drivers.utils.tools import Tools

LOG = log.getLogger(__name__)

Expand Down Expand Up @@ -251,8 +255,10 @@ def list_disks(self, storage_id):
hot_spare = disk.get('hot_spare', '')
if hot_spare and hot_spare != 'N/A':
logical_type = constants.DiskLogicalType.HOTSPARE
disk_name = disk.get('disk_name')
disk_name = ' '.join(disk_name.strip().split())
disk_model = {
'name': disk.get('disk_name'),
'name': disk_name,
'storage_id': storage_id,
'native_disk_id': disk.get('disk_id'),
'serial_number': disk.get('serial_number'),
Expand All @@ -268,7 +274,7 @@ def list_disks(self, storage_id):
'logical_type': logical_type,
'health_score': None,
'native_disk_group_id': None,
'location': disk.get('disk_name')
'location': disk_name
}
disk_list.append(disk_model)
return disk_list
Expand Down Expand Up @@ -352,7 +358,7 @@ def get_ports(self, storage_id, io_configs, iscsi_port_map):
ipv4 = iscsi_port.get('ip_address')
ipv4_mask = iscsi_port.get('subnet_mask')
port_model = {
'name': name,
'name': location,
'storage_id': storage_id,
'native_port_id': name,
'location': location,
Expand Down Expand Up @@ -390,8 +396,6 @@ def get_bus_ports(self, storage_id, io_configs):
sps = bus_port.get('sps')
for sp in (sps or []):
sp_name = sp.replace('sp', '').upper()
name = '%s-%s' % (sp_name,
bus_port.get('bus_name'))
location = '%s %s,Port %s' % (
bus_port.get('i/o_module_slot'), sp_name,
bus_port.get('physical_port_id'))
Expand All @@ -409,7 +413,7 @@ def get_bus_ports(self, storage_id, io_configs):
state = bus_port_state_map.get(port_state_key,
'')
port_model = {
'name': name,
'name': location,
'storage_id': storage_id,
'native_port_id': native_port_id,
'location': location,
Expand Down Expand Up @@ -565,3 +569,330 @@ def list_storage_hosts(self, storage_id):
}
host_list.append(host_model)
return host_list

def collect_perf_metrics(self, storage_id, resource_metrics,
start_time, end_time):
metrics = []
archive_file_list = []
try:
LOG.info("Start collection, storage:%s, start time:%s, end time:%s"
% (storage_id, start_time, end_time))
archive_file_list = self._get__archive_file(start_time, end_time)
LOG.info("Get archive files: {}".format(archive_file_list))
if not archive_file_list:
LOG.warning("The required performance file was not found!")
return metrics
resources_map, resources_type_map = self._get_resources_map(
resource_metrics)
if not resources_map or not resources_type_map:
LOG.warning("Resource object not found!")
return metrics
performance_lines_map = self._filter_performance_data(
archive_file_list, resources_map, start_time, end_time)
if not performance_lines_map:
LOG.warning("The required performance data was not found!")
return metrics
metrics = self.create_metrics(storage_id, resource_metrics,
resources_map, resources_type_map,
performance_lines_map)
LOG.info("Collection complete, storage:%s, start time:%s, "
"end time:%s, length of metrics:%s "
% (storage_id, start_time, end_time, len(metrics)))
except exception.DelfinException as err:
err_msg = "Failed to collect metrics from VnxBlockStor: %s" % \
(six.text_type(err))
LOG.error(err_msg)
raise err
except Exception as err:
err_msg = "Failed to collect metrics from VnxBlockStor: %s" % \
(six.text_type(err))
LOG.error(err_msg)
raise exception.InvalidResults(err_msg)
finally:
self._remove_archive_file(archive_file_list)
return metrics

def create_metrics(self, storage_id, resource_metrics, resources_map,
resources_type_map, performance_lines_map):
metrics = []
for resource_obj, resource_type in resources_type_map.items():
if not resources_map.get(resource_obj) \
or not resource_type:
continue
if not performance_lines_map.get(resource_obj):
continue
labels = {
'storage_id': storage_id,
'resource_type': resource_type,
'resource_id': resources_map.get(resource_obj),
'type': 'RAW',
'unit': ''
}
metric_model_list = self._get_metric_model(
resource_metrics.get(resource_type), labels,
performance_lines_map.get(resource_obj),
consts.RESOURCES_TYPE_TO_METRIC_CAP.get(resource_type),
resource_type)
if metric_model_list:
metrics.extend(metric_model_list)
return metrics

def _get__archive_file(self, start_time, end_time):
archive_file_list = []
archives = self.navi_handler.get_archives()
tools = Tools()
for archive_info in (archives or []):
collection_timestamp = tools.time_str_to_timestamp(
archive_info.get('collection_time'), consts.TIME_PATTERN)
if collection_timestamp > start_time:
archive_file_list.append(archive_info.get('archive_name'))
if collection_timestamp > end_time:
break
return archive_file_list

def _get_metric_model(self, metric_list, labels, metric_values, obj_cap,
resources_type):
metric_model_list = []
tools = Tools()
for metric_name in (metric_list or []):
values = {}
obj_labels = copy.copy(labels)
obj_labels['unit'] = obj_cap.get(metric_name).get('unit')
for metric_value in metric_values:
metric_value_infos = metric_value
if not consts.METRIC_MAP.get(resources_type, {}).get(
metric_name):
continue
value = metric_value_infos[
consts.METRIC_MAP.get(resources_type).get(metric_name)]
if not value:
value = '0'
collection_timestamp = tools.time_str_to_timestamp(
metric_value_infos[1], consts.TIME_PATTERN)
collection_time_str = tools.timestamp_to_time_str(
collection_timestamp, consts.COLLECTION_TIME_PATTERN)
collection_timestamp = tools.time_str_to_timestamp(
collection_time_str, consts.COLLECTION_TIME_PATTERN)
if "iops" == obj_cap.get(metric_name).get('unit').lower():
value = int(float(value))
else:
value = float('%.6f' % (float(value)))
values[collection_timestamp] = value
if values:
metric_model = constants.metric_struct(name=metric_name,
labels=obj_labels,
values=values)
metric_model_list.append(metric_model)
return metric_model_list

def _get_resources_map(self, resource_metrics):
resources_map = {}
resources_type_map = {}
for resource_type_key in resource_metrics.keys():
sub_resources_map = {}
sub_resources_type_map = {}
if resource_type_key == constants.ResourceType.CONTROLLER:
sub_resources_map, sub_resources_type_map = \
self._get_controllers_map()
elif resource_type_key == constants.ResourceType.PORT:
sub_resources_map, sub_resources_type_map = \
self._get_ports_map()
elif resource_type_key == constants.ResourceType.DISK:
sub_resources_map, sub_resources_type_map = \
self._get_disks_map()
elif resource_type_key == constants.ResourceType.VOLUME:
sub_resources_map, sub_resources_type_map = \
self._get_volumes_map()
if sub_resources_map and sub_resources_type_map:
resources_map.update(sub_resources_map)
resources_type_map.update(sub_resources_type_map)
return resources_map, resources_type_map

def _get_controllers_map(self):
resources_map = {}
resources_type_map = {}
controllers = self.navi_handler.get_controllers()
for controller in (controllers or []):
resources_map[controller.get('sp_name')] = controller.get(
'signature_for_the_sp')
resources_type_map[controller.get('sp_name')] = \
constants.ResourceType.CONTROLLER
return resources_map, resources_type_map

def _get_ports_map(self):
resources_map = {}
resources_type_map = {}
ports = self.navi_handler.get_ports()
for port in (ports or []):
port_id = port.get('sp_port_id')
sp_name = port.get('sp_name').replace('SP ', '')
name = '%s-%s' % (sp_name, port_id)
port_id = 'Port %s [ %s ]' % (port_id, port.get('sp_uid'))
resources_map[port_id] = name
resources_type_map[port_id] = constants.ResourceType.PORT
return resources_map, resources_type_map

def _get_disks_map(self):
resources_map = {}
resources_type_map = {}
disks = self.navi_handler.get_disks()
for disk in (disks or []):
disk_name = disk.get('disk_name')
disk_name = ' '.join(disk_name.strip().split())
resources_map[disk_name] = disk.get('disk_id')
resources_type_map[disk_name] = constants.ResourceType.DISK
return resources_map, resources_type_map

def _get_volumes_map(self):
resources_map = {}
resources_type_map = {}
volumes = self.navi_handler.get_all_lun()
for volume in (volumes or []):
if not volume.get('name'):
continue
volume_name = '%s [%s]' % (
volume.get('name'), volume.get('logical_unit_number'))
resources_map[volume_name] = str(volume.get('logical_unit_number'))
resources_type_map[volume_name] = constants.ResourceType.VOLUME
return resources_map, resources_type_map

def _filter_performance_data(self, archive_file_list, resources_map,
start_time, end_time):
performance_lines_map = {}
try:
tools = Tools()
for archive_file in archive_file_list:
self.navi_handler.download_archives(archive_file)
archive_name_infos = archive_file.split('.')
file_path = '%s%s.csv' % (
self.navi_handler.get_local_file_path(),
archive_name_infos[0])
with open(file_path) as file:
f_csv = csv.reader(file)
next(f_csv)
for row in f_csv:
self._package_performance_data(row, resources_map,
start_time, end_time,
tools,
performance_lines_map)
except Exception as err:
err_msg = "Failed to filter performance data: %s" % \
(six.text_type(err))
LOG.error(err_msg)
raise exception.StorageBackendException(err_msg)
return performance_lines_map

def _package_performance_data(self, row, resources_map, start_time,
end_time, tools, performance_lines_map):
resource_obj_name = row[0]
resource_obj_name = self._package_resource_obj_name(resource_obj_name)
if resource_obj_name in resources_map:
obj_collection_timestamp = tools.time_str_to_timestamp(
row[1], consts.TIME_PATTERN)
if (start_time + consts.TIME_INTERVAL_FLUCTUATION) \
<= obj_collection_timestamp \
and obj_collection_timestamp \
<= (end_time + consts.TIME_INTERVAL_FLUCTUATION):
performance_lines_map.setdefault(resource_obj_name, []).append(
row)

def _package_resource_obj_name(self, source_name):
target_name = source_name
if 'Port ' in target_name:
return re.sub(r'(\[.*;)', '[', target_name)
elif '; ' in target_name:
return re.sub(r'(; .*])', ']', target_name)
return target_name

def _remove_archive_file(self, archive_file_list):
try:
for archive_file in archive_file_list:
nar_file_path = '%s%s' % (
self.navi_handler.get_local_file_path(), archive_file)
archive_name_infos = archive_file.split('.')
csv_file_path = '%s%s.csv' % (
self.navi_handler.get_local_file_path(),
archive_name_infos[0])
for file_path in [nar_file_path, csv_file_path]:
LOG.info("Delete file :{}".format(file_path))
if os.path.exists(file_path):
os.remove(file_path)
else:
err_msg = 'no such file:%s' % file_path
LOG.error(err_msg)
raise exception.StorageBackendException(err_msg)
except Exception as err:
err_msg = "Failed to remove archive file: %s" % \
(six.text_type(err))
LOG.error(err_msg)
raise exception.StorageBackendException(err_msg)

def get_latest_perf_timestamp(self, storage_id):
latest_time = 0
num = 0
tools = Tools()
while latest_time <= 0:
num += 1
latest_time, file_latest_time = self.check_latest_timestamp(
storage_id)
if num > consts.EXEC_MAX_NUM:
latest_time = file_latest_time
LOG.warning("Storage:{}, Exit after {} executions.".format(
storage_id, consts.EXEC_MAX_NUM))
break
if latest_time <= 0:
wait_time = tools.timestamp_to_time_str(
time.time() * units.k,
consts.ARCHIVE_FILE_NAME_TIME_PATTERN)
LOG.warning("Storage:{} No new file found, "
"wait for next execution:{}".format(storage_id,
wait_time))
time.sleep(consts.SLEEP_TIME_SECONDS)
return latest_time

def get_data_latest_timestamp(self, storage_id):
archive_file_list = []
try:
tools = Tools()
archive_name = self.navi_handler.create_archives(storage_id)
LOG.info("Create archive_name: {}".format(archive_name))
archive_file_list.append(archive_name)
archive_name_infos = archive_name.split('.')
file_path = '%s%s.csv' % (
self.navi_handler.get_local_file_path(), archive_name_infos[0])
resource_obj_name = ''
collection_time = ''
with open(file_path) as file:
f_csv = csv.reader(file)
next(f_csv)
for row in f_csv:
if not resource_obj_name or resource_obj_name == row[0]:
resource_obj_name = row[0]
collection_time = row[1]
else:
break
latest_time = tools.time_str_to_timestamp(collection_time,
consts.TIME_PATTERN)
except Exception as err:
err_msg = "Failed to get latest perf timestamp " \
"from VnxBlockStor: %s" % (six.text_type(err))
LOG.error(err_msg)
raise exception.InvalidResults(err_msg)
finally:
self._remove_archive_file(archive_file_list)
return latest_time

def check_latest_timestamp(self, storage_id):
latest_time = 0
file_latest_time = self.get_data_latest_timestamp(storage_id)
sys_time = self.navi_handler.get_sp_time()
LOG.info("Get sys_time=={},file_latest_time=={}".format(
sys_time, file_latest_time))
if sys_time > 0 and file_latest_time > 0:
LOG.info("(sys_time - file_latest_time)={}".format(
(sys_time - file_latest_time)))
if (sys_time - file_latest_time) < \
consts.CREATE_FILE_TIME_INTERVAL:
latest_time = file_latest_time
time.sleep(consts.CHECK_WAITE_TIME_SECONDS)
return latest_time, file_latest_time
Loading

0 comments on commit 1422a00

Please sign in to comment.