This repository has been archived by the owner on Sep 17, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 48
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* Context manager propagates exceptions properly * Implemented generic mock driver * Add possibility to raise Exceptions on demand * Mock cli commands individually * Map junos' _rpc to cli * Mocking configuration management methods * Added tests for configuration methods
- Loading branch information
Showing
15 changed files
with
398 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,199 @@ | ||
# Copyright 2017 Dravetech AB. All rights reserved. | ||
# | ||
# The contents of this file are licensed under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with the | ||
# License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
# License for the specific language governing permissions and limitations under | ||
# the License. | ||
|
||
# Python3 support | ||
from __future__ import print_function | ||
from __future__ import unicode_literals | ||
|
||
from napalm_base.base import NetworkDriver | ||
import napalm_base.exceptions | ||
|
||
import inspect | ||
import json | ||
import os | ||
import re | ||
|
||
|
||
from pydoc import locate | ||
|
||
|
||
def raise_exception(result): | ||
exc = locate(result["exception"]) | ||
if exc: | ||
raise exc(*result.get("args", []), **result.get("kwargs", {})) | ||
else: | ||
raise TypeError("Couldn't resolve exception {}", result["exception"]) | ||
|
||
|
||
def is_mocked_method(method): | ||
mocked_methods = [] | ||
if method.startswith("get_") or method in mocked_methods: | ||
return True | ||
return False | ||
|
||
|
||
def mocked_method(path, name, count): | ||
parent_method = getattr(NetworkDriver, name) | ||
parent_method_args = inspect.getargspec(parent_method) | ||
modifier = 0 if 'self' not in parent_method_args.args else 1 | ||
|
||
def _mocked_method(*args, **kwargs): | ||
# Check len(args) | ||
if len(args) + len(kwargs) + modifier > len(parent_method_args.args): | ||
raise TypeError( | ||
"{}: expected at most {} arguments, got {}".format( | ||
name, len(parent_method_args.args), len(args) + modifier)) | ||
|
||
# Check kwargs | ||
unexpected = [x for x in kwargs if x not in parent_method_args.args] | ||
if unexpected: | ||
raise TypeError("{} got an unexpected keyword argument '{}'".format(name, | ||
unexpected[0])) | ||
return mocked_data(path, name, count) | ||
|
||
return _mocked_method | ||
|
||
|
||
def mocked_data(path, name, count): | ||
filename = "{}.{}".format(os.path.join(path, name), count) | ||
try: | ||
with open(filename) as f: | ||
result = json.loads(f.read()) | ||
except IOError: | ||
raise NotImplementedError("You can provide mocked data in {}".format(filename)) | ||
|
||
if "exception" in result: | ||
raise_exception(result) | ||
else: | ||
return result | ||
|
||
|
||
class MockDevice(object): | ||
|
||
def __init__(self, parent, profile): | ||
self.parent = parent | ||
self.profile = profile | ||
|
||
def run_commands(self, commands): | ||
"""Only useful for EOS""" | ||
if "eos" in self.profile: | ||
return self.parent.cli(commands).values()[0] | ||
else: | ||
raise AttributeError("MockedDriver instance has not attribute '_rpc'") | ||
|
||
|
||
class MockDriver(NetworkDriver): | ||
|
||
def __init__(self, hostname, username, password, timeout=60, optional_args=None): | ||
""" | ||
Supported optional_args: | ||
* path(str) - path to where the mocked files are located | ||
* profile(list) - List of profiles to assign | ||
""" | ||
self.hostname = hostname | ||
self.username = username | ||
self.password = password | ||
self.path = optional_args["path"] | ||
self.profile = optional_args.get("profile", []) | ||
|
||
self.opened = False | ||
self.calls = {} | ||
self.device = MockDevice(self, self.profile) | ||
|
||
# None no action, True load_merge, False load_replace | ||
self.merge = None | ||
self.filename = None | ||
self.config = None | ||
|
||
def _count_calls(self, name): | ||
current_count = self.calls.get(name, 0) | ||
self.calls[name] = current_count + 1 | ||
return self.calls[name] | ||
|
||
def _raise_if_closed(self): | ||
if not self.opened: | ||
raise napalm_base.exceptions.ConnectionClosedException("connection closed") | ||
|
||
def open(self): | ||
self.opened = True | ||
|
||
def close(self): | ||
self.opened = False | ||
|
||
def is_alive(self): | ||
return {"is_alive": self.opened} | ||
|
||
def cli(self, commands): | ||
count = self._count_calls("cli") | ||
result = {} | ||
regexp = re.compile('[^a-zA-Z0-9]+') | ||
for i, c in enumerate(commands): | ||
sanitized = re.sub(regexp, '_', c) | ||
name = "cli.{}.{}".format(count, sanitized) | ||
filename = "{}.{}".format(os.path.join(self.path, name), i) | ||
with open(filename, 'r') as f: | ||
result[c] = f.read() | ||
return result | ||
|
||
def load_merge_candidate(self, filename=None, config=None): | ||
count = self._count_calls("load_merge_candidate") | ||
self._raise_if_closed() | ||
self.merge = True | ||
self.filename = filename | ||
self.config = config | ||
mocked_data(self.path, "load_merge_candidate", count) | ||
|
||
def load_replace_candidate(self, filename=None, config=None): | ||
count = self._count_calls("load_replace_candidate") | ||
self._raise_if_closed() | ||
self.merge = False | ||
self.filename = filename | ||
self.config = config | ||
mocked_data(self.path, "load_replace_candidate", count) | ||
|
||
def compare_config(self, filename=None, config=None): | ||
count = self._count_calls("compare_config") | ||
self._raise_if_closed() | ||
return mocked_data(self.path, "compare_config", count)["diff"] | ||
|
||
def commit_config(self): | ||
count = self._count_calls("commit_config") | ||
self._raise_if_closed() | ||
self.merge = None | ||
self.filename = None | ||
self.config = None | ||
mocked_data(self.path, "commit_config", count) | ||
|
||
def discard_config(self): | ||
count = self._count_calls("commit_config") | ||
self._raise_if_closed() | ||
self.merge = None | ||
self.filename = None | ||
self.config = None | ||
mocked_data(self.path, "discard_config", count) | ||
|
||
def _rpc(self, get): | ||
"""This one is only useful for junos.""" | ||
if "junos" in self.profile: | ||
return self.cli([get]).values()[0] | ||
else: | ||
raise AttributeError("MockedDriver instance has not attribute '_rpc'") | ||
|
||
def __getattribute__(self, name): | ||
if is_mocked_method(name): | ||
self._raise_if_closed() | ||
count = self._count_calls(name) | ||
return mocked_method(self.path, name, count) | ||
else: | ||
return object.__getattribute__(self, name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
""" | ||
Test base helpers. | ||
""" | ||
|
||
# Python3 support | ||
from __future__ import print_function | ||
from __future__ import unicode_literals | ||
|
||
|
||
# NAPALM base | ||
from napalm_base import get_network_driver | ||
import napalm_base.exceptions | ||
|
||
import pytest | ||
|
||
import os | ||
|
||
|
||
BASE_PATH = os.path.dirname(__file__) | ||
|
||
|
||
driver = get_network_driver("mock") | ||
optional_args = { | ||
"path": os.path.join(BASE_PATH, "test_mock_driver"), | ||
"profile": ["eos"], | ||
} | ||
|
||
|
||
class TestMockDriver(object): | ||
"""Test Mock Driver.""" | ||
|
||
def test_basic(self): | ||
d = driver("blah", "bleh", "blih", optional_args=optional_args) | ||
assert d.is_alive() == {u'is_alive': False} | ||
d.open() | ||
assert d.is_alive() == {u'is_alive': True} | ||
d.close() | ||
assert d.is_alive() == {u'is_alive': False} | ||
|
||
with pytest.raises(napalm_base.exceptions.ConnectionClosedException) as excinfo: | ||
d.get_facts() | ||
assert "connection closed" in excinfo.value | ||
|
||
def test_context_manager(self): | ||
with driver("blah", "bleh", "blih", optional_args=optional_args) as d: | ||
assert d.is_alive() == {u'is_alive': True} | ||
assert d.is_alive() == {u'is_alive': False} | ||
|
||
def test_mocking_getters(self): | ||
d = driver("blah", "bleh", "blih", optional_args=optional_args) | ||
d.open() | ||
assert d.get_facts()["hostname"] == "localhost" | ||
assert d.get_facts()["hostname"] == "changed_hostname" | ||
d.close() | ||
|
||
def test_not_mocking_getters(self): | ||
d = driver("blah", "bleh", "blih", optional_args=optional_args) | ||
d.open() | ||
|
||
with pytest.raises(NotImplementedError) as excinfo: | ||
d.get_route_to() | ||
expected = "You can provide mocked data in {}/get_route_to.1".format(optional_args["path"]) | ||
assert expected in excinfo.value | ||
|
||
with pytest.raises(NotImplementedError) as excinfo: | ||
d.get_route_to() | ||
expected = "You can provide mocked data in {}/get_route_to.2".format(optional_args["path"]) | ||
assert expected in excinfo.value | ||
|
||
d.close() | ||
|
||
def test_arguments(self): | ||
d = driver("blah", "bleh", "blih", optional_args=optional_args) | ||
d.open() | ||
|
||
with pytest.raises(TypeError) as excinfo: | ||
d.get_route_to(1, 2, 3) | ||
assert "get_route_to: expected at most 3 arguments, got 4" in excinfo.value | ||
|
||
with pytest.raises(TypeError) as excinfo: | ||
d.get_route_to(1, 1, protocol=2) | ||
assert "get_route_to: expected at most 3 arguments, got 3" in excinfo.value | ||
|
||
with pytest.raises(TypeError) as excinfo: | ||
d.get_route_to(proto=2) | ||
assert "get_route_to got an unexpected keyword argument 'proto'" in excinfo.value | ||
|
||
d.close() | ||
|
||
def test_mock_error(self): | ||
d = driver("blah", "bleh", "blih", optional_args=optional_args) | ||
d.open() | ||
|
||
with pytest.raises(KeyError) as excinfo: | ||
d.get_bgp_neighbors() | ||
assert "Something" in excinfo.value | ||
|
||
with pytest.raises(napalm_base.exceptions.ConnectionClosedException) as excinfo: | ||
d.get_bgp_neighbors() | ||
assert "Something" in excinfo.value | ||
|
||
with pytest.raises(TypeError) as excinfo: | ||
d.get_bgp_neighbors() | ||
assert "Couldn't resolve exception NoIdeaException" in excinfo.value | ||
|
||
d.close() | ||
|
||
def test_cli(self): | ||
d = driver("blah", "bleh", "blih", optional_args=optional_args) | ||
d.open() | ||
result = d.cli(["a_command", "b_command"]) | ||
assert result == {'a_command': 'result command a\n', 'b_command': 'result command b\n'} | ||
d.close() | ||
|
||
def test_configuration_merge(self): | ||
d = driver("blah", "bleh", "blih", optional_args=optional_args) | ||
d.open() | ||
d.load_merge_candidate(config="asdasdasd") | ||
assert d.merge is True | ||
d.compare_config() == "a_diff" | ||
d.commit_config() | ||
d.close() | ||
|
||
def test_configuration_replace(self): | ||
d = driver("blah", "bleh", "blih", optional_args=optional_args) | ||
d.open() | ||
d.load_replace_candidate(config="asdasdasd") | ||
assert d.merge is False | ||
d.compare_config() == "a_diff" | ||
d.commit_config() | ||
d.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
result command a |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
result command b |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
{} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
{ | ||
"diff": "a_diff" | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
{ | ||
"exception": "KeyError", | ||
"args": [ | ||
"Something" | ||
], | ||
"kwargs": {} | ||
} |
Oops, something went wrong.