Skip to content

Commit

Permalink
feat(server): Add tracing to EventIngester (#416)
Browse files Browse the repository at this point in the history
* feat(server): Add tracing to EventIngester

* feat(server): Add tracing to EventIngester

* fix: linting

* Bump beacon

* Bump beacon
  • Loading branch information
samcm authored Dec 2, 2024
1 parent dedce7f commit 01360be
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 35 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/chuckpreslar/emission v0.0.0-20170206194824-a7ddd980baf9
github.com/creasty/defaults v1.7.0
github.com/ethereum/go-ethereum v1.14.10
github.com/ethpandaops/beacon v0.42.0
github.com/ethpandaops/beacon v0.45.0
github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756
github.com/ethpandaops/ethwallclock v0.3.0
github.com/ferranbt/fastssz v0.1.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ github.com/ethereum/go-ethereum v1.14.10 h1:kC24WjYeRjDy86LVo6MfF5Xs7nnUu+XG4Aja
github.com/ethereum/go-ethereum v1.14.10/go.mod h1:+l/fr42Mma+xBnhefL/+z11/hcmJ2egl+ScIVPjhc7E=
github.com/ethereum/go-verkle v0.1.1-0.20240829091221-dffa7562dbe9 h1:8NfxH2iXvJ60YRB8ChToFTUzl8awsc3cJ8CbLjGIl/A=
github.com/ethereum/go-verkle v0.1.1-0.20240829091221-dffa7562dbe9/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk=
github.com/ethpandaops/beacon v0.42.0 h1:5a3ld5wuAgX+N5KxEPuNfxDhdeiBG4gXlTAgCm0AuSE=
github.com/ethpandaops/beacon v0.42.0/go.mod h1:hKfalJGsF4BuWPwcGCX/4fdQR31zDJVaTLWwrkfNTzw=
github.com/ethpandaops/beacon v0.45.0 h1:6nMdKYFFNIDhX4hfmNx4Q4OeNRgR2BGNREVgqO+Q33s=
github.com/ethpandaops/beacon v0.45.0/go.mod h1:hKfalJGsF4BuWPwcGCX/4fdQR31zDJVaTLWwrkfNTzw=
github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756 h1:8JWjrRfP14m0oxOk03m11n/xgdY5ceyUf/ZxYdOs5gE=
github.com/ethpandaops/ethcore v0.0.0-20240422023000-2a5727b18756/go.mod h1:ZvKqL6CKxiraefdXPHeJurV2pDD/f2HF2uklDVdrry8=
github.com/ethpandaops/ethwallclock v0.3.0 h1:xF5fwtBf+bHFHZKBnwiPFEuelW3sMM7SD3ZNFq1lJY4=
Expand Down
8 changes: 3 additions & 5 deletions pkg/cannon/cannon.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,8 +700,6 @@ func (c *Cannon) startDeriverWhenReady(ctx context.Context, d deriver.EventDeriv
continue
}

slot := c.beacon.Node().Wallclock().Slots().Current()

fork, err := spec.ForkEpochs.GetByName(d.ActivationFork())
if err != nil {
c.log.WithError(err).Errorf("unknown activation fork: %s", d.ActivationFork())
Expand All @@ -713,10 +711,10 @@ func (c *Cannon) startDeriverWhenReady(ctx context.Context, d deriver.EventDeriv
continue
}

if !fork.Active(phase0.Slot(slot.Number()), spec.SlotsPerEpoch) {
// Sleep until the next epochl and then retrty
currentEpoch := c.beacon.Metadata().Wallclock().Epochs().Current()
currentEpoch := c.beacon.Metadata().Wallclock().Epochs().Current()

if !fork.Active(phase0.Epoch(currentEpoch.Number())) {
// Sleep until the next epochl and then retrty
activationForkEpoch := c.beacon.Node().Wallclock().Epochs().FromNumber(uint64(fork.Epoch))

sleepFor := time.Until(activationForkEpoch.TimeWindow().End())
Expand Down
6 changes: 3 additions & 3 deletions pkg/server/service/event-ingester/auth/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ func TestAuthorization_MultipleFilters(t *testing.T) {
t.Fatalf("Failed to get user and group: %v", err)
}

got, err := authorization.FilterEvents(user, group, tt.input)
got, err := authorization.FilterEvents(context.Background(), user, group, tt.input)
if err != nil {
t.Fatalf("Failed to filter events: %v", err)
}
Expand Down Expand Up @@ -597,7 +597,7 @@ func TestAuthorization_NoFilter(t *testing.T) {
t.Fatalf("Failed to get user and group: %v", err)
}

got, err := authorization.FilterEvents(user, group, tt.input)
got, err := authorization.FilterEvents(context.Background(), user, group, tt.input)
if err != nil {
t.Fatalf("Failed to filter events: %v", err)
}
Expand Down Expand Up @@ -721,7 +721,7 @@ func TestAuthorization_FilterAndRedactEvents(t *testing.T) {
t.Fatalf("Failed to get user and group: %v", err)
}

got, err := authorization.FilterAndRedactEvents(user, group, tt.input)
got, err := authorization.FilterAndRedactEvents(context.Background(), user, group, tt.input)
if err != nil {
t.Fatalf("Failed to filter events: %v", err)
}
Expand Down
48 changes: 40 additions & 8 deletions pkg/server/service/event-ingester/auth/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ import (
"fmt"
"strings"

"github.com/ethpandaops/xatu/pkg/observability"
"github.com/ethpandaops/xatu/pkg/proto/xatu"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

type AuthorizationConfig struct {
Expand Down Expand Up @@ -147,43 +150,72 @@ func (a *Authorization) GetUserAndGroup(username string) (*User, *Group, error)
return nil, nil, fmt.Errorf("user %s not found", username)
}

func (a *Authorization) FilterEvents(user *User, group *Group, events []*xatu.DecoratedEvent) ([]*xatu.DecoratedEvent, error) {
func (a *Authorization) FilterEvents(ctx context.Context, user *User, group *Group, events []*xatu.DecoratedEvent) ([]*xatu.DecoratedEvent, error) {
ctx, span := observability.Tracer().Start(ctx,
"Auth/Authorization.FilterEvents",
trace.WithAttributes(
attribute.Int64("events", int64(len(events))),
attribute.String("user", user.Username()),
attribute.String("group", group.Name()),
),
)
defer span.End()

if !a.enabled {
return events, nil
}

// Filter events for the user first since they're the most restrictive
filteredUserEvents, err := user.ApplyFilter(events)
filteredUserEvents, err := user.ApplyFilter(ctx, events)
if err != nil {
return nil, fmt.Errorf("failed to filter events for user %s: %w", user.Username(), err)
}

// Then filter events for the group
filteredGroupEvents, err := group.ApplyFilter(filteredUserEvents)
filteredGroupEvents, err := group.ApplyFilter(ctx, filteredUserEvents)
if err != nil {
return nil, fmt.Errorf("failed to filter events for group %s: %w", group.Name(), err)
}

return filteredGroupEvents, nil
}

func (a *Authorization) RedactEvents(group *Group, events []*xatu.DecoratedEvent) ([]*xatu.DecoratedEvent, error) {
redactedEvents, err := group.ApplyRedacter(events)
func (a *Authorization) RedactEvents(ctx context.Context, group *Group, events []*xatu.DecoratedEvent) ([]*xatu.DecoratedEvent, error) {
ctx, span := observability.Tracer().Start(ctx,
"Auth/Authorization.RedactEvents",
trace.WithAttributes(
attribute.Int64("events", int64(len(events))),
attribute.String("group", group.Name()),
),
)
defer span.End()

redactedEvents, err := group.ApplyRedacter(ctx, events)
if err != nil {
return nil, fmt.Errorf("failed to redact events for group %s: %w", group.Name(), err)
}

return redactedEvents, nil
}

func (a *Authorization) FilterAndRedactEvents(user *User, group *Group, events []*xatu.DecoratedEvent) ([]*xatu.DecoratedEvent, error) {
func (a *Authorization) FilterAndRedactEvents(ctx context.Context, user *User, group *Group, events []*xatu.DecoratedEvent) ([]*xatu.DecoratedEvent, error) {
ctx, span := observability.Tracer().Start(ctx,
"Authorization.FilterAndRedactEvents",
trace.WithAttributes(
attribute.Int64("events", int64(len(events))),
attribute.String("user", user.Username()),
attribute.String("group", group.Name()),
),
)
defer span.End()

// Filter first to save on processing
filteredEvents, err := a.FilterEvents(user, group, events)
filteredEvents, err := a.FilterEvents(ctx, user, group, events)
if err != nil {
return nil, fmt.Errorf("failed to filter events: %w", err)
}

redactedEvents, err := a.RedactEvents(group, filteredEvents)
redactedEvents, err := a.RedactEvents(ctx, group, filteredEvents)
if err != nil {
return nil, fmt.Errorf("failed to redact events: %w", err)
}
Expand Down
19 changes: 17 additions & 2 deletions pkg/server/service/event-ingester/auth/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"context"
"fmt"

"github.com/ethpandaops/xatu/pkg/observability"
"github.com/ethpandaops/xatu/pkg/proto/xatu"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

type GroupsConfig map[string]GroupConfig
Expand Down Expand Up @@ -163,11 +166,17 @@ func (g *Group) EventFilter() xatu.EventFilter {
return g.eventFilter
}

func (g *Group) ApplyFilter(events []*xatu.DecoratedEvent) ([]*xatu.DecoratedEvent, error) {
func (g *Group) ApplyFilter(ctx context.Context, events []*xatu.DecoratedEvent) ([]*xatu.DecoratedEvent, error) {
if g.eventFilter == nil {
return events, nil
}

_, span := observability.Tracer().Start(ctx,
"Auth/Group.ApplyFilter",
trace.WithAttributes(attribute.Int64("events", int64(len(events)))),
)
defer span.End()

filteredEvents := make([]*xatu.DecoratedEvent, 0)

for _, event := range events {
Expand All @@ -186,7 +195,13 @@ func (g *Group) ApplyFilter(events []*xatu.DecoratedEvent) ([]*xatu.DecoratedEve
return filteredEvents, nil
}

func (g *Group) ApplyRedacter(events []*xatu.DecoratedEvent) ([]*xatu.DecoratedEvent, error) {
func (g *Group) ApplyRedacter(ctx context.Context, events []*xatu.DecoratedEvent) ([]*xatu.DecoratedEvent, error) {
_, span := observability.Tracer().Start(ctx,
"Auth/Group.ApplyRedacter",
trace.WithAttributes(attribute.Int64("events", int64(len(events)))),
)
defer span.End()

if g.redacter == nil {
return events, nil
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/server/service/event-ingester/auth/user.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package auth

import (
"context"
"fmt"
"strings"

"github.com/ethpandaops/xatu/pkg/observability"
"github.com/ethpandaops/xatu/pkg/proto/xatu"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

type UserConfig struct {
Expand Down Expand Up @@ -84,11 +88,17 @@ func (u *Users) GetUser(username string) (*User, bool) {
return user, ok
}

func (u *User) ApplyFilter(events []*xatu.DecoratedEvent) ([]*xatu.DecoratedEvent, error) {
func (u *User) ApplyFilter(ctx context.Context, events []*xatu.DecoratedEvent) ([]*xatu.DecoratedEvent, error) {
if u.eventFilter == nil {
return events, nil
}

_, span := observability.Tracer().Start(ctx,
"User.ApplyFilter",
trace.WithAttributes(attribute.Int64("events", int64(len(events)))),
)
defer span.End()

filteredEvents := make([]*xatu.DecoratedEvent, 0)

for _, event := range events {
Expand Down
53 changes: 46 additions & 7 deletions pkg/server/service/event-ingester/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@ import (
"strings"
"time"

"github.com/ethpandaops/xatu/pkg/observability"
"github.com/ethpandaops/xatu/pkg/proto/xatu"
"github.com/ethpandaops/xatu/pkg/server/geoip"
"github.com/ethpandaops/xatu/pkg/server/service/event-ingester/auth"
eventHandler "github.com/ethpandaops/xatu/pkg/server/service/event-ingester/event"
"github.com/ethpandaops/xatu/pkg/server/store"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
ocodes "go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/protobuf/types/known/timestamppb"
Expand Down Expand Up @@ -44,6 +48,11 @@ func NewHandler(log logrus.FieldLogger, clockDrift *time.Duration, geoipProvider

//nolint:gocyclo // Needs refactor
func (h *Handler) Events(ctx context.Context, events []*xatu.DecoratedEvent, user *auth.User, group *auth.Group) ([]*xatu.DecoratedEvent, error) {
ctx, span := observability.Tracer().Start(ctx,
"EventIngester.Events",
)
defer span.End()

groupName := "unknown"
if group != nil {
groupName = group.Name()
Expand All @@ -63,7 +72,7 @@ func (h *Handler) Events(ctx context.Context, events []*xatu.DecoratedEvent, use

// Redact the events. Redacting is done before and after the event is processed to ensure that the field is not leaked by processing such as geoip lookups.
if group != nil {
redactedEvents, err := group.ApplyRedacter(events)
redactedEvents, err := group.ApplyRedacter(ctx, events)
if err != nil {
return nil, fmt.Errorf("failed to apply group redacter: %w", err)
}
Expand Down Expand Up @@ -142,6 +151,11 @@ func (h *Handler) Events(ctx context.Context, events []*xatu.DecoratedEvent, use
}
}

ctx, span = observability.Tracer().Start(ctx,
"EventIngester.routeEvents",
trace.WithAttributes(attribute.Int64("events", int64(len(events)))),
)

handlerFilteredEvents := make([]*xatu.DecoratedEvent, 0)
// Route the events to the correct handler
for _, event := range events {
Expand All @@ -157,12 +171,18 @@ func (h *Handler) Events(ctx context.Context, events []*xatu.DecoratedEvent, use

e, err := h.eventRouter.Route(eventHandler.Type(eventName), event)
if err != nil {
span.SetStatus(ocodes.Error, err.Error())
span.End()

h.log.WithError(err).WithField("event", eventName).Warn("failed to create event handler")

return nil, fmt.Errorf("failed to create event for %s event handler: %w ", eventName, err)
}

if err := e.Validate(ctx); err != nil {
span.SetStatus(ocodes.Error, err.Error())
span.End()

h.log.WithError(err).WithField("event", eventName).Warn("failed to validate event")

return nil, fmt.Errorf("%s event failed validation: %w", eventName, err)
Expand Down Expand Up @@ -204,11 +224,14 @@ func (h *Handler) Events(ctx context.Context, events []*xatu.DecoratedEvent, use
handlerFilteredEvents = append(handlerFilteredEvents, event)
}

// End the routeEvents span
span.End()

filteredEvents = handlerFilteredEvents

// Redact the events again
if group != nil {
redactedEvents, err := group.ApplyRedacter(filteredEvents)
redactedEvents, err := group.ApplyRedacter(ctx, filteredEvents)
if err != nil {
return nil, fmt.Errorf("failed to apply group redacter: %w", err)
}
Expand All @@ -219,28 +242,44 @@ func (h *Handler) Events(ctx context.Context, events []*xatu.DecoratedEvent, use
return filteredEvents, nil
}

func (h *Handler) filterEvents(_ context.Context, events []*xatu.DecoratedEvent, user *auth.User, group *auth.Group) ([]*xatu.DecoratedEvent, error) {
func (h *Handler) filterEvents(ctx context.Context, events []*xatu.DecoratedEvent, user *auth.User, group *auth.Group) ([]*xatu.DecoratedEvent, error) {
_, span := observability.Tracer().Start(ctx,
"EventIngester.filterEvents",
trace.WithAttributes(attribute.Int64("before_filtering", int64(len(events)))),
)
defer span.End()

filteredEvents := events

// Apply the user filter
if user != nil {
ev, err := user.ApplyFilter(filteredEvents)
ev, err := user.ApplyFilter(ctx, filteredEvents)
if err != nil {
return nil, fmt.Errorf("failed to apply user filter: %w", err)
errMsg := errors.New("failed to apply user filter")

span.SetStatus(ocodes.Error, errMsg.Error())

return nil, fmt.Errorf("%s: %w", errMsg.Error(), err)
}

filteredEvents = ev
}

// Apply the group filter
if group != nil {
ev, err := group.ApplyFilter(filteredEvents)
ev, err := group.ApplyFilter(ctx, filteredEvents)
if err != nil {
return nil, fmt.Errorf("failed to apply group filter: %w", err)
errMsg := errors.New("failed to apply group filter")

span.SetStatus(ocodes.Error, errMsg.Error())

return nil, fmt.Errorf("%s: %w", errMsg.Error(), err)
}

filteredEvents = ev
}

span.SetAttributes(attribute.Int64("after_filtering", int64(len(filteredEvents))))

return filteredEvents, nil
}
Loading

0 comments on commit 01360be

Please sign in to comment.