Skip to content

Commit

Permalink
fix: Remove sensor poke failure event (#520)
Browse files Browse the repository at this point in the history
* fix: remove_sensor_poke_failure_events

* fix: updated proton commit hash

* fix: updated proton commit hash

* fix: proton commit hash update
  • Loading branch information
Mryashbhardwaj authored Aug 12, 2022
1 parent 6309c5c commit f7ebafa
Show file tree
Hide file tree
Showing 9 changed files with 522 additions and 271 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ NAME = "github.com/odpf/optimus"
LAST_COMMIT := $(shell git rev-parse --short HEAD)
LAST_TAG := "$(shell git rev-list --tags --max-count=1)"
OPMS_VERSION := "$(shell git describe --tags ${LAST_TAG})-next"
PROTON_COMMIT := "f90fd714cfc65894b4eb73642ae098e30081b672"
PROTON_COMMIT := "16c082a44a3775d5114ccee368e91b7313721f39"

.PHONY: build test test-ci generate-proto unit-test-ci smoke-test integration-test vet coverage clean install lint

Expand Down
473 changes: 245 additions & 228 deletions api/proto/odpf/optimus/core/v1beta1/job_spec.pb.go

Large diffs are not rendered by default.

232 changes: 232 additions & 0 deletions api/proto/odpf/optimus/core/v1beta1/status.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -604,10 +604,14 @@
"TYPE_TASK_SUCCESS",
"TYPE_TASK_START",
"TYPE_TASK_FAIL",
"TYPE_SENSOR_POKE",
"TYPE_SENSOR_RETRY",
"TYPE_SENSOR_SUCCESS",
"TYPE_SENSOR_START",
"TYPE_SENSOR_FAIL"
"TYPE_SENSOR_FAIL",
"TYPE_HOOK_START",
"TYPE_HOOK_RETRY",
"TYPE_HOOK_FAIL",
"TYPE_HOOK_SUCCESS"
],
"default": "TYPE_UNSPECIFIED"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,10 +548,14 @@
"TYPE_TASK_SUCCESS",
"TYPE_TASK_START",
"TYPE_TASK_FAIL",
"TYPE_SENSOR_POKE",
"TYPE_SENSOR_RETRY",
"TYPE_SENSOR_SUCCESS",
"TYPE_SENSOR_START",
"TYPE_SENSOR_FAIL"
"TYPE_SENSOR_FAIL",
"TYPE_HOOK_START",
"TYPE_HOOK_RETRY",
"TYPE_HOOK_FAIL",
"TYPE_HOOK_SUCCESS"
],
"default": "TYPE_UNSPECIFIED"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,14 @@
"TYPE_TASK_SUCCESS",
"TYPE_TASK_START",
"TYPE_TASK_FAIL",
"TYPE_SENSOR_POKE",
"TYPE_SENSOR_RETRY",
"TYPE_SENSOR_SUCCESS",
"TYPE_SENSOR_START",
"TYPE_SENSOR_FAIL"
"TYPE_SENSOR_FAIL",
"TYPE_HOOK_START",
"TYPE_HOOK_RETRY",
"TYPE_HOOK_FAIL",
"TYPE_HOOK_SUCCESS"
],
"default": "TYPE_UNSPECIFIED"
},
Expand Down
4 changes: 2 additions & 2 deletions buf.gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ plugins:
# proto file should be.
# This is necessary while importing a proto file foo/a.proto from another
# directory, e.g. bar/b.proto
opt: paths=source_relative,Modpf/optimus/core/v1beta1/runtime.proto=github.com/odpf/optimus/api/proto/odpf/optimus/core/v1beta1,Modpf/optimus/core/v1beta1/project.proto=github.com/odpf/optimus/api/proto/odpf/optimus/core/v1beta1,Modpf/optimus/core/v1beta1/namespace.proto=github.com/odpf/optimus/api/proto/odpf/optimus/core/v1beta1,Modpf/optimus/core/v1beta1/job_spec.proto=github.com/odpf/optimus/api/proto/odpf/optimus/core/v1beta1,Modpf/optimus/core/v1beta1/job_run.proto=github.com/odpf/optimus/api/proto/odpf/optimus/core/v1beta1
opt: paths=source_relative,Modpf/optimus/core/v1beta1/runtime.proto=github.com/odpf/optimus/api/proto/odpf/optimus/core/v1beta1,Modpf/optimus/core/v1beta1/project.proto=github.com/odpf/optimus/api/proto/odpf/optimus/core/v1beta1,Modpf/optimus/core/v1beta1/namespace.proto=github.com/odpf/optimus/api/proto/odpf/optimus/core/v1beta1,Modpf/optimus/core/v1beta1/job_spec.proto=github.com/odpf/optimus/api/proto/odpf/optimus/core/v1beta1,Modpf/optimus/core/v1beta1/job_run.proto=github.com/odpf/optimus/api/proto/odpf/optimus/core/v1beta1,Modpf/optimus/core/v1beta1/status.proto=github.com/odpf/optimus/api/proto/odpf/optimus/core/v1beta1,Modpf/optimus/core/v1beta1/resource.proto=github.com/odpf/optimus/api/proto/odpf/optimus/core/v1beta1
- name: go-grpc
out: api/proto
opt: paths=source_relative,require_unimplemented_servers=true
- name: grpc-gateway
out: api/proto
opt: paths=source_relative
- name: openapiv2
out: api/third_party/openapi
out: api/third_party/openapi
56 changes: 24 additions & 32 deletions ext/scheduler/airflow2/resources/__lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,44 +253,36 @@ def __init__(
self._optimus_client = OptimusAPIClient(optimus_hostname)

def poke(self, context):
log_start_event(context, EVENT_NAMES.get("SENSOR_START_EVENT"))
schedule_time = context['next_execution_date']

try:
log_start_event(context, EVENT_NAMES.get("SENSOR_START_EVENT"))
schedule_time = context['next_execution_date']

# parse relevant metadata from the job metadata to build the task window
# TODO this needs to be updated to use optimus get job spec
try:
upstream_schedule = self.get_schedule_interval(schedule_time)
except Exception as e:
self.log.warning("error while fetching upstream schedule :: {}".format(e))
context[SCHEDULER_ERR_MSG] = "error while fetching upstream schedule :: {}".format(e)
log_failure_event(context)
return False
upstream_schedule = self.get_schedule_interval(schedule_time)
except Exception as e:
self.log.warning("error while fetching upstream schedule :: {}".format(e))
context[SCHEDULER_ERR_MSG] = "error while fetching upstream schedule :: {}".format(e)
return False

last_upstream_schedule_time, _ = self.get_last_upstream_times(
schedule_time, upstream_schedule)
last_upstream_schedule_time, _ = self.get_last_upstream_times(
schedule_time, upstream_schedule)

# get schedule window
task_window = JobSpecTaskWindow(self.window_size, 0, "m", self._optimus_client)
schedule_time_window_start, schedule_time_window_end = task_window.get_schedule_window(
last_upstream_schedule_time.strftime(TIMESTAMP_FORMAT),upstream_schedule)
# get schedule window
task_window = JobSpecTaskWindow(self.window_size, 0, "m", self._optimus_client)
schedule_time_window_start, schedule_time_window_end = task_window.get_schedule_window(
last_upstream_schedule_time.strftime(TIMESTAMP_FORMAT),upstream_schedule)


self.log.info("waiting for upstream runs between: {} - {} schedule times of airflow dag run".format(
schedule_time_window_start, schedule_time_window_end))
self.log.info("waiting for upstream runs between: {} - {} schedule times of airflow dag run".format(
schedule_time_window_start, schedule_time_window_end))

# a = 0/0
if not self._are_all_job_runs_successful(schedule_time_window_start, schedule_time_window_end):
self.log.warning("unable to find enough successful executions for upstream '{}' in "
"'{}' dated between {} and {}(inclusive), rescheduling sensor".
format(self.optimus_job, self.optimus_project, schedule_time_window_start,
schedule_time_window_end))
log_failure_event(context)
return False
return True
except Exception as e :
context[SCHEDULER_ERR_MSG] = "error while sensor poke :: {}".format(e)
log_failure_event(context)
# a = 0/0
if not self._are_all_job_runs_successful(schedule_time_window_start, schedule_time_window_end):
self.log.warning("unable to find enough successful executions for upstream '{}' in "
"'{}' dated between {} and {}(inclusive), rescheduling sensor".
format(self.optimus_job, self.optimus_project, schedule_time_window_start,
schedule_time_window_end))
return False
return True

def get_last_upstream_times(self, schedule_time_of_current_job, upstream_schedule_interval):
second_ahead_of_schedule_time = schedule_time_of_current_job + timedelta(seconds=1)
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ require (
gorm.io/gorm v1.21.16
)

require go.uber.org/multierr v1.7.0

require (
cloud.google.com/go v0.94.0 // indirect
cloud.google.com/go/storage v1.16.1 // indirect
Expand Down

0 comments on commit f7ebafa

Please sign in to comment.