Skip to content

Commit

Permalink
Merge branch 'branch/v15' into bot/backport-46775-branch/v15
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenGravy authored Sep 28, 2024
2 parents ea7063e + 9a298ef commit 765c637
Show file tree
Hide file tree
Showing 68 changed files with 5,800 additions and 2,370 deletions.
52 changes: 52 additions & 0 deletions api/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2732,6 +2732,58 @@ func (c *Client) SearchUnstructuredEvents(ctx context.Context, fromUTC, toUTC ti
return response.Items, response.LastKey, nil
}

// ExportUnstructuredEvents exports events from a given event chunk returned by GetEventExportChunks. This API prioritizes
// performance over ordering and filtering, and is intended for bulk export of events.
func (c *Client) ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured] {
// set up cancelable context so that Stream.Done can close the stream if the caller
// halts early.
ctx, cancel := context.WithCancel(ctx)

events, err := c.grpc.ExportUnstructuredEvents(ctx, req)
if err != nil {
cancel()
return stream.Fail[*auditlogpb.ExportEventUnstructured](trace.Wrap(err))
}

return stream.Func[*auditlogpb.ExportEventUnstructured](func() (*auditlogpb.ExportEventUnstructured, error) {
event, err := events.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
// io.EOF signals that stream has completed successfully
return nil, io.EOF
}
return nil, trace.Wrap(err)
}
return event, nil
}, cancel)
}

// GetEventExportChunks returns a stream of event chunks that can be exported via ExportUnstructuredEvents. The returned
// list isn't ordered and polling for new chunks requires re-consuming the entire stream from the beginning.
func (c *Client) GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk] {
// set up cancelable context so that Stream.Done can close the stream if the caller
// halts early.
ctx, cancel := context.WithCancel(ctx)

chunks, err := c.grpc.GetEventExportChunks(ctx, req)
if err != nil {
cancel()
return stream.Fail[*auditlogpb.EventExportChunk](trace.Wrap(err))
}

return stream.Func[*auditlogpb.EventExportChunk](func() (*auditlogpb.EventExportChunk, error) {
chunk, err := chunks.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
// io.EOF signals that stream has completed successfully
return nil, io.EOF
}
return nil, trace.Wrap(err)
}
return chunk, nil
}, cancel)
}

// StreamUnstructuredSessionEvents streams audit events from a given session recording in an unstructured format.
// This method is used by the Teleport event-handler plugin to receive events
// from the auth server wihout having to support the Protobuf event schema.
Expand Down
446 changes: 384 additions & 62 deletions api/gen/proto/go/teleport/auditlog/v1/auditlog.pb.go

Large diffs are not rendered by default.

136 changes: 136 additions & 0 deletions api/gen/proto/go/teleport/auditlog/v1/auditlog_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions api/proto/teleport/auditlog/v1/auditlog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ service AuditLogService {
// GetUnstructuredEvents gets events from the audit log in an unstructured format.
// This endpoint is used by the event handler to retrieve the events as JSON.
rpc GetUnstructuredEvents(GetUnstructuredEventsRequest) returns (EventsUnstructured);
// ExportUnstructuredEvents exports events from a given event chunk returned by GetEventExportChunks. This API prioritizes
// performance over ordering and filtering, and is intended for bulk export of events.
rpc ExportUnstructuredEvents(ExportUnstructuredEventsRequest) returns (stream ExportEventUnstructured);
// GetEventExportChunks returns a stream of event chunks that can be exported via ExportUnstructuredEvents. The returned
// list isn't ordered and polling for new chunks requires re-consuming the entire stream from the beginning.
rpc GetEventExportChunks(GetEventExportChunksRequest) returns (stream EventExportChunk);
}

// StreamUnstructuredSessionEventsRequest is a request containing data needed to fetch a session recording.
Expand Down Expand Up @@ -77,6 +83,25 @@ message EventsUnstructured {
string last_key = 2;
}

// ExportUnstructuredEventsRequest is a request with the needed data to export events.
message ExportUnstructuredEventsRequest {
// date is the target date from which to export events. note that only the UTC date of the
// timestamp value is used. use of a specific local timestamp may produce confusing results.
google.protobuf.Timestamp date = 1;
// chunk is the chunk to export events from.
string chunk = 2;
// cursor is an optional mechanism to resume interrupted streams for a given chunk.
string cursor = 3;
}

// ExportEventUnstructured is the stream item of the ExportUnstructuredEvents method.
message ExportEventUnstructured {
// event is the unstructured representation of the event payload.
EventUnstructured event = 1;
// cursor is the cursor to resume the stream after this point.
string cursor = 2;
}

// EventUnstructured represents a single events.AuditEvent in an unstructured format.
message EventUnstructured {
// type is the type of the event.
Expand All @@ -92,3 +117,15 @@ message EventUnstructured {
// unstructured is the unstructured representation of the event payload.
google.protobuf.Struct unstructured = 5;
}

// GetEventExportChunksRequest is used to request the next set of event chunks to export.
message GetEventExportChunksRequest {
// date is the date for which to list export shards.
google.protobuf.Timestamp date = 1;
}

// EventExportChunk represents a chunk of events to export.
message EventExportChunk {
// chunk is the chunk to export.
string chunk = 1;
}
Loading

0 comments on commit 765c637

Please sign in to comment.