Skip to content
This repository has been archived by the owner on Dec 17, 2021. It is now read-only.

Commit

Permalink
feat: removal of async (#191)
Browse files Browse the repository at this point in the history
* feat: removal of async

* feat: removal of async
  • Loading branch information
weliasz authored Nov 5, 2021
1 parent e8fa76a commit 747945b
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 34 deletions.
59 changes: 42 additions & 17 deletions splunk_connect_for_snmp_poller/manager/mib_server_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,24 @@
import os
import time

import aiohttp
import backoff as backoff
import requests as requests
from aiohttp import ClientSession
from requests.adapters import HTTPAdapter
from urllib3 import Retry

logger = logging.getLogger(__name__)


async def get_translation(var_binds, mib_server_url, data_format):
def get_translation(var_binds, mib_server_url, data_format):
"""
@param var_binds: var_binds object getting from SNMP agents
@param mib_server_url: URL of SNMP MIB server
@param data_format: format of data
@return: translated string
"""
payload = await prepare_payload(var_binds)
payload = prepare_payload(var_binds)

try:
return await get_url(mib_server_url, payload, data_format)
return get_url(mib_server_url, payload, data_format)
except requests.Timeout:
logger.exception("Time out occurred during call to MIB Server")
raise
Expand All @@ -48,7 +47,7 @@ async def get_translation(var_binds, mib_server_url, data_format):
raise


async def prepare_payload(var_binds):
def prepare_payload(var_binds):
payload = {}
var_binds_list = []
# *TODO*: Below differs a bit between poller and trap!
Expand All @@ -65,22 +64,48 @@ async def prepare_payload(var_binds):
return payload


@backoff.on_exception(backoff.expo, aiohttp.ClientError, max_tries=3)
async def get_url(mib_server_url, payload, data_format):
class SharedException(object):
pass


def get_url(mib_server_url, payload, data_format):
headers = {"Content-type": "application/json"}
endpoint = "translation"
translation_url = os.path.join(mib_server_url.strip("/"), endpoint)
logger.debug("[-] translation_url: %s", translation_url)

async with ClientSession(raise_for_status=True) as session:
resp = await session.post(
translation_url,
headers=headers,
data=payload,
params={"data_format": data_format},
timeout=5,
# Set up the request params
params = {"data_format": data_format}

try:
# use Session with Retry
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
method_whitelist=["GET", "POST"],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session = requests.Session()
session.mount("https://", adapter)
session.mount("http://", adapter)
resp = session.post(
translation_url, headers=headers, data=payload, params=params, timeout=60
)

except Exception as e:
logger.error(
f"MIB server unreachable! Error happened while communicating to MIB server to perform the Translation: {e}"
)
return await resp.text()
raise SharedException("MIB server is unreachable!")

if resp.status_code != 200:
logger.error(f"[-] MIB Server API Error with code: {resp.status_code}")
raise SharedException(f"MIB Server API Error with code: {resp.status_code}")

# *TODO*: For future release could retain failed translations in some place to re-translate.

return resp.text


# 1.3.6.1.2.1.2.2.1.4.1|Integer|16436|16436|True
Expand Down
28 changes: 13 additions & 15 deletions splunk_connect_for_snmp_poller/manager/task_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,20 @@ def is_metric_data(value):
return False


async def get_translated_string(mib_server_url, var_binds, return_multimetric=False):
def get_translated_string(mib_server_url, var_binds, return_multimetric=False):
"""
Get the translated/formatted var_binds string depending on whether the var_binds is an event or metric
Note: if it failed to get translation, return the the original var_binds
@return result: formated string ready to be sent to Splunk HEC
@return is_metric: boolean, metric data flag
"""
logger.debug(f"Getting translation for the following var_binds: {var_binds}")
is_metric, result = await result_without_translation(var_binds, return_multimetric)
is_metric, result = result_without_translation(var_binds, return_multimetric)
original_varbinds = is_metric, result
# Override the var_binds string with translated var_binds string
try:
data_format = _get_data_format(is_metric, return_multimetric)
result = await get_translation(var_binds, mib_server_url, data_format)
result = get_translation(var_binds, mib_server_url, data_format)
if data_format == "MULTIMETRIC":
result = json.loads(result)["metric"]
logger.debug(f"multimetric result\n{result}")
Expand All @@ -105,15 +105,15 @@ async def get_translated_string(mib_server_url, var_binds, return_multimetric=Fa
if not is_metric_data(_value):
is_metric = False
data_format = _get_data_format(is_metric, return_multimetric)
result = await get_translation(var_binds, mib_server_url, data_format)
result = get_translation(var_binds, mib_server_url, data_format)
except Exception:
logger.exception("Could not perform translation. Returning original var_binds")
return original_varbinds
logger.debug(f"final result -- metric: {is_metric}\n{result}")
return result, is_metric


async def result_without_translation(var_binds, return_multimetric):
def result_without_translation(var_binds, return_multimetric):
# Get Original var_binds as backup in case the mib-server is unreachable
for name, val in var_binds:
# Original oid
Expand Down Expand Up @@ -214,7 +214,7 @@ def mib_string_handler(mib_list: list) -> VarbindCollection:
return VarbindCollection(get=get_list, bulk=bulk_list)


async def snmp_get_handler(
def snmp_get_handler(
mongo_connection,
enricher_presence,
snmp_engine,
Expand Down Expand Up @@ -249,7 +249,7 @@ async def snmp_get_handler(
mongo_connection, enricher_presence, f"{host}:{port}"
)
for varbind in varBinds:
result, is_metric = await get_translated_string(
result, is_metric = get_translated_string(
mib_server_url, [varbind], return_multimetric
)
post_data_to_splunk_hec(
Expand Down Expand Up @@ -370,7 +370,7 @@ def prepare_error_message(
return is_error, result


async def snmp_bulk_handler(
def snmp_bulk_handler(
mongo_connection,
enricher_presence,
snmp_engine,
Expand Down Expand Up @@ -409,7 +409,7 @@ async def snmp_bulk_handler(
# Bulk operation returns array of var_binds
for varbind in var_binds:
logger.debug(f"Bulk returned this varbind: {var_binds}")
result, is_metric = await get_translated_string(
result, is_metric = get_translated_string(
mib_server_url, [varbind], return_multimetric
)
post_data_to_splunk_hec(
Expand Down Expand Up @@ -442,7 +442,7 @@ async def snmp_bulk_handler(
break


async def walk_handler(
def walk_handler(
profile,
mongo_connection,
snmp_engine,
Expand Down Expand Up @@ -490,7 +490,7 @@ async def walk_handler(
error_in_one_time_walk = True
break
else:
result, is_metric = await get_translated_string(mib_server_url, var_binds)
result, is_metric = get_translated_string(mib_server_url, var_binds)
post_data_to_splunk_hec(
hec_sender,
host,
Expand Down Expand Up @@ -541,7 +541,7 @@ def process_one_time_flag(
mongo_connection.delete_onetime_walk_result(host)


async def walk_handler_with_enricher(
def walk_handler_with_enricher(
profile,
enricher,
mongo_connection,
Expand Down Expand Up @@ -589,9 +589,7 @@ async def walk_handler_with_enricher(
):
break
else:
result, is_metric = await get_translated_string(
mib_server_url, var_binds, True
)
result, is_metric = get_translated_string(mib_server_url, var_binds, True)
new_result = _sort_walk_data(
is_metric,
merged_result_metric,
Expand Down
4 changes: 2 additions & 2 deletions splunk_connect_for_snmp_poller/manager/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
logger = get_task_logger(__name__)


async def get_snmp_data(
def get_snmp_data(
var_binds,
handler,
mongo_connection,
Expand All @@ -60,7 +60,7 @@ async def get_snmp_data(
):
if var_binds:
try:
await handler(
handler(
mongo_connection,
enricher,
snmp_engine,
Expand Down

0 comments on commit 747945b

Please sign in to comment.