From 106d2bae7ec18b4d7c538ff4e0734e50327e64f0 Mon Sep 17 00:00:00 2001 From: Forrest <30576607+fspmarshall@users.noreply.github.com> Date: Thu, 19 Sep 2024 12:32:14 -0700 Subject: [PATCH] bulk audit event export api (#46399) --- api/client/client.go | 52 ++ .../go/teleport/auditlog/v1/auditlog.pb.go | 446 +++++++++++++++--- .../teleport/auditlog/v1/auditlog_grpc.pb.go | 136 ++++++ api/proto/teleport/auditlog/v1/auditlog.proto | 37 ++ lib/auth/auth_with_roles.go | 21 + lib/auth/grpcserver.go | 42 +- lib/events/api.go | 10 + lib/events/athena/athena.go | 12 + lib/events/athena/consumer.go | 9 +- lib/events/athena/integration_test.go | 28 ++ lib/events/athena/querier.go | 239 ++++++++++ lib/events/athena/test.go | 24 + lib/events/athena/types.go | 12 + lib/events/auditlog.go | 19 +- lib/events/discard.go | 10 + lib/events/dynamoevents/dynamoevents.go | 10 + lib/events/filelog.go | 10 + lib/events/firestoreevents/firestoreevents.go | 10 + lib/events/multilog.go | 59 +++ lib/events/pgevents/pgevents.go | 10 + lib/events/test/suite.go | 126 ++++- lib/events/writer.go | 10 + .../externalauditstorage/error_counter.go | 16 + tool/tctl/common/loadtest_command.go | 128 ++++- 24 files changed, 1409 insertions(+), 67 deletions(-) diff --git a/api/client/client.go b/api/client/client.go index efd1b04d7d11..3ccc0f6a723c 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -2425,6 +2425,58 @@ func (c *Client) searchUnstructuredEventsFallback(ctx context.Context, fromUTC, return items, next, nil } +// ExportUnstructuredEvents exports events from a given event chunk returned by GetEventExportChunks. This API prioritizes +// performance over ordering and filtering, and is intended for bulk export of events. +func (c *Client) ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured] { + // set up cancelable context so that Stream.Done can close the stream if the caller + // halts early. + ctx, cancel := context.WithCancel(ctx) + + events, err := c.grpc.ExportUnstructuredEvents(ctx, req) + if err != nil { + cancel() + return stream.Fail[*auditlogpb.ExportEventUnstructured](trace.Wrap(err)) + } + + return stream.Func[*auditlogpb.ExportEventUnstructured](func() (*auditlogpb.ExportEventUnstructured, error) { + event, err := events.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + // io.EOF signals that stream has completed successfully + return nil, io.EOF + } + return nil, trace.Wrap(err) + } + return event, nil + }, cancel) +} + +// GetEventExportChunks returns a stream of event chunks that can be exported via ExportUnstructuredEvents. The returned +// list isn't ordered and polling for new chunks requires re-consuming the entire stream from the beginning. +func (c *Client) GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk] { + // set up cancelable context so that Stream.Done can close the stream if the caller + // halts early. + ctx, cancel := context.WithCancel(ctx) + + chunks, err := c.grpc.GetEventExportChunks(ctx, req) + if err != nil { + cancel() + return stream.Fail[*auditlogpb.EventExportChunk](trace.Wrap(err)) + } + + return stream.Func[*auditlogpb.EventExportChunk](func() (*auditlogpb.EventExportChunk, error) { + chunk, err := chunks.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + // io.EOF signals that stream has completed successfully + return nil, io.EOF + } + return nil, trace.Wrap(err) + } + return chunk, nil + }, cancel) +} + // StreamUnstructuredSessionEvents streams audit events from a given session recording in an unstructured format. // This method is used by the Teleport event-handler plugin to receive events // from the auth server wihout having to support the Protobuf event schema. diff --git a/api/gen/proto/go/teleport/auditlog/v1/auditlog.pb.go b/api/gen/proto/go/teleport/auditlog/v1/auditlog.pb.go index cde283dc2db1..f3d9547c4485 100644 --- a/api/gen/proto/go/teleport/auditlog/v1/auditlog.pb.go +++ b/api/gen/proto/go/teleport/auditlog/v1/auditlog.pb.go @@ -308,6 +308,132 @@ func (x *EventsUnstructured) GetLastKey() string { return "" } +// ExportUnstructuredEventsRequest is a request with the needed data to export events. +type ExportUnstructuredEventsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // date is the target date from which to export events. note that only the UTC date of the + // timestamp value is used. use of a specific local timestamp may produce confusing results. + Date *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=date,proto3" json:"date,omitempty"` + // chunk is the chunk to export events from. + Chunk string `protobuf:"bytes,2,opt,name=chunk,proto3" json:"chunk,omitempty"` + // cursor is an optional mechanism to resume interrupted streams for a given chunk. + Cursor string `protobuf:"bytes,3,opt,name=cursor,proto3" json:"cursor,omitempty"` +} + +func (x *ExportUnstructuredEventsRequest) Reset() { + *x = ExportUnstructuredEventsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_teleport_auditlog_v1_auditlog_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ExportUnstructuredEventsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ExportUnstructuredEventsRequest) ProtoMessage() {} + +func (x *ExportUnstructuredEventsRequest) ProtoReflect() protoreflect.Message { + mi := &file_teleport_auditlog_v1_auditlog_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ExportUnstructuredEventsRequest.ProtoReflect.Descriptor instead. +func (*ExportUnstructuredEventsRequest) Descriptor() ([]byte, []int) { + return file_teleport_auditlog_v1_auditlog_proto_rawDescGZIP(), []int{3} +} + +func (x *ExportUnstructuredEventsRequest) GetDate() *timestamppb.Timestamp { + if x != nil { + return x.Date + } + return nil +} + +func (x *ExportUnstructuredEventsRequest) GetChunk() string { + if x != nil { + return x.Chunk + } + return "" +} + +func (x *ExportUnstructuredEventsRequest) GetCursor() string { + if x != nil { + return x.Cursor + } + return "" +} + +// ExportEventUnstructured is the stream item of the ExportUnstructuredEvents method. +type ExportEventUnstructured struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // event is the unstructured representation of the event payload. + Event *EventUnstructured `protobuf:"bytes,1,opt,name=event,proto3" json:"event,omitempty"` + // cursor is the cursor to resume the stream after this point. + Cursor string `protobuf:"bytes,2,opt,name=cursor,proto3" json:"cursor,omitempty"` +} + +func (x *ExportEventUnstructured) Reset() { + *x = ExportEventUnstructured{} + if protoimpl.UnsafeEnabled { + mi := &file_teleport_auditlog_v1_auditlog_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ExportEventUnstructured) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ExportEventUnstructured) ProtoMessage() {} + +func (x *ExportEventUnstructured) ProtoReflect() protoreflect.Message { + mi := &file_teleport_auditlog_v1_auditlog_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ExportEventUnstructured.ProtoReflect.Descriptor instead. +func (*ExportEventUnstructured) Descriptor() ([]byte, []int) { + return file_teleport_auditlog_v1_auditlog_proto_rawDescGZIP(), []int{4} +} + +func (x *ExportEventUnstructured) GetEvent() *EventUnstructured { + if x != nil { + return x.Event + } + return nil +} + +func (x *ExportEventUnstructured) GetCursor() string { + if x != nil { + return x.Cursor + } + return "" +} + // EventUnstructured represents a single events.AuditEvent in an unstructured format. type EventUnstructured struct { state protoimpl.MessageState @@ -331,7 +457,7 @@ type EventUnstructured struct { func (x *EventUnstructured) Reset() { *x = EventUnstructured{} if protoimpl.UnsafeEnabled { - mi := &file_teleport_auditlog_v1_auditlog_proto_msgTypes[3] + mi := &file_teleport_auditlog_v1_auditlog_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -344,7 +470,7 @@ func (x *EventUnstructured) String() string { func (*EventUnstructured) ProtoMessage() {} func (x *EventUnstructured) ProtoReflect() protoreflect.Message { - mi := &file_teleport_auditlog_v1_auditlog_proto_msgTypes[3] + mi := &file_teleport_auditlog_v1_auditlog_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -357,7 +483,7 @@ func (x *EventUnstructured) ProtoReflect() protoreflect.Message { // Deprecated: Use EventUnstructured.ProtoReflect.Descriptor instead. func (*EventUnstructured) Descriptor() ([]byte, []int) { - return file_teleport_auditlog_v1_auditlog_proto_rawDescGZIP(), []int{3} + return file_teleport_auditlog_v1_auditlog_proto_rawDescGZIP(), []int{5} } func (x *EventUnstructured) GetType() string { @@ -395,6 +521,104 @@ func (x *EventUnstructured) GetUnstructured() *structpb.Struct { return nil } +// GetEventExportChunksRequest is used to request the next set of event chunks to export. +type GetEventExportChunksRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // date is the date for which to list export shards. + Date *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=date,proto3" json:"date,omitempty"` +} + +func (x *GetEventExportChunksRequest) Reset() { + *x = GetEventExportChunksRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_teleport_auditlog_v1_auditlog_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetEventExportChunksRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetEventExportChunksRequest) ProtoMessage() {} + +func (x *GetEventExportChunksRequest) ProtoReflect() protoreflect.Message { + mi := &file_teleport_auditlog_v1_auditlog_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetEventExportChunksRequest.ProtoReflect.Descriptor instead. +func (*GetEventExportChunksRequest) Descriptor() ([]byte, []int) { + return file_teleport_auditlog_v1_auditlog_proto_rawDescGZIP(), []int{6} +} + +func (x *GetEventExportChunksRequest) GetDate() *timestamppb.Timestamp { + if x != nil { + return x.Date + } + return nil +} + +// EventExportChunk represents a chunk of events to export. +type EventExportChunk struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // chunk is the chunk to export. + Chunk string `protobuf:"bytes,1,opt,name=chunk,proto3" json:"chunk,omitempty"` +} + +func (x *EventExportChunk) Reset() { + *x = EventExportChunk{} + if protoimpl.UnsafeEnabled { + mi := &file_teleport_auditlog_v1_auditlog_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *EventExportChunk) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EventExportChunk) ProtoMessage() {} + +func (x *EventExportChunk) ProtoReflect() protoreflect.Message { + mi := &file_teleport_auditlog_v1_auditlog_proto_msgTypes[7] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EventExportChunk.ProtoReflect.Descriptor instead. +func (*EventExportChunk) Descriptor() ([]byte, []int) { + return file_teleport_auditlog_v1_auditlog_proto_rawDescGZIP(), []int{7} +} + +func (x *EventExportChunk) GetChunk() string { + if x != nil { + return x.Chunk + } + return "" +} + var File_teleport_auditlog_v1_auditlog_proto protoreflect.FileDescriptor var file_teleport_auditlog_v1_auditlog_proto_rawDesc = []byte{ @@ -438,46 +662,85 @@ var file_teleport_auditlog_v1_auditlog_proto_rawDesc = []byte{ 0x69, 0x74, 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x55, 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x75, 0x72, 0x65, 0x64, 0x52, 0x05, 0x69, 0x74, 0x65, 0x6d, 0x73, 0x12, 0x19, 0x0a, 0x08, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x07, 0x6c, 0x61, 0x73, 0x74, 0x4b, 0x65, 0x79, 0x22, 0xba, 0x01, 0x0a, - 0x11, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x55, 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x75, 0x72, - 0x65, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x2e, 0x0a, 0x04, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, - 0x52, 0x04, 0x74, 0x69, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x18, - 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x12, 0x3b, 0x0a, 0x0c, - 0x75, 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x75, 0x72, 0x65, 0x64, 0x18, 0x05, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x0c, 0x75, 0x6e, 0x73, - 0x74, 0x72, 0x75, 0x63, 0x74, 0x75, 0x72, 0x65, 0x64, 0x2a, 0x3e, 0x0a, 0x05, 0x4f, 0x72, 0x64, - 0x65, 0x72, 0x12, 0x20, 0x0a, 0x1c, 0x4f, 0x52, 0x44, 0x45, 0x52, 0x5f, 0x44, 0x45, 0x53, 0x43, - 0x45, 0x4e, 0x44, 0x49, 0x4e, 0x47, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, - 0x45, 0x44, 0x10, 0x00, 0x12, 0x13, 0x0a, 0x0f, 0x4f, 0x52, 0x44, 0x45, 0x52, 0x5f, 0x41, 0x53, - 0x43, 0x45, 0x4e, 0x44, 0x49, 0x4e, 0x47, 0x10, 0x01, 0x32, 0x95, 0x02, 0x0a, 0x0f, 0x41, 0x75, - 0x64, 0x69, 0x74, 0x4c, 0x6f, 0x67, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x8a, 0x01, - 0x0a, 0x1f, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x55, 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, - 0x75, 0x72, 0x65, 0x64, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x45, 0x76, 0x65, 0x6e, 0x74, - 0x73, 0x12, 0x3c, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x61, 0x75, 0x64, - 0x69, 0x74, 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x55, - 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x75, 0x72, 0x65, 0x64, 0x53, 0x65, 0x73, 0x73, 0x69, - 0x6f, 0x6e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x27, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x61, 0x75, 0x64, 0x69, 0x74, - 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x55, 0x6e, 0x73, 0x74, - 0x72, 0x75, 0x63, 0x74, 0x75, 0x72, 0x65, 0x64, 0x30, 0x01, 0x12, 0x75, 0x0a, 0x15, 0x47, 0x65, - 0x74, 0x55, 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x75, 0x72, 0x65, 0x64, 0x45, 0x76, 0x65, - 0x6e, 0x74, 0x73, 0x12, 0x32, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x61, - 0x75, 0x64, 0x69, 0x74, 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x55, 0x6e, - 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x75, 0x72, 0x65, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, + 0x01, 0x28, 0x09, 0x52, 0x07, 0x6c, 0x61, 0x73, 0x74, 0x4b, 0x65, 0x79, 0x22, 0x7f, 0x0a, 0x1f, + 0x45, 0x78, 0x70, 0x6f, 0x72, 0x74, 0x55, 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x75, 0x72, + 0x65, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x2e, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, + 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x04, 0x64, 0x61, 0x74, 0x65, 0x12, + 0x14, 0x0a, 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, + 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x12, 0x16, 0x0a, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x22, 0x70, 0x0a, + 0x17, 0x45, 0x78, 0x70, 0x6f, 0x72, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x55, 0x6e, 0x73, 0x74, + 0x72, 0x75, 0x63, 0x74, 0x75, 0x72, 0x65, 0x64, 0x12, 0x3d, 0x0a, 0x05, 0x65, 0x76, 0x65, 0x6e, + 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x61, 0x75, 0x64, 0x69, 0x74, 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x45, - 0x76, 0x65, 0x6e, 0x74, 0x73, 0x55, 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x75, 0x72, 0x65, - 0x64, 0x42, 0x54, 0x5a, 0x52, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, - 0x67, 0x72, 0x61, 0x76, 0x69, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x2f, 0x74, 0x65, - 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, - 0x2f, 0x61, 0x75, 0x64, 0x69, 0x74, 0x6c, 0x6f, 0x67, 0x2f, 0x76, 0x31, 0x3b, 0x61, 0x75, 0x64, - 0x69, 0x74, 0x6c, 0x6f, 0x67, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x76, 0x65, 0x6e, 0x74, 0x55, 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x75, 0x72, 0x65, 0x64, + 0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, + 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x63, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x22, + 0xba, 0x01, 0x0a, 0x11, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x55, 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, + 0x74, 0x75, 0x72, 0x65, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x2e, 0x0a, 0x04, 0x74, 0x69, 0x6d, + 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, + 0x61, 0x6d, 0x70, 0x52, 0x04, 0x74, 0x69, 0x6d, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x69, 0x6e, 0x64, + 0x65, 0x78, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x69, 0x6e, 0x64, 0x65, 0x78, 0x12, + 0x3b, 0x0a, 0x0c, 0x75, 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x75, 0x72, 0x65, 0x64, 0x18, + 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x0c, + 0x75, 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x75, 0x72, 0x65, 0x64, 0x22, 0x4d, 0x0a, 0x1b, + 0x47, 0x65, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x45, 0x78, 0x70, 0x6f, 0x72, 0x74, 0x43, 0x68, + 0x75, 0x6e, 0x6b, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2e, 0x0a, 0x04, 0x64, + 0x61, 0x74, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x04, 0x64, 0x61, 0x74, 0x65, 0x22, 0x28, 0x0a, 0x10, 0x45, + 0x76, 0x65, 0x6e, 0x74, 0x45, 0x78, 0x70, 0x6f, 0x72, 0x74, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x12, + 0x14, 0x0a, 0x05, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, + 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x2a, 0x3e, 0x0a, 0x05, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x20, + 0x0a, 0x1c, 0x4f, 0x52, 0x44, 0x45, 0x52, 0x5f, 0x44, 0x45, 0x53, 0x43, 0x45, 0x4e, 0x44, 0x49, + 0x4e, 0x47, 0x5f, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, + 0x12, 0x13, 0x0a, 0x0f, 0x4f, 0x52, 0x44, 0x45, 0x52, 0x5f, 0x41, 0x53, 0x43, 0x45, 0x4e, 0x44, + 0x49, 0x4e, 0x47, 0x10, 0x01, 0x32, 0x8f, 0x04, 0x0a, 0x0f, 0x41, 0x75, 0x64, 0x69, 0x74, 0x4c, + 0x6f, 0x67, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x8a, 0x01, 0x0a, 0x1f, 0x53, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x55, 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x75, 0x72, 0x65, 0x64, + 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x3c, 0x2e, + 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x61, 0x75, 0x64, 0x69, 0x74, 0x6c, 0x6f, + 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x55, 0x6e, 0x73, 0x74, 0x72, + 0x75, 0x63, 0x74, 0x75, 0x72, 0x65, 0x64, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x45, 0x76, + 0x65, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x27, 0x2e, 0x74, 0x65, + 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x61, 0x75, 0x64, 0x69, 0x74, 0x6c, 0x6f, 0x67, 0x2e, + 0x76, 0x31, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x55, 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, + 0x75, 0x72, 0x65, 0x64, 0x30, 0x01, 0x12, 0x75, 0x0a, 0x15, 0x47, 0x65, 0x74, 0x55, 0x6e, 0x73, + 0x74, 0x72, 0x75, 0x63, 0x74, 0x75, 0x72, 0x65, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x12, + 0x32, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x61, 0x75, 0x64, 0x69, 0x74, + 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x47, 0x65, 0x74, 0x55, 0x6e, 0x73, 0x74, 0x72, 0x75, + 0x63, 0x74, 0x75, 0x72, 0x65, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x28, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x61, + 0x75, 0x64, 0x69, 0x74, 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, + 0x73, 0x55, 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x75, 0x72, 0x65, 0x64, 0x12, 0x82, 0x01, + 0x0a, 0x18, 0x45, 0x78, 0x70, 0x6f, 0x72, 0x74, 0x55, 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, + 0x75, 0x72, 0x65, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x35, 0x2e, 0x74, 0x65, 0x6c, + 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x61, 0x75, 0x64, 0x69, 0x74, 0x6c, 0x6f, 0x67, 0x2e, 0x76, + 0x31, 0x2e, 0x45, 0x78, 0x70, 0x6f, 0x72, 0x74, 0x55, 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, + 0x75, 0x72, 0x65, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x2d, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x61, 0x75, 0x64, + 0x69, 0x74, 0x6c, 0x6f, 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x45, 0x78, 0x70, 0x6f, 0x72, 0x74, 0x45, + 0x76, 0x65, 0x6e, 0x74, 0x55, 0x6e, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x75, 0x72, 0x65, 0x64, + 0x30, 0x01, 0x12, 0x73, 0x0a, 0x14, 0x47, 0x65, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x45, 0x78, + 0x70, 0x6f, 0x72, 0x74, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x73, 0x12, 0x31, 0x2e, 0x74, 0x65, 0x6c, + 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x61, 0x75, 0x64, 0x69, 0x74, 0x6c, 0x6f, 0x67, 0x2e, 0x76, + 0x31, 0x2e, 0x47, 0x65, 0x74, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x45, 0x78, 0x70, 0x6f, 0x72, 0x74, + 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x26, 0x2e, + 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2e, 0x61, 0x75, 0x64, 0x69, 0x74, 0x6c, 0x6f, + 0x67, 0x2e, 0x76, 0x31, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x45, 0x78, 0x70, 0x6f, 0x72, 0x74, + 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x30, 0x01, 0x42, 0x54, 0x5a, 0x52, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x72, 0x61, 0x76, 0x69, 0x74, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x61, 0x6c, 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x61, 0x70, 0x69, + 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x67, 0x6f, 0x2f, 0x74, 0x65, + 0x6c, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x2f, 0x61, 0x75, 0x64, 0x69, 0x74, 0x6c, 0x6f, 0x67, 0x2f, + 0x76, 0x31, 0x3b, 0x61, 0x75, 0x64, 0x69, 0x74, 0x6c, 0x6f, 0x67, 0x76, 0x31, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -493,32 +756,43 @@ func file_teleport_auditlog_v1_auditlog_proto_rawDescGZIP() []byte { } var file_teleport_auditlog_v1_auditlog_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_teleport_auditlog_v1_auditlog_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_teleport_auditlog_v1_auditlog_proto_msgTypes = make([]protoimpl.MessageInfo, 8) var file_teleport_auditlog_v1_auditlog_proto_goTypes = []interface{}{ (Order)(0), // 0: teleport.auditlog.v1.Order (*StreamUnstructuredSessionEventsRequest)(nil), // 1: teleport.auditlog.v1.StreamUnstructuredSessionEventsRequest (*GetUnstructuredEventsRequest)(nil), // 2: teleport.auditlog.v1.GetUnstructuredEventsRequest (*EventsUnstructured)(nil), // 3: teleport.auditlog.v1.EventsUnstructured - (*EventUnstructured)(nil), // 4: teleport.auditlog.v1.EventUnstructured - (*timestamppb.Timestamp)(nil), // 5: google.protobuf.Timestamp - (*structpb.Struct)(nil), // 6: google.protobuf.Struct + (*ExportUnstructuredEventsRequest)(nil), // 4: teleport.auditlog.v1.ExportUnstructuredEventsRequest + (*ExportEventUnstructured)(nil), // 5: teleport.auditlog.v1.ExportEventUnstructured + (*EventUnstructured)(nil), // 6: teleport.auditlog.v1.EventUnstructured + (*GetEventExportChunksRequest)(nil), // 7: teleport.auditlog.v1.GetEventExportChunksRequest + (*EventExportChunk)(nil), // 8: teleport.auditlog.v1.EventExportChunk + (*timestamppb.Timestamp)(nil), // 9: google.protobuf.Timestamp + (*structpb.Struct)(nil), // 10: google.protobuf.Struct } var file_teleport_auditlog_v1_auditlog_proto_depIdxs = []int32{ - 5, // 0: teleport.auditlog.v1.GetUnstructuredEventsRequest.start_date:type_name -> google.protobuf.Timestamp - 5, // 1: teleport.auditlog.v1.GetUnstructuredEventsRequest.end_date:type_name -> google.protobuf.Timestamp - 0, // 2: teleport.auditlog.v1.GetUnstructuredEventsRequest.order:type_name -> teleport.auditlog.v1.Order - 4, // 3: teleport.auditlog.v1.EventsUnstructured.items:type_name -> teleport.auditlog.v1.EventUnstructured - 5, // 4: teleport.auditlog.v1.EventUnstructured.time:type_name -> google.protobuf.Timestamp - 6, // 5: teleport.auditlog.v1.EventUnstructured.unstructured:type_name -> google.protobuf.Struct - 1, // 6: teleport.auditlog.v1.AuditLogService.StreamUnstructuredSessionEvents:input_type -> teleport.auditlog.v1.StreamUnstructuredSessionEventsRequest - 2, // 7: teleport.auditlog.v1.AuditLogService.GetUnstructuredEvents:input_type -> teleport.auditlog.v1.GetUnstructuredEventsRequest - 4, // 8: teleport.auditlog.v1.AuditLogService.StreamUnstructuredSessionEvents:output_type -> teleport.auditlog.v1.EventUnstructured - 3, // 9: teleport.auditlog.v1.AuditLogService.GetUnstructuredEvents:output_type -> teleport.auditlog.v1.EventsUnstructured - 8, // [8:10] is the sub-list for method output_type - 6, // [6:8] is the sub-list for method input_type - 6, // [6:6] is the sub-list for extension type_name - 6, // [6:6] is the sub-list for extension extendee - 0, // [0:6] is the sub-list for field type_name + 9, // 0: teleport.auditlog.v1.GetUnstructuredEventsRequest.start_date:type_name -> google.protobuf.Timestamp + 9, // 1: teleport.auditlog.v1.GetUnstructuredEventsRequest.end_date:type_name -> google.protobuf.Timestamp + 0, // 2: teleport.auditlog.v1.GetUnstructuredEventsRequest.order:type_name -> teleport.auditlog.v1.Order + 6, // 3: teleport.auditlog.v1.EventsUnstructured.items:type_name -> teleport.auditlog.v1.EventUnstructured + 9, // 4: teleport.auditlog.v1.ExportUnstructuredEventsRequest.date:type_name -> google.protobuf.Timestamp + 6, // 5: teleport.auditlog.v1.ExportEventUnstructured.event:type_name -> teleport.auditlog.v1.EventUnstructured + 9, // 6: teleport.auditlog.v1.EventUnstructured.time:type_name -> google.protobuf.Timestamp + 10, // 7: teleport.auditlog.v1.EventUnstructured.unstructured:type_name -> google.protobuf.Struct + 9, // 8: teleport.auditlog.v1.GetEventExportChunksRequest.date:type_name -> google.protobuf.Timestamp + 1, // 9: teleport.auditlog.v1.AuditLogService.StreamUnstructuredSessionEvents:input_type -> teleport.auditlog.v1.StreamUnstructuredSessionEventsRequest + 2, // 10: teleport.auditlog.v1.AuditLogService.GetUnstructuredEvents:input_type -> teleport.auditlog.v1.GetUnstructuredEventsRequest + 4, // 11: teleport.auditlog.v1.AuditLogService.ExportUnstructuredEvents:input_type -> teleport.auditlog.v1.ExportUnstructuredEventsRequest + 7, // 12: teleport.auditlog.v1.AuditLogService.GetEventExportChunks:input_type -> teleport.auditlog.v1.GetEventExportChunksRequest + 6, // 13: teleport.auditlog.v1.AuditLogService.StreamUnstructuredSessionEvents:output_type -> teleport.auditlog.v1.EventUnstructured + 3, // 14: teleport.auditlog.v1.AuditLogService.GetUnstructuredEvents:output_type -> teleport.auditlog.v1.EventsUnstructured + 5, // 15: teleport.auditlog.v1.AuditLogService.ExportUnstructuredEvents:output_type -> teleport.auditlog.v1.ExportEventUnstructured + 8, // 16: teleport.auditlog.v1.AuditLogService.GetEventExportChunks:output_type -> teleport.auditlog.v1.EventExportChunk + 13, // [13:17] is the sub-list for method output_type + 9, // [9:13] is the sub-list for method input_type + 9, // [9:9] is the sub-list for extension type_name + 9, // [9:9] is the sub-list for extension extendee + 0, // [0:9] is the sub-list for field type_name } func init() { file_teleport_auditlog_v1_auditlog_proto_init() } @@ -564,6 +838,30 @@ func file_teleport_auditlog_v1_auditlog_proto_init() { } } file_teleport_auditlog_v1_auditlog_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ExportUnstructuredEventsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_teleport_auditlog_v1_auditlog_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ExportEventUnstructured); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_teleport_auditlog_v1_auditlog_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*EventUnstructured); i { case 0: return &v.state @@ -575,6 +873,30 @@ func file_teleport_auditlog_v1_auditlog_proto_init() { return nil } } + file_teleport_auditlog_v1_auditlog_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetEventExportChunksRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_teleport_auditlog_v1_auditlog_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*EventExportChunk); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -582,7 +904,7 @@ func file_teleport_auditlog_v1_auditlog_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_teleport_auditlog_v1_auditlog_proto_rawDesc, NumEnums: 1, - NumMessages: 4, + NumMessages: 8, NumExtensions: 0, NumServices: 1, }, diff --git a/api/gen/proto/go/teleport/auditlog/v1/auditlog_grpc.pb.go b/api/gen/proto/go/teleport/auditlog/v1/auditlog_grpc.pb.go index d20b71d55761..a57d707a5f2c 100644 --- a/api/gen/proto/go/teleport/auditlog/v1/auditlog_grpc.pb.go +++ b/api/gen/proto/go/teleport/auditlog/v1/auditlog_grpc.pb.go @@ -35,6 +35,8 @@ const _ = grpc.SupportPackageIsVersion7 const ( AuditLogService_StreamUnstructuredSessionEvents_FullMethodName = "/teleport.auditlog.v1.AuditLogService/StreamUnstructuredSessionEvents" AuditLogService_GetUnstructuredEvents_FullMethodName = "/teleport.auditlog.v1.AuditLogService/GetUnstructuredEvents" + AuditLogService_ExportUnstructuredEvents_FullMethodName = "/teleport.auditlog.v1.AuditLogService/ExportUnstructuredEvents" + AuditLogService_GetEventExportChunks_FullMethodName = "/teleport.auditlog.v1.AuditLogService/GetEventExportChunks" ) // AuditLogServiceClient is the client API for AuditLogService service. @@ -47,6 +49,12 @@ type AuditLogServiceClient interface { // GetUnstructuredEvents gets events from the audit log in an unstructured format. // This endpoint is used by the event handler to retrieve the events as JSON. GetUnstructuredEvents(ctx context.Context, in *GetUnstructuredEventsRequest, opts ...grpc.CallOption) (*EventsUnstructured, error) + // ExportUnstructuredEvents exports events from a given event chunk returned by GetEventExportChunks. This API prioritizes + // performance over ordering and filtering, and is intended for bulk export of events. + ExportUnstructuredEvents(ctx context.Context, in *ExportUnstructuredEventsRequest, opts ...grpc.CallOption) (AuditLogService_ExportUnstructuredEventsClient, error) + // GetEventExportChunks returns a stream of event chunks that can be exported via ExportUnstructuredEvents. The returned + // list isn't ordered and polling for new chunks requires re-consuming the entire stream from the beginning. + GetEventExportChunks(ctx context.Context, in *GetEventExportChunksRequest, opts ...grpc.CallOption) (AuditLogService_GetEventExportChunksClient, error) } type auditLogServiceClient struct { @@ -98,6 +106,70 @@ func (c *auditLogServiceClient) GetUnstructuredEvents(ctx context.Context, in *G return out, nil } +func (c *auditLogServiceClient) ExportUnstructuredEvents(ctx context.Context, in *ExportUnstructuredEventsRequest, opts ...grpc.CallOption) (AuditLogService_ExportUnstructuredEventsClient, error) { + stream, err := c.cc.NewStream(ctx, &AuditLogService_ServiceDesc.Streams[1], AuditLogService_ExportUnstructuredEvents_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &auditLogServiceExportUnstructuredEventsClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type AuditLogService_ExportUnstructuredEventsClient interface { + Recv() (*ExportEventUnstructured, error) + grpc.ClientStream +} + +type auditLogServiceExportUnstructuredEventsClient struct { + grpc.ClientStream +} + +func (x *auditLogServiceExportUnstructuredEventsClient) Recv() (*ExportEventUnstructured, error) { + m := new(ExportEventUnstructured) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *auditLogServiceClient) GetEventExportChunks(ctx context.Context, in *GetEventExportChunksRequest, opts ...grpc.CallOption) (AuditLogService_GetEventExportChunksClient, error) { + stream, err := c.cc.NewStream(ctx, &AuditLogService_ServiceDesc.Streams[2], AuditLogService_GetEventExportChunks_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &auditLogServiceGetEventExportChunksClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type AuditLogService_GetEventExportChunksClient interface { + Recv() (*EventExportChunk, error) + grpc.ClientStream +} + +type auditLogServiceGetEventExportChunksClient struct { + grpc.ClientStream +} + +func (x *auditLogServiceGetEventExportChunksClient) Recv() (*EventExportChunk, error) { + m := new(EventExportChunk) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // AuditLogServiceServer is the server API for AuditLogService service. // All implementations must embed UnimplementedAuditLogServiceServer // for forward compatibility @@ -108,6 +180,12 @@ type AuditLogServiceServer interface { // GetUnstructuredEvents gets events from the audit log in an unstructured format. // This endpoint is used by the event handler to retrieve the events as JSON. GetUnstructuredEvents(context.Context, *GetUnstructuredEventsRequest) (*EventsUnstructured, error) + // ExportUnstructuredEvents exports events from a given event chunk returned by GetEventExportChunks. This API prioritizes + // performance over ordering and filtering, and is intended for bulk export of events. + ExportUnstructuredEvents(*ExportUnstructuredEventsRequest, AuditLogService_ExportUnstructuredEventsServer) error + // GetEventExportChunks returns a stream of event chunks that can be exported via ExportUnstructuredEvents. The returned + // list isn't ordered and polling for new chunks requires re-consuming the entire stream from the beginning. + GetEventExportChunks(*GetEventExportChunksRequest, AuditLogService_GetEventExportChunksServer) error mustEmbedUnimplementedAuditLogServiceServer() } @@ -121,6 +199,12 @@ func (UnimplementedAuditLogServiceServer) StreamUnstructuredSessionEvents(*Strea func (UnimplementedAuditLogServiceServer) GetUnstructuredEvents(context.Context, *GetUnstructuredEventsRequest) (*EventsUnstructured, error) { return nil, status.Errorf(codes.Unimplemented, "method GetUnstructuredEvents not implemented") } +func (UnimplementedAuditLogServiceServer) ExportUnstructuredEvents(*ExportUnstructuredEventsRequest, AuditLogService_ExportUnstructuredEventsServer) error { + return status.Errorf(codes.Unimplemented, "method ExportUnstructuredEvents not implemented") +} +func (UnimplementedAuditLogServiceServer) GetEventExportChunks(*GetEventExportChunksRequest, AuditLogService_GetEventExportChunksServer) error { + return status.Errorf(codes.Unimplemented, "method GetEventExportChunks not implemented") +} func (UnimplementedAuditLogServiceServer) mustEmbedUnimplementedAuditLogServiceServer() {} // UnsafeAuditLogServiceServer may be embedded to opt out of forward compatibility for this service. @@ -173,6 +257,48 @@ func _AuditLogService_GetUnstructuredEvents_Handler(srv interface{}, ctx context return interceptor(ctx, in, info, handler) } +func _AuditLogService_ExportUnstructuredEvents_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(ExportUnstructuredEventsRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(AuditLogServiceServer).ExportUnstructuredEvents(m, &auditLogServiceExportUnstructuredEventsServer{stream}) +} + +type AuditLogService_ExportUnstructuredEventsServer interface { + Send(*ExportEventUnstructured) error + grpc.ServerStream +} + +type auditLogServiceExportUnstructuredEventsServer struct { + grpc.ServerStream +} + +func (x *auditLogServiceExportUnstructuredEventsServer) Send(m *ExportEventUnstructured) error { + return x.ServerStream.SendMsg(m) +} + +func _AuditLogService_GetEventExportChunks_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(GetEventExportChunksRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(AuditLogServiceServer).GetEventExportChunks(m, &auditLogServiceGetEventExportChunksServer{stream}) +} + +type AuditLogService_GetEventExportChunksServer interface { + Send(*EventExportChunk) error + grpc.ServerStream +} + +type auditLogServiceGetEventExportChunksServer struct { + grpc.ServerStream +} + +func (x *auditLogServiceGetEventExportChunksServer) Send(m *EventExportChunk) error { + return x.ServerStream.SendMsg(m) +} + // AuditLogService_ServiceDesc is the grpc.ServiceDesc for AuditLogService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -191,6 +317,16 @@ var AuditLogService_ServiceDesc = grpc.ServiceDesc{ Handler: _AuditLogService_StreamUnstructuredSessionEvents_Handler, ServerStreams: true, }, + { + StreamName: "ExportUnstructuredEvents", + Handler: _AuditLogService_ExportUnstructuredEvents_Handler, + ServerStreams: true, + }, + { + StreamName: "GetEventExportChunks", + Handler: _AuditLogService_GetEventExportChunks_Handler, + ServerStreams: true, + }, }, Metadata: "teleport/auditlog/v1/auditlog.proto", } diff --git a/api/proto/teleport/auditlog/v1/auditlog.proto b/api/proto/teleport/auditlog/v1/auditlog.proto index f218c40ce242..adb1a32fa06b 100644 --- a/api/proto/teleport/auditlog/v1/auditlog.proto +++ b/api/proto/teleport/auditlog/v1/auditlog.proto @@ -29,6 +29,12 @@ service AuditLogService { // GetUnstructuredEvents gets events from the audit log in an unstructured format. // This endpoint is used by the event handler to retrieve the events as JSON. rpc GetUnstructuredEvents(GetUnstructuredEventsRequest) returns (EventsUnstructured); + // ExportUnstructuredEvents exports events from a given event chunk returned by GetEventExportChunks. This API prioritizes + // performance over ordering and filtering, and is intended for bulk export of events. + rpc ExportUnstructuredEvents(ExportUnstructuredEventsRequest) returns (stream ExportEventUnstructured); + // GetEventExportChunks returns a stream of event chunks that can be exported via ExportUnstructuredEvents. The returned + // list isn't ordered and polling for new chunks requires re-consuming the entire stream from the beginning. + rpc GetEventExportChunks(GetEventExportChunksRequest) returns (stream EventExportChunk); } // StreamUnstructuredSessionEventsRequest is a request containing data needed to fetch a session recording. @@ -77,6 +83,25 @@ message EventsUnstructured { string last_key = 2; } +// ExportUnstructuredEventsRequest is a request with the needed data to export events. +message ExportUnstructuredEventsRequest { + // date is the target date from which to export events. note that only the UTC date of the + // timestamp value is used. use of a specific local timestamp may produce confusing results. + google.protobuf.Timestamp date = 1; + // chunk is the chunk to export events from. + string chunk = 2; + // cursor is an optional mechanism to resume interrupted streams for a given chunk. + string cursor = 3; +} + +// ExportEventUnstructured is the stream item of the ExportUnstructuredEvents method. +message ExportEventUnstructured { + // event is the unstructured representation of the event payload. + EventUnstructured event = 1; + // cursor is the cursor to resume the stream after this point. + string cursor = 2; +} + // EventUnstructured represents a single events.AuditEvent in an unstructured format. message EventUnstructured { // type is the type of the event. @@ -92,3 +117,15 @@ message EventUnstructured { // unstructured is the unstructured representation of the event payload. google.protobuf.Struct unstructured = 5; } + +// GetEventExportChunksRequest is used to request the next set of event chunks to export. +message GetEventExportChunksRequest { + // date is the date for which to list export shards. + google.protobuf.Timestamp date = 1; +} + +// EventExportChunk represents a chunk of events to export. +message EventExportChunk { + // chunk is the chunk to export. + string chunk = 1; +} diff --git a/lib/auth/auth_with_roles.go b/lib/auth/auth_with_roles.go index c18ecd71cc0e..97e7b6695fd9 100644 --- a/lib/auth/auth_with_roles.go +++ b/lib/auth/auth_with_roles.go @@ -40,6 +40,7 @@ import ( "github.com/gravitational/teleport/api/constants" apidefaults "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/gen/proto/go/assist/v1" + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" integrationpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1" trustpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/trust/v1" userpreferencespb "github.com/gravitational/teleport/api/gen/proto/go/userpreferences/v1" @@ -5762,6 +5763,26 @@ func (a *ServerWithRoles) SearchEvents(ctx context.Context, req events.SearchEve return outEvents, lastKey, nil } +// ExportUnstructuredEvents exports events from a given event chunk returned by GetEventExportChunks. This API prioritizes +// performance over ordering and filtering, and is intended for bulk export of events. +func (a *ServerWithRoles) ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured] { + if err := a.action(apidefaults.Namespace, types.KindEvent, types.VerbList); err != nil { + return stream.Fail[*auditlogpb.ExportEventUnstructured](trace.Wrap(err)) + } + + return a.alog.ExportUnstructuredEvents(ctx, req) +} + +// GetEventExportChunks returns a stream of event chunks that can be exported via ExportUnstructuredEvents. The returned +// list isn't ordered and polling for new chunks requires re-consuming the entire stream from the beginning. +func (a *ServerWithRoles) GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk] { + if err := a.action(apidefaults.Namespace, types.KindEvent, types.VerbList); err != nil { + return stream.Fail[*auditlogpb.EventExportChunk](trace.Wrap(err)) + } + + return a.alog.GetEventExportChunks(ctx, req) +} + // SearchSessionEvents allows searching session audit events with pagination support. func (a *ServerWithRoles) SearchSessionEvents(ctx context.Context, req events.SearchSessionEventsRequest) (outEvents []apievents.AuditEvent, lastKey string, err error) { if req.Cond != nil { diff --git a/lib/auth/grpcserver.go b/lib/auth/grpcserver.go index c6d074d714e6..a2884398e62c 100644 --- a/lib/auth/grpcserver.go +++ b/lib/auth/grpcserver.go @@ -46,6 +46,7 @@ import ( authpb "github.com/gravitational/teleport/api/client/proto" "github.com/gravitational/teleport/api/constants" assistv1pb "github.com/gravitational/teleport/api/gen/proto/go/assist/v1" + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" auditlogv1pb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" autoupdatev1pb "github.com/gravitational/teleport/api/gen/proto/go/teleport/autoupdate/v1" clusterconfigv1pb "github.com/gravitational/teleport/api/gen/proto/go/teleport/clusterconfig/v1" @@ -3855,7 +3856,6 @@ func (g *GRPCServer) StreamSessionEvents(req *authpb.StreamSessionEventsRequest, } c, e := auth.ServerWithRoles.StreamSessionEvents(stream.Context(), session.ID(req.SessionID), int64(req.StartIndex)) - for { select { case event, more := <-c: @@ -5819,6 +5819,46 @@ func (g *GRPCServer) GetUnstructuredEvents(ctx context.Context, req *auditlogv1p }, nil } +// ExportUnstructuredEvents exports events from a given event chunk returned by GetEventExportChunks. This API prioritizes +// performance over ordering and filtering, and is intended for bulk export of events. +func (g *GRPCServer) ExportUnstructuredEvents(req *auditlogpb.ExportUnstructuredEventsRequest, stream auditlogpb.AuditLogService_ExportUnstructuredEventsServer) error { + auth, err := g.authenticate(stream.Context()) + if err != nil { + return trace.Wrap(err) + } + + events := auth.ServerWithRoles.ExportUnstructuredEvents(stream.Context(), req) + + for events.Next() { + if err := stream.Send(events.Item()); err != nil { + events.Done() + return trace.Wrap(err) + } + } + + return trace.Wrap(events.Done()) +} + +// GetEventExportChunks returns a stream of event chunks that can be exported via ExportUnstructuredEvents. The returned +// list isn't ordered and polling for new chunks requires re-consuming the entire stream from the beginning. +func (g *GRPCServer) GetEventExportChunks(req *auditlogpb.GetEventExportChunksRequest, stream auditlogpb.AuditLogService_GetEventExportChunksServer) error { + auth, err := g.authenticate(stream.Context()) + if err != nil { + return trace.Wrap(err) + } + + chunks := auth.ServerWithRoles.GetEventExportChunks(stream.Context(), req) + + for chunks.Next() { + if err := stream.Send(chunks.Item()); err != nil { + chunks.Done() + return trace.Wrap(err) + } + } + + return trace.Wrap(chunks.Done()) +} + // StreamUnstructuredSessionEvents streams all events from a given session recording as an unstructured format. func (g *GRPCServer) StreamUnstructuredSessionEvents(req *auditlogv1pb.StreamUnstructuredSessionEventsRequest, stream auditlogv1pb.AuditLogService_StreamUnstructuredSessionEventsServer) error { auth, err := g.authenticate(stream.Context()) diff --git a/lib/events/api.go b/lib/events/api.go index 97df8ec9d331..8d22b3d09022 100644 --- a/lib/events/api.go +++ b/lib/events/api.go @@ -25,6 +25,8 @@ import ( "github.com/gravitational/trace" + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" + "github.com/gravitational/teleport/api/internalutils/stream" "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib/session" @@ -984,6 +986,14 @@ type AuditLogger interface { // // This function may never return more than 1 MiB of event data. SearchSessionEvents(ctx context.Context, req SearchSessionEventsRequest) ([]apievents.AuditEvent, string, error) + + // ExportUnstructuredEvents exports events from a given event chunk returned by GetEventExportChunks. This API prioritizes + // performance over ordering and filtering, and is intended for bulk export of events. + ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured] + + // GetEventExportChunks returns a stream of event chunks that can be exported via ExportUnstructuredEvents. The returned + // list isn't ordered and polling for new chunks requires re-consuming the entire stream from the beginning. + GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk] } // EventFields instance is attached to every logged event diff --git a/lib/events/athena/athena.go b/lib/events/athena/athena.go index b292abab05d5..493cc09d9063 100644 --- a/lib/events/athena/athena.go +++ b/lib/events/athena/athena.go @@ -33,6 +33,8 @@ import ( oteltrace "go.opentelemetry.io/otel/trace" "github.com/gravitational/teleport" + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" + "github.com/gravitational/teleport/api/internalutils/stream" apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/events" @@ -466,6 +468,8 @@ func New(ctx context.Context, cfg Config) (*Log, error) { database: cfg.Database, workgroup: cfg.Workgroup, queryResultsS3: cfg.QueryResultsS3, + locationS3Prefix: cfg.locationS3Prefix, + locationS3Bucket: cfg.locationS3Bucket, getQueryResultsInterval: cfg.GetQueryResultsInterval, disableQueryCostOptimization: cfg.DisableSearchCostOptimization, awsCfg: cfg.StorerQuerierAWSConfig, @@ -504,6 +508,14 @@ func (l *Log) SearchEvents(ctx context.Context, req events.SearchEventsRequest) return l.querier.SearchEvents(ctx, req) } +func (l *Log) ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured] { + return l.querier.ExportUnstructuredEvents(ctx, req) +} + +func (l *Log) GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk] { + return l.querier.GetEventExportChunks(ctx, req) +} + func (l *Log) SearchSessionEvents(ctx context.Context, req events.SearchSessionEventsRequest) ([]apievents.AuditEvent, string, error) { return l.querier.SearchSessionEvents(ctx, req) } diff --git a/lib/events/athena/consumer.go b/lib/events/athena/consumer.go index 3ed28559ca81..2332667f4b21 100644 --- a/lib/events/athena/consumer.go +++ b/lib/events/athena/consumer.go @@ -157,7 +157,14 @@ func newConsumer(cfg Config, cancelFn context.CancelFunc) (*consumer, error) { sqsDeleter: sqsClient, queueURL: cfg.QueueURL, perDateFileParquetWriter: func(ctx context.Context, date string) (io.WriteCloser, error) { - key := fmt.Sprintf("%s/%s/%s.parquet", cfg.locationS3Prefix, date, uuid.NewString()) + // use uuidv7 to give approximate time order to files. this isn't strictly necessary + // but it assists in bulk event export roughly progressing in time order through a given + // day which is what folks tend to expect. + id, err := uuid.NewV7() + if err != nil { + return nil, trace.Wrap(err) + } + key := fmt.Sprintf("%s/%s/%s.parquet", cfg.locationS3Prefix, date, id.String()) fw, err := awsutils.NewS3V2FileWriter(ctx, storerS3Client, cfg.locationS3Bucket, key, nil /* uploader options */, func(poi *s3.PutObjectInput) { // ChecksumAlgorithm is required for putting objects when object lock is enabled. poi.ChecksumAlgorithm = s3Types.ChecksumAlgorithmSha256 diff --git a/lib/events/athena/integration_test.go b/lib/events/athena/integration_test.go index 9b6c6fa46d4e..8ba7d712088e 100644 --- a/lib/events/athena/integration_test.go +++ b/lib/events/athena/integration_test.go @@ -89,6 +89,34 @@ func testIntegrationAthenaSessionEventsCRUD(t *testing.T, bypassSNS bool) { eventsSuite.SessionEventsCRUD(t) } +func TestIntegrationAthenaEventExport(t *testing.T) { + t.Run("sns", func(t *testing.T) { + const bypassSNSFalse = false + testIntegrationAthenaEventExport(t, bypassSNSFalse) + }) + t.Run("sqs", func(t *testing.T) { + const bypassSNSTrue = true + testIntegrationAthenaEventExport(t, bypassSNSTrue) + }) +} + +func testIntegrationAthenaEventExport(t *testing.T, bypassSNS bool) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + ac := SetupAthenaContext(t, ctx, AthenaContextConfig{BypassSNS: bypassSNS}) + auditLogger := &EventuallyConsistentAuditLogger{ + Inner: ac.log, + // Additional 5s is used to compensate for uploading parquet on s3. + QueryDelay: ac.batcherInterval + 5*time.Second, + } + eventsSuite := test.EventsSuite{ + Log: auditLogger, + Clock: ac.clock, + } + + eventsSuite.EventExport(t) +} + func TestIntegrationAthenaEventPagination(t *testing.T) { t.Run("sns", func(t *testing.T) { const bypassSNSFalse = false diff --git a/lib/events/athena/querier.go b/lib/events/athena/querier.go index 2ef22e424ab9..5889c8c73108 100644 --- a/lib/events/athena/querier.go +++ b/lib/events/athena/querier.go @@ -15,10 +15,13 @@ package athena import ( + "bytes" "context" "encoding/base64" "encoding/binary" + "errors" "fmt" + "io" "strconv" "strings" "time" @@ -26,15 +29,20 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/athena" athenaTypes "github.com/aws/aws-sdk-go-v2/service/athena/types" + "github.com/aws/aws-sdk-go-v2/service/s3" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/dustin/go-humanize" "github.com/google/uuid" "github.com/gravitational/trace" "github.com/jonboulle/clockwork" + "github.com/segmentio/parquet-go" log "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/attribute" oteltrace "go.opentelemetry.io/otel/trace" "github.com/gravitational/teleport" + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" + "github.com/gravitational/teleport/api/internalutils/stream" "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib/defaults" @@ -59,6 +67,7 @@ type querier struct { querierConfig athenaClient athenaClient + s3Getter s3Getter } type athenaClient interface { @@ -67,11 +76,18 @@ type athenaClient interface { GetQueryResults(ctx context.Context, params *athena.GetQueryResultsInput, optFns ...func(*athena.Options)) (*athena.GetQueryResultsOutput, error) } +type s3Getter interface { + GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) + ListObjectsV2(ctx context.Context, params *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) +} + type querierConfig struct { tablename string database string workgroup string queryResultsS3 string + locationS3Bucket string + locationS3Prefix string getQueryResultsInterval time.Duration // getQueryResultsInitialDelay allows to set custom getQueryResultsInitialDelay. // If not provided, default will be used. @@ -130,6 +146,7 @@ func newQuerier(cfg querierConfig) (*querier, error) { } return &querier{ athenaClient: athena.NewFromConfig(*cfg.awsCfg), + s3Getter: s3.NewFromConfig(*cfg.awsCfg), querierConfig: cfg, }, nil } @@ -203,6 +220,228 @@ func (q *querier) SearchEvents(ctx context.Context, req events.SearchEventsReque return events, keyset, trace.Wrap(err) } +// ExportUnstructuredEvents exports events from a given event chunk returned by GetEventExportChunks. This API prioritizes +// performance over ordering and filtering, and is intended for bulk export of events. +func (q *querier) ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured] { + startTime := req.Date.AsTime() + if startTime.IsZero() { + return stream.Fail[*auditlogpb.ExportEventUnstructured](trace.BadParameter("missing required parameter 'date'")) + } + + if req.Chunk == "" { + return stream.Fail[*auditlogpb.ExportEventUnstructured](trace.BadParameter("missing required parameter 'chunk'")) + } + + date := startTime.Format(time.DateOnly) + + var cursor athenaExportCursor + + if req.Cursor != "" { + if err := cursor.Decode(req.Cursor); err != nil { + return stream.Fail[*auditlogpb.ExportEventUnstructured](trace.Wrap(err)) + } + } + + events := q.streamEventsFromChunk(ctx, date, req.Chunk) + + events = stream.Skip(events, int(cursor.pos)) + + return stream.FilterMap(events, func(e eventParquet) (*auditlogpb.ExportEventUnstructured, bool) { + cursor.pos++ + event, err := auditEventFromParquet(e) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "date": date, + "chunk": req.Chunk, + "pos": cursor.pos, + }).Warn("skipping export of audit event due to failed decoding") + return nil, false + } + + unstructuredEvent, err := apievents.ToUnstructured(event) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "date": date, + "chunk": req.Chunk, + "pos": cursor.pos, + }).Warn("skipping export of audit event due to failed conversion to unstructured event") + return nil, false + } + + return &auditlogpb.ExportEventUnstructured{ + Event: unstructuredEvent, + Cursor: cursor.Encode(), + }, true + }) +} + +// athenaExportCursors follow the format a1:. +type athenaExportCursor struct { + pos int64 +} + +func (c *athenaExportCursor) Encode() string { + return fmt.Sprintf("a1:%d", c.pos) +} + +func (c *athenaExportCursor) Decode(key string) error { + parts := strings.Split(key, ":") + if len(parts) != 2 { + return trace.BadParameter("invalid key format") + } + if parts[0] != "a1" { + return trace.BadParameter("unsupported cursor format (expected a1, got %q)", parts[0]) + } + pos, err := strconv.ParseInt(parts[1], 10, 64) + if err != nil { + return trace.Wrap(err) + } + c.pos = pos + return nil +} + +// GetEventExportChunks returns a stream of event chunks that can be exported via ExportUnstructuredEvents. The returned +// list isn't ordered and polling for new chunks requires re-consuming the entire stream from the beginning. +func (q *querier) GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk] { + dt := req.Date.AsTime() + if dt.IsZero() { + return stream.Fail[*auditlogpb.EventExportChunk](trace.BadParameter("missing required parameter 'date'")) + } + + date := dt.Format(time.DateOnly) + + prefix := fmt.Sprintf("%s/%s/", q.locationS3Prefix, date) + + var continuationToken *string + firstPage := true + + return stream.PageFunc(func() ([]*auditlogpb.EventExportChunk, error) { + if !firstPage && continuationToken == nil { + // no more pages available. + return nil, io.EOF + } + + firstPage = false + + rsp, err := q.s3Getter.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ + Bucket: aws.String(q.locationS3Bucket), + Prefix: aws.String(prefix), + ContinuationToken: continuationToken, + }) + if err != nil { + var nsk *s3types.NoSuchKey + if continuationToken == nil && errors.As(err, &nsk) { + log.WithFields(log.Fields{ + "date": date, + "error": err, + }).Debug("no event chunks found for date") + // no pages available + return nil, io.EOF + } + log.WithFields(log.Fields{ + "error": err, + "date": date, + }).Error("failed to list event chunk objects in S3") + return nil, trace.Wrap(err) + } + + continuationToken = rsp.NextContinuationToken + + chunks := make([]*auditlogpb.EventExportChunk, 0, len(rsp.Contents)) + + for _, obj := range rsp.Contents { + fullKey := aws.ToString(obj.Key) + + if !strings.HasSuffix(fullKey, ".parquet") { + log.WithFields(log.Fields{ + "key": fullKey, + "date": date, + }).Debug("skipping non-parquet s3 file") + continue + } + + chunkID := strings.TrimSuffix(strings.TrimPrefix(fullKey, prefix), ".parquet") + if chunkID == "" { + log.WithFields(log.Fields{ + "key": fullKey, + "date": date, + }).Warn("skipping empty parquet file name") + continue + } + + chunks = append(chunks, &auditlogpb.EventExportChunk{ + Chunk: chunkID, + }) + } + + return chunks, nil + }) +} + +func (q *querier) streamEventsFromChunk(ctx context.Context, date, chunk string) stream.Stream[eventParquet] { + data, err := q.readEventChunk(ctx, date, chunk) + if err != nil { + return stream.Fail[eventParquet](err) + } + + reader := parquet.NewGenericReader[eventParquet](bytes.NewReader(data)) + + closer := func() { + reader.Close() + } + + return stream.Func(func() (eventParquet, error) { + // conventional wisdom says that we should use a larger persistent buffer here + // but in loadtesting this API was abserved having almost twice the throughput + // with a single element local buf variable instead. + var buf [1]eventParquet + _, err := reader.Read(buf[:]) + if err != nil { + if errors.Is(err, io.EOF) { + return eventParquet{}, io.EOF + } + return eventParquet{}, trace.Wrap(err) + } + return buf[0], nil + }, closer) +} + +func (q *querier) readEventChunk(ctx context.Context, date, chunk string) ([]byte, error) { + getObjectInput := &s3.GetObjectInput{ + Bucket: aws.String(q.locationS3Bucket), + Key: aws.String(fmt.Sprintf("%s/%s/%s.parquet", q.locationS3Prefix, date, chunk)), + } + getObjectOutput, err := q.s3Getter.GetObject(ctx, getObjectInput) + if err != nil { + var nsk *s3types.NoSuchKey + if errors.As(err, &nsk) { + log.WithFields(log.Fields{ + "date": date, + "chunk": chunk, + "error": err, + }).Debug("event chunk not found") + return nil, trace.NotFound("event chunk %q not found", chunk) + } + log.WithFields(log.Fields{ + "error": err, + "date": date, + }).Error("failed to get event chunk") + return nil, trace.Wrap(err) + } + + defer getObjectOutput.Body.Close() + + // ideally we'd start streaming events immediately without waiting for the read to + // complete. in practice thats tricky since the parquet reader wants methods that aren't + // typically available on lazy readers. we may be able to eek out a bit more performance by + // implementing a custom wrapper that lazily loads all bytes into an unlimited size buffer + // so that we can support methods like Seek and ReadAt, which aren't available on buffered + // readers with fixed sizes. + return io.ReadAll(getObjectOutput.Body) +} + func (q *querier) canOptimizePaginatedSearchCosts(ctx context.Context, startKey *keyset, from, to time.Time) bool { return !q.disableQueryCostOptimization && startKey != nil && to.Sub(from) > 24*time.Hour } diff --git a/lib/events/athena/test.go b/lib/events/athena/test.go index ea2eec9e455f..d92bbdbd2b60 100644 --- a/lib/events/athena/test.go +++ b/lib/events/athena/test.go @@ -43,6 +43,8 @@ import ( "github.com/stretchr/testify/require" "github.com/gravitational/teleport" + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" + "github.com/gravitational/teleport/api/internalutils/stream" apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib/backend/memory" "github.com/gravitational/teleport/lib/events" @@ -146,6 +148,28 @@ func (e *EventuallyConsistentAuditLogger) SearchEvents(ctx context.Context, req return e.Inner.SearchEvents(ctx, req) } +func (e *EventuallyConsistentAuditLogger) ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured] { + e.mu.Lock() + defer e.mu.Unlock() + if e.emitWasAfterLastDelay { + time.Sleep(e.QueryDelay) + // clear emit delay + e.emitWasAfterLastDelay = false + } + return e.Inner.ExportUnstructuredEvents(ctx, req) +} + +func (e *EventuallyConsistentAuditLogger) GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk] { + e.mu.Lock() + defer e.mu.Unlock() + if e.emitWasAfterLastDelay { + time.Sleep(e.QueryDelay) + // clear emit delay + e.emitWasAfterLastDelay = false + } + return e.Inner.GetEventExportChunks(ctx, req) +} + func (e *EventuallyConsistentAuditLogger) SearchSessionEvents(ctx context.Context, req events.SearchSessionEventsRequest) ([]apievents.AuditEvent, string, error) { e.mu.Lock() defer e.mu.Unlock() diff --git a/lib/events/athena/types.go b/lib/events/athena/types.go index 4620af116f90..57046d09e542 100644 --- a/lib/events/athena/types.go +++ b/lib/events/athena/types.go @@ -48,3 +48,15 @@ func auditEventToParquet(event apievents.AuditEvent) (*eventParquet, error) { EventData: string(jsonBlob), }, nil } + +func auditEventFromParquet(event eventParquet) (apievents.AuditEvent, error) { + var fields events.EventFields + if err := utils.FastUnmarshal([]byte(event.EventData), &fields); err != nil { + return nil, trace.Wrap(err, "failed to unmarshal event, %s", event.EventData) + } + e, err := events.FromEventFields(fields) + if err != nil { + return nil, trace.Wrap(err) + } + return e, nil +} diff --git a/lib/events/auditlog.go b/lib/events/auditlog.go index 585d464a32ec..25f017eb48d8 100644 --- a/lib/events/auditlog.go +++ b/lib/events/auditlog.go @@ -40,6 +40,8 @@ import ( "github.com/gravitational/teleport" apidefaults "github.com/gravitational/teleport/api/defaults" + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" + "github.com/gravitational/teleport/api/internalutils/stream" apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/internal/context121" @@ -954,6 +956,22 @@ func (l *AuditLog) SearchSessionEvents(ctx context.Context, req SearchSessionEve return l.localLog.SearchSessionEvents(ctx, req) } +func (l *AuditLog) ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured] { + l.log.Debugf("ExportUnstructuredEvents(%v, %v, %v)", req.Date, req.Chunk, req.Cursor) + if l.ExternalLog != nil { + return l.ExternalLog.ExportUnstructuredEvents(ctx, req) + } + return l.localLog.ExportUnstructuredEvents(ctx, req) +} + +func (l *AuditLog) GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk] { + l.log.Debugf("GetEventExportChunks(%v)", req.Date) + if l.ExternalLog != nil { + return l.ExternalLog.GetEventExportChunks(ctx, req) + } + return l.localLog.GetEventExportChunks(ctx, req) +} + // StreamSessionEvents implements [SessionStreamer]. func (l *AuditLog) StreamSessionEvents(ctx context.Context, sessionID session.ID, startIndex int64) (chan apievents.AuditEvent, chan error) { l.log.WithField("session_id", string(sessionID)).Debug("StreamSessionEvents()") @@ -998,7 +1016,6 @@ func (l *AuditLog) StreamSessionEvents(ctx context.Context, sessionID session.ID go func() { defer rawSession.Close() - // this shouldn't be necessary as the position should be already 0 (Download // takes an io.WriterAt), but it's better to be safe than sorry if _, err := rawSession.Seek(0, io.SeekStart); err != nil { diff --git a/lib/events/discard.go b/lib/events/discard.go index a94744331014..f8c268aa83e8 100644 --- a/lib/events/discard.go +++ b/lib/events/discard.go @@ -23,6 +23,8 @@ import ( "github.com/gravitational/trace" log "github.com/sirupsen/logrus" + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" + "github.com/gravitational/teleport/api/internalutils/stream" apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib/session" ) @@ -56,6 +58,14 @@ func (d *DiscardAuditLog) SearchSessionEvents(ctx context.Context, req SearchSes return make([]apievents.AuditEvent, 0), "", nil } +func (d *DiscardAuditLog) ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured] { + return stream.Empty[*auditlogpb.ExportEventUnstructured]() +} + +func (d *DiscardAuditLog) GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk] { + return stream.Empty[*auditlogpb.EventExportChunk]() +} + func (d *DiscardAuditLog) EmitAuditEvent(ctx context.Context, event apievents.AuditEvent) error { return nil } diff --git a/lib/events/dynamoevents/dynamoevents.go b/lib/events/dynamoevents/dynamoevents.go index efb4c6e5a883..2ae36b85fa18 100644 --- a/lib/events/dynamoevents/dynamoevents.go +++ b/lib/events/dynamoevents/dynamoevents.go @@ -45,6 +45,8 @@ import ( "github.com/gravitational/teleport" apidefaults "github.com/gravitational/teleport/api/defaults" + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" + "github.com/gravitational/teleport/api/internalutils/stream" "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib/backend/dynamo" @@ -584,6 +586,14 @@ func (l *Log) searchEventsWithFilter(ctx context.Context, fromUTC, toUTC time.Ti return eventArr, lastKey, nil } +func (l *Log) ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured] { + return stream.Fail[*auditlogpb.ExportEventUnstructured](trace.NotImplemented("dynamoevents backend does not support streaming export")) +} + +func (l *Log) GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk] { + return stream.Fail[*auditlogpb.EventExportChunk](trace.NotImplemented("dynamoevents backend does not support streaming export")) +} + // ByTimeAndIndex sorts events by time // and if there are several session events with the same session by event index. type byTimeAndIndex []apievents.AuditEvent diff --git a/lib/events/filelog.go b/lib/events/filelog.go index 1b6dd2100f7e..1921a3e25eb1 100644 --- a/lib/events/filelog.go +++ b/lib/events/filelog.go @@ -35,6 +35,8 @@ import ( log "github.com/sirupsen/logrus" "github.com/gravitational/teleport" + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" + "github.com/gravitational/teleport/api/internalutils/stream" "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib/defaults" @@ -355,6 +357,14 @@ func (l *FileLog) SearchSessionEvents(ctx context.Context, req SearchSessionEven return events, lastKey, trace.Wrap(err) } +func (l *FileLog) ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured] { + return stream.Fail[*auditlogpb.ExportEventUnstructured](trace.NotImplemented("FileLog does not implement ExportUnstructuredEvents")) +} + +func (l *FileLog) GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk] { + return stream.Fail[*auditlogpb.EventExportChunk](trace.NotImplemented("FileLog does not implement GetEventExportChunks")) +} + type searchEventsFilter struct { eventTypes []string condition utils.FieldsCondition diff --git a/lib/events/firestoreevents/firestoreevents.go b/lib/events/firestoreevents/firestoreevents.go index 477c75997acf..8caea7ac46e7 100644 --- a/lib/events/firestoreevents/firestoreevents.go +++ b/lib/events/firestoreevents/firestoreevents.go @@ -36,6 +36,8 @@ import ( "github.com/gravitational/teleport" apidefaults "github.com/gravitational/teleport/api/defaults" + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" + "github.com/gravitational/teleport/api/internalutils/stream" "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" apiutils "github.com/gravitational/teleport/api/utils" @@ -556,6 +558,14 @@ func (l *Log) SearchSessionEvents(ctx context.Context, req events.SearchSessionE ) } +func (l *Log) ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured] { + return stream.Fail[*auditlogpb.ExportEventUnstructured](trace.NotImplemented("firestoreevents backend does not support streaming export")) +} + +func (l *Log) GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk] { + return stream.Fail[*auditlogpb.EventExportChunk](trace.NotImplemented("firestoreevents backend does not support streaming export")) +} + type searchEventsFilter struct { eventTypes []string condition utils.FieldsCondition diff --git a/lib/events/multilog.go b/lib/events/multilog.go index f7f22792b43e..72b281223e3e 100644 --- a/lib/events/multilog.go +++ b/lib/events/multilog.go @@ -18,9 +18,12 @@ package events import ( "context" + "io" "github.com/gravitational/trace" + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" + "github.com/gravitational/teleport/api/internalutils/stream" apievents "github.com/gravitational/teleport/api/types/events" ) @@ -75,6 +78,62 @@ func (m *MultiLog) SearchEvents(ctx context.Context, req SearchEventsRequest) (e return events, lastKey, err } +func (m *MultiLog) ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured] { + var foundImplemented bool + var pos int + // model our iteration through sub-loggers as a stream of streams that terminates after the first stream + // is reached that results in something other than a not implemented error, then flatten the stream of streams + // to produce the effect of creating a single stream of events originating solely from the first logger that + // implements the ExportUnstructuredEvents method. + return stream.Flatten(stream.Func(func() (stream.Stream[*auditlogpb.ExportEventUnstructured], error) { + if foundImplemented { + // an implementing stream has already been found and consumed. + return nil, io.EOF + } + if pos >= len(m.loggers) { + // we've reached the end of the list of loggers and none of them implement ExportUnstructuredEvents + return nil, trace.NotImplemented("no loggers implement ExportUnstructuredEvents") + } + log := m.loggers[pos] + pos++ + return stream.MapErr(log.ExportUnstructuredEvents(ctx, req), func(err error) error { + if trace.IsNotImplemented(err) { + return nil + } + foundImplemented = true + return err + }), nil + })) +} + +func (m *MultiLog) GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk] { + var foundImplemented bool + var pos int + // model our iteration through sub-loggers as a stream of streams that terminates after the first stream + // is reached that results in something other than a not implemented error, then flatten the stream of streams + // to produce the effect of creating a single stream of chunks originating solely from the first logger that + // implements the GetEventExportChunks method. + return stream.Flatten(stream.Func(func() (stream.Stream[*auditlogpb.EventExportChunk], error) { + if foundImplemented { + // an implementing stream has already been found and consumed. + return nil, io.EOF + } + if pos >= len(m.loggers) { + // we've reached the end of the list of loggers and none of them implement GetEventExportChunks + return nil, trace.NotImplemented("no loggers implement GetEventExportChunks") + } + log := m.loggers[pos] + pos++ + return stream.MapErr(log.GetEventExportChunks(ctx, req), func(err error) error { + if trace.IsNotImplemented(err) { + return nil + } + foundImplemented = true + return err + }), nil + })) +} + // SearchSessionEvents is a flexible way to find session events. // Only session.end and windows.desktop.session.end events are returned by this function. // This is used to find completed sessions. diff --git a/lib/events/pgevents/pgevents.go b/lib/events/pgevents/pgevents.go index 1c2e00a77735..4ef7bc7e24f3 100644 --- a/lib/events/pgevents/pgevents.go +++ b/lib/events/pgevents/pgevents.go @@ -29,6 +29,8 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/sirupsen/logrus" + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" + "github.com/gravitational/teleport/api/internalutils/stream" "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" pgcommon "github.com/gravitational/teleport/lib/backend/pgbk/common" @@ -506,6 +508,14 @@ func (l *Log) SearchEvents(ctx context.Context, req events.SearchEventsRequest) return l.searchEvents(ctx, req.From, req.To, req.EventTypes, emptyCond, emptySessionID, req.Limit, req.Order, req.StartKey) } +func (l *Log) ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured] { + return stream.Fail[*auditlogpb.ExportEventUnstructured](trace.NotImplemented("pgevents backend does not support streaming export")) +} + +func (l *Log) GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk] { + return stream.Fail[*auditlogpb.EventExportChunk](trace.NotImplemented("pgevents backend does not support streaming export")) +} + // SearchSessionEvents implements [events.AuditLogger]. func (l *Log) SearchSessionEvents(ctx context.Context, req events.SearchSessionEventsRequest) ([]apievents.AuditEvent, string, error) { sessionEndTypes := []string{events.SessionEndEvent, events.WindowsDesktopSessionEndEvent} diff --git a/lib/events/test/suite.go b/lib/events/test/suite.go index fa561b408bd7..53ae7745c23d 100644 --- a/lib/events/test/suite.go +++ b/lib/events/test/suite.go @@ -29,7 +29,9 @@ import ( "github.com/jonboulle/clockwork" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/timestamppb" + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/api/utils/retryutils" @@ -85,6 +87,128 @@ type EventsSuite struct { SearchSessionEvensBySessionIDTimeout time.Duration } +func (s *EventsSuite) EventExport(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + baseTime := time.Now().UTC() + + // initial state should contain no chunks + chunks := s.Log.GetEventExportChunks(ctx, &auditlogpb.GetEventExportChunksRequest{ + Date: timestamppb.New(baseTime), + }) + + require.False(t, chunks.Next()) + require.NoError(t, chunks.Done()) + + names := []string{"bob", "jack", "daisy", "evan"} + + // create an initial set of events that should all end up in the same chunk + for i, name := range names { + err := s.Log.EmitAuditEvent(context.Background(), &apievents.UserLogin{ + Method: events.LoginMethodSAML, + Status: apievents.Status{Success: true}, + UserMetadata: apievents.UserMetadata{User: name}, + Metadata: apievents.Metadata{ + ID: uuid.NewString(), + Type: events.UserLoginEvent, + Time: baseTime.Add(time.Duration(i)), + }, + }) + require.NoError(t, err) + } + + // wait for the events to be processed + require.EventuallyWithT(t, func(t *assert.CollectT) { + chunks := s.Log.GetEventExportChunks(ctx, &auditlogpb.GetEventExportChunksRequest{ + Date: timestamppb.New(baseTime), + }) + + var chunkCount, eventCount int + + for chunks.Next() { + chunkCount++ + + events := s.Log.ExportUnstructuredEvents(ctx, &auditlogpb.ExportUnstructuredEventsRequest{ + Date: timestamppb.New(baseTime), + Chunk: chunks.Item().Chunk, + }) + + for events.Next() { + eventCount++ + } + assert.NoError(t, events.Done()) + } + + assert.NoError(t, chunks.Done()) + + assert.Equal(t, 1, chunkCount) + assert.Equal(t, 4, eventCount) + }, 30*time.Second, 500*time.Millisecond) + + // add more events that should end up in a new chunk + for i, name := range names { + err := s.Log.EmitAuditEvent(context.Background(), &apievents.UserLogin{ + Method: events.LoginMethodSAML, + Status: apievents.Status{Success: true}, + UserMetadata: apievents.UserMetadata{User: name}, + Metadata: apievents.Metadata{ + ID: uuid.NewString(), + Type: events.UserLoginEvent, + Time: baseTime.Add(time.Duration(i + 4)), + }, + }) + require.NoError(t, err) + } + + // wait for the events to be processed + require.EventuallyWithT(t, func(t *assert.CollectT) { + chunks := s.Log.GetEventExportChunks(ctx, &auditlogpb.GetEventExportChunksRequest{ + Date: timestamppb.New(baseTime), + }) + + var chunkCount, eventCount int + + for chunks.Next() { + chunkCount++ + + events := s.Log.ExportUnstructuredEvents(ctx, &auditlogpb.ExportUnstructuredEventsRequest{ + Date: timestamppb.New(baseTime), + Chunk: chunks.Item().Chunk, + }) + + for events.Next() { + eventCount++ + } + assert.NoError(t, events.Done()) + } + + assert.NoError(t, chunks.Done()) + + assert.Equal(t, 2, chunkCount) + assert.Equal(t, 8, eventCount) + }, 30*time.Second, 500*time.Millisecond) + + // generate a random chunk and verify that it is not found + events := s.Log.ExportUnstructuredEvents(ctx, &auditlogpb.ExportUnstructuredEventsRequest{ + Date: timestamppb.New(baseTime), + Chunk: uuid.New().String(), + }) + + require.False(t, events.Next()) + + fixtures.AssertNotFound(t, events.Done()) + + // try a different day and verify that no chunks are found + chunks = s.Log.GetEventExportChunks(ctx, &auditlogpb.GetEventExportChunksRequest{ + Date: timestamppb.New(baseTime.AddDate(0, 0, 1)), + }) + + require.False(t, chunks.Next()) + + require.NoError(t, chunks.Done()) +} + // EventPagination covers event search pagination. func (s *EventsSuite) EventPagination(t *testing.T) { // This serves no special purpose except to make querying easier. @@ -124,7 +248,7 @@ func (s *EventsSuite) EventPagination(t *testing.T) { assert.NoError(t, err) assert.Len(t, arr, 4) assert.Empty(t, checkpoint) - }, 10*time.Second, 500*time.Millisecond) + }, 30*time.Second, 500*time.Millisecond) for _, name := range names { arr, checkpoint, err = s.Log.SearchEvents(ctx, events.SearchEventsRequest{ diff --git a/lib/events/writer.go b/lib/events/writer.go index 66c9f1588006..07f852620e28 100644 --- a/lib/events/writer.go +++ b/lib/events/writer.go @@ -23,6 +23,8 @@ import ( "github.com/gravitational/trace" "github.com/jonboulle/clockwork" + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" + "github.com/gravitational/teleport/api/internalutils/stream" apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib/utils" ) @@ -61,6 +63,14 @@ func (w *WriterLog) SearchEvents(ctx context.Context, req SearchEventsRequest) ( return nil, "", trace.NotImplemented(writerCannotRead) } +func (w *WriterLog) ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured] { + return stream.Fail[*auditlogpb.ExportEventUnstructured](trace.NotImplemented(writerCannotRead)) +} + +func (w *WriterLog) GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk] { + return stream.Fail[*auditlogpb.EventExportChunk](trace.NotImplemented(writerCannotRead)) +} + // SearchSessionEvents is a flexible way to find session events. // Only session.end and windows.desktop.session.end events are returned by this function. // This is used to find completed sessions. diff --git a/lib/integrations/externalauditstorage/error_counter.go b/lib/integrations/externalauditstorage/error_counter.go index a92387214c46..3cec198b9c1e 100644 --- a/lib/integrations/externalauditstorage/error_counter.go +++ b/lib/integrations/externalauditstorage/error_counter.go @@ -27,6 +27,8 @@ import ( "github.com/jonboulle/clockwork" "github.com/sirupsen/logrus" + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" + "github.com/gravitational/teleport/api/internalutils/stream" "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/lib/events" @@ -311,6 +313,20 @@ func (c *ErrorCountingLogger) SearchEvents(ctx context.Context, req events.Searc return events, key, err } +func (c *ErrorCountingLogger) ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured] { + return stream.MapErr(c.wrapped.ExportUnstructuredEvents(ctx, req), func(err error) error { + c.searches.observe(err) + return err + }) +} + +func (c *ErrorCountingLogger) GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk] { + return stream.MapErr(c.wrapped.GetEventExportChunks(ctx, req), func(err error) error { + c.searches.observe(err) + return err + }) +} + // SearchSessionEvents calls [c.wrapped.SearchSessionEvents] and counts the error or // success. func (c *ErrorCountingLogger) SearchSessionEvents(ctx context.Context, req events.SearchSessionEventsRequest) ([]apievents.AuditEvent, string, error) { diff --git a/tool/tctl/common/loadtest_command.go b/tool/tctl/common/loadtest_command.go index bdca232b665b..2aa72c816dee 100644 --- a/tool/tctl/common/loadtest_command.go +++ b/tool/tctl/common/loadtest_command.go @@ -31,7 +31,9 @@ import ( "github.com/google/uuid" "github.com/gravitational/trace" log "github.com/sirupsen/logrus" + "google.golang.org/protobuf/types/known/timestamppb" + auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/lib/auth/authclient" "github.com/gravitational/teleport/lib/cache" @@ -45,7 +47,8 @@ type LoadtestCommand struct { nodeHeartbeats *kingpin.CmdClause - watch *kingpin.CmdClause + watch *kingpin.CmdClause + auditEvents *kingpin.CmdClause count int churn int @@ -55,6 +58,9 @@ type LoadtestCommand struct { concurrency int kind string + + date string + cursor string } // Initialize allows LoadtestCommand to plug itself into the CLI parser @@ -74,6 +80,10 @@ func (c *LoadtestCommand) Initialize(app *kingpin.Application, config *servicecf c.watch = loadtest.Command("watch", "Monitor event stream").Hidden() c.watch.Flag("kind", "Resource kind(s) to watch, e.g. --kind=node,user,role").StringVar(&c.kind) + + c.auditEvents = loadtest.Command("export-audit-events", "Bulk export audit events").Hidden() + c.auditEvents.Flag("date", "Date to dump events for").StringVar(&c.date) + c.auditEvents.Flag("cursor", "Cursor to start from").StringVar(&c.cursor) } // TryRun takes the CLI command as an argument (like "loadtest node-heartbeats") and executes it. @@ -83,6 +93,8 @@ func (c *LoadtestCommand) TryRun(ctx context.Context, cmd string, client *authcl err = c.NodeHeartbeats(ctx, client) case c.watch.FullCommand(): err = c.Watch(ctx, client) + case c.auditEvents.FullCommand(): + err = c.AuditEvents(ctx, client) default: return false, nil } @@ -289,3 +301,117 @@ func printEvent(ekind string, rsc types.Resource) { fmt.Printf("%s: %s/%s\n", ekind, rsc.GetKind(), rsc.GetName()) } } + +func (c *LoadtestCommand) AuditEvents(ctx context.Context, client *authclient.Client) error { + date := time.Now() + if c.date != "" { + var err error + date, err = time.Parse("2006-01-02", c.date) + if err != nil { + return trace.Wrap(err) + } + } + + outch := make(chan *auditlogpb.ExportEventUnstructured, 1024) + defer close(outch) + + go func() { + for event := range outch { + s, err := utils.FastMarshal(event.Event.Unstructured) + if err != nil { + panic(err) + } + fmt.Println(string(s)) + } + }() + + chunksProcessed := make(map[string]struct{}) + +Outer: + for { + chunks := client.GetEventExportChunks(ctx, &auditlogpb.GetEventExportChunksRequest{ + Date: timestamppb.New(date), + }) + + Chunks: + for chunks.Next() { + if _, ok := chunksProcessed[chunks.Item().Chunk]; ok { + log.WithFields(log.Fields{ + "date": date.Format(time.DateOnly), + "chunk": chunks.Item().Chunk, + }).Info("skipping already processed chunk") + continue Chunks + } + + var cursor string + ProcessChunk: + for { + + eventStream := client.ExportUnstructuredEvents(ctx, &auditlogpb.ExportUnstructuredEventsRequest{ + Date: timestamppb.New(date), + Chunk: chunks.Item().Chunk, + Cursor: cursor, + }) + + Events: + for eventStream.Next() { + cursor = eventStream.Item().Cursor + select { + case outch <- eventStream.Item(): + continue Events + default: + log.Warn("backpressure in event stream") + } + + select { + case outch <- eventStream.Item(): + case <-ctx.Done(): + return nil + } + } + + if err := eventStream.Done(); err != nil { + log.WithFields(log.Fields{ + "date": date.Format(time.DateOnly), + "chunk": chunks.Item().Chunk, + "error": err, + }).Error("event stream failed, will attempt to reestablish") + continue ProcessChunk + } + + chunksProcessed[chunks.Item().Chunk] = struct{}{} + break ProcessChunk + } + } + + if err := chunks.Done(); err != nil { + log.WithFields(log.Fields{ + "date": date.Format(time.DateOnly), + "error": err, + }).Error("event chunk stream failed, will attempt to reestablish") + continue Outer + } + + nextDate := date.AddDate(0, 0, 1) + if nextDate.After(time.Now()) { + delay := utils.SeventhJitter(time.Second * 7) + log.WithFields(log.Fields{ + "date": date.Format(time.DateOnly), + "delay": delay, + }).Info("finished processing known event chunks for current date, will re-poll after delay") + select { + case <-time.After(delay): + case <-ctx.Done(): + return nil + } + continue Outer + } + + log.WithFields(log.Fields{ + "date": date.Format(time.DateOnly), + "next": nextDate.Format(time.DateOnly), + }).Info("finished processing known event chunks for historical date, moving to next") + date = nextDate + clear(chunksProcessed) + } +}