Skip to content

Commit

Permalink
Merge pull request #2486 from ASFHyP3/fix-negative-processing-times
Browse files Browse the repository at this point in the history
Fix negative processing times
  • Loading branch information
jtherrmann authored Nov 13, 2024
2 parents 340c17b + 3aaf60d commit c00533a
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 104 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [8.0.1]
## [9.0.0]

### Changed
- All failed jobs now have a `processing_times` value of `null`.

### Fixed
- Resolve a regression introduced by the previous release (v8.0.0) in which a processing step could report a negative processing time if the underlying AWS Batch job had a failed attempt that did not include a `StartedAt` field. Fixes <https://github.com/ASFHyP3/hyp3/issues/2485>
- Upgrade from Flask v2.2.5 to v3.0.3. Fixes <https://github.com/ASFHyP3/hyp3/issues/2491>
- Specify our custom JSON encoder by subclassing `flask.json.provider.JSONProvider`. See <https://github.com/pallets/flask/pull/4692>

Expand Down
7 changes: 4 additions & 3 deletions apps/api/src/hyp3_api/api-spec/openapi-spec.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,9 @@ components:
processing_times:
description: >
List of run times for the job's processing steps in the order that they were executed.
An empty list represents a failure to calculate processing times.
This field is null for failed jobs and non-null for successful jobs.
type: array
nullable: true
items:
oneOf:
- type: array
Expand All @@ -422,10 +423,10 @@ components:

processing_time_in_seconds:
description: >
Run time in seconds for a processing step's final attempt (regardless of whether it succeeded).
A value of zero indicates that there were no attempts.
Run time in seconds for a processing step's final attempt.
type: number
minimum: 0
exclusiveMinimum: true
example: 50

securitySchemes:
Expand Down
15 changes: 1 addition & 14 deletions apps/check-processing-time/src/check_processing_time.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,11 @@
import json
from typing import Union


def get_time_from_attempts(attempts: list[dict]) -> float:
if len(attempts) == 0:
return 0
attempts.sort(key=lambda attempt: attempt['StoppedAt'])
final_attempt = attempts[-1]
return (final_attempt['StoppedAt'] - final_attempt['StartedAt']) / 1000


def get_time_from_result(result: Union[list, dict]) -> Union[list, float]:
if isinstance(result, list):
return [get_time_from_result(item) for item in result]

if 'start' in result:
attempts = [{'StartedAt': start, 'StoppedAt': stop} for start, stop in zip(result['start'], result['stop'])]
return get_time_from_attempts(attempts)

return get_time_from_attempts(json.loads(result['Cause'])['Attempts'])
return (result['StoppedAt'] - result['StartedAt']) / 1000


def lambda_handler(event, _) -> list[Union[list, float]]:
Expand Down
4 changes: 2 additions & 2 deletions apps/render_cf.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ def get_batch_submit_job_state(job_spec: dict, step: dict, filter_batch_params=F
},
},
'ResultSelector': {
'start.$': '$.Attempts[*].StartedAt',
'stop.$': '$.Attempts[*].StoppedAt',
'StartedAt.$': '$.StartedAt',
'StoppedAt.$': '$.StoppedAt',
},
'Retry': [
{
Expand Down
3 changes: 1 addition & 2 deletions apps/step-function.json.j2
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
"SET_DEFAULT_RESULTS": {
"Type": "Pass",
"Result": {
"processing_times": [],
"get_files": {
"logs": [],
"expiration_time": null
Expand Down Expand Up @@ -208,7 +207,7 @@
"status_code": "FAILED",
"logs.$": "$.results.get_files.logs",
"expiration_time.$": "$.results.get_files.expiration_time",
"processing_times.$": "$.results.processing_times"
"processing_times": null
},
"Retry": [
{
Expand Down
90 changes: 8 additions & 82 deletions tests/test_check_processing_time.py
Original file line number Diff line number Diff line change
@@ -1,99 +1,25 @@
import check_processing_time


def test_single_attempt():
attempts = [{'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 2800}]
assert check_processing_time.get_time_from_attempts(attempts) == 2.3


def test_multiple_attempts():
attempts = [
{'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 1000},
{'Container': {}, 'StartedAt': 3000, 'StatusReason': '', 'StoppedAt': 8700}
]
assert check_processing_time.get_time_from_attempts(attempts) == 5.7


def test_unsorted_attempts():
# I'm not sure if the lambda would ever be given unsorted attempts, but it seems worth testing that it doesn't
# depend on the list to be sorted.
attempts = [
{'Container': {}, 'StartedAt': 3000, 'StatusReason': '', 'StoppedAt': 8700},
{'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 1000}
]
assert check_processing_time.get_time_from_attempts(attempts) == 5.7


def test_missing_start_time():
# There are some cases in which at least one of the attempts may not have a StartedAt time.
# https://github.com/ASFHyP3/hyp3/issues/936
attempts = [
{'Container': {}, 'StartedAt': 500, 'StatusReason': '', 'StoppedAt': 1000},
{'Container': {}, 'StatusReason': '', 'StoppedAt': 8700},
{'Container': {}, 'StartedAt': 12000, 'StatusReason': '', 'StoppedAt': 15200}
]
assert check_processing_time.get_time_from_attempts(attempts) == 3.2


def test_no_attempts():
assert check_processing_time.get_time_from_attempts([]) == 0


def test_get_time_from_result():
result = {
'start': [500, 3000],
'stop': [1000, 8700],
}
assert check_processing_time.get_time_from_result(result) == 5.7


def test_get_time_from_result_list():
result = [
{
'start': [500, 3000],
'stop': [1000, 8900],
},
{
'start': [500, 4000],
'stop': [3000, 4200],
},
]
assert check_processing_time.get_time_from_result(result) == [5.9, 0.2]


def test_get_time_from_result_failed():
result = {
'Error': 'States.TaskFailed',
'Cause': '{"Attempts": ['
'{"Container": {}, "StartedAt": 500, "StatusReason": "", "StoppedAt": 1000}, '
'{"Container": {}, "StartedAt": 1500, "StatusReason": "", "StoppedAt": 2000}, '
'{"Container": {}, "StartedAt": 3000, "StatusReason": "", "StoppedAt": 9400}]}'
}
assert check_processing_time.get_time_from_result(result) == 6.4


def test_lambda_handler():
event = {
'processing_results': {
'step_0': {
'start': [500, 3000],
'stop': [1000, 8700],
'StartedAt': 3000,
'StoppedAt': 8700,
},
'step_1': {
'Error': 'States.TaskFailed',
'Cause': '{"Attempts": ['
'{"Container": {}, "StartedAt": 500, "StatusReason": "", "StoppedAt": 1000}, '
'{"Container": {}, "StartedAt": 1500, "StatusReason": "", "StoppedAt": 2000}, '
'{"Container": {}, "StartedAt": 3000, "StatusReason": "", "StoppedAt": 9400}]}'
'StartedAt': 3000,
'StoppedAt': 9400,
},
'step_2': [
{
'start': [500, 3000],
'stop': [1000, 8900],
'StartedAt': 3000,
'StoppedAt': 8900,
},
{
'start': [500, 4000],
'stop': [3000, 4200],
'StartedAt': 4000,
'StoppedAt': 4200,
},
]
}
Expand Down

0 comments on commit c00533a

Please sign in to comment.