Skip to content
This repository has been archived by the owner on Dec 17, 2021. It is now read-only.

Commit

Permalink
feat: add additionalvarbinds to enricher (#126)
Browse files Browse the repository at this point in the history
* feat: reorganize enricher field (add division on existing and additional varbinds)

* clean: code cleanup

* fix: add more unit tests

* code: mongo.py refactor

* feat: create variables file to for keeping string data, fix unit tests

* feat: add enricher and oidFamily to variables
  • Loading branch information
omrozowicz-splunk authored Sep 21, 2021
1 parent 04b16f7 commit 98aeaf3
Show file tree
Hide file tree
Showing 11 changed files with 356 additions and 101 deletions.
11 changes: 7 additions & 4 deletions splunk_connect_for_snmp_poller/manager/hec_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,9 @@ def _enrich_event_data(mib_enricher: MibEnricher, variables_binds: dict) -> str:
"""
metric_result = json.loads(variables_binds["metric"])
non_metric_result = variables_binds["non_metric"]
mib_enricher.append_additional_dimensions(metric_result)
for field_name in mib_enricher.dimensions_fields:
additional_dimensions = mib_enricher.append_additional_dimensions(metric_result)
logger.info(additional_dimensions)
for field_name in additional_dimensions:
if field_name in metric_result:
non_metric_result += f'{field_name}="{metric_result[field_name]}" '
return non_metric_result
Expand Down Expand Up @@ -160,7 +161,9 @@ def post_metric_data(endpoint, host, variables_binds, index, mib_enricher=None):
def _enrich_metric_data(
mib_enricher: MibEnricher, variables_binds: dict, fields: dict
) -> None:
mib_enricher.append_additional_dimensions(variables_binds)
for field_name in mib_enricher.dimensions_fields:
additional_if_mib_dimensions = mib_enricher.append_additional_dimensions(
variables_binds
)
for field_name in additional_if_mib_dimensions:
if field_name in variables_binds:
fields[field_name] = variables_binds[field_name]
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,42 @@
from splunk_connect_for_snmp_poller.manager.realtime.interface_mib import InterfaceMib
from splunk_connect_for_snmp_poller.utilities import multi_key_lookup

from ..variables import (
enricher_additional_varbinds,
enricher_existing_varbinds,
enricher_name,
enricher_oid_family,
)

logger = logging.getLogger(__name__)


def __network_interface_enricher_attributes(config_as_dict):
def __network_interface_enricher_attributes(config_as_dict, oid_family, varbinds_type):
# TODO: we just assume here the whole structre of the poller's configuration
# main file. If such section does not exist we simply do not anything.
return "IF-MIB", multi_key_lookup(
config_as_dict, ("enricher", "oidFamily", "IF-MIB")
result = multi_key_lookup(
config_as_dict, (enricher_name, enricher_oid_family, oid_family, varbinds_type)
)
return result or []


def extract_network_interface_data_from_additional_config(config_as_dict):
result = {}
oid_families = config_as_dict[enricher_name][enricher_oid_family]
for oid_family in oid_families.keys():
additional_list = __network_interface_enricher_attributes(
config_as_dict, oid_family, enricher_additional_varbinds
)
result[oid_family] = {}
for el in additional_list:
for key, values in el.items():
result[oid_family][key] = values
return result


def extract_network_interface_data_from_config(config_as_dict):
parent_oid, splunk_dimensions = __network_interface_enricher_attributes(
config_as_dict
def extract_network_interface_data_from_existing_config(config_as_dict):
splunk_dimensions = __network_interface_enricher_attributes(
config_as_dict, "IF-MIB", enricher_existing_varbinds
)
result = []
if splunk_dimensions:
Expand All @@ -51,7 +73,9 @@ def extract_network_interface_data_from_walk(config_as_dict, if_mib_metric_walk_
result = []
network_data = InterfaceMib(if_mib_metric_walk_data)
if network_data.has_consistent_data():
enricher_fields = extract_network_interface_data_from_config(config_as_dict)
enricher_fields = extract_network_interface_data_from_existing_config(
config_as_dict
)
for data in enricher_fields:
splunk_dimension = data["splunk_dimension_name"]
current_result = network_data.extract_custom_field(data["oid_name"])
Expand Down
65 changes: 45 additions & 20 deletions splunk_connect_for_snmp_poller/manager/static/mib_enricher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

from splunk_connect_for_snmp_poller.manager.realtime.interface_mib import InterfaceMib

from ..variables import enricher_additional_varbinds, enricher_existing_varbinds

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -54,25 +56,26 @@ def extract_dimension_name_and_value(dimension, index):
class MibEnricher:
def __init__(self, mib_static_data_collection):
self._mib_static_data_collection = mib_static_data_collection
self.dimensions_fields = self.__collect_if_mib_fields(
mib_static_data_collection
)

def __collect_if_mib_fields(self, mib_static_data_collection):
fields = []
if not mib_static_data_collection:
return []
for el in mib_static_data_collection:
fields += list(el.keys())
logger.info(f"_mib_static_data_collection: {mib_static_data_collection}")
logger.info(f"__collect_if_mib_fields: {fields}")
return fields
def get_by_oid(self, oid_family):
if oid_family not in self._mib_static_data_collection:
return {}
return self._mib_static_data_collection[oid_family]

def __enrich_if_mib(self, metric_name):
def get_by_oid_and_type(self, oid_family, type):
oid_record = self.get_by_oid(oid_family)
if not oid_record:
oid_record = self.get_by_oid(oid_family.split(".")[1])
return oid_record.get(type, {})

def __enrich_if_mib_existing(self, metric_name):
result = []
if metric_name and metric_name.startswith(InterfaceMib.IF_MIB_METRIC_PREFIX):
if self._mib_static_data_collection:
for dimension in self._mib_static_data_collection:
if_mib_record = self.get_by_oid_and_type(
InterfaceMib.IF_MIB_METRIC_PREFIX, enricher_existing_varbinds
)
for dimension in if_mib_record:
index = extract_current_index_from_metric(metric_name)
(
dimension_name,
Expand All @@ -82,14 +85,36 @@ def __enrich_if_mib(self, metric_name):
result.append({dimension_name: dimension_value})
return result

def __enrich_if_mib_additional(self, metric_name):
for oid_family in self._mib_static_data_collection.keys():
if oid_family in metric_name:
try:
index = extract_current_index_from_metric(metric_name) + 1
index_field = self.get_by_oid_and_type(
oid_family, enricher_additional_varbinds
)["indexNum"]
return [{index_field: index}]
except KeyError:
logger.error("Enricher additionalVarBinds badly formatted")
except TypeError:
logger.debug(f"Can't get the index from metric name: {metric_name}")
return []

def append_additional_dimensions(self, translated_var_bind):
if translated_var_bind:
metric_name = translated_var_bind[InterfaceMib.METRIC_NAME_KEY]
logger.info(f"metric_name: {metric_name}")
additional_if_mib_dimensions = self.__enrich_if_mib(metric_name)
logger.info(f"ADDITIONAL_IF_DIMENSIONS: {additional_if_mib_dimensions}")
if additional_if_mib_dimensions:
for more_data in additional_if_mib_dimensions:
translated_var_bind.update(more_data)
additional_if_mib_dimensions = []
fields_list = []
if self._mib_static_data_collection:
additional_if_mib_dimensions += self.__enrich_if_mib_existing(
metric_name
)
additional_if_mib_dimensions += self.__enrich_if_mib_additional(
metric_name
)
for more_data in additional_if_mib_dimensions:
translated_var_bind.update(more_data)
fields_list += list(more_data.keys())
return fields_list
else:
logger.warning("None translated var binds, enrichment process will be skip")
33 changes: 23 additions & 10 deletions splunk_connect_for_snmp_poller/manager/task_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from splunk_connect_for_snmp_poller.manager.hec_sender import post_data_to_splunk_hec
from splunk_connect_for_snmp_poller.manager.mib_server_client import get_translation
from splunk_connect_for_snmp_poller.manager.static.interface_mib_utililities import (
extract_network_interface_data_from_additional_config,
extract_network_interface_data_from_walk,
)
from splunk_connect_for_snmp_poller.manager.static.mib_enricher import MibEnricher
Expand Down Expand Up @@ -316,7 +317,6 @@ def _any_walk_failure_happened(
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex) - 1][0] or "?",
)
logger.info(result)
post_data_to_splunk_hec(
host,
otel_logs_url,
Expand Down Expand Up @@ -363,16 +363,15 @@ async def snmp_bulk_handler(
if not _any_failure_happened(
errorIndication, errorStatus, errorIndex, varBinds
):
mib_enricher, return_multimetric = _enrich_response(
mongo_connection, enricher_presence, f"{host}:{port}"
)
# Bulk operation returns array of varbinds
for varbind in varBinds:
mib_enricher, return_multimetric = _enrich_response(
mongo_connection, enricher_presence, f"{host}:{port}"
)
logger.debug(f"Bulk returned this varbind: {varbind}")
result, is_metric = await get_translated_string(
mib_server_url, [varbind], return_multimetric
)
logger.info(result)
post_data_to_splunk_hec(
host,
otel_logs_url,
Expand Down Expand Up @@ -497,9 +496,14 @@ async def walk_handler_with_enricher(
)

processed_result = extract_network_interface_data_from_walk(enricher, merged_result)
mongo_connection.update_mib_static_data_for(f"{host}:{port}", processed_result)
additional_enricher_varbinds = (
extract_network_interface_data_from_additional_config(enricher)
)
mib_enricher = _return_mib_enricher_for_walk(
mongo_connection, processed_result, f"{host}:{port}"
mongo_connection,
f"{host}:{port}",
processed_result,
additional_enricher_varbinds,
)
post_walk_data_to_splunk_arguments = [
host,
Expand Down Expand Up @@ -547,9 +551,18 @@ def _sort_walk_data(
merged_result.append(eval(result["metric"]))


def _return_mib_enricher_for_walk(mongo_connection, processed_result, hostname):
if processed_result:
mongo_connection.update_mib_static_data_for(hostname, processed_result)
def _return_mib_enricher_for_walk(
mongo_connection, hostname, existing_data, additional_data
):
"""
This function works only when an enricher is specified in the config and walk is being ran.
If any data was derived from walk result, then the function updates MongoDB with the result.
If no data was derived from the walk, then it's being retrieved from the MongoDB.
"""
if existing_data or additional_data:
processed_result = mongo_connection.update_mib_static_data_for(
hostname, existing_data, additional_data
)
return MibEnricher(processed_result)
else:
processed_data = mongo_connection.static_data_for(hostname)
Expand Down
4 changes: 2 additions & 2 deletions splunk_connect_for_snmp_poller/manager/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async def get_snmp_data(
var_binds,
handler,
mongo_connection,
enricher_presence,
enricher,
snmp_engine,
auth_data,
context_data,
Expand All @@ -69,7 +69,7 @@ async def get_snmp_data(
try:
await handler(
mongo_connection,
enricher_presence,
enricher,
snmp_engine,
auth_data,
context_data,
Expand Down
18 changes: 18 additions & 0 deletions splunk_connect_for_snmp_poller/manager/variables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright 2021 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
enricher_name = "enricher"
enricher_existing_varbinds = "existingVarBinds"
enricher_additional_varbinds = "additionalVarBinds"
enricher_oid_family = "oidFamily"
62 changes: 46 additions & 16 deletions splunk_connect_for_snmp_poller/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@
# under the License.
import logging
import os
from collections import defaultdict

from pymongo import MongoClient, ReturnDocument
from pymongo.errors import ConnectionFailure

from splunk_connect_for_snmp_poller.manager.realtime.interface_mib import InterfaceMib

from .manager.variables import enricher_additional_varbinds, enricher_existing_varbinds

logger = logging.getLogger(__name__)

"""
Expand Down Expand Up @@ -126,9 +129,7 @@ def static_data_for(self, host):
return None
if WalkedHostsRepository.MIB_STATIC_DATA in full_collection:
mib_static_data = full_collection[WalkedHostsRepository.MIB_STATIC_DATA]
if InterfaceMib.IF_MIB_DATA_MONGO_IDENTIFIER in mib_static_data:
return mib_static_data[InterfaceMib.IF_MIB_DATA_MONGO_IDENTIFIER]
return None
return mib_static_data
else:
return None

Expand All @@ -143,17 +144,46 @@ def update_real_time_data_for(self, host, input_dictionary):
return_document=ReturnDocument.AFTER,
)

