Skip to content

Commit

Permalink
Implement strategy design pattern to load_image.py
Browse files Browse the repository at this point in the history
This is done to help in adding in Edge correction and make the code more manageable and extendable.
  • Loading branch information
emirkmo committed Feb 14, 2022
1 parent 3084d3b commit 84c32d4
Showing 1 changed file with 94 additions and 13 deletions.
107 changes: 94 additions & 13 deletions flows/load_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
from astropy.io import fits
from astropy.time import Time
from astropy.wcs import WCS, FITSFixedWarning
from typing import Tuple

from flows import api
from dataclasses import dataclass, field
import typing
from abc import ABC, abstractmethod

logger = logging.getLogger(__name__) # Singleton logger instance


# --------------------------------------------------------------------------------------------------
@dataclass
Expand All @@ -44,13 +48,6 @@ def update_mask(self, mask):
self.mask = mask
self.check_finite()

# def get_observatory_params(self, instrument=None):
# if not instrument:
# instrument = self.instrument
# site = instrument.get_site()
# obstime = instrument.get_obstime()
# photfilter = instrument.get_photfilter()

def create_wcs(self) -> WCS:
with warnings.catch_warnings():
warnings.simplefilter('ignore', category=FITSFixedWarning)
Expand All @@ -61,6 +58,84 @@ def create_masked_image(self):
self.image[self.mask] = np.NaN
self.clean = np.ma.masked_array(data=self.image, mask=self.mask, copy=False)

def set_edge_rows_to_value(self, y: Tuple[float] = None, value: typing.Union[int, float, np.NaN] = 0):
if y is None:
pass
for row in y:
self.image[row] = value

def set_edge_columns_to_value(self, x: Tuple[float] = None, value: typing.Union[int, float, np.NaN] = 0):
if x is None:
pass
for col in x:
self.image[:, col] = value

@staticmethod
def get_edge_mask(img: np.ndarray, value: typing.Union[int, float, np.NaN] = 0):
"""
Create boolean mask of given value near edge of image.
Parameters:
img (ndarray): image with values for masking.
value (float): Value to detect near edge. Default=0.
Returns:
ndarray: Pixel mask with given values on the edge of image.
.. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk>
"""

mask1 = (img == value)
mask = np.zeros_like(img, dtype='bool')

# Mask entire rows and columns which are only the value:
mask[np.all(mask1, axis=1), :] = True
mask[:, np.all(mask1, axis=0)] = True

# Detect "uneven" edges column-wise in image:
a = np.argmin(mask1, axis=0)
b = np.argmin(np.flipud(mask1), axis=0)
for col in range(img.shape[1]):
if mask1[0, col]:
mask[:a[col], col] = True
if mask1[-1, col]:
mask[-b[col]:, col] = True

# Detect "uneven" edges row-wise in image:
a = np.argmin(mask1, axis=1)
b = np.argmin(np.fliplr(mask1), axis=1)
for row in range(img.shape[0]):
if mask1[row, 0]:
mask[row, :a[row]] = True
if mask1[row, -1]:
mask[row, -b[row]:] = True

return mask

def apply_edge_mask(self, y: Tuple[int] = None, x: Tuple[int] = None, apply_existing_mask_first: bool = False):
"""
Masks given rows and columns of image but will replace the current mask! Set apply_existing_mask_first to True
if the current mask should be kept.
:param y: Tuple[int] of rows to mask
:param x: Tuple[int] of columns to mask
:param apply_existing_mask_first: Whether to apply the existing mask to image first, before overwriting mask.
:return: None
"""
if y is None and x is None:
logger.debug("(y,x) was None when applying edge mask. Edge was not actually masked.")

if apply_existing_mask_first:
self.create_masked_image()

if y is not None:
self.set_edge_rows_to_value(y=y)

if x is not None:
self.set_edge_columns_to_value(x=x)

self.mask = self.get_edge_mask(self.image)
self.create_masked_image()


class AbstractInstrument(ABC):
peakmax: int = None
Expand Down Expand Up @@ -149,7 +224,7 @@ def __init__(self, image: FlowsImage):

def get_obstime(self):
obstime = Time(self.image.header['DATE-OBS'], format='isot', scale='utc',
location=self.image.site['EarthLocation'])
location=self.image.site['EarthLocation'])
obstime += 0.5 * self.image.exptime * u.second # Make time centre of exposure
return obstime

Expand Down Expand Up @@ -186,8 +261,12 @@ class ALFOSC(Instrument):
siteid = 5

def get_obstime(self):
return Time(self.image.header['DATE-AVG'], format='isot', scale='utc',
location=self.image.site['EarthLocation'])
return Time(
self.image.header['DATE-AVG'],
format='isot',
scale='utc',
location=self.image.site['EarthLocation']
)

def get_photfilter(self):
# Sometimes data from NOT does not have the FILTER keyword,
Expand Down Expand Up @@ -481,6 +560,7 @@ def get_photfilter(self):
}.get(hdr['FILTER'], hdr['FILTER'])
return photfilter


class TJO(Instrument):
siteid = 22

Expand Down Expand Up @@ -570,7 +650,6 @@ def new_load_image(filename: str, target_coord: coords.SkyCoord = None):
.. codeauthor:: Emir Karamehmetoglu <emir.k@phys.au.dk>
.. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk>
"""
logger = logging.getLogger(__name__)
ext = 0 # Default extension in HDUList, individual instruments may override this.
mask = None # Instrument can override, default is to only mask all non-finite values, override is additive.

Expand Down Expand Up @@ -610,7 +689,8 @@ def new_load_image(filename: str, target_coord: coords.SkyCoord = None):
break

# NOT - ALFOSC
elif telescope == 'NOT' and instrument in ('ALFOSC FASU','ALFOSC_FASU') and hdr.get('OBS_MODE','').lower() == 'imaging':
elif telescope == 'NOT' and instrument in ('ALFOSC FASU', 'ALFOSC_FASU') and hdr.get('OBS_MODE',
'').lower() == 'imaging':
instrument_name = 'ALFOSC'
break

Expand Down Expand Up @@ -651,7 +731,8 @@ def new_load_image(filename: str, target_coord: coords.SkyCoord = None):
instrument_name = 'Sofi'
break

elif telescope == 'ESO-NTT' and instrument == 'EFOSC' and (origin == 'ESO' or origin.startswith('NOAO-IRAF')):
elif telescope == 'ESO-NTT' and instrument == 'EFOSC' and (
origin == 'ESO' or origin.startswith('NOAO-IRAF')):
instrument_name = 'EFOSC'
break

Expand Down

0 comments on commit 84c32d4

Please sign in to comment.