Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fix about alarm mangement and refactor some classes and constants. #912

Merged
merged 2 commits into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion delfin/alert_manager/alert_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def process_alert_info(self, alert):
ctxt, storage, alert_model)
alert_util.fill_storage_attributes(alert_model, storage)
except exception.IncompleteTrapInformation as e:
LOG.warn(e)
LOG.warning(e)
threading.Thread(target=self.sync_storage_alert,
args=(ctxt, alert['storage_id'])).start()
except exception.AlertSourceNotFound:
Expand Down
37 changes: 30 additions & 7 deletions delfin/alert_manager/snmp_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import binascii
import copy

import six
from oslo_config import cfg
from oslo_log import log
from oslo_utils import encodeutils
from pyasn1.type.univ import OctetString
from pysnmp.entity.rfc3413.oneliner import cmdgen

Expand All @@ -39,8 +41,21 @@ def __init__(self):
def validate(self, ctxt, alert_source):
engine_id = alert_source.get('engine_id')
try:
alert_source = self.validate_connectivity(alert_source)

hosts = alert_source['host'].split(',')
temp_alert_source = copy.deepcopy(alert_source)
# Sets a value to raise a SNMPConnectionFailed when multiple
# alarm sources fail to be verified
connection_times = 0
for host in hosts:
temp_alert_source['host'] = host
try:
connection_times += 1
alert_source = \
self.validate_connectivity(ctxt, temp_alert_source)
break
except Exception as e:
if connection_times == len(hosts):
raise e
# If protocol is snmpv3, the snmp_validator will update
# engine id if engine id is empty. Therefore, engine id
# should be saved in database.
Expand All @@ -61,7 +76,7 @@ def validate(self, ctxt, alert_source):
LOG.error("Failed to check snmp config. Reason: %s", msg)

@staticmethod
def validate_connectivity(alert_source):
def validate_connectivity(ctxt, alert_source):
# Fill optional parameters with default values if not set in input
if not alert_source.get('port'):
alert_source['port'] = constants.DEFAULT_SNMP_CONNECT_PORT
Expand All @@ -78,6 +93,12 @@ def validate_connectivity(alert_source):
if CONF.snmp_validation_enabled is False:
return alert_source

storage_id = alert_source.get('storage_id')
access_info = db.access_info_get(ctxt, storage_id)
access_info = dict(access_info)
if access_info.get('model') not in constants.SNMP_SUPPORTED_MODELS:
return alert_source

cmd_gen = cmdgen.CommandGenerator()

