Skip to content

Commit

Permalink
Add alert lifecycle observer
Browse files Browse the repository at this point in the history
Signed-off-by: Emmanuel Lodovice <lodovice@amazon.com>
  • Loading branch information
emanlodovice authored and alanprot committed Dec 10, 2024
1 parent bff3f7b commit 881f002
Show file tree
Hide file tree
Showing 11 changed files with 514 additions and 64 deletions.
36 changes: 36 additions & 0 deletions alertobserver/alertobserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2023 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package alertobserver

import (
"github.com/prometheus/alertmanager/types"
)

const (
EventAlertReceived string = "received"
EventAlertRejected string = "rejected"
EventAlertAddedToAggrGroup string = "addedAggrGroup"
EventAlertFailedAddToAggrGroup string = "failedAddAggrGroup"
EventAlertPipelineStart string = "pipelineStart"
EventAlertPipelinePassStage string = "pipelinePassStage"
EventAlertMuted string = "muted"
EventAlertSent string = "sent"
EventAlertSendFailed string = "sendFailed"
)

type AlertEventMeta map[string]interface{}

type LifeCycleObserver interface {
Observe(event string, alerts []*types.Alert, meta AlertEventMeta)
}
46 changes: 46 additions & 0 deletions alertobserver/testing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2023 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package alertobserver

import (
"sync"

"github.com/prometheus/alertmanager/types"
)

type FakeLifeCycleObserver struct {
AlertsPerEvent map[string][]*types.Alert
PipelineStageAlerts map[string][]*types.Alert
MetaPerEvent map[string][]AlertEventMeta
Mtx sync.RWMutex
}

func (o *FakeLifeCycleObserver) Observe(event string, alerts []*types.Alert, meta AlertEventMeta) {
o.Mtx.Lock()
defer o.Mtx.Unlock()
if event == EventAlertPipelinePassStage {
o.PipelineStageAlerts[meta["stageName"].(string)] = append(o.PipelineStageAlerts[meta["stageName"].(string)], alerts...)
} else {
o.AlertsPerEvent[event] = append(o.AlertsPerEvent[event], alerts...)
}
o.MetaPerEvent[event] = append(o.MetaPerEvent[event], meta)
}

func NewFakeLifeCycleObserver() *FakeLifeCycleObserver {
return &FakeLifeCycleObserver{
PipelineStageAlerts: map[string][]*types.Alert{},
AlertsPerEvent: map[string][]*types.Alert{},
MetaPerEvent: map[string][]AlertEventMeta{},
}
}
5 changes: 5 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"

"github.com/prometheus/alertmanager/alertobserver"
apiv2 "github.com/prometheus/alertmanager/api/v2"
"github.com/prometheus/alertmanager/cluster"
"github.com/prometheus/alertmanager/config"
Expand Down Expand Up @@ -81,6 +82,9 @@ type Options struct {
GroupInfoFunc func(func(*dispatch.Route) bool) dispatch.AlertGroupInfos
// APICallback define the callback function that each api call will perform before returned.
APICallback callback.Callback
// AlertLCObserver is used to add hooks to the different alert life cycle events.
// If nil then no observer methods will be invoked in the life cycle events.
AlertLCObserver alertobserver.LifeCycleObserver
}

