Skip to content

Commit

Permalink
use transonic to optimize cost and heuristic functions for A*
Browse files Browse the repository at this point in the history
  • Loading branch information
VasudhaJha committed Feb 3, 2023
1 parent 50b6757 commit 69ae9b6
Show file tree
Hide file tree
Showing 17 changed files with 441 additions and 68 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ venv
dist
build
rr30a_s0_ch2.tif
__pythran__/
45 changes: 20 additions & 25 deletions brightest_path_lib/algorithm/astar.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import numpy as np
from queue import PriorityQueue, Queue
from typing import List, Tuple
from brightest_path_lib.cost import Reciprocal
from brightest_path_lib.heuristic import Euclidean
from brightest_path_lib.cost import ReciprocalTransonic
from brightest_path_lib.heuristic import EuclideanTransonic
from brightest_path_lib.image import ImageStats
from brightest_path_lib.input import CostFunction, HeuristicFunction
from brightest_path_lib.node import Node
Expand Down Expand Up @@ -91,15 +91,16 @@ def __init__(
self.open_nodes = open_nodes

if cost_function == CostFunction.RECIPROCAL:
self.cost_function = Reciprocal(
self.cost_function = ReciprocalTransonic(
min_intensity=self.image_stats.min_intensity,
max_intensity=self.image_stats.max_intensity)

if heuristic_function == HeuristicFunction.EUCLIDEAN:
self.heuristic_function = Euclidean(scale=self.scale)
self.heuristic_function = EuclideanTransonic(scale=self.scale)

self.is_canceled = False
self.found_path = False
self.evaluated_nodes = 0
self.result = []

def _validate_inputs(
Expand Down Expand Up @@ -196,6 +197,7 @@ def search(self) -> List[np.ndarray]:

close_set_hash.add(current_coordinates)

self.evaluated_nodes = count
return self.result

def _default_value(self) -> float:
Expand Down Expand Up @@ -252,28 +254,25 @@ def _find_2D_neighbors_of(self, node: Node) -> List[Node]:
neighbors = []
steps = [-1, 0, 1]
for xdiff in steps:
new_x = node.point[1] + xdiff
if new_x < self.image_stats.x_min or new_x > self.image_stats.x_max:
continue

for ydiff in steps:
if xdiff == ydiff == 0:
continue

new_x = node.point[1] + xdiff
# new_x = node.point[0] + xdiff
if new_x < self.image_stats.x_min or new_x > self.image_stats.x_max:
continue

new_y = node.point[0] + ydiff
# new_y = node.point[1] + ydiff
if new_y < self.image_stats.y_min or new_y > self.image_stats.y_max:
continue

# new_point = np.array([new_x, new_y])
new_point = np.array([new_y, new_x])

h_for_new_point = self._estimate_cost_to_goal(new_point)

intensity_at_new_point = self.image[new_y, new_x]

cost_of_moving_to_new_point = self.cost_function.cost_of_moving_to(intensity_at_new_point)
cost_of_moving_to_new_point = self.cost_function.cost_of_moving_to(float(intensity_at_new_point))
if cost_of_moving_to_new_point < self.cost_function.minimum_step_cost():
cost_of_moving_to_new_point = self.cost_function.minimum_step_cost()

Expand Down Expand Up @@ -317,7 +316,15 @@ def _find_3D_neighbors_of(self, node: Node) -> List[Node]:
steps = [-1, 0, 1]

for xdiff in steps:
new_x = node.point[2] + xdiff
if new_x < self.image_stats.x_min or new_x > self.image_stats.x_max:
continue

for ydiff in steps:
new_y = node.point[1] + ydiff
if new_y < self.image_stats.y_min or new_y > self.image_stats.y_max:
continue

for zdiff in steps:
if xdiff == ydiff == zdiff == 0:
continue
Expand All @@ -326,24 +333,12 @@ def _find_3D_neighbors_of(self, node: Node) -> List[Node]:
if new_z < self.image_stats.z_min or new_z > self.image_stats.z_max:
continue

# new_y = node.point[2] + ydiff
new_y = node.point[1] + ydiff
if new_y < self.image_stats.y_min or new_y > self.image_stats.y_max:
continue

# new_x = node.point[1] + xdiff
new_x = node.point[2] + xdiff
if new_x < self.image_stats.x_min or new_x > self.image_stats.x_max:
continue

#new_point = np.array([new_z, new_x, new_y])
new_point = np.array([new_z, new_y, new_x])

h_for_new_point = self._estimate_cost_to_goal(new_point)

#intensity_at_new_point = self.image[new_z, new_x, new_y]
intensity_at_new_point = self.image[new_z, new_y, new_x]
cost_of_moving_to_new_point = self.cost_function.cost_of_moving_to(intensity_at_new_point)
cost_of_moving_to_new_point = self.cost_function.cost_of_moving_to(float(intensity_at_new_point))
if cost_of_moving_to_new_point < self.cost_function.minimum_step_cost():
cost_of_moving_to_new_point = self.cost_function.minimum_step_cost()

Expand Down
15 changes: 11 additions & 4 deletions brightest_path_lib/algorithm/nbastar.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def __init__(
self.touch_node: BidirectionalNode = None
self.is_canceled = False
self.found_path = False
self.evaluated_nodes = 2 # since we will add the start and goal node to the open queue
self.result = []

def _validate_inputs(
Expand Down Expand Up @@ -264,6 +265,8 @@ def _expand_2D_neighbors_of(self, node: BidirectionalNode, from_start: bool):
in these directions
- 2D coordinates are of the type (y, x)
"""
current_g_score = node.get_g(from_start) # optimization: will be the same for all neighbors

steps = [-1, 0, 1]
for xdiff in steps:
new_x = node.point[1] + xdiff
Expand All @@ -280,10 +283,10 @@ def _expand_2D_neighbors_of(self, node: BidirectionalNode, from_start: bool):

new_point = np.array([new_y, new_x])

current_g_score = node.get_g(from_start)
# current_g_score = node.get_g(from_start)
intensity_at_new_point = self.image[new_y, new_x]

cost_of_moving_to_new_point = self.cost_function.cost_of_moving_to(intensity_at_new_point)
cost_of_moving_to_new_point = self.cost_function.cost_of_moving_to(float(intensity_at_new_point))
if cost_of_moving_to_new_point < self.cost_function.minimum_step_cost():
cost_of_moving_to_new_point = self.cost_function.minimum_step_cost()

Expand Down Expand Up @@ -316,6 +319,8 @@ def _expand_3D_neighbors_of(self, node: BidirectionalNode, from_start: bool):
or on the edges of the image)
- 3D coordinates are of the form (z, x, y)
"""
current_g_score = node.get_g(from_start)

steps = [-1, 0, 1]

for xdiff in steps:
Expand All @@ -338,10 +343,10 @@ def _expand_3D_neighbors_of(self, node: BidirectionalNode, from_start: bool):

new_point = np.array([new_z, new_y, new_x])

current_g_score = node.get_g(from_start)
# current_g_score = node.get_g(from_start)
intensity_at_new_point = self.image[new_z, new_y, new_x]

cost_of_moving_to_new_point = self.cost_function.cost_of_moving_to(intensity_at_new_point)
cost_of_moving_to_new_point = self.cost_function.cost_of_moving_to(float(intensity_at_new_point))
if cost_of_moving_to_new_point < self.cost_function.minimum_step_cost():
cost_of_moving_to_new_point = self.cost_function.minimum_step_cost()

Expand Down Expand Up @@ -372,6 +377,7 @@ def is_touch_node(
open_queue.put((tentative_f_score, self._get_node_priority(from_start), new_node))
if self.open_nodes:
self.open_nodes.put(new_point_coordinates)
self.evaluated_nodes += 1
self.node_at_coordinates[new_point_coordinates] = new_node
# elif self._in_closed_set(new_point_coordinates, from_start):
# return
Expand All @@ -383,6 +389,7 @@ def is_touch_node(
open_queue.put((tentative_f_score, self._get_node_priority(from_start), already_there))
if self.open_nodes:
self.open_nodes.put(new_point_coordinates)
self.evaluated_nodes += 1
path_length = already_there.g_score_from_start + already_there.g_score_from_goal
if path_length < self.best_path_length:
self.best_path_length = path_length
Expand Down
3 changes: 2 additions & 1 deletion brightest_path_lib/cost/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .cost import Cost
from .reciprocal import Reciprocal
from .reciprocal import Reciprocal
from .reciprocal_transonic import ReciprocalTransonic
7 changes: 3 additions & 4 deletions brightest_path_lib/cost/reciprocal.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ class Reciprocal(Cost):
is between RECIPROCAL MIN and RECIPROCAL_MAX
"""

def __init__(self, min_intensity: float, max_intensity: float) -> None:
super().__init__()
if min_intensity is None or max_intensity is None:
Expand All @@ -30,10 +29,10 @@ def __init__(self, min_intensity: float, max_intensity: float) -> None:
raise ValueError
self.min_intensity = min_intensity
self.max_intensity = max_intensity
self.RECIPROCAL_MIN = 1E-6
self.RECIPROCAL_MIN = float(1E-6)
self.RECIPROCAL_MAX = 255.0
self._min_step_cost = 1 / self.RECIPROCAL_MAX
self._min_step_cost = 1.0 / self.RECIPROCAL_MAX


def cost_of_moving_to(self, intensity_at_new_point: float) -> float:
"""calculates the cost of moving to a point
Expand Down
86 changes: 86 additions & 0 deletions brightest_path_lib/cost/reciprocal_transonic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from transonic import boost

from brightest_path_lib.cost import Cost

@boost
class ReciprocalTransonic(Cost):
"""Uses the reciprocal of pixel/voxel intensity to compute the cost of moving
to a neighboring point
Parameters
----------
min_intensity : float
The minimum intensity a pixel/voxel can have in a given image
max_intensity : float
The maximum intensity a pixel/voxel can have in a given image
Attributes
----------
RECIPROCAL_MIN : float
To cope with zero intensities, RECIPROCAL_MIN is added to the intensities
in the range before reciprocal calculation
RECIPROCAL_MAX : float
We set the maximum intensity <= RECIPROCAL_MAX so that the intensity
is between RECIPROCAL MIN and RECIPROCAL_MAX
"""

min_intensity: float
max_intensity: float
RECIPROCAL_MIN: float
RECIPROCAL_MAX: float
_min_step_cost: float

def __init__(self, min_intensity: float, max_intensity: float) -> None:
super().__init__()
if min_intensity is None or max_intensity is None:
raise TypeError
if min_intensity > max_intensity:
raise ValueError
self.min_intensity = min_intensity
self.max_intensity = max_intensity
self.RECIPROCAL_MIN = float(1E-6)
self.RECIPROCAL_MAX = 255.0
self._min_step_cost = 1.0 / self.RECIPROCAL_MAX


@boost
def cost_of_moving_to(self, intensity_at_new_point: float) -> float:
"""calculates the cost of moving to a point
Parameters
----------
intensity_at_new_point : float
The intensity of the new point under consideration
Returns
-------
float
the cost of moving to the new point
Notes
-----
- To cope with zero intensities, RECIPROCAL_MIN is added to the intensities in the range before reciprocal calculation
- We set the maximum intensity <= RECIPROCAL_MAX so that the intensity is between RECIPROCAL MIN and RECIPROCAL_MAX
"""
if intensity_at_new_point > self.max_intensity:
raise ValueError

intensity_at_new_point = self.RECIPROCAL_MAX * (intensity_at_new_point - self.min_intensity) / (self.max_intensity - self.min_intensity)

if intensity_at_new_point < self.RECIPROCAL_MIN:
intensity_at_new_point = self.RECIPROCAL_MIN

return 1.0 / intensity_at_new_point

@boost
def minimum_step_cost(self) -> float:
"""calculates the minimum step cost
Returns
-------
float
the minimum step cost
"""
return self._min_step_cost
1 change: 1 addition & 0 deletions brightest_path_lib/heuristic/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from .heuristic import Heuristic
from .euclidean import Euclidean
from .euclidean_transonic import EuclideanTransonic
7 changes: 1 addition & 6 deletions brightest_path_lib/heuristic/euclidean.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ class Euclidean(Heuristic):
the scale of the image's Z-axis
"""

def __init__(self, scale: Tuple):
if scale is None:
raise TypeError
Expand Down Expand Up @@ -68,19 +67,15 @@ def estimate_cost_to_goal(self, current_point: np.ndarray, goal_point: np.ndarra
if (len(current_point) == 0 or len(goal_point) == 0) or (len(current_point) != len(goal_point)):
raise ValueError

# current_x, current_y, current_z = current_point[0], current_point[1], 0
current_x, current_y, current_z = current_point[1], current_point[0], 0
# goal_x, goal_y, goal_z = goal_point[0], goal_point[1], 0
goal_x, goal_y, goal_z = goal_point[1], goal_point[0], 0

if len(current_point) == len(goal_point) == 3:
# current_z, current_x, current_y = current_point[0], current_point[1], current_point[2]
# goal_z, goal_x, goal_y = goal_point[0], goal_point[1], goal_point[2]
current_z, current_y, current_x = current_point[0], current_point[1], current_point[2]
goal_z, goal_y, goal_x = goal_point[0], goal_point[1], goal_point[2]

x_diff = (goal_x - current_x) * self.scale_x
y_diff = (goal_y - current_y) * self.scale_y
z_diff = (goal_z - current_z) * self.scale_z

return math.sqrt((x_diff * x_diff) + (y_diff * y_diff) + (z_diff * z_diff))
return math.sqrt((x_diff * x_diff) + (y_diff * y_diff) + (z_diff * z_diff))
Loading

0 comments on commit 69ae9b6

Please sign in to comment.