# Input is what extract_network_interface_data_from_walk() returns
def update_mib_static_data_for(self, host, if_mib_data):
if if_mib_data:
real_time_data_dictionary = {
WalkedHostsRepository.MIB_STATIC_DATA: {
InterfaceMib.IF_MIB_DATA_MONGO_IDENTIFIER: if_mib_data
}
def create_mib_static_data_mongo_structure(self, existing_data, additional_data):
"""
This function creates database mib static data structure out of existing_data and additional_data provided
from config.yaml and the data derived from SNMP Walk, for ex.:
existing_data = [{'interface_index': ['1', '2']}, {'interface_desc': ['lo', 'eth0']}]
additional_data = {'IF-MIB': {'indexNum': 'index_num'}, 'SNMPv2-MIB': {'indexNum': 'index_num'}}
Returned structure should look like this:
{ "MIB-STATIC-DATA": {
{'IF-MIB':
{'existingVarBinds': [{'interface_index': ['1', '2']}, {'interface_desc': ['lo', 'eth0']}],
'additionalVarBinds': {'indexNum': 'index_num'}},
'SNMPv2-MIB':
{'additionalVarBinds': {'indexNum': 'index_num'}}
}
self._walked_hosts.find_one_and_update(
{"_id": host},
{"$set": real_time_data_dictionary},
upsert=True,
return_document=ReturnDocument.AFTER,
)
}
"""
static_data_dictionary = {"MIB-STATIC-DATA": defaultdict(dict)}
static_data_dictionary_mib = static_data_dictionary["MIB-STATIC-DATA"]
if existing_data:
static_data_dictionary_mib[InterfaceMib.IF_MIB_DATA_MONGO_IDENTIFIER][
enricher_existing_varbinds
] = existing_data
for el in additional_data.keys():
static_data_dictionary_mib[el][
enricher_additional_varbinds
] = additional_data[el]
return static_data_dictionary

# Input is what extract_network_interface_data_from_walk() returns
def update_mib_static_data_for(self, host, existing_data, additional_data):
if not existing_data and not additional_data:
return
static_data_dictionary = self.create_mib_static_data_mongo_structure(
existing_data, additional_data
)
self._walked_hosts.find_one_and_update(
{"_id": host},
{"$set": static_data_dictionary},
upsert=True,
return_document=ReturnDocument.AFTER,
)
return static_data_dictionary["MIB-STATIC-DATA"]
Loading

0 comments on commit 98aeaf3

Please sign in to comment.