func (o Options) validate() error {
Expand Down Expand Up @@ -127,6 +131,7 @@ func New(opts Options) (*API, error) {
opts.Peer,
log.With(l, "version", "v2"),
opts.Registry,
opts.AlertLCObserver,
)
if err != nil {
return nil, err
Expand Down
19 changes: 17 additions & 2 deletions api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
alertgroupinfolist_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alertgroupinfolist"
"github.com/prometheus/alertmanager/util/callback"

"github.com/prometheus/alertmanager/alertobserver"
"github.com/prometheus/alertmanager/api/metrics"
open_api_models "github.com/prometheus/alertmanager/api/v2/models"
"github.com/prometheus/alertmanager/api/v2/restapi"
Expand Down Expand Up @@ -77,8 +78,9 @@ type API struct {
route *dispatch.Route
setAlertStatus setAlertStatusFn

logger log.Logger
m *metrics.Alerts
logger log.Logger
m *metrics.Alerts
alertLCObserver alertobserver.LifeCycleObserver

Handler http.Handler
}
Expand All @@ -101,6 +103,7 @@ func NewAPI(
peer cluster.ClusterPeer,
l log.Logger,
r prometheus.Registerer,
o alertobserver.LifeCycleObserver,
) (*API, error) {
if apiCallback == nil {
apiCallback = callback.NoopAPICallback{}
Expand All @@ -116,6 +119,7 @@ func NewAPI(
logger: l,
m: metrics.NewAlerts(r),
uptime: time.Now(),
alertLCObserver: o,
}

// Load embedded swagger file.
Expand Down Expand Up @@ -403,19 +407,30 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.
if err := a.Validate(); err != nil {
validationErrs.Add(err)
api.m.Invalid().Inc()
if api.alertLCObserver != nil {
m := alertobserver.AlertEventMeta{"msg": err.Error()}
api.alertLCObserver.Observe(alertobserver.EventAlertRejected, []*types.Alert{a}, m)
}
continue
}
validAlerts = append(validAlerts, a)
}
if err := api.alerts.Put(validAlerts...); err != nil {
level.Error(logger).Log("msg", "Failed to create alerts", "err", err)
if api.alertLCObserver != nil {
m := alertobserver.AlertEventMeta{"msg": err.Error()}
api.alertLCObserver.Observe(alertobserver.EventAlertRejected, validAlerts, m)
}
return alert_ops.NewPostAlertsInternalServerError().WithPayload(err.Error())
}

if validationErrs.Len() > 0 {
level.Error(logger).Log("msg", "Failed to validate alerts", "err", validationErrs.Error())
return alert_ops.NewPostAlertsBadRequest().WithPayload(validationErrs.Error())
}
if api.alertLCObserver != nil {
api.alertLCObserver.Observe(alertobserver.EventAlertReceived, validAlerts, alertobserver.AlertEventMeta{})
}

return alert_ops.NewPostAlertsOK()
}
Expand Down
65 changes: 65 additions & 0 deletions api/v2/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"bytes"
"encoding/json"
"fmt"

Check failure on line 19 in api/v2/api_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not `goimports`-ed with -local github.com/prometheus/alertmanager (goimports)
"github.com/prometheus/client_golang/prometheus"

Check failure on line 20 in api/v2/api_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)

"io"
"net/http"
"net/http/httptest"
Expand All @@ -29,6 +31,8 @@ import (
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/prometheus/alertmanager/alertobserver"
"github.com/prometheus/alertmanager/api/metrics"
alert_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alert"
alertgroup_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alertgroup"
alertgroupinfolist_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alertgroupinfolist"
Expand Down Expand Up @@ -1123,6 +1127,67 @@ func TestListAlertInfosHandler(t *testing.T) {
}
}

func TestPostAlertHandler(t *testing.T) {
now := time.Now()
for i, tc := range []struct {
start, end time.Time
err bool
code int
}{
{time.Time{}, time.Time{}, false, 200},
{now, time.Time{}, false, 200},
{time.Time{}, now.Add(time.Duration(-1) * time.Second), false, 200},
{time.Time{}, now, false, 200},
{time.Time{}, now.Add(time.Duration(1) * time.Second), false, 200},
{now.Add(time.Duration(-2) * time.Second), now.Add(time.Duration(-1) * time.Second), false, 200},
{now.Add(time.Duration(1) * time.Second), now.Add(time.Duration(2) * time.Second), false, 200},
{now.Add(time.Duration(1) * time.Second), now, false, 400},
} {
alerts, alertsBytes := createAlert(t, tc.start, tc.end)
api := API{
uptime: time.Now(),
alerts: newFakeAlerts([]*types.Alert{}),
logger: log.NewNopLogger(),
m: metrics.NewAlerts(prometheus.NewRegistry()),
}
api.Update(&config.Config{
Global: &config.GlobalConfig{
ResolveTimeout: model.Duration(5),
},
Route: &config.Route{},
}, nil)

r, err := http.NewRequest("POST", "/api/v2/alerts", bytes.NewReader(alertsBytes))
require.NoError(t, err)

w := httptest.NewRecorder()
p := runtime.TextProducer()
responder := api.postAlertsHandler(alert_ops.PostAlertsParams{
HTTPRequest: r,
Alerts: alerts,
})
responder.WriteResponse(w, p)
body, _ := io.ReadAll(w.Result().Body)

require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, response: %s", i, string(body)))

observer := alertobserver.NewFakeLifeCycleObserver()
api.alertLCObserver = observer
r, err = http.NewRequest("POST", "/api/v2/alerts", bytes.NewReader(alertsBytes))
require.NoError(t, err)
api.postAlertsHandler(alert_ops.PostAlertsParams{
HTTPRequest: r,
Alerts: alerts,
})
amAlert := OpenAPIAlertsToAlerts(alerts)
if tc.code == 200 {
require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertReceived][0].Fingerprint(), amAlert[0].Fingerprint())
} else {
require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertRejected][0].Fingerprint(), amAlert[0].Fingerprint())
}
}
}

type limitNumberOfAlertsReturnedCallback struct {
limit int
}
Expand Down
20 changes: 20 additions & 0 deletions api/v2/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,23 @@ func newGetAlertStatus(f *fakeAlerts) func(model.Fingerprint) types.AlertStatus
return status
}
}

func createAlert(t *testing.T, start, ends time.Time) (open_api_models.PostableAlerts, []byte) {
startsAt := strfmt.DateTime(start)
endsAt := strfmt.DateTime(ends)

alert := open_api_models.PostableAlert{
StartsAt: startsAt,
EndsAt: endsAt,
Annotations: open_api_models.LabelSet{"annotation1": "some text"},
Alert: open_api_models.Alert{
Labels: open_api_models.LabelSet{"label1": "test1"},
GeneratorURL: "http://localhost:3000",
},
}
alerts := open_api_models.PostableAlerts{}
alerts = append(alerts, &alert)
b, err := json.Marshal(alerts)
require.NoError(t, err)
return alerts, b
}
3 changes: 2 additions & 1 deletion cmd/alertmanager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ func run() int {
intervener,
notificationLog,
pipelinePeer,
nil,
)

configuredReceivers.Set(float64(len(activeReceivers)))
Expand All @@ -448,7 +449,7 @@ func run() int {
silencer.Mutes(labels)
})

disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, nil, logger, dispMetrics)
disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, nil, logger, dispMetrics, nil)
routes.Walk(func(r *dispatch.Route) {
if r.RouteOpts.RepeatInterval > *retention {
level.Warn(configLogger).Log(
Expand Down
Loading

0 comments on commit 881f002

Please sign in to comment.