diff --git a/delfin/drivers/fake_storage/__init__.py b/delfin/drivers/fake_storage/__init__.py index e5165e3cb..716d19611 100644 --- a/delfin/drivers/fake_storage/__init__.py +++ b/delfin/drivers/fake_storage/__init__.py @@ -11,9 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import copy import random -import datetime import decorator import math import six @@ -55,11 +54,21 @@ MIN_CONTROLLERS, MAX_CONTROLLERS = 1, 5 PAGE_LIMIT = 500 MIN_STORAGE, MAX_STORAGE = 1, 10 -MIN_PERF_VALUES, MAX_PERF_VALUES = 1, 4 MIN_QUOTA, MAX_QUOTA = 1, 100 MIN_FS, MAX_FS = 1, 10 MIN_QTREE, MAX_QTREE = 1, 100 MIN_SHARE, MAX_SHARE = 1, 100 +# Minimum sampling interval +MINIMUM_SAMPLE_DURATION_IN_MS = 5 * 1000 +# count of instances for each resource type +RESOURCE_COUNT_DICT = { + "storage": 1, + "storagePool": 10, + "volume": 1000, + "port": 10, + "controller": 4, + "disk": 10, +} def get_range_val(range_str, t): @@ -468,41 +477,58 @@ def _get_volume_range(self, start, end): volume_list.append(v) return volume_list - def _get_random_performance(self): + def _get_random_performance(self, metric_list, start_time, end_time): def get_random_timestamp_value(): rtv = {} - for i in range(MIN_PERF_VALUES, MAX_PERF_VALUES): - timestamp = int(float(datetime.datetime.now().timestamp() - ) * 1000) + timestamp = start_time + while timestamp < end_time: rtv[timestamp] = random.uniform(1, 100) + timestamp += MINIMUM_SAMPLE_DURATION_IN_MS + return rtv # The sample performance_params after filling looks like, # performance_params = {timestamp1: value1, timestamp2: value2} performance_params = {} - for key in constants.DELFIN_ARRAY_METRICS: + for key in metric_list.keys(): performance_params[key] = get_random_timestamp_value() return performance_params + @wait_random(MIN_WAIT, MAX_WAIT) + def get_resource_perf_metrics(self, storage_id, start_time, end_time, + resource_type, metric_list): + LOG.info("###########collecting metrics for resource %s: from" + " storage %s" % (resource_type, self.storage_id)) + resource_metrics = [] + resource_count = RESOURCE_COUNT_DICT[resource_type] + + for i in range(resource_count): + labels = {'storage_id': storage_id, + 'resource_type': resource_type, + 'resource_id': resource_type + str(i), + 'type': 'RAW'} + fake_metrics = self._get_random_performance(metric_list, + start_time, end_time) + for key in metric_list.keys(): + labels['unit'] = metric_list[key]['unit'] + m = constants.metric_struct(name=key, labels=labels, + values=fake_metrics[key]) + resource_metrics.append(copy.deepcopy(m)) + return resource_metrics + @wait_random(MIN_WAIT, MAX_WAIT) def collect_perf_metrics(self, context, storage_id, resource_metrics, start_time, end_time): """Collects performance metric for the given interval""" - rd_array_count = random.randint(MIN_STORAGE, MAX_STORAGE) - LOG.debug("Fake_perf_metrics number for %s: %d" % ( - storage_id, rd_array_count)) - array_metrics = [] - labels = {'storage_id': storage_id, 'resource_type': 'array'} - fake_metrics = self._get_random_performance() - - for _ in range(rd_array_count): - for key in constants.DELFIN_ARRAY_METRICS: - m = constants.metric_struct(name=key, labels=labels, - values=fake_metrics[key]) - array_metrics.append(m) - - return array_metrics + merged_metrics = [] + for key in resource_metrics.keys(): + m = self.get_resource_perf_metrics(storage_id, + start_time, + end_time, key, + resource_metrics[key]) + merged_metrics += m + return merged_metrics @staticmethod def get_capabilities(context): @@ -521,7 +547,7 @@ def get_capabilities(context): "description": "Average time taken for an IO " "operation in ms" }, - "requests": { + "iops": { "unit": "IOPS", "description": "Input/output operations per second" }, @@ -535,14 +561,218 @@ def get_capabilities(context): "description": "Represents how much data write is " "successfully transferred in MB/s" }, - "readRequests": { + "readIops": { "unit": "IOPS", "description": "Read requests per second" }, - "writeRequests": { + "writeIops": { "unit": "IOPS", "description": "Write requests per second" }, - } + }, + "storagePool": { + "throughput": { + "unit": "MB/s", + "description": "Total data transferred per second " + }, + "responseTime": { + "unit": "ms", + "description": "Average time taken for an IO " + "operation" + }, + "iops": { + "unit": "IOPS", + "description": "Read and write operations per second" + }, + "readThroughput": { + "unit": "MB/s", + "description": "Total read data transferred per" + " second" + }, + "writeThroughput": { + "unit": "MB/s", + "description": "Total write data transferred per" + " second " + }, + "readIops": { + "unit": "IOPS", + "description": "Read operations per second" + }, + "writeIops": { + "unit": "IOPS", + "description": "Write operations per second" + }, + + }, + "volume": { + "throughput": { + "unit": "MB/s", + "description": "Total data transferred per second " + }, + "responseTime": { + "unit": "ms", + "description": "Average time taken for an IO " + "operation" + }, + "iops": { + "unit": "IOPS", + "description": "Read and write operations per" + " second" + }, + "readThroughput": { + "unit": "MB/s", + "description": "Total read data transferred per " + "second " + }, + "writeThroughput": { + "unit": "MB/s", + "description": "Total write data transferred per" + " second " + }, + "readIops": { + "unit": "IOPS", + "description": "Read operations per second" + }, + "writeIops": { + "unit": "IOPS", + "description": "Write operations per second" + }, + "cacheHitRatio": { + "unit": "%", + "description": "Percentage of io that are cache " + "hits" + }, + "readCacheHitRatio": { + "unit": "%", + "description": "Percentage of read ops that are cache" + " hits" + }, + "writeCacheHitRatio": { + "unit": "%", + "description": "Percentage of write ops that are cache" + " hits" + }, + "ioSize": { + "unit": "KB", + "description": "The average size of IO requests in KB" + }, + "readIoSize": { + "unit": "KB", + "description": "The average size of read IO requests " + "in KB." + }, + "writeIoSize": { + "unit": "KB", + "description": "The average size of read IO requests" + " in KB." + }, + }, + "controller": { + "throughput": { + "unit": "MB/s", + "description": "Total data transferred per second " + }, + "responseTime": { + "unit": "ms", + "description": "Average time taken for an IO " + "operation" + }, + "iops": { + "unit": "IOPS", + "description": "Read and write operations per " + "second" + }, + "readThroughput": { + "unit": "MB/s", + "description": "Total read data transferred per " + "second " + }, + "writeThroughput": { + "unit": "MB/s", + "description": "Total write data transferred per " + "second " + }, + "readIops": { + "unit": "IOPS", + "description": "Read operations per second" + }, + "writeIops": { + "unit": "IOPS", + "description": "Write operations per second" + }, + + }, + "port": { + "throughput": { + "unit": "MB/s", + "description": "Total data transferred per second " + }, + "responseTime": { + "unit": "ms", + "description": "Average time taken for an IO " + "operation" + }, + "iops": { + "unit": "IOPS", + "description": "Read and write operations per " + "second" + }, + "readThroughput": { + "unit": "MB/s", + "description": "Total read data transferred per " + "second " + }, + "writeThroughput": { + "unit": "MB/s", + "description": "Total write data transferred per " + "second " + }, + "readIops": { + "unit": "IOPS", + "description": "Read operations per second" + }, + "writeIops": { + "unit": "IOPS", + "description": "Write operations per second" + }, + + }, + "disk": { + "throughput": { + "unit": "MB/s", + "description": "Total data transferred per second " + }, + "responseTime": { + "unit": "ms", + "description": "Average time taken for an IO " + "operation" + }, + "iops": { + "unit": "IOPS", + "description": "Read and write operations per" + " second" + }, + "readThroughput": { + "unit": "MB/s", + "description": "Total read data transferred per" + " second " + }, + "writeThroughput": { + "unit": "MB/s", + "description": "Total write data transferred per" + " second " + }, + "readIops": { + "unit": "IOPS", + "description": "Read operations per second" + }, + "writeIops": { + "unit": "IOPS", + "description": "Write operations per second" + }, + + }, + } + } diff --git a/delfin/exporter/prometheus/exporter_server.py b/delfin/exporter/prometheus/exporter_server.py index ecbd9a95d..99cb4db4f 100644 --- a/delfin/exporter/prometheus/exporter_server.py +++ b/delfin/exporter/prometheus/exporter_server.py @@ -11,24 +11,28 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import glob +import os from flask import Flask from oslo_config import cfg import sys +from oslo_log import log + +LOG = log.getLogger(__name__) app = Flask(__name__) grp = cfg.OptGroup('PROMETHEUS_EXPORTER') - +METRICS_CACHE_DIR = '/var/lib/delfin/metrics' prometheus_opts = [ cfg.StrOpt('metric_server_ip', default='0.0.0.0', help='The exporter server host ip'), cfg.IntOpt('metric_server_port', default=8195, help='The exporter server port'), - cfg.StrOpt('metrics_cache_file', default='/var/lib/delfin/delfin_exporter' - '.txt', - help='The temp cache file used for persisting metrics'), + cfg.StrOpt('metrics_dir', default=METRICS_CACHE_DIR, + + help='The temp directory to keep incoming metrics'), ] cfg.CONF.register_opts(prometheus_opts, group=grp) cfg.CONF(sys.argv[1:]) @@ -36,9 +40,22 @@ @app.route("/metrics", methods=['GET']) def getfile(): - with open(cfg.CONF.PROMETHEUS_EXPORTER.metrics_cache_file, "r+") as f: - data = f.read() - f.truncate(0) + try: + if not os.path.exists(cfg.CONF.PROMETHEUS_EXPORTER.metrics_dir): + LOG.error('No metrics cache folder exists') + return '' + os.chdir(cfg.CONF.PROMETHEUS_EXPORTER.metrics_dir) + except OSError as e: + LOG.error('Error opening metrics folder') + raise Exception(e) + data = '' + for file in glob.glob("*.prom"): + file_name = cfg.CONF.PROMETHEUS_EXPORTER.metrics_dir + '/' + file + with open(file_name, "r") as f: + data += f.read() + # Remove a metric file after reading it + os.remove(file_name) + return data diff --git a/delfin/exporter/prometheus/prometheus.py b/delfin/exporter/prometheus/prometheus.py index fce5d518b..95ca7406a 100644 --- a/delfin/exporter/prometheus/prometheus.py +++ b/delfin/exporter/prometheus/prometheus.py @@ -11,20 +11,30 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import datetime +import os +import pytz +import six from oslo_config import cfg +from oslo_log import log +from tzlocal import get_localzone -grp = cfg.OptGroup('PROMETHEUS_EXPORTER') +LOG = log.getLogger(__name__) +grp = cfg.OptGroup('PROMETHEUS_EXPORTER') +METRICS_CACHE_DIR = '/var/lib/delfin/metrics' prometheus_opts = [ - cfg.StrOpt('metrics_cache_file', default='/var/lib/delfin/delfin_exporter' - '.txt', - help='The temp cache file used for persisting metrics'), + cfg.StrOpt('metrics_dir', default=METRICS_CACHE_DIR, + + help='The temp directory to keep incoming metrics'), + cfg.StrOpt('timezone', + default='local', + help='time zone of prometheus server ' + ), ] cfg.CONF.register_opts(prometheus_opts, group=grp) - """" The metrics received from driver is should be in this format storage_metrics = [Metric(name='response_time', @@ -41,26 +51,63 @@ values={1600998817585: 20.264160223426305})] """ -unit_of_metric = {'response_time': 'ms', 'throughput': 'IOPS', - 'read_throughput': 'IOPS', 'write_throughput': 'IOPS', - 'bandwidth': 'MBps', 'read_bandwidth': 'MBps', - 'write_bandwidth': 'MBps' - } - class PrometheusExporter(object): + def __init__(self): + self.timestamp_offset_ms = 0 + self.metrics_dir = cfg.CONF.PROMETHEUS_EXPORTER.metrics_dir + + def check_metrics_dir_exists(self, directory): + try: + if not os.path.exists(directory): + os.makedirs(directory) + return True + except Exception as e: + msg = six.text_type(e) + LOG.error("Error while creating metrics directory. Reason: %s", + msg) + return False + + def set_timestamp_offset_from_utc_ms(self): + """Set timestamp offset from utc required for all metrics""" + try: + timez = get_localzone() + if cfg.CONF.PROMETHEUS_EXPORTER.timezone != 'local': + timez = pytz.timezone(cfg.CONF.PROMETHEUS_EXPORTER.timezone) + timez.utcoffset(datetime.datetime.now()) + return int(timez.utcoffset( + datetime.datetime.now()).total_seconds() * 1000) + except Exception: + LOG.error('Error while setting timestamp' + ' offset for prometheus exporter') + # return no offset in case of an error + return 0 + # Print metrics in Prometheus format. - def _write_to_prometheus_format(self, f, metric, labels, values): - f.write("# HELP storage_%s storage metric for %s\n" % (metric, metric)) - f.write("# TYPE storage_%s gauge\n" % metric) + def _write_to_prometheus_format(self, f, metric, + labels, prom_labels, values): + f.write("# HELP %s metric for resource %s and instance %s\n" + % (metric, labels.get('resource_type'), + labels.get('resource_id'))) + f.write("# TYPE %s gauge\n" % metric) for timestamp, value in values.items(): - f.write("storage_%s{%s} %f %d\n" % (metric, labels, - value, timestamp)) + timestamp += self.timestamp_offset_ms + f.write("%s{%s} %f %d\n" % (metric, prom_labels, + value, timestamp)) def push_to_prometheus(self, storage_metrics): - with open(cfg.CONF.PROMETHEUS_EXPORTER.metrics_cache_file, "a+") as f: + self.timestamp_offset_ms = self.set_timestamp_offset_from_utc_ms() + if not self.check_metrics_dir_exists(self.metrics_dir): + return + time_stamp = str(datetime.datetime.now().timestamp()) + temp_file_name = os.path.join(self.metrics_dir, + time_stamp + ".prom.temp") + actual_file_name = os.path.join(self.metrics_dir, + time_stamp + ".prom") + # make a temp file with current timestamp + with open(temp_file_name, "w") as f: for metric in storage_metrics: name = metric.name labels = metric.labels @@ -69,14 +116,28 @@ def push_to_prometheus(self, storage_metrics): storage_name = labels.get('name') storage_sn = labels.get('serial_number') resource_type = labels.get('resource_type') - unit = unit_of_metric.get(name) + resource_id = labels.get('resource_id') + unit = labels.get('unit') + m_type = labels.get('type', 'RAW') value_type = labels.get('value_type', 'gauge') - storage_labels = ( - "storage_id=\"%s\",storage_name=\"%s\",storage_sn=\"%s\"," - "resource_type=\"%s\", " - "type=\"%s\",unit=\"%s\",value_type=\"%s\"" % + prom_labels = ( + "storage_id=\"%s\"," + "storage_name=\"%s\"," + "storage_sn=\"%s\"," + "resource_type=\"%s\"," + "resource_id=\"%s\"," + "type=\"%s\"," + "unit=\"%s\"," + "value_type=\"%s\"" % (storage_id, storage_name, storage_sn, resource_type, - 'RAW', unit, value_type)) - - self._write_to_prometheus_format(f, name, storage_labels, + resource_id, + m_type, unit, value_type)) + name = labels.get('resource_type') + '_' + name + self._write_to_prometheus_format(f, name, labels, prom_labels, values) + # this is done so that the exporter server never see an incomplete file + try: + f.close() + os.renames(temp_file_name, actual_file_name) + except Exception: + LOG.error('Error while renaming the temporary metric file') diff --git a/delfin/tests/unit/drivers/test_api.py b/delfin/tests/unit/drivers/test_api.py index ec1ada643..f591e8cbd 100644 --- a/delfin/tests/unit/drivers/test_api.py +++ b/delfin/tests/unit/drivers/test_api.py @@ -20,7 +20,7 @@ from delfin import context from delfin import exception -from delfin.common import config # noqa +from delfin.common import config, constants # noqa from delfin.drivers.api import API from delfin.drivers.fake_storage import FakeStorageDriver @@ -74,7 +74,6 @@ def test_init(self): @mock.patch('delfin.db.storage_get_all') def test_discover_storage(self, mock_storage, mock_access_info, mock_storage_create): - # Case: Positive scenario for fake driver discovery storage = copy.deepcopy(STORAGE) storage['id'] = '12345' @@ -350,3 +349,16 @@ def test_get_capabilities(self, driver_manager): self.assertTrue('resource_metrics' in capabilities) driver_manager.assert_called_once() + + @mock.patch('delfin.drivers.manager.DriverManager.get_driver') + def test_collect_perf_metrics(self, driver_manager): + driver_manager.return_value = FakeStorageDriver() + storage_id = '12345' + capabilities = API().get_capabilities(context, storage_id) + + metrics = API().collect_perf_metrics(context, storage_id, + capabilities['resource_metrics'], + 1622808000000, 1622808000001) + self.assertTrue('resource_metrics' in capabilities) + self.assertTrue(True, isinstance(metrics[0], constants.metric_struct)) + self.assertEqual(driver_manager.call_count, 2) diff --git a/delfin/tests/unit/exporter/prometheus/__init__.py b/delfin/tests/unit/exporter/prometheus/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/delfin/tests/unit/exporter/prometheus/test_prometheus.py b/delfin/tests/unit/exporter/prometheus/test_prometheus.py new file mode 100644 index 000000000..176fdf3f2 --- /dev/null +++ b/delfin/tests/unit/exporter/prometheus/test_prometheus.py @@ -0,0 +1,35 @@ +# Copyright 2021 The SODA Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import glob +import os +from unittest import TestCase + +from delfin.exporter.prometheus import prometheus +from delfin.common.constants import metric_struct + +fake_metrics = [metric_struct(name='throughput', + labels={'storage_id': '12345', + 'resource_type': 'storage', + 'resource_id': 'storage0', + 'type': 'RAW', 'unit': 'MB/s'}, + values={1622808000000: 61.9388895680357})] + + +class TestPrometheusExporter(TestCase): + + def test_push_to_prometheus(self): + prometheus_obj = prometheus.PrometheusExporter() + prometheus_obj.metrics_dir = os.getcwd() + prometheus_obj.push_to_prometheus(fake_metrics) + self.assertTrue(glob.glob(prometheus_obj.metrics_dir + '/' + '*.prom'))