Skip to content

Commit

Permalink
replace casjobs.jar with mastcasjobs python module and fix various is…
Browse files Browse the repository at this point in the history
…sue; now works on new updated library versions: run_catalogs.py [--commit]
  • Loading branch information
Erik Jensen committed Sep 16, 2024
1 parent 96a5c1a commit 5a2f4db
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 86 deletions.
Binary file removed flows/casjobs/casjobs.jar
Binary file not shown.
171 changes: 87 additions & 84 deletions flows/catalogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
.. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk>
"""
import configparser
import logging
import os
import os.path
Expand All @@ -17,13 +18,15 @@
import requests
from astropy import units as u
from astropy.coordinates import Angle, SkyCoord
from astropy.table import MaskedColumn, Table
from astropy.table import MaskedColumn, Table, unique
from astropy.time import Time
from astroquery import sdss
from astroquery.simbad import Simbad
from bottleneck import anynan
from tendrils.utils import load_config, query_ztf_id

import mastcasjobs

from .aadc_db import AADC_DB

logger = logging.getLogger(__name__)
Expand All @@ -49,42 +52,15 @@ def intval(value):


# --------------------------------------------------------------------------------------------------
def configure_casjobs(overwrite=False):
def casjobs_configured(config: configparser.ConfigParser) -> bool:
"""
Set up CasJobs if needed.
Parameters:
overwrite (bool, optional): Overwrite existing configuration. Default (False) is to not
overwrite existing configuration.
.. codeauthor:: Rasmus Handberg <rasmush@phys.au.dk>
Check if casjobs credentials are available.
"""
__dir__ = os.path.dirname(os.path.realpath(__file__))
casjobs_config = os.path.join(__dir__, 'casjobs', 'CasJobs.config')
logger.debug(",".join([casjobs_config,__dir__,os.path.realpath(__file__)]))
if os.path.isfile(casjobs_config) and not overwrite:
return

config = load_config()
wsid = config.get('casjobs', 'wsid', fallback=os.environ.get("CASJOBS_WSID", None))
passwd = config.get('casjobs', 'password', fallback=os.environ.get("CASJOBS_PASSWORD", None))
if wsid is None or passwd is None:
raise CasjobsError("CasJobs WSID and PASSWORD not in config.ini")

try:
with open(casjobs_config, 'w') as fid:
fid.write("wsid={0:s}\n".format(wsid))
fid.write("password={0:s}\n".format(passwd))
fid.write("default_target=HLSP_ATLAS_REFCAT2\n")
fid.write("default_queue=1\n")
fid.write("default_days=1\n")
fid.write("verbose=false\n")
fid.write("debug=false\n")
fid.write("jobs_location=http://mastweb.stsci.edu/gcasjobs/services/jobs.asmx\n")
except: # noqa: E722, pragma: no cover
if os.path.isfile(casjobs_config):
os.remove(casjobs_config)

raise CasjobsError("CasJobs WSID and PASSWORD not in config.ini / 'CASJOBS_WSID' and 'CASJOBS_PASSWORD' not defined as environment variables")
return True

# --------------------------------------------------------------------------------------------------
def query_casjobs_refcat2(coo_centre, radius=24 * u.arcmin):
Expand Down Expand Up @@ -114,18 +90,17 @@ def query_casjobs_refcat2(coo_centre, radius=24 * u.arcmin):
results = _query_casjobs_refcat2_divide_and_conquer(coo_centre, radius=radius)

# Remove duplicate entries:
_, indx = np.unique([res['starid'] for res in results], return_index=True)
results = [results[k] for k in indx]
results = unique(results, keys='starid')

# Trim away anything outside radius:
ra = [res['ra'] for res in results]
decl = [res['decl'] for res in results]
coords = SkyCoord(ra=ra, dec=decl, unit='deg', frame='icrs')
sep = coords.separation(coo_centre)
results = [res for k, res in enumerate(results) if sep[k] <= radius]
mask = sep <= radius

logger.debug("Found %d unique results", len(results))
return results
logger.debug("Found %d unique results", np.count_nonzero(mask))
return results[mask]


# --------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -176,55 +151,81 @@ def _query_casjobs_refcat2(coo_centre, radius=24 * u.arcmin):
if isinstance(radius, (float, int)):
radius *= u.deg

sql = "SELECT r.* FROM fGetNearbyObjEq({ra:f}, {dec:f}, {radius:f}) AS n INNER JOIN HLSP_ATLAS_REFCAT2.refcat2 AS r ON n.objid=r.objid ORDER BY n.distance;".format(
ra=coo_centre.ra.deg, dec=coo_centre.dec.deg, radius=Angle(radius).deg)
sql = ("""SELECT r.* FROM fGetNearbyObjEq({ra:f}, {dec:f}, {radius:f}) AS n
INNER JOIN HLSP_ATLAS_REFCAT2.refcat2 AS r ON n.objid=r.objid ORDER BY n.distance;"""
.format(ra=coo_centre.ra.deg, dec=coo_centre.dec.deg, radius=Angle(radius).deg))
logger.debug(sql)

# Make sure that CasJobs have been configured:
configure_casjobs()

# The command to run the casjobs script:
# BEWARE: This may change in the future without warning - it has before!
cmd = 'java -jar casjobs.jar execute "{0:s}"'.format(sql)

# Execute the command:
cmd = shlex.split(cmd) # TODO: download casjobs from https://galex.stsci.edu/casjobs/casjobscl.aspx, follow instructions
directory = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'casjobs')
proc = subprocess.Popen(cmd, cwd=directory, stdout=subprocess.PIPE, universal_newlines=True)
stdout, stderr = proc.communicate()
output = stdout.split("\n")

# build list of all kois from output from the CasJobs-script:
error_thrown = False
results = []
for line in output:
line = line.strip()
if line == '':
continue
if 'ERROR' in line:
error_thrown = True
break

row = line.split(',')
if len(row) == 45 and row[0] != '[objid]:Integer':
results.append(
{'starid': int(row[0]), 'ra': floatval(row[1]), 'decl': floatval(row[2]), 'pm_ra': floatval(row[5]),
'pm_dec': floatval(row[7]), 'gaia_mag': floatval(row[9]), 'gaia_bp_mag': floatval(row[11]),
'gaia_rp_mag': floatval(row[13]), 'gaia_variability': intval(row[17]), 'g_mag': floatval(row[22]),
'r_mag': floatval(row[26]), 'i_mag': floatval(row[30]), 'z_mag': floatval(row[34]),
'J_mag': floatval(row[39]), 'H_mag': floatval(row[41]), 'K_mag': floatval(row[43]), })

if error_thrown:
error_msg = ''
for line in output:
if len(line.strip()) > 0:
error_msg += line.strip() + "\n"

logger.debug("Error Msg: %s", error_msg)
if 'query results exceed memory limit' in error_msg.lower():
raise CasjobsMemoryError("Query results exceed memory limit")
else:
raise CasjobsError("ERROR detected in CasJobs: " + error_msg)
config = load_config()
casjobs_configured(config)

jobs = mastcasjobs.MastCasJobs(userid=config.get('casjobs', 'wsid'),
password=config.get('casjobs', 'password'),
context="HLSP_ATLAS_REFCAT2")
results = jobs.quick(sql, task_name="python cone search")

# Contents of HLSP_ATLAS_REFCAT2.refcat2 data table - i.e. the columns of 'results' just
# above; from https://galex.stsci.edu/casjobs/MyDB.aspx -> HLSP_ATLAS_REFCAT2.refcat2
# -----------------------------------------------------------------------------
# Name Unit Data Type Size Default Value Description
# objid dimensionless bigint 8 Unique object identifier.
# RA degrees float 8 RA from Gaia DR2, J2000, epoch 2015.5
# Dec degrees float 8 Dec from Gaia DR2, J2000, epoch 2015.5
# plx mas real 4 Parallax from Gaia DR2
# dplx mas real 4 Parallax uncertainty
# pmra mas/yr real 4 Proper motion in RA from Gaia DR2
# dpmra mas/yr real 4 Proper motion uncertainty in RA
# pmdec mas/yr real 4 Proper motion in Dec from Gaia DR2
# dpmdec mas/yr real 4 Proper motion uncertainty in Dec
# Gaia mag real 4 Gaia DR2 G magnitude
# dGaia mag real 4 Gaia DR2 G magnitude uncertainty
# BP mag real 4 Gaia G_BP magnitude
# dBP mag real 4 Gaia G_BP magnitude uncertainty
# RP mag real 4 Gaia G_RP magnitude
# dRP mag real 4 Gaia G_RP magnitude uncertainty
# Teff [K] int 4 Gaia stellar effective temperature
# AGaia mag real 4 Gaia estimate of G-band extinction for this star
# dupvar dimensionless int 4 Gaia flags coded as CONSTANT (0), VARIABLE (1), or NOT_AVAILABLE (2) + 4*DUPLICATE
# Ag mag real 4 SFD estimate of total column g-band extinction
# rp1 arc seconds real 4 Radius where cumulative G flux exceeds 0.1x this star
# r1 arc seconds real 4 Radius where cumulative G flux exceeds 1x this star
# r10 arc seconds real 4 Radius where cumulative G flux exceeds 10x this star
# g mag real 4 Pan-STARRS g_P1 magnitude
# dg mag real 4 Pan-STARRS g_P1 magnitude uncertainty
# gchi dimensionless real 4 chi^2/DOF for contributors to g
# gcontrib dimensionless int 4 Bitmap of contributing catalogs to g
# r mag real 4 Pan-STARRS r_P1 magnitude
# dr mag real 4 Pan-STARRS r_P1 magnitude uncertainty
# rchi dimensionless real 4 chi^2/DOF for contributors to r
# rcontrib dimensionless int 4 Bitmap of contributing catalogs to r
# i mag real 4 Pan-STARRS i_P1 magnitude
# di mag real 4 Pan-STARRS i_P1 magnitude uncertainty
# ichi dimensionless real 4 chi^2/DOF for contributors to i
# icontrib dimensionless int 4 Bitmap of contributing catalogs to i
# z mag real 4 Pan-STARRS z_P1 magnitude
# dz mag real 4 Pan-STARRS z_P1 magnitude uncertainty
# zchi dimensionless real 4 chi^2/DOF for contributors to z
# zcontrib dimensionless int 4 Bitmap of contributing catalogs to z
# nstat dimensionless int 4 Count of griz deweighted outliers
# J mag real 4 2MASS J magnitude
# dJ mag real 4 2MASS J magnitude uncertainty
# H mag real 4 2MASS H magnitude
# dH mag real 4 2MASS H magnitude uncertainty
# K mag real 4 2MASS K magnitude
# dK mag real 4 2MASS K magnitude uncertainty

# remove unused columns from results
results.remove_columns(['plx', 'dplx', 'dpmra', 'dpmdec', 'dGaia', 'dBP', 'dRP', 'Teff', 'AGaia', 'Ag',
'rp1', 'r1', 'r10', 'dg', 'gchi', 'gcontrib', 'dr', 'rchi', 'rcontrib',
'di', 'ichi', 'icontrib', 'dz', 'zchi', 'zcontrib', 'nstat', 'dJ', 'dH', 'dK'])

# rename columns to match older java-based implementation which had different naming conventions
# TODO: refactor dependent functions to expect the default namings and get rid of these renamings
# to start: comment out the renamings below and see where the flows pipeline breaks, then fix it
results = Table(results,
names=('starid', 'ra', 'decl', 'pm_ra', 'pm_dec', 'gaia_mag', 'gaia_bp_mag', 'gaia_rp_mag',
'gaia_variability', 'g_mag', 'r_mag', 'i_mag', 'z_mag', 'J_mag', 'H_mag', 'K_mag'))

if not results:
raise CasjobsError("Could not find anything on CasJobs")
Expand Down Expand Up @@ -298,7 +299,9 @@ def query_sdss(coo_centre, radius=24 * u.arcmin, dr=16, clean=True):
if isinstance(radius, (float, int)):
radius *= u.deg

#SDSS.MAX_CROSSID_RADIUS = radius + 1 * u.arcmin
# from astroquery version 0.4.7 onwards, radius is limited to 3 arcmins - use this value if necessary
radius = radius if radius < sdss.SDSS.MAX_CROSSID_RADIUS else sdss.SDSS.MAX_CROSSID_RADIUS

sdss.conf.skyserver_baseurl = sdss.conf.skyserver_baseurl.replace("http://","https://")
AT_sdss = sdss.SDSS.query_region(coo_centre, photoobj_fields=['type', 'clean', 'ra', 'dec', 'psfMag_u'], data_release=dr,
timeout=600, radius=radius)
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ astroalign > 2.3
networkx
astroquery >= 0.4.6
tendrils >= 0.1.5
mastcasjobs >= 0.0.6
3 changes: 2 additions & 1 deletion run_catalogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def parse():
parser.add_argument('-d', '--debug', help='Print debug messages.', action='store_true')
parser.add_argument('-q', '--quiet', help='Only report warnings and errors.', action='store_true')
parser.add_argument('-t', '--target', type=str, help='Target to print catalog for.', nargs='?', default=None)
parser.add_argument('-c', '--commit', help='Commit downloaded catalogs to flows database.', action='store_true')
return parser.parse_args()

def set_logging_level(args):
Expand Down Expand Up @@ -43,7 +44,7 @@ def main():
# Get missing
for target in api.get_catalog_missing():
logger.info("Downloading catalog for target=%s...", target)
download_catalog(target) # @TODO: refactor to Tendrils
download_catalog(target, update_existing=args.commit) # @TODO: refactor to Tendrils

# download target catalog for printing
if args.target is not None:
Expand Down
3 changes: 2 additions & 1 deletion tests/test_catalogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import pytest
from astropy.coordinates import SkyCoord
from astropy.table import Table
from tendrils.utils import load_config

from flows import catalogs

Expand Down Expand Up @@ -46,7 +47,7 @@ def test_download_catalog(SETUP_CONFIG, ra: float = 256.727512, dec: float = 30.
# Check if CasJobs have been configured, and skip the entire test if it isn't.
# This has to be done like this, to avoid problems when config.ini doesn't exist.
try:
catalogs.configure_casjobs()
catalogs.casjobs_configured(load_config())
except catalogs.CasjobsError:
pytest.skip("CasJobs not configured")

Expand Down

0 comments on commit 5a2f4db

Please sign in to comment.