diff --git a/stonesoup/platform/__init__.py b/stonesoup/platform/__init__.py index 2ce175d0a..0f2870958 100644 --- a/stonesoup/platform/__init__.py +++ b/stonesoup/platform/__init__.py @@ -1,3 +1,5 @@ -from .base import Platform, MovingPlatform, FixedPlatform, MultiTransitionMovingPlatform +from .base import Platform, MovingPlatform, FixedPlatform, MultiTransitionMovingPlatform, Obstacle -__all__ = ['Platform', 'MovingPlatform', 'FixedPlatform', 'MultiTransitionMovingPlatform'] + +__all__ = ['Platform', 'MovingPlatform', 'FixedPlatform', 'MultiTransitionMovingPlatform', + 'Obstacle'] diff --git a/stonesoup/platform/base.py b/stonesoup/platform/base.py index 38bfc03f7..37a549c5d 100644 --- a/stonesoup/platform/base.py +++ b/stonesoup/platform/base.py @@ -1,9 +1,13 @@ import uuid -from typing import MutableSequence +from typing import MutableSequence, Sequence, Union, Any +import numpy as np +from functools import lru_cache from stonesoup.base import Property, Base +from stonesoup.functions import build_rotation_matrix from stonesoup.movable import Movable, FixedMovable, MovingMovable, MultiTransitionMovable from stonesoup.sensor.sensor import Sensor +from stonesoup.types.array import StateVectors from stonesoup.types.groundtruth import GroundTruthPath @@ -189,3 +193,163 @@ class MovingPlatform(Platform): class MultiTransitionMovingPlatform(Platform): _default_movable_class = MultiTransitionMovable + + +class Obstacle(Platform): + """A platform class representing obstacles in the environment that may + block the line of sight to targets, preventing detection and measurement.""" + + shape_data: StateVectors = Property( + default=None, + doc="Coordinates defining the vertices of the obstacle relative" + "to its centroid without any orientation. Defaults to `None`") + + simplices: Union[Sequence[int], np.ndarray] = Property( + default=None, + doc="A :class:`Sequence` or :class:`np.ndarray`, describing the connectivity " + "of vertices specified in :attr:`shape_data`. Should be constructed such " + "that element `i` is the index of a vertex that `i` is connected to. " + "For example, simplices for a four sided obstacle may be `(1, 2, 3, 0)` " + "for consecutively defined shape data. Default assumes that :attr:`shape_data` " + "is provided such that consecutive vertices are connected, such as the " + "example above.") + + shape_mapping: Sequence[int] = Property( + default=(0, 1), + doc="A mapping for shape data dimensions to x y cartesian position. Default value is " + "(0,1)." + ) + + _default_movable_class = FixedMovable + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + # If simplices not defined, calculate based on sequential vertices assumption + if self.simplices is None: + self.simplices = np.roll(np.linspace(0, + self.shape_data.shape[1]-1, + self.shape_data.shape[1]), + -1).astype(int) + # Initialise vertices + self._vertices = self._calculate_verts() + + # Initialise relative_edges + self._relative_edges = self._calculate_relative_edges() + + @property + def vertices(self): + """Vertices are calculated by applying :attr:`position` and + :attr:`orientation` to :attr:`shape_data`. If :attr:`position` + or :attr:`orientation` changes, then vertices will reflect + these changes. If shape data specifies vertices that connect + to more than two other vertices, then the vertex with more + connections will be duplicated. This enables correct handling + of complex non-convex shapes.""" + self._update_verts_and_relative_edges() + return self._vertices + + @property + def relative_edges(self): + """Calculates the difference between connected vertices + Cartesian coordinates. This is used by :meth:`is_visible` of + :class:`~.VisibilityInformed2DSensor` when calculating the + visibility of a state due to obstacles obstructing the line of + sight to the target.""" + self._update_verts_and_relative_edges() + return self._relative_edges + + @lru_cache(maxsize=None) + def _orientation_cache(self): + # Cache for orientation allows for vertices and relative edges to be + # be calculated when necessary. Maxsize set to unlimited as it + # is cleared before assigning a new value + return self.orientation + + @lru_cache(maxsize=None) + def _position_cache(self): + # Cache for position allows for vertices and relative edges to be + # calculated only when necessary. Maxsize set to unlimited as it + # is cleared before assigning a new value + return self.position + + def _update_verts_and_relative_edges(self): + # Checks to see if cached position and orientation matches the + # current property. If they match nothing is calculated. If they + # don't vertices and relative edges are recalculated. + if np.any(self._orientation_cache() != self.orientation) or \ + np.any(self._position_cache() != self.position): + + self._orientation_cache.cache_clear() + self._position_cache.cache_clear() + self._vertices = self._calculate_verts() + self._relative_edges = self._calculate_relative_edges() + + def _calculate_verts(self) -> np.ndarray: + # Calculates the vertices based on the defined `shape_data`, + # `position` and `orientation`. + rot_mat = build_rotation_matrix(self.orientation) + rotated_shape = \ + rot_mat[np.ix_(self.shape_mapping, self.shape_mapping)] @ \ + self.shape_data[self.shape_mapping, :] + verts = rotated_shape + self.position + return verts[:, self.simplices] + + def _calculate_relative_edges(self): + # Calculates the relative edge length in Cartesian space. Required + # for visibility estimator + return np.array( + [self.vertices[self.shape_mapping[0], :] - + self.vertices[self.shape_mapping[0], + np.roll(np.linspace(0, + len(self.simplices)-1, + len(self.simplices)), 1).astype(int)], + self.vertices[self.shape_mapping[1], :] - + self.vertices[self.shape_mapping[1], + np.roll(np.linspace(0, + len(self.simplices)-1, + len(self.simplices)), 1).astype(int)], + ]) + + @classmethod + def from_obstacle( + cls, + obstacle: 'Obstacle', + *args: Any, + **kwargs: Any) -> 'Obstacle': + + """Return a new obstacle instance by providing new properties to an existing obstacle. + It is possible to overwrite any property of the original obstacle by + defining the required keyword arguments. Any arguments that are undefined + remain from the `obstacle` attribute. The utility of this method is to + easily create new obstacles from a single base obstacle, where each will + share the shape data of the original, but this is not the limit of its + functionality. + + Parameters + ---------- + obstacle: Obstacle + :class:`~.Obstacle` to use existing properties from. + \\*args: Sequence + Arguments to pass to newly created obstacle which will replace those in `obstacle` + \\*\\*kwargs: Mapping + New property names and associate value for use in newly created obstacle, replacing + those on the ``obstacle`` parameter. + """ + + args_property_names = { + name for n, name in enumerate(obstacle._properties) if n < len(args)} + + ignore = ['movement_controller', 'id'] + + new_kwargs = { + name: getattr(obstacle, name) + for name in obstacle._properties.keys() + if name not in args_property_names and name not in kwargs and name not in ignore} + + new_kwargs.update(kwargs) + + if 'position_mapping' not in kwargs.keys(): + new_kwargs.update({'position_mapping': getattr(obstacle, 'position_mapping')}) + + return cls(*args, **new_kwargs) diff --git a/stonesoup/platform/tests/test_platform_base.py b/stonesoup/platform/tests/test_platform_base.py index 9723f1f99..6869ff057 100644 --- a/stonesoup/platform/tests/test_platform_base.py +++ b/stonesoup/platform/tests/test_platform_base.py @@ -1,3 +1,4 @@ +import copy import datetime import numpy as np @@ -7,6 +8,7 @@ from stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \ ConstantVelocity, KnownTurnRate from stonesoup.movable import FixedMovable, MovingMovable +from stonesoup.platform.base import Obstacle from stonesoup.sensor.sensor import Sensor from stonesoup.types.angle import Bearing from stonesoup.types.array import StateVector @@ -822,3 +824,217 @@ def test_setting_movement_controller_sensors(): platform.movement_controller = moving assert platform.movement_controller is sensor.movement_controller + + +@pytest.mark.parametrize( + 'shape_data, position1, orientation1, position2, orientation2, simplices', + [(np.array([[-2, 2, 2, -2], [2, 2, -2, -2]]), # shape_data + StateVector([1, 2]), # position1 + StateVector([[0], [0], [0]]), # orientation1 + StateVector([3, 4]), # position2 + StateVector([[0], [0], [np.radians(45)]]), # orientation2 + (1, 2, 3, 0), # simplices + ), + (np.array([[-2, 2, 2, -2], [2, 2, -2, -2]]), # shape_data + StateVector([1, 2]), # position1 + StateVector([[0], [0], [np.radians(22)]]), # orientation1 + StateVector([3, 4]), # position2 + StateVector([[0], [0], [0]]), # orientation2 + None, # simplices + ), # from_obs_mapping + (np.array([[-2, 0, 2, 2, -2], [2, 3, 2, -2, -2]]), # shape_data + StateVector([1, 2]), # position1 + StateVector([[0], [0], [0]]), # orientation1 + StateVector([3, 4]), # position2 + StateVector([[0], [0], [np.radians(45)]]), # orientation2 + None, # simplices + ), # from_obs_mapping + ], + ids=['test_defined_simpleces', 'test_undefined_simplices', 'test_5_sides']) +def test_obstacle(shape_data, position1, orientation1, position2, orientation2, + simplices): + + if np.any(orientation1 != StateVector([[0], [0], [0]])) and simplices: + test_obstacle = Obstacle(states=State(position1), + shape_data=shape_data, + orientation=orientation1, + simplices=simplices, + position_mapping=(0, 1)) + elif simplices: + test_obstacle = Obstacle(states=State(position1), + shape_data=shape_data, + simplices=simplices, + position_mapping=(0, 1)) + elif np.any(orientation1 != StateVector([[0], [0], [0]])): + test_obstacle = Obstacle(states=State(position1), + shape_data=shape_data, + orientation=orientation1, + position_mapping=(0, 1)) + else: + test_obstacle = Obstacle(states=State(position1), + shape_data=shape_data, + position_mapping=(0, 1)) + + # Check that shape data is correct + assert np.all(test_obstacle.shape_data == shape_data) + # Check that position is correct + assert np.all(test_obstacle.position == position1) + # check that orientation is correct + assert np.all(test_obstacle.orientation == orientation1) + + # check vertices + rot_m = np.array([[1, 0, 0], + [0, np.cos(-orientation1[0, 0]), -np.sin(-orientation1[0, 0])], + [0, np.sin(-orientation1[0, 0]), np.cos(-orientation1[0, 0])]]) @ \ + np.array([[np.cos(orientation1[1, 0]), 0, np.sin(orientation1[1, 0])], + [0, 1, 0], + [-np.sin(orientation1[1, 0]), 0, np.cos(orientation1[1, 0])]]) @ \ + np.array([[np.cos(-orientation1[2, 0]), -np.sin(-orientation1[2, 0]), 0], + [np.sin(-orientation1[2, 0]), np.cos(-orientation1[2, 0]), 0], + [0, 0, 1]]) + + if not simplices: + simplices = np.roll(np.linspace(0, shape_data.shape[1]-1, shape_data.shape[1]), + -1).astype(int) + + true_vertices = rot_m[:2, :2] @ \ + test_obstacle.shape_data[:, simplices] + position1 + + edge_index = np.roll(np.linspace(0, len(simplices)-1, len(simplices)), 1).astype(int) + true_relative_edges = np.array([true_vertices[0, :] - true_vertices[0, edge_index], + true_vertices[1, :] - true_vertices[1, edge_index]]) + + assert np.all(test_obstacle.vertices == true_vertices) + assert np.all(test_obstacle.relative_edges == true_relative_edges) + + # Ensure changing position and/or orientation changes vertices + test_obstacle.movement_controller.states = State(position2), + if orientation2 is not None: + rot_m = np.array([[1, 0, 0], + [0, np.cos(-orientation2[0, 0]), -np.sin(-orientation2[0, 0])], + [0, np.sin(-orientation2[0, 0]), np.cos(-orientation2[0, 0])]]) @ \ + np.array([[np.cos(orientation2[1, 0]), 0, np.sin(orientation2[1, 0])], + [0, 1, 0], + [-np.sin(orientation2[1, 0]), 0, np.cos(orientation2[1, 0])]]) @ \ + np.array([[np.cos(-orientation2[2, 0]), -np.sin(-orientation2[2, 0]), 0], + [np.sin(-orientation2[2, 0]), np.cos(-orientation2[2, 0]), 0], + [0, 0, 1]]) + test_obstacle.orientation = orientation2 + + assert np.all(test_obstacle.position == position2) + if orientation2 is not None: + assert np.all(test_obstacle.orientation == orientation2) + else: + assert np.all(test_obstacle.orientation == orientation1) + + true_vertices2 = rot_m[:2, :2] @ shape_data[:, simplices] + position2 + assert np.all(test_obstacle.vertices == true_vertices2) + + true_relative_edges2 = np.array([true_vertices2[0, :] - + true_vertices2[0, edge_index], + true_vertices2[1, :] - + true_vertices2[1, edge_index]]) + + assert np.all(test_obstacle.relative_edges == true_relative_edges2) + + +@pytest.mark.parametrize( + 'position2, orientation2, mapping2', + [(StateVector([3, 4]), # position2 + None, # orientation2 + None), # mapping2 + (StateVector([3, 4]), # position2 + StateVector([[0], [0], [np.radians(22.2)]]), # orientation2 + None), # mapping2 + (StateVector([3, 4, 5]), # position2 + None, # orientation2 + (1, 2)), # mapping2 + (StateVector([3, 4, 5]), # position2 + StateVector([[0], [0], [np.radians(45)]]), # orientation2 + (1, 2)) # mapping2 + ], + ids=['from_obs_position_change', 'from_obs_orientation_change', 'from_obs_mapping_change', + 'from_obs_mapping_and_orient_change'] +) +def test_from_obstacle(position2, orientation2, mapping2): + + shape_data = np.array([[-2, 2, 2, -2], [2, 2, -2, -2]]) + position = StateVector([1, 2]) + mapping = (0, 1) + + initial_obstacle = Obstacle(shape_data=shape_data, + states=State(position), + position_mapping=mapping) + + initial_verts = copy.deepcopy(initial_obstacle.vertices) + initial_relative_edges = copy.deepcopy(initial_obstacle.relative_edges) + + # Generate obstacle from initial_obstacle + if orientation2 is not None and mapping2 is not None: + sub_obstacle = Obstacle.from_obstacle(initial_obstacle, + states=State(position2), + orientation=orientation2, + position_mapping=mapping2) + rot_orient = orientation2 + elif orientation2 is not None: + sub_obstacle = Obstacle.from_obstacle(initial_obstacle, + states=State(position2), + orientation=orientation2) + rot_orient = orientation2 + elif mapping2 is not None: + sub_obstacle = Obstacle.from_obstacle(initial_obstacle, + states=State(position2), + position_mapping=mapping2) + rot_orient = initial_obstacle.orientation + else: + sub_obstacle = Obstacle.from_obstacle(initial_obstacle, + states=State(position2)) + rot_orient = initial_obstacle.orientation + + rot_m = np.array([[1, 0, 0], + [0, np.cos(-rot_orient[0, 0]), -np.sin(-rot_orient[0, 0])], + [0, np.sin(-rot_orient[0, 0]), np.cos(-rot_orient[0, 0])]]) @ \ + np.array([[np.cos(rot_orient[1, 0]), 0, np.sin(rot_orient[1, 0])], + [0, 1, 0], + [-np.sin(rot_orient[1, 0]), 0, np.cos(rot_orient[1, 0])]]) @ \ + np.array([[np.cos(-rot_orient[2, 0]), -np.sin(-rot_orient[2, 0]), 0], + [np.sin(-rot_orient[2, 0]), np.cos(-rot_orient[2, 0]), 0], + [0, 0, 1]]) + + true_vertices = rot_m[:2, :2] @ \ + shape_data[:, initial_obstacle.simplices] + position2[mapping2 if mapping2 is not None + else mapping, :] + edge_index = np.roll(np.linspace(0, + len(initial_obstacle.simplices)-1, + len(initial_obstacle.simplices)), 1). astype(int) + true_relative_edges = np.array([true_vertices[0, :] - + true_vertices[0, edge_index], + true_vertices[1, :] - + true_vertices[1, edge_index]]) + + # Check that shape data has not changed + assert np.all(sub_obstacle.shape_data == initial_obstacle.shape_data) + # check that changed properties are correct + if mapping2 is not None: + assert np.all(sub_obstacle.position == position2[mapping2, :]) + assert np.all(sub_obstacle.position_mapping == mapping2) + else: + assert np.all(sub_obstacle.position == position2) + assert np.all(sub_obstacle.position_mapping == initial_obstacle.position_mapping) + + if orientation2 is not None: + assert np.all(sub_obstacle.orientation == orientation2) + else: + assert np.all(sub_obstacle.orientation == initial_obstacle.orientation) + + # check that vertices are correctly calculated + assert np.all(sub_obstacle.vertices == true_vertices) + # check that relative edges are calculated correctly + assert np.all(sub_obstacle.relative_edges == true_relative_edges) + + # check that initial_obstacle properties have not changed + assert np.all(initial_obstacle.orientation == StateVector([[0], [0], [0]])) + assert np.all(initial_obstacle.position == position) + assert np.all(initial_obstacle.position_mapping == mapping) + assert np.all(initial_obstacle.vertices == initial_verts) + assert np.all(initial_obstacle.relative_edges == initial_relative_edges) diff --git a/stonesoup/plotter.py b/stonesoup/plotter.py index 26ae3b07e..a1a98d549 100644 --- a/stonesoup/plotter.py +++ b/stonesoup/plotter.py @@ -10,7 +10,7 @@ from matplotlib import pyplot as plt from matplotlib.legend_handler import HandlerPatch from matplotlib.lines import Line2D -from matplotlib.patches import Ellipse +from matplotlib.patches import Ellipse, Patch, Polygon from mergedeep import merge from scipy.integrate import quad from scipy.optimize import brentq @@ -70,6 +70,10 @@ def plot_tracks(self, tracks, mapping, uncertainty=False, particle=False, track_ def plot_sensors(self, sensors, mapping, sensor_label="Sensors", **kwargs): raise NotImplementedError + @abstractmethod + def plot_obstacles(self, obstacles, mapping, label="Obstacles", **kwargs): + raise NotImplementedError + def _conv_measurements(self, measurements, mapping, measurement_model=None, convert_measurements=True) -> \ Tuple[Dict[detection.Detection, StateVector], Dict[detection.Clutter, StateVector]]: @@ -535,6 +539,47 @@ def plot_sensors(self, sensors, mapping=None, sensor_label="Sensors", **kwargs): labels=self.legend_dict.keys())) return artists + def plot_obstacles(self, obstacles, mapping=[0, 1], label='Obstacles', **kwargs): + """Plots obstacle(s) + + Plots obstacles. Users can change the colour and marker size of obstacle + vertices with keyword arguments. Marker colour determines the fill colour + of obstacle patches. Defaults are grey '.' marker and matching fill colour. + + Parameters + ---------- + obstacles : Collection of :class:`~.Obstacle` + Obstacles to plot + mapping : list + List of items specifying the mapping of the position components of the + obstacle's position. Default is [0, 1]. + label: str + Label to apply to obstacles for the legend. + \\*\\*kwargs: dict + Additional arguments to be passed to scatter function for detections. Defaults are + ``marker=dict(symbol='circle', size=3, color='grey')``. + """ + artists = [] + if not isinstance(obstacles, Collection): + obstacles = {obstacles} + + if self.dimension == 1 or self.dimension == 3: + raise NotImplementedError + + obstacle_kwargs = dict(linestyle='-', marker='.', color='grey') + obstacle_kwargs.update(kwargs) + for obstacle in obstacles: + artists.append(self.ax.scatter(*obstacle.vertices, **obstacle_kwargs)) + artists.append(self.ax.add_patch(Polygon(obstacle.vertices.T, + facecolor=obstacle_kwargs['color']))) + + obstacle_handle = Patch(facecolor=obstacle_kwargs['color'], label=label) + self.legend_dict[label] = obstacle_handle + + artists.append(self.ax.legend(handles=self.legend_dict.values(), + labels=self.legend_dict.keys())) + return artists + def set_equal_3daxis(self, axes=None): """Plots minimum/maximum points with no linestyle to increase the plotting region to simulate `.ax.axis('equal')` from matplotlib 2d plots which is not possible using 3d @@ -1508,6 +1553,53 @@ def plot_sensors(self, sensors, mapping=[0, 1], sensor_label="Sensors", **kwargs sensor_xy = np.array([sensor.position[mapping, 0] for sensor in sensors]) self.fig.add_scatter(x=sensor_xy[:, 0], y=sensor_xy[:, 1], **sensor_kwargs) + def plot_obstacles(self, obstacles, mapping=[0, 1], label='Obstacles', **kwargs): + """Plots obstacle(s) + + Plots obstacles. Users can change the colour and marker size of obstacle + vertices with keyword arguments. Marker colour determines the fill colour + of obstacle patches. Defaults are grey '.' marker and matching fill colour. + + Parameters + ---------- + obstacles : Collection of :class:`~.Obstacle` + Obstacles to plot + mapping : list + List of items specifying the mapping of the position components of the + obstacle's position. Default is [0, 1]. + label: str + Label to apply to obstacles for the legend. + \\*\\*kwargs: dict + Additional arguments to be passed to scatter function for detections. Defaults are + ``marker=dict(symbol='circle', size=3, color='grey')``. + """ + + if not isinstance(obstacles, Collection): + obstacles = {obstacles} + + self._check_mapping(mapping) + + if self.dimension == 1 or self.dimension == 3: + raise NotImplementedError + + obstacle_kwargs = dict(mode='markers', marker=dict(symbol='circle', size=3, + color='grey'), + legendgroup=label, legendrank=50, fill='toself', + name=label) + + merge(obstacle_kwargs, kwargs) + + if obstacle_kwargs['legendgroup'] not in {trace.legendgroup + for trace in self.fig.data}: + obstacle_kwargs['showlegend'] = True + else: + obstacle_kwargs['showlegend'] = False + + for obstacle in obstacles: + obstacle_xy = obstacle.vertices[mapping, :] + self.fig.add_scatter(x=obstacle_xy[0, :], y=obstacle_xy[1, :], **obstacle_kwargs) + obstacle_kwargs['showlegend'] = False + def hide_plot_traces(self, items_to_hide=None): """Hide Plot Traces @@ -1776,6 +1868,9 @@ def plot_tracks(self, tracks, mapping, uncertainty=False, particle=False, track_ def plot_sensors(self, sensors, sensor_label="Sensors", **kwargs): raise NotImplementedError + def plot_obstacles(self, obstacles, mapping=[0, 1], label='Obstacles', **kwargs): + raise NotImplementedError + class _AnimationPlotterDataClass(Base): plotting_data = Property(Iterable[State]) @@ -2030,6 +2125,9 @@ def plot_measurements(self, measurements, mapping, measurement_model=None, def plot_sensors(self, sensors, sensor_label="Sensors", **kwargs): raise NotImplementedError + def plot_obstacles(self, obstacles, mapping=[0, 1], label='Obstacles', **kwargs): + raise NotImplementedError + @classmethod def run_animation(cls, times_to_plot: List[datetime], @@ -2398,6 +2496,12 @@ def _resize(self, data, type="track"): for y_values in dictionary["y"]: all_y.extend([np.nanmax(y_values), np.nanmin(y_values)]) + elif type == "obstacle": + for obstacle in data: + obstacle_xy = obstacle.vertices[(0, 1), :] + all_x.extend(obstacle_xy[0, :]) + all_y.extend(obstacle_xy[1, :]) + xmax = max(all_x) ymax = max(all_y) xmin = min(all_x) @@ -3022,3 +3126,61 @@ def plot_sensors(self, sensors, sensor_label="Sensors", resize=True, **kwargs): # we have called a plotting function so update flag (used in _resize) self.plotting_function_called = True + + def plot_obstacles(self, obstacles, mapping=[0, 1], label="Obstacles", resize=True, + **kwargs): + """Plots obstacle(s) + + Plots obstacles. Users can change the colour and marker size of obstacle + vertices with keyword arguments. Marker colour determines the fill colour + of obstacle patches. Defaults are grey '.' marker and matching fill colour. + Currently only works for stationary obstacles. + + Parameters + ---------- + obstacles : Collection of :class:`~.Obstacle` + Obstacles to plot + mapping : list + List of items specifying the mapping of the position components of the + obstacle's position. Default is [0, 1]. + label: str + Label to apply to obstacles for the legend. + resize : boolean + Boolean to reshape figure such that all elements are in view. + \\*\\*kwargs: dict + Additional arguments to be passed to scatter function for detections. Defaults are + ``marker=dict(symbol='circle', size=3, color='grey')``. + """ + if not isinstance(obstacles, Collection): + obstacles = {obstacles} + + if obstacles: + trace_base = len(self.fig.data) + + obstacle_kwargs = dict(mode='markers', marker=dict(symbol='circle', size=3, + color='grey'), + legendgroup=label, legendrank=50, fill='toself', + name=label) + + merge(obstacle_kwargs, kwargs) + + self.fig.add_trace(go.Scatter(obstacle_kwargs)) + + if resize: + self._resize(obstacles, "obstacle") + + obstacle_xy = np.concatenate( + [np.concatenate([obstacle.vertices[mapping, :], + np.array([[None], [None]])], axis=1) + for obstacle in obstacles], axis=1) + for frame in self.fig.frames: + traces_ = list(frame.traces) + data_ = list(frame.data) + + data_.append(go.Scatter(x=obstacle_xy[0, :], y=obstacle_xy[1, :])) + traces_.append(trace_base) + + frame.traces = traces_ + frame.data = data_ + + self.plotting_function_called = True diff --git a/stonesoup/predictor/particle.py b/stonesoup/predictor/particle.py index 2c31208b1..661b67589 100644 --- a/stonesoup/predictor/particle.py +++ b/stonesoup/predictor/particle.py @@ -6,6 +6,8 @@ from scipy.special import logsumexp from ordered_set import OrderedSet +from ..sensor.sensor import Sensor + from .base import Predictor from ._utils import predict_lru_cache from .kalman import KalmanPredictor, ExtendedKalmanPredictor @@ -323,11 +325,10 @@ def predict(self, prior, timestamp=None, **kwargs): parent=prior, ) - # Predict particles using the transition model new_state_vector = self.transition_model.function( new_particle_state, - noise=True, time_interval=time_interval, + noise=True, **kwargs) # Estimate existence @@ -335,9 +336,15 @@ def predict(self, prior, timestamp=None, **kwargs): new_particle_state.existence_probability) predicted_log_weights = self.predict_log_weights( + untransitioned_state, # prior state + Prediction.from_state( + new_particle_state, + state_vector=new_state_vector, + ), # predicted_state new_particle_state.existence_probability, predicted_existence, nsurv_particles, - new_particle_state.log_weight) + new_particle_state.log_weight, + time_interval=time_interval) # Create the prediction output new_particle_state = Prediction.from_state( @@ -357,8 +364,8 @@ def estimate_existence(self, existence_prior): + self.survival_probability * existence_prior return existence_estimate - def predict_log_weights(self, prior_existence, predicted_existence, surv_part_size, - prior_log_weights): + def predict_log_weights(self, prior_state, predicted_state, prior_existence, + predicted_existence, surv_part_size, prior_log_weights, **kwargs): # Weight prediction function currently assumes that the chosen importance density is the # transition density. This will need to change if implementing a different importance @@ -383,11 +390,85 @@ def get_detections(prior): if hasattr(prior, 'hypothesis'): if prior.hypothesis is not None: for hypothesis in prior.hypothesis: - detections |= {hypothesis.measurement} + if hypothesis.measurement: + detections |= {hypothesis.measurement} return detections +class VisibilityInformedBernoulliParticlePredictor(BernoulliParticlePredictor): + """A Bernoulli particle predictor implementing visibility estimation of particles + for reduced existence probability decay rate in cluttered environments.""" + + """Visibility informed Bernoulli Particle Predictor class + + An implementation of a particle filter predictor utilising the Bernoulli + filter formulation that estimates the spatial distribution of a single + target and estimates its existence. This implementation modifies the weight + prediction step to account for particles that are predicted to be inside + obstacles and not visible to the sensor. This is based on the work by + Glover et al. [#vibpf] + + This should be used in conjunction with the + :class:`~.VisibilityInformedBernoulliParticleUpdater` but also works with + :class:`.BernoulliParticleUpdater`. + + References + ---------- + .. [#vibpf] Glover, Timothy J & Liu, Cunjia & Chen, Wen-Hua, Visibility + informed Bernoulli filter for target tracking in cluttered environments, + 2022, 25th International Conference on Information Fusion (FUSION), 1-8. + """ + + sensor: Sensor = Property( + default=None, doc="Sensor providing measurements for update stages. " + "Used here to evaluate visibility of particles.") + obstacle_transition_likelihood: float = Property( + default=1e-20, doc="Transition likelihood of particles that are propagated into obstacles " + "or not visible") + obstacle_birth_likelihood: float = Property( + default=1e-20, doc="Birth transition likelihood of particles that are not visible " + "or within obstacles") + + def predict_log_weights(self, prior_state, predicted_state, prior_existence, + predicted_existence, surv_part_size, prior_log_weights, **kwargs): + + # Weight prediction function currently assumes that the chosen importance density is the + # transition density. This will need to change if implementing a different importance + # density or incorporating visibility information + + transition_likelihood = self.transition_model.logpdf(predicted_state, + prior_state, + **kwargs) + + visible_parts = self.sensor.is_detectable(predicted_state) + _, parts_in_obs = self.sensor.is_visible(predicted_state, + obstacle_check=True) + + transition_likelihood_mod = copy.copy(transition_likelihood) + transition_likelihood_mod[:surv_part_size][parts_in_obs[:surv_part_size]] = \ + np.log(self.obstacle_transition_likelihood) + + transition_likelihood_mod[surv_part_size:][~visible_parts[surv_part_size:]] = \ + np.log(self.obstacle_birth_likelihood) + + surv_weights = \ + np.log((self.survival_probability*prior_existence)/predicted_existence) + \ + (transition_likelihood_mod[:surv_part_size] - + transition_likelihood[:surv_part_size]) + prior_log_weights[:surv_part_size] + + birth_weights = \ + np.log((self.birth_probability*(1-prior_existence))/predicted_existence) + \ + (transition_likelihood_mod[surv_part_size:] - + transition_likelihood[surv_part_size:]) + prior_log_weights[surv_part_size:] + predicted_log_weights = np.concatenate((surv_weights, birth_weights)) + + # Normalise weights + predicted_log_weights -= logsumexp(predicted_log_weights) + + return predicted_log_weights + + class SMCPHDBirthSchemeEnum(Enum): """SMC-PHD Birth scheme enumeration""" EXPANSION = 'expansion' #: Expansion birth scheme diff --git a/stonesoup/predictor/tests/test_particle.py b/stonesoup/predictor/tests/test_particle.py index 664436b76..188809585 100644 --- a/stonesoup/predictor/tests/test_particle.py +++ b/stonesoup/predictor/tests/test_particle.py @@ -4,18 +4,19 @@ import copy import numpy as np +from scipy.special import logsumexp import pytest from ...models.transition.linear import ConstantVelocity from ...predictor.particle import ( ParticlePredictor, ParticleFlowKalmanPredictor, BernoulliParticlePredictor, SMCPHDPredictor, - SMCPHDBirthSchemeEnum) + SMCPHDBirthSchemeEnum, VisibilityInformedBernoulliParticlePredictor) from ...types.array import StateVector from ...types.numeric import Probability from ...types.particle import Particle from ...types.prediction import ParticleStatePrediction, BernoulliParticleStatePrediction from ...types.update import BernoulliParticleStateUpdate -from ...types.state import ParticleState, BernoulliParticleState +from ...types.state import ParticleState, BernoulliParticleState, State from ...models.measurement.linear import LinearGaussian from ...types.detection import Detection from ...sampler.particle import ParticleSampler @@ -23,6 +24,8 @@ from ...functions import gm_sample from ...types.hypothesis import SingleHypothesis from ...types.multihypothesis import MultipleHypothesis +from ...platform.base import Obstacle +from ...sensor.radar import RadarBearingRange @pytest.mark.parametrize( @@ -81,9 +84,28 @@ def test_particle(predictor_class): assert np.all([prediction.weight[i] == 1 / 9 for i in range(9)]) -def test_bernoulli_particle_no_detection(): +@pytest.mark.parametrize( + "predictor, extra_params", + [ + (BernoulliParticlePredictor, # predictor + {},), # extra_params + (VisibilityInformedBernoulliParticlePredictor, # predictor + {'sensor': RadarBearingRange( + position=StateVector([[0], [0]]), + position_mapping=(0, 1), + noise_covar=np.array([[np.radians(1)**2, 0], + [0, 1**2]]), + ndim_state=4, + obstacles=[Obstacle(states=State(StateVector([[20], [15]])), + shape_data=np.array([[-2.5, -2.5, 2.5, 2.5], + [-2.5, 2.5, 2.5, -2.5]]), + position_mapping=(0, 1))])}) # extra_params + ], + ids=["standard_bernoulli", "vis_informed_bernoulli"] + ) +def test_bernoulli_particle_no_detection(predictor, extra_params): # Initialise transition model - cv = ConstantVelocity(noise_diff_coeff=0) + cv = ConstantVelocity(noise_diff_coeff=0.1) # Define time related variables timestamp = datetime.datetime.now() @@ -130,28 +152,47 @@ def test_bernoulli_particle_no_detection(): axis=1) eval_prior.weight = np.array([1/18]*18) - eval_prediction = [Particle(cv.matrix( - timestamp=new_timestamp, - time_interval=time_interval) @ particle.state_vector, - weight=1/18) for particle in eval_prior] + eval_prediction = cv.function(eval_prior, noise=True, time_interval=time_interval) - eval_prediction = BernoulliParticleStatePrediction(None, + eval_prediction = BernoulliParticleStatePrediction(eval_prediction, + weight=eval_prior.weight, timestamp=new_timestamp, - particle_list=eval_prediction) + existence_probability=existence_prob) eval_existence = birth_prob * (1 - existence_prob) + survival_prob * existence_prob eval_weight = eval_prior.weight - eval_weight[:9] = survival_prob*existence_prob/eval_existence * eval_weight[:9] - eval_weight[9:] = birth_prob*(1-existence_prob)/eval_existence * eval_weight[9:] - eval_weight = eval_weight/np.sum(eval_weight) + if len(extra_params) > 0: + sensor = extra_params['sensor'] + # since the noise of the model is 0, logpdf is assumed to be 1e10 + original_log_likelihood = np.full((18,), 1e10) + visible_parts = sensor.is_detectable(eval_prediction) + _, parts_in_obs = sensor.is_visible(eval_prediction, obstacle_check=True) + log_likelihood = copy.copy(original_log_likelihood) + log_likelihood[:9][parts_in_obs[:9]] = np.log(1e-20) + log_likelihood[9:][~visible_parts[9:]] = np.log(1e-20) + eval_weight[:9] = np.log((survival_prob*existence_prob)/eval_existence) + \ + (log_likelihood[:9] - + original_log_likelihood[:9]) + np.log(eval_weight[:9]) + eval_weight[9:] = np.log((birth_prob*(1-existence_prob))/eval_existence) + \ + (log_likelihood[9:] - + original_log_likelihood[9:]) + np.log(eval_weight[9:]) + eval_weight -= logsumexp(eval_weight) + eval_weight = np.exp(eval_weight) + else: + eval_weight[:9] = survival_prob*existence_prob/eval_existence * eval_weight[:9] + eval_weight[9:] = birth_prob*(1-existence_prob)/eval_existence * eval_weight[9:] + eval_weight = eval_weight/np.sum(eval_weight) + # reinitialise the random seed to ensure model noise is similar np.random.seed(random_seed) - predictor = BernoulliParticlePredictor(transition_model=cv, - birth_sampler=sampler, - birth_probability=birth_prob, - survival_probability=survival_prob) + predictor_inputs = {'transition_model': cv, + 'birth_sampler': sampler, + 'birth_probability': birth_prob, + 'survival_probability': survival_prob} + predictor_inputs.update(extra_params) + predictor = predictor(**predictor_inputs) prediction = predictor.predict(prior, timestamp=new_timestamp) @@ -171,9 +212,28 @@ def test_bernoulli_particle_no_detection(): assert np.around(float(np.sum(prediction.weight)), decimals=1) == 1 -def test_bernoulli_particle_detection(): +@pytest.mark.parametrize( + "predictor, extra_params", + [ + (BernoulliParticlePredictor, # predictor + {},), # extra_params + (VisibilityInformedBernoulliParticlePredictor, # predictor + {'sensor': RadarBearingRange( + position=StateVector([[0], [0]]), + position_mapping=(0, 1), + noise_covar=np.array([[np.radians(1)**2, 0], + [0, 1**2]]), + ndim_state=4, + obstacles=[Obstacle(states=State(StateVector([[20], [15]])), + shape_data=np.array([[-2.5, -2.5, 2.5, 2.5], + [-2.5, 2.5, 2.5, -2.5]]), + position_mapping=(0, 1))])}) # extra_params + ], + ids=["standard_bernoulli", "vis_informed_bernoulli"] + ) +def test_bernoulli_particle_detection(predictor, extra_params): # Initialise transition model - cv = ConstantVelocity(noise_diff_coeff=0) + cv = ConstantVelocity(noise_diff_coeff=0.1) lg = LinearGaussian(ndim_state=2, mapping=(0,), noise_covar=np.array([[1]])) @@ -237,29 +297,47 @@ def test_bernoulli_particle_detection(): axis=1) eval_prior.weight = np.array([1/18]*18) - eval_prediction = [Particle(cv.matrix( - timestamp=new_timestamp, - time_interval=time_interval) @ particle.state_vector, - weight=1/18) for particle in eval_prior] + eval_prediction = cv.function(eval_prior, noise=True, time_interval=time_interval) - eval_prediction = BernoulliParticleStatePrediction(None, + eval_prediction = BernoulliParticleStatePrediction(eval_prediction, + weight=eval_prior.weight, timestamp=new_timestamp, - particle_list=eval_prediction) + existence_probability=existence_prob) eval_existence = birth_prob * (1 - existence_prob) + survival_prob * existence_prob eval_weight = eval_prior.weight - eval_weight[:9] = survival_prob*existence_prob/eval_existence * eval_weight[:9] - eval_weight[9:] = birth_prob*(1-existence_prob)/eval_existence * eval_weight[9:] - eval_weight = eval_weight/np.sum(eval_weight) - - predictor = BernoulliParticlePredictor(transition_model=cv, - birth_sampler=sampler, - birth_probability=birth_prob, - survival_probability=survival_prob) + if len(extra_params) > 0: + sensor = extra_params['sensor'] + # since the noise of the model is 0, logpdf is assumed to be 1e10 + original_log_likelihood = np.full((18,), 1e10) + visible_parts = sensor.is_detectable(eval_prediction) + _, parts_in_obs = sensor.is_visible(eval_prediction, obstacle_check=True) + log_likelihood = copy.copy(original_log_likelihood) + log_likelihood[:9][parts_in_obs[:9]] = np.log(1e-20) + log_likelihood[9:][~visible_parts[9:]] = np.log(1e-20) + eval_weight[:9] = np.log((survival_prob*existence_prob)/eval_existence) + \ + (log_likelihood[:9] - + original_log_likelihood[:9]) + np.log(eval_weight[:9]) + eval_weight[9:] = np.log((birth_prob*(1-existence_prob))/eval_existence) + \ + (log_likelihood[9:] - + original_log_likelihood[9:]) + np.log(eval_weight[9:]) + eval_weight -= logsumexp(eval_weight) + eval_weight = np.exp(eval_weight) + else: + eval_weight[:9] = survival_prob*existence_prob/eval_existence * eval_weight[:9] + eval_weight[9:] = birth_prob*(1-existence_prob)/eval_existence * eval_weight[9:] + eval_weight = eval_weight/np.sum(eval_weight) np.random.seed(random_seed) + predictor_inputs = {'transition_model': cv, + 'birth_sampler': sampler, + 'birth_probability': birth_prob, + 'survival_probability': survival_prob} + predictor_inputs.update(extra_params) + predictor = predictor(**predictor_inputs) + prediction = predictor.predict(prior, timestamp=new_timestamp) # check that the correct number of particles are output diff --git a/stonesoup/sensor/radar/radar.py b/stonesoup/sensor/radar/radar.py index 3ad171ca6..f601e3bcf 100644 --- a/stonesoup/sensor/radar/radar.py +++ b/stonesoup/sensor/radar/radar.py @@ -16,7 +16,7 @@ Cartesian2DToBearing) from ...sensor.action.dwell_action import DwellActionsGenerator from ...sensormanager.action import ActionableProperty -from ...sensor.sensor import Sensor, SimpleSensor +from ...sensor.sensor import Sensor, VisibilityInformed2DSensor from ...types.array import CovarianceMatrix from ...types.angle import Angle from ...types.detection import TrueDetection, Detection @@ -25,7 +25,7 @@ from ...types.state import StateVector -class RadarBearingRange(SimpleSensor): +class RadarBearingRange(VisibilityInformed2DSensor): """A simple radar sensor that generates measurements of targets, using a :class:`~.CartesianToBearingRange` model, relative to its position. @@ -61,14 +61,19 @@ def measurement_model(self): def is_detectable(self, state: GroundTruthState) -> bool: measurement_vector = self.measurement_model.function(state, noise=False) - true_range = measurement_vector[1, 0] # Bearing(0), Range(1) - return true_range <= self.max_range + true_range = measurement_vector[1, :] # Bearing(0), Range(1) + detectable = true_range <= self.max_range + visible = self.is_visible(state) + return detectable & visible def is_clutter_detectable(self, state: Detection) -> bool: - return state.state_vector[1, 0] <= self.max_range + clutter_cart = self.measurement_model.inverse_function(state) + visible = self.is_visible(clutter_cart) + detectable = state.state_vector[1, 0] <= self.max_range + return detectable and visible -class RadarBearing(SimpleSensor): +class RadarBearing(VisibilityInformed2DSensor): """A simple radar sensor that generates measurements of targets, using a :class:`~.Cartesian2DToBearing` model, relative to its position. @@ -111,8 +116,10 @@ def is_detectable(self, state: GroundTruthState) -> bool: rotation_offset=self.orientation ) measurement_vector = tmp_meas_model.function(state, noise=False) - true_range = measurement_vector[1, 0] # Bearing(0), Range(1) - return true_range <= self.max_range + true_range = measurement_vector[1, :] # Bearing(0), Range(1) + detectable = true_range <= self.max_range + visible = self.is_visible(state) + return detectable & visible def is_clutter_detectable(self, state: Detection) -> bool: return True @@ -186,10 +193,14 @@ def is_detectable(self, state: GroundTruthState) -> bool: # Check if state falls within sensor's FOV fov_min = -self.fov_angle / 2 fov_max = +self.fov_angle / 2 - bearing_t = measurement_vector[0, 0] - true_range = measurement_vector[1, 0] + bearing_t = measurement_vector[0, :] + true_range = measurement_vector[1, :] - return fov_min <= bearing_t <= fov_max and true_range <= self.max_range + detectable = np.logical_and(fov_min <= bearing_t, bearing_t <= fov_max) & \ + (true_range <= self.max_range) + visible = self.is_visible(state) + + return detectable & visible def is_clutter_detectable(self, state: Detection) -> bool: measurement_vector = state.state_vector @@ -200,7 +211,11 @@ def is_clutter_detectable(self, state: Detection) -> bool: bearing_t = measurement_vector[0, 0] true_range = measurement_vector[1, 0] - return fov_min <= bearing_t <= fov_max and true_range <= self.max_range + detectable = fov_min <= bearing_t <= fov_max and true_range <= self.max_range + clutter_cart = self.measurement_model.inverse_function(state) + visible = self.is_visible(clutter_cart) + + return detectable and visible class RadarRotatingBearing(RadarBearing): @@ -285,10 +300,13 @@ def is_detectable(self, state: GroundTruthState) -> bool: # Check if state falls within sensor's FOV fov_min = -self.fov_angle / 2 fov_max = +self.fov_angle / 2 - bearing_t = measurement_vector[0, 0] - true_range = measurement_vector[1, 0] + bearing_t = measurement_vector[0, :] + true_range = measurement_vector[1, :] + detectable = np.logical_and(fov_min <= bearing_t, bearing_t <= fov_max) & \ + (true_range <= self.max_range) + visible = self.is_visible(state) - return fov_min <= bearing_t <= fov_max and true_range <= self.max_range + return detectable & visible class RadarElevationBearingRange(RadarBearingRange): diff --git a/stonesoup/sensor/radar/tests/test_radar.py b/stonesoup/sensor/radar/tests/test_radar.py index 5acc3f1c1..544e1ce5b 100644 --- a/stonesoup/sensor/radar/tests/test_radar.py +++ b/stonesoup/sensor/radar/tests/test_radar.py @@ -18,6 +18,7 @@ from ....types.state import State from ....types.detection import TrueDetection from ....models.clutter.clutter import ClutterModel +from ....platform.base import Obstacle def h2d(state, pos_map, translation_offset, rotation_offset): @@ -792,3 +793,81 @@ def test_clutter_model(radar, clutter_params): measurements = radar.measure({truth}) assert len([target for target in measurements if (isinstance(target, TrueDetection))]) == 1 assert len(measurements) >= 1 + + +@pytest.mark.parametrize("sensor, params", + [ + (RadarBearingRange, + {'ndim_state': 2, + 'position_mapping': [0, 1], + 'noise_covar': np.array([[np.radians(0.5) ** 2, 0], + [0, 1]]), + 'position': np.array([[0], [1]]), + 'max_range': np.inf}), + (RadarBearing, + {'ndim_state': 2, + 'position_mapping': [0, 1], + 'noise_covar': np.array([[np.radians(0.5) ** 2]]), + 'position': np.array([[0], [1]]), + 'max_range': np.inf}), + (RadarRotatingBearingRange, + {'ndim_state': 2, + 'position_mapping': [0, 1], + 'noise_covar': np.array([[np.radians(0.5) ** 2, 0], + [0, 1]]), + 'position': np.array([[0], [1]]), + 'max_range': 100, + 'dwell_centre': StateVector([np.radians(90)]), + 'fov_angle': np.radians(30), + 'rpm': 0}), + (RadarRotatingBearing, + {'ndim_state': 2, + 'position_mapping': [0, 1], + 'noise_covar': np.array([[np.radians(0.5) ** 2]]), + 'position': np.array([[0], [1]]), + 'max_range': 100, + 'dwell_centre': StateVector([np.radians(90)]), + 'fov_angle': np.radians(30), + 'rpm': 0}), + ], + ids=["is_vis_radar_bearing_range", "is_vis_radar_bearing", + "is_vis_rotating_radar_bearing_range", + "is_vis_rotating_radar_bearing"]) +def test_is_visible_sensors(sensor, params): + + obs_state = State(StateVector([[0], [75]])) + shape = np.array([[-5, -5, 5, 5], [-5, 5, 5, -5]]) + obstacles = [Obstacle(states=obs_state, + shape_data=shape, + position_mapping=(0, 1))] + + params['obstacles'] = obstacles + vis_informed_sensor = sensor(**params) + + states = [GroundTruthState(StateVector([[25], [50]])), + GroundTruthState(StateVector([[0], [50]])), + GroundTruthState(StateVector([[-25], [50]])), + GroundTruthState(StateVector([[-25], [99]])), + GroundTruthState(StateVector([[0], [99]])), + GroundTruthState(StateVector([[25], [99]]))] + + # states not visible with limited FOV sensor + not_vis_set_1 = [StateVector([[25], [50]]), + StateVector([[-25], [50]]), + StateVector([[-25], [99]]), + StateVector([[25], [99]])] + + # states not visible + not_vis_set_2 = [StateVector([[0], [99]])] + + for state in states: + measurement = vis_informed_sensor.measure({state}, noise=False) + + if ((isinstance(vis_informed_sensor, RadarRotatingBearing) or + isinstance(vis_informed_sensor, RadarRotatingBearingRange)) + and np.any(np.all(state.state_vector == not_vis_set_1, axis=1)))\ + or np.any(np.all(state.state_vector == not_vis_set_2, axis=1)): + assert len(measurement) == 0 + else: + assert np.all(measurement.pop().state_vector == + vis_informed_sensor.measurement_model.function(state)) diff --git a/stonesoup/sensor/sensor.py b/stonesoup/sensor/sensor.py index baa2a8546..ffc84ccca 100644 --- a/stonesoup/sensor/sensor.py +++ b/stonesoup/sensor/sensor.py @@ -1,5 +1,5 @@ from abc import abstractmethod, ABC -from typing import Set, Union, Sequence +from typing import Set, Union, Sequence, List, TYPE_CHECKING import numpy as np @@ -9,6 +9,10 @@ from ..models.clutter.clutter import ClutterModel from ..types.detection import TrueDetection, Detection from ..types.groundtruth import GroundTruthState +from ..types.state import ParticleState, State, StateVector + +if TYPE_CHECKING: + from ..platform.base import Obstacle class Sensor(PlatformMountable, Actionable): @@ -133,6 +137,9 @@ def is_detectable(self, state: GroundTruthState) -> bool: def is_clutter_detectable(self, state: Detection) -> bool: raise NotImplementedError + def is_visible(self, state: State) -> bool: + return True + class SensorSuite(Sensor): """Sensor composition type @@ -176,3 +183,99 @@ def measure(self, ground_truths: Set[GroundTruthState], noise: Union[bool, np.nd def measurement_model(self): """Measurement model of the sensor, describing general sensor model properties""" raise NotImplementedError + + +class VisibilityInformed2DSensor(SimpleSensor): + """The base class of 2D sensors that evaluate the visibility of + targets in known cluttered environments. + """ + + obstacles: List['Obstacle'] = Property(default=None, + doc="list of :class:`~.Obstacle` type platforms " + "that represent obstacles in the environment") + + def is_visible(self, state, obstacle_check=False): + """Function for evaluating the visibility of states in the + environment based on a 2D line of sight intersection check with + obstacles edges. Note that this method does not check sensor field of + view in evaluating visibility. If no obstacles are provided, the + method will return `True`. + + Parameters + ---------- + state : :class:`~.State` + A state object that describes `n` positions to check line of sight to from + the sensor position. + obstacle_check : bool, optional + A flag for returning a second output that indicates if the state is + inside an obstacle. Defaults to `False`. + + Returns + ------- + : :class:`~numpy.ndarray` + (1, n) array of booleans indicating the visibility of `state`. True represents + that the state is visible. + : :class:`~numpy.ndarray` + (1, n) array of booleans indicating whether `state` is inside an obstacle + and is true when a state is inside an obstacle. Only returned when + `obstacle_check` is `True` and :attr:`obstacles` is not `None`.""" + + # Check if visibility calculations should be run + if not self.obstacles: + return True + + # Check number of states if `state` is `ParticleState` + if isinstance(state, ParticleState): + nstates = len(state) + else: + nstates = 1 + + # Get relative edges from obstacle list + relative_edges = np.concatenate([obstacle.relative_edges + for obstacle in self.obstacles], axis=1) + + # Calculate relative vector between sensor position and state position + if isinstance(state, StateVector): + relative_ray = np.array([state[self.position_mapping[0], :] + - self.position[0, 0], + state[self.position_mapping[1], :] + - self.position[1, 0]]) + else: + relative_ray = np.array([state.state_vector[self.position_mapping[0], :] + - self.position[0, 0], + state.state_vector[self.position_mapping[1], :] + - self.position[1, 0]]) + + # Calculate relative vector between sensor and all obstacle edge positions + relative_sensor_to_edge = self.position[0:2] - \ + np.concatenate([obstacle.vertices for obstacle in self.obstacles], axis=1) + + # Initialise the intersection vector + intersections = np.full((relative_edges.shape[1], nstates), False) + + # Perform intersection check + for n in range(relative_edges.shape[1]): + denom = relative_ray[1, :]*relative_edges[0, n] \ + - relative_ray[0, :]*relative_edges[1, n] + alpha = (relative_edges[1, n]*relative_sensor_to_edge[0, n] + - relative_edges[0, n]*relative_sensor_to_edge[1, n])/denom + beta = (relative_ray[0, :]*relative_sensor_to_edge[1, n] + - relative_ray[1, :]*relative_sensor_to_edge[0, n])/denom + + intersections[n, :] = np.logical_and.reduce((alpha >= 0, + alpha <= 1, + beta >= 0, + beta <= 1)) + + # Count intersections. If the number of intersections is odd then the state + # is inside an obstacle. If the number of intersections is even, the + # state is in free space. + intersection_count = sum(intersections, 0) + intersections = np.invert(np.any(intersections, 0)) + if nstates == 1: + intersections = intersections[0] + + if obstacle_check: + return intersections, intersection_count % 2 != 0 + else: + return intersections diff --git a/stonesoup/tests/test_plotter.py b/stonesoup/tests/test_plotter.py index 2335196bf..c1c7c6f08 100644 --- a/stonesoup/tests/test_plotter.py +++ b/stonesoup/tests/test_plotter.py @@ -22,9 +22,10 @@ from stonesoup.measures import Mahalanobis from stonesoup.dataassociator.neighbour import NearestNeighbour -from stonesoup.types.state import GaussianState, State +from stonesoup.types.state import GaussianState, State, StateVector from stonesoup.types.track import Track +from stonesoup.platform.base import Obstacle start_time = datetime.now() transition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(0.005), @@ -91,6 +92,17 @@ position=np.array([[10], [50], [0]]) ) +shape = np.array([[-2, -2, 2, 2], [-2, 2, 2, -2]]) +obstacle_list = [Obstacle(shape_data=shape, + states=State(StateVector([[0], [0]])), + position_mapping=(0, 1)), + Obstacle(shape_data=shape, + states=State(StateVector([[0], [5]])), + position_mapping=(0, 1)), + Obstacle(shape_data=shape, + states=State(StateVector([[5], [0]])), + position_mapping=(0, 1))] + plotter = Plotter() # Test functions @@ -230,6 +242,7 @@ def test_animated_plotterly(): plotter = AnimatedPlotterly(timesteps) plotter.plot_ground_truths(truth, [0, 2]) plotter.plot_measurements(all_measurements, [0, 2]) + plotter.plot_obstacles(obstacle_list) plotter.plot_tracks(track, [0, 2], uncertainty=True, plot_history=True) @@ -259,6 +272,7 @@ def test_plotterly_empty(): plotter.plot_ground_truths({}, [0, 2]) plotter.plot_measurements({}, [0, 2]) plotter.plot_tracks({}, [0, 2]) + plotter.plot_obstacles({}, [0, 1]) with pytest.raises(TypeError): plotter.plot_tracks({}) with pytest.raises(ValueError): @@ -286,6 +300,8 @@ def test_plotterly_2d(): plotter2d.plot_measurements(all_measurements, [0, 2]) plotter2d.plot_tracks(track, [0, 2], uncertainty=True) plotter2d.plot_sensors(sensor2d) + plotter2d.plot_obstacles(obstacle_list) + plotter2d.plot_obstacles(obstacle_list[0]) def test_plotterly_3d(): @@ -371,3 +387,25 @@ def test_show_plot(labels): else: assert showing == len(labels) assert showing + hidden == 3 + + +@pytest.fixture(scope="module", params=[ + Plotter(), Plotterly(), AnimationPlotter(), AnimatedPlotterly(timesteps=timesteps)]) +def plotters(request): + return request.param + + +@pytest.fixture(scope="module", params=[obstacle_list[0], obstacle_list]) +def obstacles(request): + return request.param + + +def test_obstacles(plotters, obstacles): + if isinstance(plotters, AnimationPlotter): + with pytest.raises(NotImplementedError): + plotters.plot_obstacles(obstacles) + else: + plotters.plot_ground_truths(truth, [0, 1]) + plotters.plot_measurements(all_measurements, [0, 1]) + plotters.plot_tracks(track, [0, 1]) + plotters.plot_obstacles(obstacles) diff --git a/stonesoup/updater/particle.py b/stonesoup/updater/particle.py index ea2055cec..b6026165a 100644 --- a/stonesoup/updater/particle.py +++ b/stonesoup/updater/particle.py @@ -7,6 +7,8 @@ from scipy.linalg import inv from scipy.special import logsumexp +from stonesoup.sensor.sensor import Sensor + from .base import Updater from .kalman import KalmanUpdater, ExtendedKalmanUpdater from ..base import Property @@ -410,14 +412,6 @@ class BernoulliParticleUpdater(ParticleUpdater): 2013, IEEE Transactions on Signal Processing, 61(13), 3406-3430. """ - birth_probability: float = Property( - default=0.01, - doc="Probability of target birth.") - - survival_probability: float = Property( - default=0.98, - doc="Probability of target survival") - clutter_rate: int = Property( default=1, doc="Average number of clutter measurements per time step. Implementation assumes number " @@ -465,8 +459,7 @@ def update(self, hypotheses, **kwargs): # Evaluate measurement likelihood and approximate integrals log_meas_likelihood = [] delta_part2 = [] - log_detection_probability = np.full(len(prediction), - np.log(self.detection_probability)) + log_detection_probability = self.get_detection_probability(prediction) for detection in detections: measurement_model = detection.measurement_model or self.measurement_model @@ -491,7 +484,7 @@ def update(self, hypotheses, **kwargs): np.logaddexp(log_detection_probability + logsumexp(log_meas_likelihood, axis=0) - np.log(self.clutter_rate * self.clutter_distribution), - np.log(1 - self.detection_probability)) \ + np.log(1 - np.exp(log_detection_probability))) \ + updated_state.log_weight # Normalise weights @@ -510,6 +503,13 @@ def update(self, hypotheses, **kwargs): return updated_state + def get_detection_probability(self, prediction): + + log_detection_probability = np.full(len(prediction), + np.log(self.detection_probability)) + + return log_detection_probability + @staticmethod def _log_space_product(A, B): if A.ndim < 2: @@ -521,6 +521,49 @@ def _log_space_product(A, B): return np.squeeze(logsumexp(Astack + Bstack, axis=2)) +class VisibilityInformedBernoulliParticleUpdater(BernoulliParticleUpdater): + """Visibility informed Bernoulli Particle Filter Updater class + + An implementation of a particle filter updater utilising the + Bernoulli filter formulation that estimates the spatial distribution + of a single target and estimates its existence. This implementation + modifies the probability of detection of particles depending on whether + they are calculated to be not visible to the sensor, as described in [#vibpf]_. + + Due to the nature of the Bernoulli particle + filter prediction process, resampling is required at every + time step to reduce the number of particles back down to the + desired size. + + This updater should be used in conjunction with the + :class:`~.VisibilityInformedBernoulliParticlePredictor` but is also compatible + with :class:`~.BernoulliParticlePredictor`. + + References + ---------- + .. [#vibpf] Glover, Timothy J & Liu, Cunjia & Chen, Wen-Hua, Visibility + informed Bernoulli filter for target tracking in cluttered environments, + 2022, 25th International Conference on Information Fusion (FUSION), 1-8. + """ + + sensor: Sensor = Property( + default=None, + doc="Sensor providing measurements for update stages. " + "Used here to evaluate visibility of particles.") + obstacle_detection_probability: float = Property( + default=1e-20, + doc="Probability of detection " + "to assume when particle state is not visible to the sensor.") + + def get_detection_probability(self, prediction): + + log_detection_probability = np.full(len(prediction), np.log(self.detection_probability)) + visible_parts = self.sensor.is_detectable(prediction) + log_detection_probability[~visible_parts] = np.log(self.obstacle_detection_probability) + + return log_detection_probability + + class SMCPHDUpdater(ParticleUpdater): """Sequential Monte Carlo Probability Hypothesis Density (SMC-PHD) Updater class diff --git a/stonesoup/updater/tests/test_particle.py b/stonesoup/updater/tests/test_particle.py index 2cf2a5e14..2d05e5bbd 100644 --- a/stonesoup/updater/tests/test_particle.py +++ b/stonesoup/updater/tests/test_particle.py @@ -15,19 +15,21 @@ from ...types.multihypothesis import MultipleHypothesis from ...types.numeric import Probability from ...types.particle import Particle -from ...types.state import ParticleState +from ...types.state import ParticleState, StateVector, State from ...types.prediction import ( ParticleStatePrediction, ParticleMeasurementPrediction) from ...updater.particle import ( ParticleUpdater, GromovFlowParticleUpdater, GromovFlowKalmanParticleUpdater, BernoulliParticleUpdater, - SMCPHDUpdater) + SMCPHDUpdater, VisibilityInformedBernoulliParticleUpdater) from ...predictor.particle import BernoulliParticlePredictor from ...models.transition.linear import ConstantVelocity, CombinedLinearGaussianTransitionModel from ...types.update import BernoulliParticleStateUpdate from ...sampler.particle import ParticleSampler from ...sampler.detection import SwitchingDetectionSampler, GaussianDetectionParticleSampler from ...regulariser.particle import MCMCRegulariser +from ...platform.base import Obstacle +from ...sensor.radar import RadarBearingRange def dummy_constraint_function(particles): @@ -106,7 +108,26 @@ def test_particle(updater): assert np.allclose(updated_state.mean, StateVectors([[15.0], [20.0]]), rtol=5e-2) -def test_bernoulli_particle(): +@pytest.mark.parametrize( + "updater, extra_params", + [ + (BernoulliParticleUpdater, # predictor + {},), # extra_params + (VisibilityInformedBernoulliParticleUpdater, # predictor + {'sensor': RadarBearingRange( + position=StateVector([[0], [0]]), + position_mapping=(0, 1), + noise_covar=np.array([[np.radians(1)**2, 0], + [0, 1**2]]), + ndim_state=4, + obstacles=[Obstacle(states=State(StateVector([[20], [15]])), + shape_data=np.array([[-2.5, -2.5, 2.5, 2.5], + [-2.5, 2.5, 2.5, -2.5]]), + position_mapping=(0, 1))])}) # extra_params + ], + ids=["standard_bernoulli", "vis_informed_bernoulli"] + ) +def test_bernoulli_particle(updater, extra_params): timestamp = datetime.datetime.now() timediff = 2 new_timestamp = timestamp + datetime.timedelta(seconds=timediff) @@ -169,15 +190,16 @@ def test_bernoulli_particle(): resampler = SystematicResampler() regulariser = MCMCRegulariser(transition_model=cv) - updater = BernoulliParticleUpdater(measurement_model=None, - resampler=resampler, - regulariser=regulariser, - birth_probability=birth_prob, - survival_probability=survival_prob, - clutter_rate=2, - clutter_distribution=1/10, - nsurv_particles=9, - detection_probability=detection_probability) + params = {'resampler': resampler, + 'regulariser': regulariser, + 'clutter_rate': 2, + 'clutter_distribution': 1/10, + 'nsurv_particles': 9, + 'detection_probability': detection_probability} + + params.update(extra_params) + + updater = updater(measurement_model=None, **params) hypotheses = MultipleHypothesis( [SingleHypothesis(prediction, detection) for detection in detections])