Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solve several issues found in stress tests #248

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 33 additions & 24 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

NApp to provision circuits from user request.
"""
from threading import Lock

from flask import jsonify, request
from werkzeug.exceptions import (BadRequest, Conflict, Forbidden,
MethodNotAllowed, NotFound,
Expand Down Expand Up @@ -48,11 +50,13 @@ def setup(self):
# dictionary of EVCs by interface
self._circuits_by_interface = {}

self._lock = Lock()

self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)

def execute(self):
"""Execute once when the napp is running."""
for circuit in self.circuits.values():
for circuit in tuple(self.circuits.values()):
if circuit.is_enabled() and not circuit.is_active():
circuit.deploy()

Expand Down Expand Up @@ -461,7 +465,7 @@ def _is_duplicated_evc(self, evc):
boolean: True if the circuit is duplicated, otherwise False.

"""
for circuit in self.circuits.values():
for circuit in tuple(self.circuits.values()):
if not circuit.archived and circuit.shares_uni(evc):
return True
return False
Expand All @@ -488,6 +492,8 @@ def handle_link_down(self, event):
def load_circuits_by_interface(self, circuits):
"""Load circuits in storehouse for in-memory dictionary."""
for circuit_id, circuit in circuits.items():
if circuit['archived'] is True:
continue
intf_a = circuit['uni_a']['interface_id']
self.add_to_dict_of_sets(intf_a, circuit_id)
intf_z = circuit['uni_z']['interface_id']
Expand All @@ -508,28 +514,31 @@ def add_to_dict_of_sets(self, intf, circuit_id):
@listen_to('kytos/topology.port.created')
def load_evcs(self, event):
"""Try to load the unloaded EVCs from storehouse."""
log.debug("Event load_evcs %s", event)
circuits = self.storehouse.get_data()
if not self._circuits_by_interface:
self.load_circuits_by_interface(circuits)

interface_id = '{}:{}'.format(event.content['switch'],
event.content['port'])

for circuit_id in self._circuits_by_interface.get(interface_id, []):
if circuit_id in circuits and circuit_id not in self.circuits:
try:
evc = self._evc_from_dict(circuits[circuit_id])
except ValueError as exception:
log.info(
f'Could not load EVC {circuit_id} because {exception}')
continue

evc.deactivate()
evc.current_path = Path([])
evc.sync()
self.circuits.setdefault(circuit_id, evc)
self.sched.add(evc)
with self._lock:
log.debug("Event load_evcs %s", event)
circuits = self.storehouse.get_data()
if not self._circuits_by_interface:
self.load_circuits_by_interface(circuits)

interface_id = '{}:{}'.format(event.content['switch'],
event.content['port'])

for circuit_id in self._circuits_by_interface.get(interface_id,
[]):
if circuit_id in circuits and circuit_id not in self.circuits:
try:
evc = self._evc_from_dict(circuits[circuit_id])
except ValueError as exception:
log.error(
f'Could not load EVC {circuit_id} '
f'because {exception}')
continue

evc.deactivate()
evc.current_path = Path([])
evc.sync()
self.circuits.setdefault(circuit_id, evc)
self.sched.add(evc)

@listen_to('kytos/flow_manager.flow.error')
def handle_flow_mod_error(self, event):
Expand Down
3 changes: 2 additions & 1 deletion storehouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def __init__(self, controller):
"""Create a storehouse instance."""
self.controller = controller
self.namespace = 'kytos.mef_eline.circuits'
self._lock = threading.Lock()
if '_lock' not in self.__dict__:
self._lock = threading.Lock()
if 'box' not in self.__dict__:
self.box = None
self.list_stored_boxes()
Expand Down
6 changes: 2 additions & 4 deletions tests/unit/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,11 +696,9 @@ def test_load_circuits_by_interface(self):

expected_result = {
'00:00:00:00:00:00:00:03:12':
{'182f5bac84074017a262a2321195dbb4',
'65c4582cc8f249c2a5947ef500c19e37'},
{'65c4582cc8f249c2a5947ef500c19e37'},
'00:00:00:00:00:00:00:06:11':
{'182f5bac84074017a262a2321195dbb4',
'65c4582cc8f249c2a5947ef500c19e37'},
{'65c4582cc8f249c2a5947ef500c19e37'},
'00:00:00:00:00:00:00:03:3':
{'65c4582cc8f249c2a5947ef500c19e37'},
'00:00:00:00:00:00:00:05:2':
Expand Down