version = alert_source.get('version')
Expand All @@ -103,10 +124,12 @@ def validate_connectivity(alert_source):
)
auth_key = None
if alert_source['auth_key']:
auth_key = cryptor.decode(alert_source['auth_key'])
auth_key = encodeutils.to_utf8(
cryptor.decode(alert_source['auth_key']))
privacy_key = None
if alert_source['privacy_key']:
privacy_key = cryptor.decode(alert_source['privacy_key'])
privacy_key = encodeutils.to_utf8(
cryptor.decode(alert_source['privacy_key']))
auth_protocol = None
privacy_protocol = None
if alert_source['auth_protocol']:
Expand Down Expand Up @@ -135,8 +158,8 @@ def validate_connectivity(alert_source):
alert_source['engine_id'] = binascii.hexlify(
engine_id.asOctets()).decode()
else:
community_string = cryptor.decode(
alert_source['community_string'])
community_string = encodeutils.to_utf8(
cryptor.decode(alert_source['community_string']))
error_indication, __, __, __ = cmd_gen.getCmd(
cmdgen.CommunityData(
community_string,
Expand Down
125 changes: 84 additions & 41 deletions delfin/alert_manager/trap_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
import six
from oslo_log import log
from oslo_service import periodic_task
from oslo_utils import encodeutils
from pysnmp.carrier.asyncore.dgram import udp
from pysnmp.entity import engine, config
from pysnmp.entity.rfc3413 import ntfrcv
from pysnmp.proto.api import v2c
from pysnmp.smi import builder, view
from retrying import retry

from delfin import context, cryptor
from delfin import db
Expand Down Expand Up @@ -61,41 +63,53 @@ def sync_snmp_config(self, ctxt, snmp_config_to_del=None,
self._add_snmp_config(ctxt, snmp_config_to_add)

def _add_snmp_config(self, ctxt, new_config):
LOG.info("Start to add snmp trap config.")
storage_id = new_config.get("storage_id")
version_int = self._get_snmp_version_int(ctxt,
new_config.get("version"))
if version_int == constants.SNMP_V2_INT or \
version_int == constants.SNMP_V1_INT:
community_string = cryptor.decode(
new_config.get("community_string"))
community_index = self._get_community_index(storage_id)
config.addV1System(self.snmp_engine, community_index,
community_string, contextName=community_string)
else:
username = new_config.get("username")
engine_id = new_config.get("engine_id")
if engine_id:
engine_id = v2c.OctetString(hexValue=engine_id)

auth_key = new_config.get("auth_key")
auth_protocol = new_config.get("auth_protocol")
privacy_key = new_config.get("privacy_key")
privacy_protocol = new_config.get("privacy_protocol")
if auth_key:
auth_key = cryptor.decode(auth_key)
if privacy_key:
privacy_key = cryptor.decode(privacy_key)
config.addV3User(
self.snmp_engine,
userName=username,
authKey=auth_key,
privKey=privacy_key,
authProtocol=self._get_usm_auth_protocol(ctxt,
auth_protocol),
privProtocol=self._get_usm_priv_protocol(ctxt,
privacy_protocol),
securityEngineId=engine_id)
LOG.info("Start to add snmp trap config for storage: %s",
storage_id)
try:
version_int = self._get_snmp_version_int(ctxt,
new_config.get("version"))
if version_int == constants.SNMP_V2_INT or \
version_int == constants.SNMP_V1_INT:
community_string = cryptor.decode(
new_config.get("community_string"))
community_string = encodeutils.to_utf8(community_string)
community_index = self._get_community_index(storage_id)
config.addV1System(self.snmp_engine, community_index,
community_string,
contextName=community_string)
else:
username = new_config.get("username")
engine_id = new_config.get("engine_id")
if engine_id:
engine_id = v2c.OctetString(hexValue=engine_id)

auth_key = new_config.get("auth_key")
auth_protocol = new_config.get("auth_protocol")
privacy_key = new_config.get("privacy_key")
privacy_protocol = new_config.get("privacy_protocol")
if auth_key:
auth_key = encodeutils.to_utf8(cryptor.decode(auth_key))
if privacy_key:
privacy_key = encodeutils.to_utf8(
cryptor.decode(privacy_key))
config.addV3User(
self.snmp_engine,
userName=username,
authKey=auth_key,
privKey=privacy_key,
authProtocol=self._get_usm_auth_protocol(ctxt,
auth_protocol),
privProtocol=self._get_usm_priv_protocol(ctxt,
privacy_protocol),
securityEngineId=engine_id)
LOG.info("Add snmp trap config for storage: %s successfully.",
storage_id)
except Exception as e:
msg = six.text_type(e)
LOG.error("Failed to add snmp trap config for storage: %s. "
"Reason: %s", storage_id, msg)
raise e

def _delete_snmp_config(self, ctxt, snmp_config):
LOG.info("Start to remove snmp trap config.")
Expand Down Expand Up @@ -168,22 +182,49 @@ def _add_transport(self):
udp.UdpTransport().openServerMode(
(self.trap_receiver_address, int(self.trap_receiver_port)))
)
except Exception:
raise ValueError("Port binding failed: Port is in use.")
except Exception as e:
LOG.error('Failed to add transport, error is %s'
% six.text_type(e))
raise exception.DelfinException(message=six.text_type(e))

@staticmethod
def _get_alert_source_by_host(source_ip):
"""Gets alert source for given source ip address."""
filters = {'host': source_ip}
filters = {'host~': source_ip}
ctxt = context.RequestContext()

# Using the known filter and db exceptions are handled by api
alert_source = db.alert_source_get_all(ctxt, filters=filters)
if not alert_source:
alert_sources = db.alert_source_get_all(ctxt, filters=filters)
if not alert_sources:
raise exception.AlertSourceNotFoundWithHost(source_ip)

# Return first configured source that can handle the trap
return alert_source[0]
# This is to make sure unique host is configured each alert source
unique_alert_source = None
if len(alert_sources) > 1:
# Clear invalid alert_source
for alert_source in alert_sources:
try:
db.storage_get(ctxt, alert_source['storage_id'])
except exception.StorageNotFound:
LOG.warning('Found redundancy alert source for storage %s'
% alert_source['storage_id'])
try:
db.alert_source_delete(
ctxt, alert_source['storage_id'])
except Exception as e:
LOG.warning('Delete the invalid alert source failed, '
'reason is %s' % six.text_type(e))
else:
unique_alert_source = alert_source
else:
unique_alert_source = alert_sources[0]

if unique_alert_source is None:
msg = (_("Failed to get unique alert source with host %s.")
% source_ip)
raise exception.InvalidResults(msg)

return unique_alert_source

def _cb_fun(self, state_reference, context_engine_id, context_name,
var_binds, cb_ctx):
Expand Down Expand Up @@ -256,6 +297,8 @@ def _load_snmp_config(self):
if len(alert_sources) < limit:
finished = True

@retry(stop_max_attempt_number=180, wait_random_min=4000,
wait_random_max=6000)
def start(self):
"""Starts the snmp trap receiver with necessary prerequisites."""
snmp_engine = engine.SnmpEngine()
Expand Down
4 changes: 2 additions & 2 deletions delfin/api/v1/storages.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ def sync_all(self, req):
try:
_set_synced_if_ok(ctxt, storage['id'], resource_count)
except exception.InvalidInput as e:
LOG.warn('Can not start new sync task for %s, reason is %s'
% (storage['id'], e.msg))
LOG.warning('Can not start new sync task for %s, reason is %s'
% (storage['id'], e.msg))
continue
else:
for subclass in \
Expand Down
Loading