Skip to content

Commit

Permalink
work on test for lock module
Browse files Browse the repository at this point in the history
  • Loading branch information
Fedor Baart committed Feb 8, 2024
1 parent e78a3e5 commit 3ce7f83
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 46 deletions.
18 changes: 12 additions & 6 deletions opentnsim/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ def __init__(
lock_depth, # a float which contains the depth of the lock chamber
distance_doors1_from_first_waiting_area,
distance_doors2_from_second_waiting_area,
detector_nodes,
detector_nodes=None,
doors_open=0, # a float which contains the time it takes to open the doors
doors_close=0, # a float which contains the time it takes to close the doors
disch_coeff=0, # a float which contains the discharge coefficient of filling system
Expand Down Expand Up @@ -327,6 +327,8 @@ def __init__(
self.conditions = conditions
self.time_step = time_step
self.priority_rules = priority_rules
if detector_nodes is None:
detector_nodes = []
self.detector_nodes = detector_nodes
self.distance_doors1_from_first_waiting_area = distance_doors1_from_first_waiting_area
self.distance_doors2_from_second_waiting_area = distance_doors2_from_second_waiting_area
Expand All @@ -337,19 +339,23 @@ def __init__(
index=pd.MultiIndex(levels=[[], []], codes=[[], []], names=["Name", "VesselName"]),
columns=["direction", "ETA", "ETD", "Length", "Beam", "Draught", "VesselType", "Priority"],
)
super().__init__(capacity=100, length=lock_length, remaining_length=lock_length, *args, **kwargs)

# 100 spaces in the lock by default
capacity = 100

super().__init__(nr_resources=capacity, length=lock_length, remaining_length=lock_length, *args, **kwargs)
self.simulation_start = self.env.simulation_start.timestamp()
self.next_lockage = {
node_doors1: simpy.PriorityResource(self.env, capacity=100),
node_doors2: simpy.PriorityResource(self.env, capacity=100),
node_doors1: simpy.PriorityResource(self.env, capacity=capacity),
node_doors2: simpy.PriorityResource(self.env, capacity=capacity),
}
self.next_lockage_length = {
node_doors1: simpy.Container(self.env, capacity=lock_length, init=lock_length),
node_doors2: simpy.Container(self.env, capacity=lock_length, init=lock_length),
}
self.in_next_lockage = {
node_doors1: simpy.PriorityResource(self.env, capacity=100),
node_doors2: simpy.PriorityResource(self.env, capacity=100),
node_doors1: simpy.PriorityResource(self.env, capacity=capacity),
node_doors2: simpy.PriorityResource(self.env, capacity=capacity),
}
self.doors_1 = {
node_doors1: simpy.PriorityResource(self.env, capacity=1),
Expand Down
136 changes: 96 additions & 40 deletions tests/test_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,84 +2,140 @@
import pytest
import shapely
import simpy
import numpy as np
import xarray as xr

import opentnsim.core as core
import opentnsim.lock
import networkx as nx


@pytest.fixture
def graph():
FG = nx.DiGraph()
Node = type('Site', (core.Identifiable, core.Log, core.Locatable, core.HasResource), {})
Node = type("Site", (core.Identifiable, core.Log, core.Locatable, core.HasResource), {})
coordinates = [(0, 0), (1200, 0)]
for index, coord in enumerate(coordinates):
node = Node(**{'env': [], 'name': index, 'geometry': shapely.geometry.Point(x, y)})
FG.add_node(node.name, geometry=node.geometry, Info={'geometry': geometry})
x, y = coord
geometry = shapely.geometry.Point(x, y)
node = Node(**{"env": [], "name": index, "geometry": geometry})
FG.add_node(node.name, geometry=node.geometry, name=index, Info={"geometry": geometry})

edges = [(FG.nodes[0], FG.nodes[1]), (FG.nodes[1], FG.nodes[0])]
for (node_start, node_stop) in edges:
geometry = geometry.LineString([node_start.geometry, node_stop.geometry])
FG.add_edge(node_start.name, node_stop.name, weight=1, geometry=geometry,
Info={'geometry': geometry, 'length': geometry.length})
for node_start, node_stop in edges:
geometry = shapely.LineString([node_start["geometry"], node_stop["geometry"]])

FG.add_edge(
node_start["name"],
node_stop["name"],
weight=1,
geometry=geometry,
Info={"geometry": geometry, "length": geometry.length},
)

# run for a day
return FG


@pytest.fixture
def env(FG):
t_start = datetime.datetime(2024,1,1,0,0,0)
def env(graph):
t_start = datetime.datetime(2024, 1, 1, 0, 0, 0)
env = simpy.Environment(initial_time=t_start.timestamp())

env.epoch = t_start
env.FG = FG
env.simulation_start = t_start
# TODO: switch to env.graph
env.FG = graph
# run for a day
return env


@pytest.fixture
def add_hydrodynamics(env):
def hydrodynamics_env(env):
t_end = datetime.datetime(2024, 2, 1, 0, 0, 0)
stations = list(env.FG.nodes)
times = np.arange(env.epoch,t_end,datetime.timedelta(seconds=600))
water_depth = np.linspace(10, 10, len(times))
times = np.arange(env.epoch, t_end, datetime.timedelta(seconds=600))
water_depth = [np.linspace(10, 10, len(times)), np.linspace(10, 10, len(times))]
water_level = [np.linspace(0, 0, len(times)), np.linspace(1, 1, len(times))]
salinity = [np.linspace(0, 0, len(times)), np.linspace(0, 0, len(times))]
station_data = xr.DataArray(data=stations,dims=["STATIONS"])
time_data = xr.DataArray(data=times,dims=["TIME"])
depth_data = xr.DataArray(data=water_depth,dims=["STATIONS"])
water_level_data = xr.DataArray(data=[wlev for wlev in water_level],dims=["STATIONS", "TIME"])
salinity_data = xr.DataArray(data=[sal for sal in salinity],dims=["STATIONS", "TIME"])
hydrodynamic_data = xr.Dataset({'TIME': time_data, 'Stations': station_data, 'MBL': depth_data,
'Water level': water_level_data, 'Salinity': salinity_data})
station_data = xr.DataArray(data=stations, dims=["STATIONS"])
time_data = xr.DataArray(data=times, dims=["TIME"])
depth_data = xr.DataArray(data=water_depth, dims=["STATIONS", "TIME"])
water_level_data = xr.DataArray(data=[wlev for wlev in water_level], dims=["STATIONS", "TIME"])
salinity_data = xr.DataArray(data=[sal for sal in salinity], dims=["STATIONS", "TIME"])
hydrodynamic_data = xr.Dataset(
{"TIME": time_data, "Stations": station_data, "MBL": depth_data, "Water level": water_level_data, "Salinity": salinity_data}
)
env.vessel_traffic_service = opentnsim.vessel_traffic_service.VesselTrafficService(hydrodynamic_data)
return

return env

def add_vessel(env,name,origin,destination,type,L,B,T,v,arrival_time):
Vessel = type('Vessel',
(core.SimpyObject, core.Identifiable, lock.HasWaitingArea, lock.HasLock, lock.HasLineUpArea,
core.Movable, vessel.VesselProperties, output.HasOutput, vessel.ExtraMetadata), {})

vessel = Vessel(**{'env':env, 'name':name, 'origin':origin, 'destination':destination,
'geometry':env.FG.nodes[origin],'route':nx.dijkstra_path(env.FG,origin,destination),
'type':type, 'L':L, 'B':B, 'T':T, 'v':v, 'arrival_time':arrival_time})
def add_vessel(env, name, origin, destination, type, L, B, T, v, arrival_time):
Vessel = type(
"Vessel",
(
core.SimpyObject,
core.Identifiable,
lock.HasWaitingArea,
lock.HasLock,
lock.HasLineUpArea,
core.Movable,
vessel.VesselProperties,
output.HasOutput,
vessel.ExtraMetadata,
),
{},
)

vessel = Vessel(
**{
"env": env,
"name": name,
"origin": origin,
"destination": destination,
"geometry": env.FG.nodes[origin],
"route": nx.dijkstra_path(env.FG, origin, destination),
"type": type,
"L": L,
"B": B,
"T": T,
"v": v,
"arrival_time": arrival_time,
}
)

env.process(vessel.move())
return


def lock_test(env):
lock = opentnsim.lock.IsLock(env=env, name='Lock', lock_length=400, lock_width=40, lock_depth=10,
node_doors1=0, node_doors2=1, distance_doors1_from_first_waiting_area=400,
distance_doors2_from_second_waiting_area=400, speed_reduction_factor = 1)
lock.IsLockLineUpArea(env=env, name='Lock', distance_to_lock_doors=0, start_node=0, end_node=1,
lineup_length=400, speed_reduction_factor = 1)
lock.IsLockLineUpArea(env=env, name="Lock", distance_to_lock_doors=0, start_node=1, end_node=0,
lineup_length=400, speed_reduction_factor = 1)
lock.IsLockWaitingArea(env=sim.environment, name='Lock', distance_from_node=0, node=0)
def test_lock(hydrodynamics_env):
env = hydrodynamics_env
lock = opentnsim.lock.IsLock(
env=env,
name="Lock",
lock_length=400,
lock_width=40,
lock_depth=10,
node_doors1=0,
node_doors2=1,
distance_doors1_from_first_waiting_area=400,
distance_doors2_from_second_waiting_area=400,
speed_reduction_factor=1,
detector_nodes=[],
)
lock.IsLockLineUpArea(
env=env, name="Lock", distance_to_lock_doors=0, start_node=0, end_node=1, lineup_length=400, speed_reduction_factor=1
)
lock.IsLockLineUpArea(
env=env, name="Lock", distance_to_lock_doors=0, start_node=1, end_node=0, lineup_length=400, speed_reduction_factor=1
)
lock.IsLockWaitingArea(env=sim.environment, name="Lock", distance_from_node=0, node=0)
lock.IsLockWaitingArea(env=sim.environment, name="Lock", distance_from_node=0, node=1)

add_vessel(env, 0, 0, 1 ,None, 200, 30, 8, v, datetime.datetime(2024, 1, 1, 0, 0, 0))
add_vessel(env, 0, 0, 1, None, 200, 30, 8, v, datetime.datetime(2024, 1, 1, 0, 0, 0))

env.run()

vessels = env.vessels

assert len(vessels[0].log) > 2
assert len(vessels[0].log) > 2

0 comments on commit 3ce7f83

Please sign in to comment.