-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #22 from AeroFlorian/deletion
Refactoring for NetConfParser 1.0
- Loading branch information
Showing
8 changed files
with
1,345 additions
and
1,005 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
import re | ||
from Logger import logger | ||
|
||
|
||
class LineRemoverRule: | ||
def __init__(self): | ||
pass | ||
|
||
def apply(self, full_lines: str): | ||
logger.error("LineRemoverRule.apply() not implemented") | ||
pass | ||
|
||
|
||
class LineRemoverRegexRule(LineRemoverRule): | ||
def __init__(self, condition: str, pattern: str, replacement: str): | ||
super(LineRemoverRegexRule, self).__init__() | ||
self.condition = condition | ||
self.pattern = pattern | ||
self.replacement = replacement | ||
|
||
def apply(self, full_lines: str): | ||
if self.condition in full_lines: | ||
return re.sub(self.pattern, self.replacement, full_lines) | ||
return full_lines | ||
|
||
|
||
class LineRemoverLogicRule(LineRemoverRule): | ||
def __init__(self, condition: str, logic): | ||
super(LineRemoverLogicRule, self).__init__() | ||
self.condition = condition | ||
self.logic = logic | ||
|
||
def apply(self, full_lines: str): | ||
if self.condition in full_lines: | ||
return self.logic(full_lines) | ||
return full_lines | ||
|
||
|
||
class LineRemover: | ||
def __init__(self): | ||
self.rules = [ | ||
LineRemoverLogicRule("830 for SSH connections", | ||
lambda x: "\n".join([line for line in x.split("\n") if ">" in line or "<" in line])), | ||
LineRemoverRegexRule("Session 0: Sending message", | ||
r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z Dbg: .*? Session \d+: (?:Sending|Received) message:", | ||
""), | ||
LineRemoverRegexRule("Sending", | ||
r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{7} INF (?:Sending|Received) message: ", "")] | ||
|
||
def remove_unwanted_parts(self, full_lines: str): | ||
for rule in self.rules: | ||
full_lines = rule.apply(full_lines) | ||
return full_lines |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
import NetconfMessageDefs | ||
import OranMessageDefs | ||
import LineRemover | ||
import re | ||
from Logger import logger | ||
|
||
|
||
class NetconfSession: | ||
def __init__(self): | ||
self.messages: list = [] | ||
|
||
def append_message(self, message: NetconfMessageDefs.Message): | ||
self.messages.append(message) | ||
|
||
def apply_messages_without_counterpart(self): | ||
id_to_message_id = {} | ||
for index, message in enumerate(self.messages): | ||
if message.message_type == NetconfMessageDefs.MessageType.RPC: | ||
if message.message_id in id_to_message_id: | ||
logger.warning('Multiple RPCs with the same message Id') | ||
id_to_message_id[message.message_id] = index | ||
elif message.message_type == NetconfMessageDefs.MessageType.RPC_REPLY: | ||
if message.message_id in id_to_message_id: | ||
rpc_message_id = id_to_message_id.pop(message.message_id) | ||
self.messages[rpc_message_id].received_reply() | ||
else: | ||
logger.warning('RPC reply without counterpart') | ||
|
||
|
||
class ResultTree: | ||
def __init__(self): | ||
self.netconf_sessions: list = [] | ||
|
||
def add_netconf_session(self): | ||
self.netconf_sessions.append(NetconfSession()) | ||
|
||
def add_message(self, message: NetconfMessageDefs.Message): | ||
self.netconf_sessions[-1].append_message(message) | ||
|
||
def display(self): | ||
logger.info('************** ResultTree: **************') | ||
for index, netconf_session in enumerate(self.netconf_sessions): | ||
logger.info(f'************** NetconfSession {index + 1}: **************') | ||
for message in netconf_session.messages: | ||
logger.info(message) | ||
|
||
def apply_messages_without_counterpart(self): | ||
for netconf_session in self.netconf_sessions: | ||
netconf_session.apply_messages_without_counterpart() | ||
|
||
def get_messages(self): | ||
messages = [] | ||
for session in self.netconf_sessions: | ||
messages += session.messages | ||
return messages | ||
|
||
|
||
class OranAnalysisTree: | ||
def __init__(self): | ||
self.analysis_messages: list = [] | ||
|
||
def add_netconf_session(self): | ||
self.analysis_messages.append(OranMessageDefs.NetconfClientConnectedMessage()) | ||
|
||
def add_message(self, message: NetconfMessageDefs.Message): | ||
if message.message_type == NetconfMessageDefs.MessageType.RPC: | ||
oran_message = OranMessageDefs.OranRpcMessage(message) | ||
elif message.message_type == NetconfMessageDefs.MessageType.RPC_REPLY: | ||
oran_message = OranMessageDefs.OranRpcReplyMessage(message) | ||
elif message.message_type == NetconfMessageDefs.MessageType.NOTIFICATION: | ||
oran_message = OranMessageDefs.OranNotificationMessage(message) | ||
else: | ||
return | ||
if oran_message.should_be_present_in_analysis(): | ||
self.analysis_messages.append(oran_message) | ||
|
||
def display(self): | ||
logger.info('************** OranAnalysisTree: **************') | ||
for message in self.analysis_messages: | ||
logger.info(message) | ||
|
||
def get_messages(self): | ||
return self.analysis_messages | ||
|
||
class Trees: | ||
def __init__(self): | ||
self.result_tree: ResultTree = ResultTree() | ||
self.analysis_tree: OranAnalysisTree = OranAnalysisTree() | ||
self.previous_message_type = None | ||
|
||
def handle_message(self, message: NetconfMessageDefs.Message): | ||
if self.previous_message_type != NetconfMessageDefs.MessageType.HELLO \ | ||
and message.message_type == NetconfMessageDefs.MessageType.HELLO: | ||
self.result_tree.add_netconf_session() | ||
self.analysis_tree.add_netconf_session() | ||
self.previous_message_type = message.message_type | ||
self.result_tree.add_message(message) | ||
self.analysis_tree.add_message(message) | ||
|
||
def display(self): | ||
#self.result_tree.display() | ||
self.analysis_tree.display() | ||
|
||
def apply_after_computation_tags(self): | ||
self.result_tree.apply_messages_without_counterpart() | ||
|
||
|
||
class RegexToNetconfMessage: | ||
def __init__(self, match): | ||
self.match = match | ||
|
||
def to_netconf_message(self): | ||
if self.match[0] != '': | ||
return NetconfMessageDefs.RpcMessage(int(self.match[0]), self.match[1]) | ||
elif self.match[2] != '': | ||
return NetconfMessageDefs.RpcReplyMessage(int(self.match[2]), self.match[3]) | ||
elif self.match[4] != '': | ||
return NetconfMessageDefs.NotificationMessage(self.match[4]) | ||
elif self.match[5] != '': | ||
return NetconfMessageDefs.HelloMessage(self.match[5]) | ||
else: | ||
logger.error('RegexToNetconfMessage.to_netconf_message() not implemented') | ||
return None | ||
|
||
|
||
class NetConfParser: | ||
def __init__(self, data: str): | ||
self.data = data | ||
self.trees = Trees() | ||
|
||
def parse(self): | ||
filtered_lines = LineRemover.LineRemover().remove_unwanted_parts(self.data) | ||
reg = r'<rpc .*? message-id=.(\d+).>(.*?)</rpc>|' \ | ||
r'<rpc-reply .*? message-id=.(\d+).>(.*?)</rpc-reply>|' \ | ||
r'<notification .*?/eventTime>(.*?)</notification>|' \ | ||
r'<hello xmlns=\".*?\">(.*?)</hello>' | ||
|
||
all_matches = re.findall(reg, filtered_lines, re.MULTILINE | re.DOTALL) | ||
for match in all_matches: | ||
try: | ||
message = RegexToNetconfMessage(match).to_netconf_message() | ||
self.trees.handle_message(message) | ||
except: | ||
pass | ||
self.trees.apply_after_computation_tags() | ||
return | ||
|
||
def display(self): | ||
self.trees.display() | ||
|
||
def get_netconf_messages(self): | ||
return self.trees.result_tree.get_messages() | ||
|
||
def get_oran_messages(self): | ||
return self.trees.analysis_tree.get_messages() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,163 @@ | ||
from enum import Enum | ||
from Logger import logger | ||
import re | ||
import xmltodict | ||
|
||
|
||
class MessageType(Enum): | ||
HELLO = 0 | ||
RPC = 1 | ||
RPC_REPLY = 2 | ||
NOTIFICATION = 3 | ||
|
||
|
||
class Tag(Enum): | ||
RPC = 0 | ||
RPC_WITHOUT_COUNTERPART = 1 | ||
RPC_REPLY = 2 | ||
NOTIFICATION = 3 | ||
HELLO = 4 | ||
SCHEMA = 5 | ||
NOTIFICATION_NETCONF_CONFIG_CHANGE = 6 | ||
RPC_ERROR = 7 | ||
|
||
|
||
class Direction(Enum): | ||
TO_SERVER = 0 | ||
TO_CLIENT = 1 | ||
UNKNOWN = 2 | ||
|
||
|
||
class Message: | ||
def __init__(self, message_id: int, message_type: MessageType, tag: Tag, direction: Direction): | ||
self.message_type: MessageType = message_type | ||
self.message_id: int = message_id | ||
self.raw_data: str = "" | ||
self.summary: str = "" | ||
self.tag: Tag = tag | ||
self.direction: Direction = direction | ||
self.data: dict = {} | ||
|
||
def __str__(self): | ||
return f'{self.message_type.name} id {self.message_id} tag {self.tag} summary {self.summary} ' | ||
|
||
def get_values(self): | ||
return ( | ||
str(self.message_id) if self.message_id >= 0 else "N/A", | ||
"<-" if self.direction == Direction.TO_CLIENT else "->", | ||
self.message_type.name.lower(), | ||
f"\t\t{self.summary}", | ||
"", | ||
self.raw_data | ||
) | ||
|
||
def fill_fields(self, data: str): | ||
logger.error('Message.fill_fields() not implemented') | ||
return | ||
|
||
@staticmethod | ||
def remove_unwanted_parts(data: str): | ||
data = re.sub(r'} {', '', data, flags=re.DOTALL) | ||
data = re.sub(r'\n', '', data, flags=re.DOTALL) | ||
return data | ||
|
||
def received_reply(self): | ||
logger.error('Message.receive_reply() not implemented') | ||
|
||
|
||
class RpcMessage(Message): | ||
def __init__(self, message_id: int, data: str): | ||
super(RpcMessage, self).__init__(message_id, MessageType.RPC, Tag.RPC_WITHOUT_COUNTERPART, Direction.TO_SERVER) | ||
self.fill_fields(data) | ||
|
||
def fill_fields(self, data: str): | ||
self.raw_data = self.remove_unwanted_parts(f'<rpc xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">{data}</rpc>') | ||
data = re.sub(r' [^ ]+=\"[^\"]+\"', '', data) | ||
data = re.sub('\n', '', data) | ||
self.data = xmltodict.parse(self.remove_unwanted_parts(data)) | ||
if "get-schema" in self.raw_data: | ||
self.tag = Tag.SCHEMA | ||
self.summary = f"get-schema {self.data['get-schema']['identifier']}" | ||
elif "get" in self.data: | ||
try: | ||
self.summary = f"get {'|'.join(self.data['get']['filter'].keys())}" | ||
except: | ||
self.summary = "get" | ||
elif "edit-config" in self.data: | ||
try: | ||
self.summary = f"edit-config {''.join(self.data['edit-config']['config'].keys())}" | ||
except: | ||
self.summary = "edit-config" | ||
else: | ||
self.summary = f"rpc {''.join(self.data.keys())}" | ||
|
||
def received_reply(self): | ||
if self.tag == Tag.RPC_WITHOUT_COUNTERPART: | ||
self.tag = Tag.RPC | ||
|
||
|
||
class RpcReplyMessage(Message): | ||
def __init__(self, message_id: int, data: str): | ||
super(RpcReplyMessage, self).__init__(message_id, MessageType.RPC_REPLY, Tag.RPC_REPLY, Direction.TO_CLIENT) | ||
self.fill_fields(data) | ||
|
||
def fill_fields(self, data: str): | ||
self.raw_data = self.remove_unwanted_parts(f'<rpc-reply xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">{data}</rpc-reply>') | ||
if len(data) < 10000 and "rpc-error" in data: | ||
data = f"<rpc-error>{data}</rpc-error>" | ||
self.tag = Tag.RPC_ERROR | ||
else: | ||
data = f"<rpc-reply>{data}</rpc-reply>" | ||
data = re.sub(r' [^ ]+=\"[^\"]+\"', '', data) | ||
data = re.sub(r'^.*?<', '<', data, flags=re.DOTALL) | ||
data = re.sub(r'>[^>]*?$', '>', data, flags=re.DOTALL) | ||
self.data = xmltodict.parse(self.remove_unwanted_parts(data)) | ||
try: | ||
if "data" in self.data['rpc-reply'] and len(self.data['rpc-reply']['data']) < 100: | ||
self.summary = f"rpc-reply data {' | '.join(self.data['rpc-reply']['data'].keys())}" | ||
else: | ||
self.summary = f"rpc-reply {' '.join(self.data['rpc-reply'].keys())}" | ||
except: | ||
pass | ||
|
||
|
||
class NotificationMessage(Message): | ||
def __init__(self, data: str): | ||
super(NotificationMessage, self).__init__(-1, MessageType.NOTIFICATION, Tag.NOTIFICATION, Direction.TO_CLIENT) | ||
self.fill_fields(data) | ||
|
||
def fill_fields(self, data: str): | ||
self.raw_data = self.remove_unwanted_parts( | ||
f'<notification xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">{data}</notification>') | ||
if "netconf-config-change" in data: | ||
self.tag = Tag.NOTIFICATION_NETCONF_CONFIG_CHANGE | ||
data = re.sub(r' xmlns[^ ]*=\"[^\"]+\"', '', data) | ||
self.data = xmltodict.parse(self.remove_unwanted_parts(data)) | ||
self.summary = f"notification {''.join(self.data.keys())}" | ||
|
||
|
||
|
||
class HelloMessage(Message): | ||
def __init__(self, data: str): | ||
super(HelloMessage, self).__init__(0, MessageType.HELLO, Tag.HELLO, Direction.UNKNOWN) | ||
self.fill_fields(data) | ||
|
||
def fill_fields(self, data: str): | ||
self.raw_data = self.remove_unwanted_parts( | ||
f'<hello xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">{data}</hello>') | ||
data = re.sub(r' xmlns[^ ]*=\"[^\"]+\"', '', data) | ||
data = re.sub(r'<session-id.*/session-id>', '', data) | ||
self.data = xmltodict.parse(self.remove_unwanted_parts(data)) | ||
capas = set() | ||
capabilities = list(self.data["capabilities"].values()) | ||
if type(capabilities[0]) is list: | ||
capabilities = capabilities[0] | ||
for capa in capabilities: | ||
if "urn:ietf:params:netconf:base:" in capa: | ||
capas.add(capa.split(":")[-1]) | ||
self.summary = f"Netconf Version Supported {', '.join(capas)}" | ||
|
||
|
||
class EmptyNetconfMessage(Message): | ||
def __init__(self): | ||
super(EmptyNetconfMessage, self).__init__(0, MessageType.HELLO, Tag.HELLO, Direction.UNKNOWN) |
Oops, something went wrong.