Skip to content

Commit

Permalink
Raise timeout to 190 seconds and use more descriptive variable name
Browse files Browse the repository at this point in the history
Signed-off-by: Adam.Dybbroe <a000680@c21856.ad.smhi.se>
  • Loading branch information
Adam.Dybbroe committed Jun 19, 2023
1 parent 049fb27 commit be3c85a
Showing 1 changed file with 30 additions and 33 deletions.
63 changes: 30 additions & 33 deletions cspp_runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def __init__(self, ncpus, level1_home, publish_topic):
self.platform_name = 'unknown' # Ex.: Suomi-NPP
self.fullswath = False
self.cspp_results = []
self.glist = []
self.granules = []
self.working_dir = None
self.pass_start_time = None
self.result_files = []
Expand All @@ -310,7 +310,7 @@ def initialise(self):
"""Initialise the processor."""
self.fullswath = False
self.cspp_results = []
self.glist = []
self.granules = []
self.working_dir = None
self.pass_start_time = None
self.platform_name = 'unknown'
Expand All @@ -325,16 +325,16 @@ def run(self, msg, publisher, viirs_sdr_call, viirs_sdr_options,
else:
LOG.debug("Message is None!")

LOG.debug("glist: " + str(self.glist))
if msg is None and self.glist and len(self.glist) > 2:
LOG.debug("granules: " + str(self.granules))
if msg is None and self.granules and len(self.granules) > 2:
# The swath is assumed to be finished now
LOG.debug("The swath is assumed to be finished now")
del self.glist[0]
keeper = self.glist[1]
LOG.info("Start CSPP: RDR files = " + str(self.glist))
del self.granules[0]
keeper = self.granules[1]
LOG.info("Start CSPP: RDR files = " + str(self.granules))
self.cspp_results.append(
self.pool.apply_async(self.spawn_cspp,
args=(keeper, self.glist, publisher,
args=(keeper, self.granules, publisher,
viirs_sdr_call,
viirs_sdr_options),
kwds={"granule_time_tolerance":
Expand All @@ -349,16 +349,14 @@ def run(self, msg, publisher, viirs_sdr_call, viirs_sdr_options,
LOG.info("Not a VIIRS scene. Continue...")
return True
elif msg is None:
LOG.debug("Message is None. glist = %s", str(self.glist))
LOG.debug("Message is None. granules = %s", str(self.granules))
return True

LOG.debug("")
LOG.debug("\tMessage:")
LOG.debug(str(msg))
urlobj = urlparse(msg.data['uri'])
LOG.info("Sat and Instrument: %s %s",
str(msg.data['platform_name']),
str(msg.data['sensor']))
LOG.info("Sat and Instrument: %s %s", str(msg.data['platform_name']), str(msg.data['sensor']))

self.platform_name = str(msg.data['platform_name'])
self.message_data = msg.data
Expand Down Expand Up @@ -402,28 +400,27 @@ def run(self, msg, publisher, viirs_sdr_call, viirs_sdr_options,
self.orbit_number = orbnum
LOG.info("Orbit number = " + str(self.orbit_number))

self.glist.append(rdr_filename)
self.granules.append(rdr_filename)

if len(self.glist) > 4:
raise RuntimeError("Invalid number of granules to "
"process!!!")
if len(self.glist) == 4:
if len(self.granules) > 4:
raise RuntimeError("Invalid number of granules to process!!!")
if len(self.granules) == 4:
LOG.info("4 granules. Skip the first from the list...")
del self.glist[0]
if len(self.glist) == 3:
del self.granules[0]
if len(self.granules) == 3:
LOG.info("3 granules. Keep the middle one...")
keeper = self.glist[1]
if len(self.glist) == 2:
keeper = self.granules[1]
if len(self.granules) == 2:
LOG.info("2 granules. Keep the first one...")
keeper = self.glist[0]
if len(self.glist) == 1:
keeper = self.granules[0]
if len(self.granules) == 1:
# Check start and end time and check if the RDR file
# contains several granules (a full local swath):
tdiff = end_time - start_time
if tdiff.seconds > 4 * 60:
LOG.info("RDR file contains 3 or more granules. " +
"We assume it is a full local swath!")
keeper = self.glist[0]
keeper = self.granules[0]
self.fullswath = True
else:
LOG.info("Only one granule. This is not enough for CSPP" +
Expand Down Expand Up @@ -458,11 +455,11 @@ def run(self, msg, publisher, viirs_sdr_call, viirs_sdr_options,
self.working_dir)

LOG.info("Before call to spawn_cspp. Argument list = " +
str([keeper] + self.glist))
str([keeper] + self.granules))
LOG.info("Start time: %s", start_time.strftime('%Y-%m-%d %H:%M:%S'))
self.cspp_results.append(
self.pool.apply_async(self.spawn_cspp,
args=(keeper, self.glist, publisher,
args=(keeper, self.granules, publisher,
viirs_sdr_call,
viirs_sdr_options),
kwds={"granule_time_tolerance": granule_time_tolerance}))
Expand All @@ -473,12 +470,12 @@ def run(self, msg, publisher, viirs_sdr_call, viirs_sdr_options,

return True

def spawn_cspp(self, current_granule, glist, publisher,
def spawn_cspp(self, current_granule, granules, publisher,
viirs_sdr_call, viirs_sdr_options, **kwargs):
"""Spawn a CSPP run on the set of RDR files given."""
LOG.info("current_granule = " + str(current_granule))
LOG.info("glist = " + str(glist))
if current_granule in glist and len(glist) == 1:
LOG.info("granules = " + str(granules))
if current_granule in granules and len(granules) == 1:
LOG.info("Current granule is identical to the 'list of granules'" +
" No sdr result files will be skipped")

Expand All @@ -490,9 +487,9 @@ def spawn_cspp(self, current_granule, glist, publisher,

working_dir = create_tmp_workdir(self.working_dir)

LOG.info("Start CSPP: RDR files = " + str(glist))
LOG.info("Start CSPP: RDR files = " + str(granules))
LOG.info("Run from working dir = %s", working_dir)
run_cspp(working_dir, viirs_sdr_call, viirs_sdr_options, glist)
run_cspp(working_dir, viirs_sdr_call, viirs_sdr_options, granules)
LOG.info("CSPP SDR processing finished...")
# Assume everything has gone well!

Expand Down Expand Up @@ -569,7 +566,7 @@ def publish_sdr(self, publisher, result_files):
'polar',
'direct_readout')),
"dataset", to_send).encode()
LOG.debug("sending: " + str(msg))
LOG.debug("Sending message: " + str(msg))
publisher.send(msg)
LOG.debug("After having published/sent message.")

Expand Down Expand Up @@ -646,7 +643,7 @@ def npp_rolling_runner(
while True:
LOG.debug("Re-initialise the viirs processor instance.")
viirs_proc.initialise()
for msg in subscr.recv(timeout=90):
for msg in subscr.recv(timeout=190):
status = viirs_proc.run(
msg, publisher,
viirs_sdr_call, viirs_sdr_options,
Expand Down

0 comments on commit be3c85a

Please sign in to comment.