From b8591f185345e149b8d10b4b00f8e8a41d74f0f9 Mon Sep 17 00:00:00 2001 From: Nate Broyles Date: Tue, 29 Sep 2020 10:17:35 -0400 Subject: [PATCH 1/5] [dbnode][query] Add ability to set and retrieve AggregationOptions via namespace APis --- .../generated/proto/namespace/namespace.pb.go | 805 ++++++++++++++++-- .../generated/proto/namespace/namespace.proto | 35 + src/dbnode/namespace/aggregation.go | 90 ++ src/dbnode/namespace/aggregation_test.go | 59 ++ src/dbnode/namespace/convert.go | 59 +- src/dbnode/namespace/convert_test.go | 59 +- src/dbnode/namespace/namespace_mock.go | 93 ++ src/dbnode/namespace/options.go | 19 +- src/dbnode/namespace/types.go | 47 + .../api/v1/handler/database/create_test.go | 6 + src/query/api/v1/handler/namespace/add.go | 7 +- .../api/v1/handler/namespace/add_test.go | 1 + src/query/api/v1/handler/namespace/common.go | 36 + .../api/v1/handler/namespace/common_test.go | 41 + .../api/v1/handler/namespace/get_test.go | 2 + src/query/api/v1/handler/namespace/update.go | 31 +- .../api/v1/handler/namespace/update_test.go | 24 + 17 files changed, 1350 insertions(+), 64 deletions(-) create mode 100644 src/dbnode/namespace/aggregation.go create mode 100644 src/dbnode/namespace/aggregation_test.go diff --git a/src/dbnode/generated/proto/namespace/namespace.pb.go b/src/dbnode/generated/proto/namespace/namespace.pb.go index 8cb1d8aa68..4344675ded 100644 --- a/src/dbnode/generated/proto/namespace/namespace.pb.go +++ b/src/dbnode/generated/proto/namespace/namespace.pb.go @@ -32,6 +32,10 @@ RetentionOptions IndexOptions NamespaceOptions + AggregationOptions + Aggregation + AggregatedAttributes + DownsampleOptions Registry NamespaceRuntimeOptions SchemaOptions @@ -159,6 +163,7 @@ type NamespaceOptions struct { ColdWritesEnabled bool `protobuf:"varint,10,opt,name=coldWritesEnabled,proto3" json:"coldWritesEnabled,omitempty"` RuntimeOptions *NamespaceRuntimeOptions `protobuf:"bytes,11,opt,name=runtimeOptions" json:"runtimeOptions,omitempty"` CacheBlocksOnRetrieve *google_protobuf.BoolValue `protobuf:"bytes,12,opt,name=cacheBlocksOnRetrieve" json:"cacheBlocksOnRetrieve,omitempty"` + AggregationOptions *AggregationOptions `protobuf:"bytes,13,opt,name=aggregationOptions" json:"aggregationOptions,omitempty"` } func (m *NamespaceOptions) Reset() { *m = NamespaceOptions{} } @@ -250,6 +255,107 @@ func (m *NamespaceOptions) GetCacheBlocksOnRetrieve() *google_protobuf.BoolValue return nil } +func (m *NamespaceOptions) GetAggregationOptions() *AggregationOptions { + if m != nil { + return m.AggregationOptions + } + return nil +} + +// AggregationOptions is a set of options for aggregating data +// within the namespace. +type AggregationOptions struct { + // aggregations is a repeated field to support the ability to send aggregated data + // to a namespace also receiving unaggregated data. In this case, the namespace will + // have one Aggregation with aggregated set to false and another with aggregated set to true. + Aggregations []*Aggregation `protobuf:"bytes,1,rep,name=aggregations" json:"aggregations,omitempty"` +} + +func (m *AggregationOptions) Reset() { *m = AggregationOptions{} } +func (m *AggregationOptions) String() string { return proto.CompactTextString(m) } +func (*AggregationOptions) ProtoMessage() {} +func (*AggregationOptions) Descriptor() ([]byte, []int) { return fileDescriptorNamespace, []int{3} } + +func (m *AggregationOptions) GetAggregations() []*Aggregation { + if m != nil { + return m.Aggregations + } + return nil +} + +// Aggregation describes data points within the namespace. +type Aggregation struct { + // aggregated is true if data points are aggregated, false otherwise. + Aggregated bool `protobuf:"varint,1,opt,name=aggregated,proto3" json:"aggregated,omitempty"` + // attributes specifies details for how to aggregate data when aggregated is set to true. + // This field is ignored when aggregated is false. + Attributes *AggregatedAttributes `protobuf:"bytes,2,opt,name=attributes" json:"attributes,omitempty"` +} + +func (m *Aggregation) Reset() { *m = Aggregation{} } +func (m *Aggregation) String() string { return proto.CompactTextString(m) } +func (*Aggregation) ProtoMessage() {} +func (*Aggregation) Descriptor() ([]byte, []int) { return fileDescriptorNamespace, []int{4} } + +func (m *Aggregation) GetAggregated() bool { + if m != nil { + return m.Aggregated + } + return false +} + +func (m *Aggregation) GetAttributes() *AggregatedAttributes { + if m != nil { + return m.Attributes + } + return nil +} + +// AggregatedAttributes describe how to aggregate data. +type AggregatedAttributes struct { + // resolutionNanos is the time range to aggregate data across. + ResolutionNanos int64 `protobuf:"varint,1,opt,name=resolutionNanos,proto3" json:"resolutionNanos,omitempty"` + DownsampleOptions *DownsampleOptions `protobuf:"bytes,2,opt,name=downsampleOptions" json:"downsampleOptions,omitempty"` +} + +func (m *AggregatedAttributes) Reset() { *m = AggregatedAttributes{} } +func (m *AggregatedAttributes) String() string { return proto.CompactTextString(m) } +func (*AggregatedAttributes) ProtoMessage() {} +func (*AggregatedAttributes) Descriptor() ([]byte, []int) { return fileDescriptorNamespace, []int{5} } + +func (m *AggregatedAttributes) GetResolutionNanos() int64 { + if m != nil { + return m.ResolutionNanos + } + return 0 +} + +func (m *AggregatedAttributes) GetDownsampleOptions() *DownsampleOptions { + if m != nil { + return m.DownsampleOptions + } + return nil +} + +// DownsampleOptions is a set of options related to downsampling data. +type DownsampleOptions struct { + // all indicates whether to send data points to this namespace. If false, + // data points must be sent via rollup/recording rules. Defaults to true. + All bool `protobuf:"varint,1,opt,name=all,proto3" json:"all,omitempty"` +} + +func (m *DownsampleOptions) Reset() { *m = DownsampleOptions{} } +func (m *DownsampleOptions) String() string { return proto.CompactTextString(m) } +func (*DownsampleOptions) ProtoMessage() {} +func (*DownsampleOptions) Descriptor() ([]byte, []int) { return fileDescriptorNamespace, []int{6} } + +func (m *DownsampleOptions) GetAll() bool { + if m != nil { + return m.All + } + return false +} + type Registry struct { Namespaces map[string]*NamespaceOptions `protobuf:"bytes,1,rep,name=namespaces" json:"namespaces,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value"` } @@ -257,7 +363,7 @@ type Registry struct { func (m *Registry) Reset() { *m = Registry{} } func (m *Registry) String() string { return proto.CompactTextString(m) } func (*Registry) ProtoMessage() {} -func (*Registry) Descriptor() ([]byte, []int) { return fileDescriptorNamespace, []int{3} } +func (*Registry) Descriptor() ([]byte, []int) { return fileDescriptorNamespace, []int{7} } func (m *Registry) GetNamespaces() map[string]*NamespaceOptions { if m != nil { @@ -274,7 +380,7 @@ type NamespaceRuntimeOptions struct { func (m *NamespaceRuntimeOptions) Reset() { *m = NamespaceRuntimeOptions{} } func (m *NamespaceRuntimeOptions) String() string { return proto.CompactTextString(m) } func (*NamespaceRuntimeOptions) ProtoMessage() {} -func (*NamespaceRuntimeOptions) Descriptor() ([]byte, []int) { return fileDescriptorNamespace, []int{4} } +func (*NamespaceRuntimeOptions) Descriptor() ([]byte, []int) { return fileDescriptorNamespace, []int{8} } func (m *NamespaceRuntimeOptions) GetWriteIndexingPerCPUConcurrency() *google_protobuf.DoubleValue { if m != nil { @@ -294,6 +400,10 @@ func init() { proto.RegisterType((*RetentionOptions)(nil), "namespace.RetentionOptions") proto.RegisterType((*IndexOptions)(nil), "namespace.IndexOptions") proto.RegisterType((*NamespaceOptions)(nil), "namespace.NamespaceOptions") + proto.RegisterType((*AggregationOptions)(nil), "namespace.AggregationOptions") + proto.RegisterType((*Aggregation)(nil), "namespace.Aggregation") + proto.RegisterType((*AggregatedAttributes)(nil), "namespace.AggregatedAttributes") + proto.RegisterType((*DownsampleOptions)(nil), "namespace.DownsampleOptions") proto.RegisterType((*Registry)(nil), "namespace.Registry") proto.RegisterType((*NamespaceRuntimeOptions)(nil), "namespace.NamespaceRuntimeOptions") } @@ -523,6 +633,145 @@ func (m *NamespaceOptions) MarshalTo(dAtA []byte) (int, error) { } i += n5 } + if m.AggregationOptions != nil { + dAtA[i] = 0x6a + i++ + i = encodeVarintNamespace(dAtA, i, uint64(m.AggregationOptions.Size())) + n6, err := m.AggregationOptions.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n6 + } + return i, nil +} + +func (m *AggregationOptions) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *AggregationOptions) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Aggregations) > 0 { + for _, msg := range m.Aggregations { + dAtA[i] = 0xa + i++ + i = encodeVarintNamespace(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func (m *Aggregation) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Aggregation) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Aggregated { + dAtA[i] = 0x8 + i++ + if m.Aggregated { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i++ + } + if m.Attributes != nil { + dAtA[i] = 0x12 + i++ + i = encodeVarintNamespace(dAtA, i, uint64(m.Attributes.Size())) + n7, err := m.Attributes.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n7 + } + return i, nil +} + +func (m *AggregatedAttributes) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *AggregatedAttributes) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.ResolutionNanos != 0 { + dAtA[i] = 0x8 + i++ + i = encodeVarintNamespace(dAtA, i, uint64(m.ResolutionNanos)) + } + if m.DownsampleOptions != nil { + dAtA[i] = 0x12 + i++ + i = encodeVarintNamespace(dAtA, i, uint64(m.DownsampleOptions.Size())) + n8, err := m.DownsampleOptions.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n8 + } + return i, nil +} + +func (m *DownsampleOptions) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *DownsampleOptions) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.All { + dAtA[i] = 0x8 + i++ + if m.All { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i++ + } return i, nil } @@ -561,11 +810,11 @@ func (m *Registry) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x12 i++ i = encodeVarintNamespace(dAtA, i, uint64(v.Size())) - n6, err := v.MarshalTo(dAtA[i:]) + n9, err := v.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n6 + i += n9 } } } @@ -591,21 +840,21 @@ func (m *NamespaceRuntimeOptions) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintNamespace(dAtA, i, uint64(m.WriteIndexingPerCPUConcurrency.Size())) - n7, err := m.WriteIndexingPerCPUConcurrency.MarshalTo(dAtA[i:]) + n10, err := m.WriteIndexingPerCPUConcurrency.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n7 + i += n10 } if m.FlushIndexingPerCPUConcurrency != nil { dAtA[i] = 0x12 i++ i = encodeVarintNamespace(dAtA, i, uint64(m.FlushIndexingPerCPUConcurrency.Size())) - n8, err := m.FlushIndexingPerCPUConcurrency.MarshalTo(dAtA[i:]) + n11, err := m.FlushIndexingPerCPUConcurrency.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n8 + i += n11 } return i, nil } @@ -702,6 +951,57 @@ func (m *NamespaceOptions) Size() (n int) { l = m.CacheBlocksOnRetrieve.Size() n += 1 + l + sovNamespace(uint64(l)) } + if m.AggregationOptions != nil { + l = m.AggregationOptions.Size() + n += 1 + l + sovNamespace(uint64(l)) + } + return n +} + +func (m *AggregationOptions) Size() (n int) { + var l int + _ = l + if len(m.Aggregations) > 0 { + for _, e := range m.Aggregations { + l = e.Size() + n += 1 + l + sovNamespace(uint64(l)) + } + } + return n +} + +func (m *Aggregation) Size() (n int) { + var l int + _ = l + if m.Aggregated { + n += 2 + } + if m.Attributes != nil { + l = m.Attributes.Size() + n += 1 + l + sovNamespace(uint64(l)) + } + return n +} + +func (m *AggregatedAttributes) Size() (n int) { + var l int + _ = l + if m.ResolutionNanos != 0 { + n += 1 + sovNamespace(uint64(m.ResolutionNanos)) + } + if m.DownsampleOptions != nil { + l = m.DownsampleOptions.Size() + n += 1 + l + sovNamespace(uint64(l)) + } + return n +} + +func (m *DownsampleOptions) Size() (n int) { + var l int + _ = l + if m.All { + n += 2 + } return n } @@ -1358,6 +1658,395 @@ func (m *NamespaceOptions) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 13: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field AggregationOptions", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNamespace + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthNamespace + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.AggregationOptions == nil { + m.AggregationOptions = &AggregationOptions{} + } + if err := m.AggregationOptions.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipNamespace(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthNamespace + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *AggregationOptions) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNamespace + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: AggregationOptions: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: AggregationOptions: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Aggregations", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNamespace + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthNamespace + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Aggregations = append(m.Aggregations, &Aggregation{}) + if err := m.Aggregations[len(m.Aggregations)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipNamespace(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthNamespace + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Aggregation) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNamespace + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Aggregation: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Aggregation: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Aggregated", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNamespace + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Aggregated = bool(v != 0) + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Attributes", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNamespace + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthNamespace + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Attributes == nil { + m.Attributes = &AggregatedAttributes{} + } + if err := m.Attributes.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipNamespace(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthNamespace + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *AggregatedAttributes) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNamespace + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: AggregatedAttributes: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: AggregatedAttributes: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ResolutionNanos", wireType) + } + m.ResolutionNanos = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNamespace + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ResolutionNanos |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DownsampleOptions", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNamespace + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthNamespace + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.DownsampleOptions == nil { + m.DownsampleOptions = &DownsampleOptions{} + } + if err := m.DownsampleOptions.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipNamespace(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthNamespace + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *DownsampleOptions) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNamespace + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DownsampleOptions: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DownsampleOptions: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field All", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowNamespace + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.All = bool(v != 0) default: iNdEx = preIndex skippy, err := skipNamespace(dAtA[iNdEx:]) @@ -1778,50 +2467,58 @@ func init() { } var fileDescriptorNamespace = []byte{ - // 715 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0xdd, 0x6a, 0xdb, 0x4a, - 0x10, 0x3e, 0xb2, 0xf3, 0xe3, 0xac, 0x9d, 0xc4, 0x67, 0x39, 0x87, 0x18, 0x9f, 0x83, 0x09, 0x6e, - 0x29, 0xa6, 0x14, 0x8b, 0x3a, 0x37, 0xa5, 0x85, 0x40, 0xe2, 0xa4, 0xa1, 0xa5, 0x24, 0x66, 0xd3, - 0x1f, 0xc8, 0xdd, 0x4a, 0x1a, 0xcb, 0x22, 0xd2, 0xae, 0xd8, 0x5d, 0x25, 0x71, 0x9f, 0xa1, 0x17, - 0xbd, 0xee, 0x2b, 0xf4, 0x45, 0x7a, 0xd9, 0x47, 0x28, 0x29, 0x7d, 0x8f, 0xa2, 0xdd, 0xca, 0x91, - 0xe4, 0x24, 0x0d, 0xbd, 0x31, 0xeb, 0x6f, 0xbe, 0x99, 0x6f, 0x34, 0xdf, 0xec, 0xa2, 0x03, 0x3f, - 0x50, 0x93, 0xc4, 0xe9, 0xbb, 0x3c, 0xb2, 0xa3, 0x2d, 0xcf, 0xb1, 0xa3, 0x2d, 0x5b, 0x0a, 0xd7, - 0xf6, 0x1c, 0xc6, 0x3d, 0xb0, 0x7d, 0x60, 0x20, 0xa8, 0x02, 0xcf, 0x8e, 0x05, 0x57, 0xdc, 0x66, - 0x34, 0x02, 0x19, 0x53, 0x17, 0xae, 0x4e, 0x7d, 0x1d, 0xc1, 0x2b, 0x33, 0xa0, 0xdd, 0xf1, 0x39, - 0xf7, 0x43, 0x30, 0x29, 0x4e, 0x32, 0xb6, 0xcf, 0x05, 0x8d, 0x63, 0x10, 0xd2, 0x50, 0xdb, 0x7b, - 0x7f, 0xaa, 0x29, 0xdd, 0x09, 0x44, 0xd4, 0x54, 0xe9, 0x7e, 0xa8, 0xa2, 0x26, 0x01, 0x05, 0x4c, - 0x05, 0x9c, 0x1d, 0xc5, 0xe9, 0xaf, 0xc4, 0x03, 0xf4, 0x8f, 0xc8, 0xb0, 0x11, 0x88, 0x80, 0x7b, - 0x87, 0x94, 0x71, 0xd9, 0xb2, 0x36, 0xad, 0x5e, 0x95, 0x5c, 0x1b, 0xc3, 0x0f, 0xd0, 0x9a, 0x13, - 0x72, 0xf7, 0xf4, 0x38, 0x78, 0x0f, 0x86, 0x5d, 0xd1, 0xec, 0x12, 0x8a, 0x1f, 0xa1, 0xbf, 0x9d, - 0x64, 0x3c, 0x06, 0xf1, 0x3c, 0x51, 0x89, 0xf8, 0x45, 0xad, 0x6a, 0xea, 0x7c, 0x00, 0xf7, 0xd0, - 0xba, 0x01, 0x47, 0x54, 0x2a, 0xc3, 0x5d, 0xd0, 0xdc, 0x32, 0xac, 0x99, 0xa9, 0xd2, 0x1e, 0x55, - 0x74, 0xff, 0x22, 0x0e, 0xc4, 0xb4, 0xb5, 0xb8, 0x69, 0xf5, 0x6a, 0xa4, 0x0c, 0xe3, 0x13, 0xd4, - 0x2b, 0x41, 0x3b, 0x63, 0x05, 0xe2, 0x90, 0xab, 0x1d, 0xd7, 0x05, 0x29, 0xf3, 0x5f, 0xbc, 0xa4, - 0xc5, 0xee, 0xcc, 0xc7, 0xdb, 0xa8, 0x3d, 0xd6, 0xed, 0x93, 0xeb, 0xe6, 0xb7, 0xac, 0xab, 0xdd, - 0xc2, 0xe8, 0x8e, 0x50, 0xe3, 0x05, 0xf3, 0xe0, 0x22, 0x73, 0xa2, 0x85, 0x96, 0x81, 0x51, 0x27, - 0x04, 0x4f, 0x0f, 0xbf, 0x46, 0xb2, 0xbf, 0x77, 0x9d, 0x77, 0xf7, 0xd3, 0x22, 0x6a, 0x1e, 0x66, - 0xde, 0x67, 0x65, 0x1f, 0xa2, 0xa6, 0xc3, 0xb9, 0x92, 0x4a, 0xd0, 0x78, 0xbf, 0x50, 0x7f, 0x0e, - 0xc7, 0x5d, 0xd4, 0x18, 0x87, 0x89, 0x9c, 0x64, 0xbc, 0x8a, 0xe6, 0x15, 0xb0, 0xd4, 0xd4, 0x73, - 0x11, 0x28, 0x90, 0xaf, 0xf9, 0x90, 0x47, 0x51, 0xa0, 0x5e, 0x71, 0x5f, 0x9b, 0x5a, 0x23, 0xf3, - 0x81, 0xb4, 0x75, 0x37, 0x04, 0xca, 0x92, 0x99, 0xf6, 0x82, 0xa6, 0x96, 0x50, 0x7c, 0x1f, 0xad, - 0x0a, 0x88, 0x69, 0x20, 0x32, 0x9a, 0x31, 0xb4, 0x08, 0xe2, 0x03, 0xd4, 0x14, 0xa5, 0x05, 0xd6, - 0xb6, 0xd5, 0x07, 0xff, 0xf5, 0xaf, 0xae, 0x57, 0x79, 0xc7, 0xc9, 0x5c, 0x52, 0xba, 0x41, 0x92, - 0xd1, 0x58, 0x4e, 0xb8, 0xca, 0x04, 0x97, 0xcd, 0x06, 0x95, 0x60, 0xfc, 0x0c, 0x35, 0x82, 0x9c, - 0x4b, 0xad, 0x9a, 0x96, 0xdb, 0xc8, 0xc9, 0xe5, 0x4d, 0x24, 0x05, 0x32, 0xde, 0x46, 0xab, 0xe6, - 0x06, 0x66, 0xd9, 0x2b, 0x3a, 0xbb, 0x95, 0xcb, 0x3e, 0xce, 0xc7, 0x49, 0x91, 0x9e, 0xce, 0xda, - 0xe5, 0xa1, 0xf7, 0x4e, 0x8f, 0x35, 0x6b, 0x14, 0x99, 0x59, 0xcf, 0x05, 0xf0, 0x4b, 0xb4, 0x26, - 0x12, 0xa6, 0x82, 0x28, 0xf3, 0xbe, 0x55, 0xd7, 0x72, 0xdd, 0x9c, 0xdc, 0x6c, 0x3d, 0x48, 0x81, - 0x49, 0x4a, 0x99, 0x78, 0x84, 0xfe, 0x75, 0xa9, 0x3b, 0x81, 0xdd, 0x74, 0xc3, 0xe4, 0x11, 0x23, - 0xa0, 0x44, 0x00, 0x67, 0xd0, 0x6a, 0xe8, 0x92, 0xed, 0xbe, 0x79, 0xb1, 0xfa, 0xd9, 0x8b, 0xd5, - 0xdf, 0xe5, 0x3c, 0x7c, 0x4b, 0xc3, 0x04, 0xc8, 0xf5, 0x89, 0xdd, 0xcf, 0x16, 0xaa, 0x11, 0xf0, - 0x03, 0xa9, 0xc4, 0x14, 0x0f, 0x11, 0x9a, 0xf5, 0x94, 0xbe, 0x35, 0xd5, 0x5e, 0x7d, 0x70, 0xaf, - 0x60, 0xa1, 0x21, 0x5e, 0xf5, 0x2b, 0xf7, 0x99, 0x12, 0x53, 0x92, 0x4b, 0x6b, 0x9f, 0xa0, 0xf5, - 0x52, 0x18, 0x37, 0x51, 0xf5, 0x14, 0xa6, 0x7a, 0xbf, 0x57, 0x48, 0x7a, 0xc4, 0x8f, 0xd1, 0xe2, - 0x59, 0xda, 0x96, 0xde, 0xe5, 0xe2, 0x9e, 0x94, 0xaf, 0x0a, 0x31, 0xcc, 0xa7, 0x95, 0x27, 0x56, - 0xf7, 0x87, 0x85, 0x36, 0x6e, 0x98, 0x15, 0xf6, 0x50, 0x47, 0x2f, 0xba, 0x36, 0x3e, 0x60, 0xfe, - 0x08, 0xc4, 0x70, 0xf4, 0x66, 0xc8, 0x99, 0x9b, 0x08, 0x01, 0xcc, 0x35, 0xfa, 0xf5, 0xc1, 0xff, - 0x73, 0x43, 0xda, 0xe3, 0x89, 0x13, 0x82, 0x19, 0xd3, 0x6f, 0x6a, 0xa4, 0x2a, 0xfa, 0xde, 0xdd, - 0xac, 0x52, 0xb9, 0x8b, 0xca, 0xed, 0x35, 0x76, 0x9b, 0x5f, 0x2e, 0x3b, 0xd6, 0xd7, 0xcb, 0x8e, - 0xf5, 0xed, 0xb2, 0x63, 0x7d, 0xfc, 0xde, 0xf9, 0xcb, 0x59, 0xd2, 0x75, 0xb6, 0x7e, 0x06, 0x00, - 0x00, 0xff, 0xff, 0xd5, 0x7d, 0x11, 0xd6, 0xe8, 0x06, 0x00, 0x00, + // 848 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x94, 0xcd, 0x6e, 0xdb, 0x46, + 0x10, 0xc7, 0x4b, 0x2b, 0x8e, 0xe5, 0x91, 0x9c, 0xc8, 0x8b, 0xb4, 0x11, 0x94, 0x56, 0x0d, 0xd8, + 0x0f, 0x08, 0x45, 0x21, 0xa1, 0xf6, 0xa5, 0x48, 0x81, 0x14, 0xfe, 0x6a, 0xd0, 0xa0, 0x75, 0x84, + 0x4d, 0x3f, 0x80, 0xdc, 0x96, 0xe4, 0x88, 0x22, 0x42, 0xee, 0x12, 0xbb, 0xcb, 0x38, 0xee, 0x33, + 0xe4, 0xd0, 0xf7, 0xe8, 0x8b, 0xf4, 0xd8, 0x47, 0x28, 0x5c, 0xf4, 0xd0, 0xb7, 0x28, 0xb8, 0x6b, + 0xca, 0x2b, 0x92, 0x71, 0x8d, 0x5e, 0x04, 0x6a, 0xe6, 0x37, 0xf3, 0x9f, 0x9d, 0x99, 0x5d, 0x78, + 0x12, 0x27, 0x7a, 0x59, 0x04, 0xd3, 0x50, 0x64, 0xb3, 0x6c, 0x3f, 0x0a, 0x66, 0xd9, 0xfe, 0x4c, + 0xc9, 0x70, 0x16, 0x05, 0x5c, 0x44, 0x38, 0x8b, 0x91, 0xa3, 0x64, 0x1a, 0xa3, 0x59, 0x2e, 0x85, + 0x16, 0x33, 0xce, 0x32, 0x54, 0x39, 0x0b, 0xf1, 0xea, 0x6b, 0x6a, 0x3c, 0x64, 0x7b, 0x65, 0x18, + 0x8d, 0x63, 0x21, 0xe2, 0x14, 0x6d, 0x48, 0x50, 0x2c, 0x66, 0x67, 0x92, 0xe5, 0x39, 0x4a, 0x65, + 0xd1, 0xd1, 0xf1, 0xff, 0xd5, 0x54, 0xe1, 0x12, 0x33, 0x66, 0xb3, 0xf8, 0x6f, 0x3a, 0x30, 0xa0, + 0xa8, 0x91, 0xeb, 0x44, 0xf0, 0x67, 0x79, 0xf9, 0xab, 0xc8, 0x1e, 0xdc, 0x93, 0x95, 0x6d, 0x8e, + 0x32, 0x11, 0xd1, 0x29, 0xe3, 0x42, 0x0d, 0xbd, 0x87, 0xde, 0xa4, 0x43, 0x5b, 0x7d, 0xe4, 0x53, + 0xb8, 0x13, 0xa4, 0x22, 0x7c, 0xf9, 0x3c, 0xf9, 0x05, 0x2d, 0xbd, 0x61, 0xe8, 0x9a, 0x95, 0x7c, + 0x0e, 0xbb, 0x41, 0xb1, 0x58, 0xa0, 0xfc, 0xa6, 0xd0, 0x85, 0xbc, 0x44, 0x3b, 0x06, 0x6d, 0x3a, + 0xc8, 0x04, 0xee, 0x5a, 0xe3, 0x9c, 0x29, 0x6d, 0xd9, 0x5b, 0x86, 0xad, 0x9b, 0x0d, 0x59, 0x2a, + 0x1d, 0x33, 0xcd, 0x4e, 0x5e, 0xe7, 0x89, 0x3c, 0x1f, 0x6e, 0x3e, 0xf4, 0x26, 0x5d, 0x5a, 0x37, + 0x93, 0x17, 0x30, 0xa9, 0x99, 0x0e, 0x16, 0x1a, 0xe5, 0xa9, 0xd0, 0x07, 0x61, 0x88, 0x4a, 0xb9, + 0x27, 0xbe, 0x6d, 0xc4, 0x6e, 0xcc, 0x93, 0xc7, 0x30, 0x5a, 0x98, 0xf2, 0x69, 0x5b, 0xff, 0xb6, + 0x4c, 0xb6, 0x6b, 0x08, 0x7f, 0x0e, 0xfd, 0x6f, 0x79, 0x84, 0xaf, 0xab, 0x49, 0x0c, 0x61, 0x0b, + 0x39, 0x0b, 0x52, 0x8c, 0x4c, 0xf3, 0xbb, 0xb4, 0xfa, 0x7b, 0xd3, 0x7e, 0xfb, 0xff, 0x6c, 0xc2, + 0xe0, 0xb4, 0x9a, 0x7d, 0x95, 0xf6, 0x33, 0x18, 0x04, 0x42, 0x68, 0xa5, 0x25, 0xcb, 0x4f, 0xd6, + 0xf2, 0x37, 0xec, 0xc4, 0x87, 0xfe, 0x22, 0x2d, 0xd4, 0xb2, 0xe2, 0x36, 0x0c, 0xb7, 0x66, 0x2b, + 0x87, 0x7a, 0x26, 0x13, 0x8d, 0xea, 0x07, 0x71, 0x24, 0xb2, 0x2c, 0xd1, 0xdf, 0x89, 0xd8, 0x0c, + 0xb5, 0x4b, 0x9b, 0x8e, 0xb2, 0xf4, 0x30, 0x45, 0xc6, 0x8b, 0x95, 0xf6, 0x2d, 0x83, 0xd6, 0xac, + 0xe4, 0x63, 0xd8, 0x91, 0x98, 0xb3, 0x44, 0x56, 0x98, 0x1d, 0xe8, 0xba, 0x91, 0x3c, 0x81, 0x81, + 0xac, 0x2d, 0xb0, 0x19, 0x5b, 0x6f, 0xef, 0xc1, 0xf4, 0xea, 0x7a, 0xd5, 0x77, 0x9c, 0x36, 0x82, + 0xca, 0x0d, 0x52, 0x9c, 0xe5, 0x6a, 0x29, 0x74, 0x25, 0xb8, 0x65, 0x37, 0xa8, 0x66, 0x26, 0x5f, + 0x41, 0x3f, 0x71, 0xa6, 0x34, 0xec, 0x1a, 0xb9, 0xfb, 0x8e, 0x9c, 0x3b, 0x44, 0xba, 0x06, 0x93, + 0xc7, 0xb0, 0x63, 0x6f, 0x60, 0x15, 0xbd, 0x6d, 0xa2, 0x87, 0x4e, 0xf4, 0x73, 0xd7, 0x4f, 0xd7, + 0xf1, 0xb2, 0xd7, 0xa1, 0x48, 0xa3, 0x9f, 0x4d, 0x5b, 0xab, 0x42, 0xc1, 0xf6, 0xba, 0xe1, 0x20, + 0x4f, 0xe1, 0x8e, 0x2c, 0xb8, 0x4e, 0xb2, 0x6a, 0xf6, 0xc3, 0x9e, 0x91, 0xf3, 0x1d, 0xb9, 0xd5, + 0x7a, 0xd0, 0x35, 0x92, 0xd6, 0x22, 0xc9, 0x1c, 0xde, 0x0d, 0x59, 0xb8, 0xc4, 0xc3, 0x72, 0xc3, + 0xd4, 0x33, 0x4e, 0x51, 0xcb, 0x04, 0x5f, 0xe1, 0xb0, 0x6f, 0x52, 0x8e, 0xa6, 0xf6, 0xc5, 0x9a, + 0x56, 0x2f, 0xd6, 0xf4, 0x50, 0x88, 0xf4, 0x27, 0x96, 0x16, 0x48, 0xdb, 0x03, 0xc9, 0xf7, 0x40, + 0x58, 0x1c, 0x4b, 0x8c, 0x99, 0x3b, 0xbd, 0x1d, 0x93, 0xee, 0x03, 0xa7, 0xc2, 0x83, 0x06, 0x44, + 0x5b, 0x02, 0xfd, 0x39, 0x90, 0x26, 0x49, 0x1e, 0x41, 0xdf, 0x61, 0xcb, 0x57, 0xac, 0x33, 0xe9, + 0xed, 0xbd, 0xd7, 0x9e, 0x9e, 0xae, 0xb1, 0x3e, 0x87, 0x9e, 0xe3, 0x24, 0x63, 0x80, 0xca, 0xbd, + 0xba, 0x31, 0x8e, 0x85, 0x7c, 0x0d, 0xc0, 0xb4, 0x96, 0x49, 0x50, 0x68, 0xb4, 0x17, 0xb2, 0xb7, + 0xf7, 0x61, 0x8b, 0x10, 0x46, 0x07, 0x2b, 0x8c, 0x3a, 0x21, 0xfe, 0x1b, 0x0f, 0xee, 0xb5, 0x41, + 0xe5, 0x72, 0x4a, 0x54, 0x22, 0x2d, 0xca, 0x3a, 0xdc, 0xd7, 0xb8, 0x6e, 0x26, 0x4f, 0x61, 0x37, + 0x12, 0x67, 0x5c, 0xb1, 0x2c, 0x4f, 0x57, 0x43, 0xb7, 0xa5, 0xbc, 0xef, 0x94, 0x72, 0x5c, 0x67, + 0x68, 0x33, 0xcc, 0xff, 0x04, 0x76, 0x1b, 0x1c, 0x19, 0x40, 0x87, 0xa5, 0xe9, 0xe5, 0xe9, 0xcb, + 0x4f, 0xff, 0x37, 0x0f, 0xba, 0x14, 0xe3, 0x44, 0x69, 0x79, 0x4e, 0x8e, 0x00, 0x56, 0x2a, 0x55, + 0xb3, 0x3f, 0x5a, 0xbb, 0x89, 0x16, 0xbc, 0x5a, 0x3b, 0x75, 0xc2, 0xb5, 0x3c, 0xa7, 0x4e, 0xd8, + 0xe8, 0x05, 0xdc, 0xad, 0xb9, 0x4b, 0xd9, 0x97, 0x78, 0x6e, 0x64, 0xb7, 0x69, 0xf9, 0x49, 0xbe, + 0x80, 0xcd, 0x57, 0xe5, 0x76, 0x5d, 0x9e, 0xee, 0x41, 0xdb, 0x4a, 0x57, 0x87, 0xb3, 0xe4, 0xa3, + 0x8d, 0x2f, 0x3d, 0xff, 0x6f, 0x0f, 0xee, 0xbf, 0x65, 0xe5, 0x49, 0x04, 0x63, 0xf3, 0x5e, 0x99, + 0xfb, 0x9b, 0xf0, 0x78, 0x8e, 0xf2, 0x68, 0xfe, 0xe3, 0x91, 0xe0, 0x61, 0x21, 0x25, 0xf2, 0xd0, + 0xea, 0x97, 0x9d, 0xac, 0xef, 0xfa, 0xb1, 0x28, 0x82, 0x14, 0xed, 0xb6, 0xff, 0x47, 0x8e, 0x52, + 0xc5, 0x3c, 0x9f, 0x6f, 0x57, 0xd9, 0xb8, 0x89, 0xca, 0xf5, 0x39, 0x0e, 0x07, 0xbf, 0x5f, 0x8c, + 0xbd, 0x3f, 0x2e, 0xc6, 0xde, 0x9f, 0x17, 0x63, 0xef, 0xd7, 0xbf, 0xc6, 0xef, 0x04, 0xb7, 0x4d, + 0x9e, 0xfd, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xba, 0xc3, 0xf3, 0x56, 0xaf, 0x08, 0x00, 0x00, } diff --git a/src/dbnode/generated/proto/namespace/namespace.proto b/src/dbnode/generated/proto/namespace/namespace.proto index 45410b450a..0077717daa 100644 --- a/src/dbnode/generated/proto/namespace/namespace.proto +++ b/src/dbnode/generated/proto/namespace/namespace.proto @@ -33,6 +33,41 @@ message NamespaceOptions { bool coldWritesEnabled = 10; NamespaceRuntimeOptions runtimeOptions = 11; google.protobuf.BoolValue cacheBlocksOnRetrieve = 12; + AggregationOptions aggregationOptions = 13; +} + +// AggregationOptions is a set of options for aggregating data +// within the namespace. +message AggregationOptions { + // aggregations is a repeated field to support the ability to send aggregated data + // to a namespace also receiving unaggregated data. In this case, the namespace will + // have one Aggregation with aggregated set to false and another with aggregated set to true. + repeated Aggregation aggregations = 1; +} + +// Aggregation describes data points within the namespace. +message Aggregation { + // aggregated is true if data points are aggregated, false otherwise. + bool aggregated = 1; + + // attributes specifies how to aggregate data when aggregated is set to true. + // This field is ignored when aggregated is false and required when aggregated + // is true. + AggregatedAttributes attributes = 2; +} + +// AggregatedAttributes describe how to aggregate data. +message AggregatedAttributes { + // resolutionNanos is the time range to aggregate data across. + int64 resolutionNanos = 1; + DownsampleOptions downsampleOptions = 2; +} + +// DownsampleOptions is a set of options related to downsampling data. +message DownsampleOptions { + // all indicates whether to send data points to this namespace. If false, + // data points must be sent via rollup/recording rules. Defaults to true. + bool all = 1; } message Registry { diff --git a/src/dbnode/namespace/aggregation.go b/src/dbnode/namespace/aggregation.go new file mode 100644 index 0000000000..5c240a7aa8 --- /dev/null +++ b/src/dbnode/namespace/aggregation.go @@ -0,0 +1,90 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package namespace + +import ( + "fmt" + "time" +) + +type aggregationOptions struct { + aggregations []Aggregation +} + +// NewAggregationOptions creates new AggregationOptions. +func NewAggregationOptions() AggregationOptions { + return &aggregationOptions{} +} + +func (a *aggregationOptions) SetAggregations(value []Aggregation) AggregationOptions { + opts := *a + opts.aggregations = value + return &opts +} + +func (a *aggregationOptions) Aggregations() []Aggregation { + return a.aggregations +} + +func (a *aggregationOptions) Equal(rhs AggregationOptions) bool { + if len(a.aggregations) != len(rhs.Aggregations()) { + return false + } + + for i, agg := range rhs.Aggregations() { + if a.aggregations[i] != agg { + return false + } + } + + return true +} + +// NewUnaggregatedAggregation creates a new unaggregated Aggregation. +func NewUnaggregatedAggregation() Aggregation { + return Aggregation{ + Aggregated: false, + } +} + +// NewAggregatedAggregation creates a new aggregated Aggregation. +func NewAggregatedAggregation(attrs AggregatedAttributes) Aggregation { + return Aggregation{ + Aggregated: true, + Attributes: attrs, + } +} + +// NewAggregateAttributes creates new AggregatedAttributes. +func NewAggregatedAttributes(resolution time.Duration, downsampleOptions DownsampleOptions) (AggregatedAttributes, error) { + if resolution <= 0 { + return AggregatedAttributes{}, fmt.Errorf("invalid resolution %v. must be greater than 0", resolution) + } + return AggregatedAttributes{ + Resolution: resolution, + DownsampleOptions: downsampleOptions, + }, nil +} + +// NewDownsampleOptions creates new DownsampleOptions. +func NewDownsampleOptions(all bool) DownsampleOptions { + return DownsampleOptions{All: all} +} diff --git a/src/dbnode/namespace/aggregation_test.go b/src/dbnode/namespace/aggregation_test.go new file mode 100644 index 0000000000..eedd5d6140 --- /dev/null +++ b/src/dbnode/namespace/aggregation_test.go @@ -0,0 +1,59 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package namespace + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestAggregationEqual(t *testing.T) { + attrs, err := NewAggregatedAttributes(5*time.Minute, NewDownsampleOptions(true)) + require.NoError(t, err) + + opts1 := NewAggregationOptions(). + SetAggregations([]Aggregation{ + NewUnaggregatedAggregation(), + NewAggregatedAggregation(attrs), + }) + opts2 := NewAggregationOptions(). + SetAggregations([]Aggregation{ + NewUnaggregatedAggregation(), + NewAggregatedAggregation(attrs), + }) + opts3 := NewAggregationOptions(). + SetAggregations([]Aggregation{ + NewUnaggregatedAggregation(), + }) + + require.Equal(t, opts1, opts2) + require.NotEqual(t, opts1, opts3) +} + +func TestAggregationAttributesValidation(t *testing.T) { + _, err := NewAggregatedAttributes(5*time.Minute, NewDownsampleOptions(true)) + require.NoError(t, err) + + _, err = NewAggregatedAttributes(-5*time.Minute, NewDownsampleOptions(true)) + require.Error(t, err) +} diff --git a/src/dbnode/namespace/convert.go b/src/dbnode/namespace/convert.go index 8746eed342..c969aed220 100644 --- a/src/dbnode/namespace/convert.go +++ b/src/dbnode/namespace/convert.go @@ -130,6 +130,11 @@ func ToMetadata( return nil, err } + aggOpts, err := ToAggregationOptions(opts.AggregationOptions) + if err != nil { + return nil, err + } + mOpts := NewOptions(). SetBootstrapEnabled(opts.BootstrapEnabled). SetFlushEnabled(opts.FlushEnabled). @@ -141,7 +146,8 @@ func ToMetadata( SetRetentionOptions(rOpts). SetIndexOptions(iOpts). SetColdWritesEnabled(opts.ColdWritesEnabled). - SetRuntimeOptions(runtimeOpts) + SetRuntimeOptions(runtimeOpts). + SetAggregationOptions(aggOpts) if opts.CacheBlocksOnRetrieve != nil { mOpts = mOpts.SetCacheBlocksOnRetrieve(opts.CacheBlocksOnRetrieve.Value) @@ -154,6 +160,38 @@ func ToMetadata( return NewMetadata(ident.StringID(id), mOpts) } +// ToAggregationOptions converts nsproto.AggregationOptions to AggregationOptions. +func ToAggregationOptions(opts *nsproto.AggregationOptions) (AggregationOptions, error) { + aggOpts := NewAggregationOptions() + if opts == nil || len(opts.Aggregations) == 0 { + return aggOpts, nil + } + aggregations := make([]Aggregation, 0, len(opts.Aggregations)) + for _, agg := range opts.Aggregations { + if agg.Aggregated { + if agg.Attributes == nil { + return nil, errors.New("must set Attributes when aggregated is true") + } + + var dsOpts DownsampleOptions + if agg.Attributes.DownsampleOptions == nil { + dsOpts = NewDownsampleOptions(true) + } else { + dsOpts = NewDownsampleOptions(agg.Attributes.DownsampleOptions.All) + } + + attrs, err := NewAggregatedAttributes(time.Duration(agg.Attributes.ResolutionNanos), dsOpts) + if err != nil { + return nil, err + } + aggregations = append(aggregations, NewAggregatedAggregation(attrs)) + } else { + aggregations = append(aggregations, NewUnaggregatedAggregation()) + } + } + return aggOpts.SetAggregations(aggregations), nil +} + // ToProto converts Map to nsproto.Registry func ToProto(m Map) *nsproto.Registry { reg := nsproto.Registry{ @@ -209,7 +247,26 @@ func OptionsToProto(opts Options) *nsproto.NamespaceOptions { ColdWritesEnabled: opts.ColdWritesEnabled(), RuntimeOptions: toRuntimeOptions(opts.RuntimeOptions()), CacheBlocksOnRetrieve: &protobuftypes.BoolValue{Value: opts.CacheBlocksOnRetrieve()}, + AggregationOptions: toProtoAggregationOptions(opts.AggregationOptions()), + } +} + +func toProtoAggregationOptions(aggOpts AggregationOptions) *nsproto.AggregationOptions { + if aggOpts == nil || len(aggOpts.Aggregations()) == 0 { + return nil + } + protoAggs := make([]*nsproto.Aggregation, 0, len(aggOpts.Aggregations())) + for _, agg := range aggOpts.Aggregations() { + protoAgg := nsproto.Aggregation{Aggregated: agg.Aggregated} + if agg.Aggregated { + protoAgg.Attributes = &nsproto.AggregatedAttributes{ + ResolutionNanos: agg.Attributes.Resolution.Nanoseconds(), + DownsampleOptions: &nsproto.DownsampleOptions{All: agg.Attributes.DownsampleOptions.All}, + } + } + protoAggs = append(protoAggs, &protoAgg) } + return &nsproto.AggregationOptions{Aggregations: protoAggs} } // toRuntimeOptions returns the corresponding RuntimeOptions proto. diff --git a/src/dbnode/namespace/convert_test.go b/src/dbnode/namespace/convert_test.go index f38bf2ef43..0df197424c 100644 --- a/src/dbnode/namespace/convert_test.go +++ b/src/dbnode/namespace/convert_test.go @@ -54,6 +54,12 @@ var ( BlockDataExpiryAfterNotAccessPeriodNanos: toNanos(30), // 30m } + validAggregationOpts = nsproto.AggregationOptions{ + Aggregations: []*nsproto.Aggregation{ + {Aggregated: false}, + }, + } + validNamespaceOpts = []nsproto.NamespaceOptions{ nsproto.NamespaceOptions{ BootstrapEnabled: true, @@ -72,8 +78,9 @@ var ( CleanupEnabled: true, RepairEnabled: true, // Explicitly not setting CacheBlocksOnRetrieve here to test defaulting to true when not set. - RetentionOptions: &validRetentionOpts, - IndexOptions: &validIndexOpts, + RetentionOptions: &validRetentionOpts, + IndexOptions: &validIndexOpts, + AggregationOptions: &validAggregationOpts, }, } @@ -104,6 +111,18 @@ var ( BlockDataExpiryAfterNotAccessPeriodNanos: toNanos(30), // 30m }, } + + invalidAggregationOpts = nsproto.AggregationOptions{ + Aggregations: []*nsproto.Aggregation{ + { + Aggregated: true, + Attributes: &nsproto.AggregatedAttributes{ + ResolutionNanos: -10, + DownsampleOptions: &nsproto.DownsampleOptions{All: true}, + }, + }, + }, + } ) func TestNamespaceToRetentionValid(t *testing.T) { @@ -274,6 +293,42 @@ func TestFromProtoSnapshotEnabled(t *testing.T) { require.Equal(t, !namespace.NewOptions().SnapshotEnabled(), md.Options().SnapshotEnabled()) } +func TestToAggregationOptions(t *testing.T) { + aggOpts, err := namespace.ToAggregationOptions(&validAggregationOpts) + require.NoError(t, err) + + require.Equal(t, 1, len(aggOpts.Aggregations())) + + aggregation := aggOpts.Aggregations()[0] + require.Equal(t, false, aggregation.Aggregated) + require.Equal(t, namespace.AggregatedAttributes{}, aggregation.Attributes) +} + +func TestToAggregationOptionsInvalid(t *testing.T) { + _, err := namespace.ToAggregationOptions(&invalidAggregationOpts) + require.Error(t, err) +} + +func TestAggregationOptsToProto(t *testing.T) { + aggOpts, err := namespace.ToAggregationOptions(&validAggregationOpts) + require.NoError(t, err) + + // make ns map + md1, err := namespace.NewMetadata(ident.StringID("ns1"), + namespace.NewOptions().SetAggregationOptions(aggOpts)) + require.NoError(t, err) + nsMap, err := namespace.NewMap([]namespace.Metadata{md1}) + require.NoError(t, err) + + // convert to nsproto map + reg := namespace.ToProto(nsMap) + require.Len(t, reg.Namespaces, 1) + + nsOpts := *reg.Namespaces["ns1"] + + require.Equal(t, validAggregationOpts, *nsOpts.AggregationOptions) +} + func assertEqualMetadata(t *testing.T, name string, expected nsproto.NamespaceOptions, observed namespace.Metadata) { require.Equal(t, name, observed.ID().String()) opts := observed.Options() diff --git a/src/dbnode/namespace/namespace_mock.go b/src/dbnode/namespace/namespace_mock.go index 0a9d45196e..ffaeb1623f 100644 --- a/src/dbnode/namespace/namespace_mock.go +++ b/src/dbnode/namespace/namespace_mock.go @@ -424,6 +424,34 @@ func (mr *MockOptionsMockRecorder) RuntimeOptions() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RuntimeOptions", reflect.TypeOf((*MockOptions)(nil).RuntimeOptions)) } +// SetAggregationOptions mocks base method +func (m *MockOptions) SetAggregationOptions(value AggregationOptions) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetAggregationOptions", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetAggregationOptions indicates an expected call of SetAggregationOptions +func (mr *MockOptionsMockRecorder) SetAggregationOptions(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetAggregationOptions", reflect.TypeOf((*MockOptions)(nil).SetAggregationOptions), value) +} + +// AggregationOptions mocks base method +func (m *MockOptions) AggregationOptions() AggregationOptions { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AggregationOptions") + ret0, _ := ret[0].(AggregationOptions) + return ret0 +} + +// AggregationOptions indicates an expected call of AggregationOptions +func (mr *MockOptionsMockRecorder) AggregationOptions() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregationOptions", reflect.TypeOf((*MockOptions)(nil).AggregationOptions)) +} + // MockIndexOptions is a mock of IndexOptions interface type MockIndexOptions struct { ctrl *gomock.Controller @@ -1333,3 +1361,68 @@ func (mr *MockNamespaceWatchMockRecorder) Close() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockNamespaceWatch)(nil).Close)) } + +// MockAggregationOptions is a mock of AggregationOptions interface +type MockAggregationOptions struct { + ctrl *gomock.Controller + recorder *MockAggregationOptionsMockRecorder +} + +// MockAggregationOptionsMockRecorder is the mock recorder for MockAggregationOptions +type MockAggregationOptionsMockRecorder struct { + mock *MockAggregationOptions +} + +// NewMockAggregationOptions creates a new mock instance +func NewMockAggregationOptions(ctrl *gomock.Controller) *MockAggregationOptions { + mock := &MockAggregationOptions{ctrl: ctrl} + mock.recorder = &MockAggregationOptionsMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockAggregationOptions) EXPECT() *MockAggregationOptionsMockRecorder { + return m.recorder +} + +// Equal mocks base method +func (m *MockAggregationOptions) Equal(value AggregationOptions) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Equal", value) + ret0, _ := ret[0].(bool) + return ret0 +} + +// Equal indicates an expected call of Equal +func (mr *MockAggregationOptionsMockRecorder) Equal(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Equal", reflect.TypeOf((*MockAggregationOptions)(nil).Equal), value) +} + +// SetAggregations mocks base method +func (m *MockAggregationOptions) SetAggregations(value []Aggregation) AggregationOptions { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetAggregations", value) + ret0, _ := ret[0].(AggregationOptions) + return ret0 +} + +// SetAggregations indicates an expected call of SetAggregations +func (mr *MockAggregationOptionsMockRecorder) SetAggregations(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetAggregations", reflect.TypeOf((*MockAggregationOptions)(nil).SetAggregations), value) +} + +// Aggregations mocks base method +func (m *MockAggregationOptions) Aggregations() []Aggregation { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Aggregations") + ret0, _ := ret[0].([]Aggregation) + return ret0 +} + +// Aggregations indicates an expected call of Aggregations +func (mr *MockAggregationOptionsMockRecorder) Aggregations() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Aggregations", reflect.TypeOf((*MockAggregationOptions)(nil).Aggregations)) +} diff --git a/src/dbnode/namespace/options.go b/src/dbnode/namespace/options.go index 7816381e6a..5c15d5d611 100644 --- a/src/dbnode/namespace/options.go +++ b/src/dbnode/namespace/options.go @@ -57,6 +57,7 @@ var ( errIndexBlockSizeTooLarge = errors.New("index block size needs to be <= namespace retention period") errIndexBlockSizeMustBeAMultipleOfDataBlockSize = errors.New("index block size must be a multiple of data block size") errNamespaceRuntimeOptionsNotSet = errors.New("namespace runtime options is not set") + errAggregationOptionsNotSet = errors.New("aggregation options is not set") ) type options struct { @@ -72,6 +73,7 @@ type options struct { indexOpts IndexOptions schemaHis SchemaHistory runtimeOpts RuntimeOptions + aggregationOpts AggregationOptions } // NewSchemaHistory returns an empty schema history. @@ -94,6 +96,7 @@ func NewOptions() Options { indexOpts: NewIndexOptions(), schemaHis: NewSchemaHistory(), runtimeOpts: NewRuntimeOptions(), + aggregationOpts: NewAggregationOptions(), } } @@ -122,6 +125,9 @@ func (o *options) Validate() error { if o.runtimeOpts == nil { return errNamespaceRuntimeOptionsNotSet } + if o.aggregationOpts == nil { + return errAggregationOptionsNotSet + } return nil } @@ -137,7 +143,8 @@ func (o *options) Equal(value Options) bool { o.retentionOpts.Equal(value.RetentionOptions()) && o.indexOpts.Equal(value.IndexOptions()) && o.schemaHis.Equal(value.SchemaHistory()) && - o.runtimeOpts.Equal(value.RuntimeOptions()) + o.runtimeOpts.Equal(value.RuntimeOptions()) && + o.aggregationOpts.Equal(value.AggregationOptions()) } func (o *options) SetBootstrapEnabled(value bool) Options { @@ -259,3 +266,13 @@ func (o *options) SetRuntimeOptions(value RuntimeOptions) Options { func (o *options) RuntimeOptions() RuntimeOptions { return o.runtimeOpts } + +func (o *options) SetAggregationOptions(value AggregationOptions) Options { + opts := *o + opts.aggregationOpts = value + return &opts +} + +func (o *options) AggregationOptions() AggregationOptions { + return o.aggregationOpts +} diff --git a/src/dbnode/namespace/types.go b/src/dbnode/namespace/types.go index ff5177c000..16584a924a 100644 --- a/src/dbnode/namespace/types.go +++ b/src/dbnode/namespace/types.go @@ -111,6 +111,12 @@ type Options interface { // RuntimeOptions returns the RuntimeOptions. RuntimeOptions() RuntimeOptions + + // SetAggregationOptions sets the aggregation-related options for this namespace. + SetAggregationOptions(value AggregationOptions) Options + + // AggregationOptions returns the aggregation-related options for this namespace. + AggregationOptions() AggregationOptions } // IndexOptions controls the indexing options for a namespace. @@ -293,3 +299,44 @@ type NamespaceWatch interface { // NamespaceUpdater is a namespace updater function. type NamespaceUpdater func(Map) error + +// AggregationOptions is a set of options for aggregating data +// within the namespace. +type AggregationOptions interface { + // Equal returns true if the provided value is equal to this one. + Equal(value AggregationOptions) bool + + // SetAggregations sets the aggregations for this namespace. + SetAggregations(value []Aggregation) AggregationOptions + + // Aggregations returns the aggregations for this namespace. + Aggregations() []Aggregation +} + +// Aggregation describes data points within the namespace. +type Aggregation struct { + // Aggregated is true if data points are aggregated, false otherwise. + Aggregated bool + + // Attributes specifies how to aggregate data when aggregated is set to true. + // This field is ignored when aggregated is false. + Attributes AggregatedAttributes +} + +// AggregationAttributes are attributes specifying how data points should be aggregated. +type AggregatedAttributes struct { + // Resolution is the time range to aggregate data across. + Resolution time.Duration + + // DownsampleOptions stores options around how data points are downsampled. + DownsampleOptions DownsampleOptions +} + +// DownsampleOptions is a set of options related to downsampling data. +type DownsampleOptions struct { + // All indicates whether to send data points to this namespace. + // If set to false, this namespace will not receive data points. In this + // case, data will need to be sent to the namespace via another mechanism + // (e.g. rollup/recording rules). + All bool +} diff --git a/src/query/api/v1/handler/database/create_test.go b/src/query/api/v1/handler/database/create_test.go index c2a9b41083..6d5f303854 100644 --- a/src/query/api/v1/handler/database/create_test.go +++ b/src/query/api/v1/handler/database/create_test.go @@ -151,6 +151,7 @@ func testLocalType(t *testing.T, providedType string, placementExists bool) { "registry": { "namespaces": { "testNamespace": { + "aggregationOptions": null, "bootstrapEnabled": true, "cacheBlocksOnRetrieve": true, "flushEnabled": true, @@ -315,6 +316,7 @@ func TestLocalTypeWithNumShards(t *testing.T) { "registry": { "namespaces": { "testNamespace": { + "aggregationOptions": null, "bootstrapEnabled": true, "cacheBlocksOnRetrieve": true, "flushEnabled": true, @@ -431,6 +433,7 @@ func TestLocalWithBlockSizeNanos(t *testing.T) { "registry": { "namespaces": { "testNamespace": { + "aggregationOptions": null, "bootstrapEnabled": true, "cacheBlocksOnRetrieve": true, "flushEnabled": true, @@ -553,6 +556,7 @@ func TestLocalWithBlockSizeExpectedSeriesDatapointsPerHour(t *testing.T) { "registry": { "namespaces": { "testNamespace": { + "aggregationOptions": null, "bootstrapEnabled": true, "cacheBlocksOnRetrieve": true, "flushEnabled": true, @@ -805,6 +809,7 @@ func testClusterTypeHosts(t *testing.T, placementExists bool) { "registry": { "namespaces": { "testNamespace": { + "aggregationOptions": null, "bootstrapEnabled": true, "cacheBlocksOnRetrieve": true, "flushEnabled": true, @@ -950,6 +955,7 @@ func TestClusterTypeHostsWithIsolationGroup(t *testing.T) { "registry": { "namespaces": { "testNamespace": { + "aggregationOptions": null, "bootstrapEnabled": true, "cacheBlocksOnRetrieve": true, "flushEnabled": true, diff --git a/src/query/api/v1/handler/namespace/add.go b/src/query/api/v1/handler/namespace/add.go index 43e0bd0d6c..cc98c38942 100644 --- a/src/query/api/v1/handler/namespace/add.go +++ b/src/query/api/v1/handler/namespace/add.go @@ -152,7 +152,12 @@ func (h *AddHandler) Add( } } - nsMap, err := namespace.NewMap(append(currentMetadata, md)) + newMDs := append(currentMetadata, md) + if err = validateNamespaceAggregationOptions(newMDs); err != nil { + return emptyReg, err + } + + nsMap, err := namespace.NewMap(newMDs) if err != nil { return emptyReg, err } diff --git a/src/query/api/v1/handler/namespace/add_test.go b/src/query/api/v1/handler/namespace/add_test.go index a58eee35a7..4ee3e6d65c 100644 --- a/src/query/api/v1/handler/namespace/add_test.go +++ b/src/query/api/v1/handler/namespace/add_test.go @@ -112,6 +112,7 @@ func TestNamespaceAddHandler(t *testing.T) { "registry": xjson.Map{ "namespaces": xjson.Map{ "testNamespace": xjson.Map{ + "aggregationOptions": nil, "bootstrapEnabled": true, "cacheBlocksOnRetrieve": true, "flushEnabled": true, diff --git a/src/query/api/v1/handler/namespace/common.go b/src/query/api/v1/handler/namespace/common.go index 388eb09334..f812e0d61b 100644 --- a/src/query/api/v1/handler/namespace/common.go +++ b/src/query/api/v1/handler/namespace/common.go @@ -25,6 +25,7 @@ import ( "fmt" "net/http" "path" + "time" clusterclient "github.com/m3db/m3/src/cluster/client" "github.com/m3db/m3/src/cluster/kv" @@ -155,3 +156,38 @@ func RegisterRoutes( applyMiddleware(NewSchemaResetHandler(client, instrumentOpts).ServeHTTP, defaults)) r.HandleFunc(M3DBSchemaURL, schemaResetHandler.ServeHTTP).Methods(DeleteHTTPMethod) } + +func validateNamespaceAggregationOptions(mds []namespace.Metadata) error { + resolutionRetentionMap := make(map[resolutionRetentionKey]bool, len(mds)) + + for _, md := range mds { + aggOpts := md.Options().AggregationOptions() + if aggOpts == nil || len(aggOpts.Aggregations()) == 0 { + continue + } + + retention := md.Options().RetentionOptions().RetentionPeriod() + for _, agg := range aggOpts.Aggregations() { + if agg.Aggregated { + key := resolutionRetentionKey{ + retention: retention, + resolution: agg.Attributes.Resolution, + } + + if resolutionRetentionMap[key] { + return fmt.Errorf("resolution and retention combination must be unique. "+ + "namespace with resolution=%v retention=%v already exists", key.resolution, key.retention) + } else { + resolutionRetentionMap[key] = true + } + } + } + } + + return nil +} + +type resolutionRetentionKey struct { + resolution time.Duration + retention time.Duration +} diff --git a/src/query/api/v1/handler/namespace/common_test.go b/src/query/api/v1/handler/namespace/common_test.go index b4649bf8d0..421826930b 100644 --- a/src/query/api/v1/handler/namespace/common_test.go +++ b/src/query/api/v1/handler/namespace/common_test.go @@ -24,9 +24,13 @@ import ( "errors" "fmt" "testing" + "time" "github.com/m3db/m3/src/cluster/kv" nsproto "github.com/m3db/m3/src/dbnode/generated/proto/namespace" + "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/retention" + "github.com/m3db/m3/src/x/ident" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -133,3 +137,40 @@ func TestMetadata(t *testing.T) { assert.Len(t, meta, 2) assert.NoError(t, err) } + +func TestValidateAggregationOptionsUniqueResolutionAndRetention(t *testing.T) { + // Validate that non-unique (resolution, retention) fails. + dsOpts := namespace.NewDownsampleOptions(true) + attrs, err := namespace.NewAggregatedAttributes(5*time.Minute, dsOpts) + require.NoError(t, err) + + agg := namespace.NewAggregatedAggregation(attrs) + aggOpts := namespace.NewAggregationOptions(). + SetAggregations([]namespace.Aggregation{agg}) + + nsOpts := namespace.NewOptions(). + SetAggregationOptions(aggOpts). + SetRetentionOptions(retention.NewOptions(). + SetRetentionPeriod(24 * time.Hour)) + + md1, err := namespace.NewMetadata(ident.StringID("ns1"), nsOpts) + require.NoError(t, err) + + md2, err := namespace.NewMetadata(ident.StringID("ns2"), nsOpts) + require.NoError(t, err) + + err = validateNamespaceAggregationOptions([]namespace.Metadata{md1, md2}) + require.Error(t, err) + + // Validate that unique (resolution, retention) is fine. + nsOpts2 := namespace.NewOptions(). + SetAggregationOptions(aggOpts). + SetRetentionOptions(retention.NewOptions(). + SetRetentionPeriod(48 * time.Hour)) + + md2, err = namespace.NewMetadata(ident.StringID("ns2"), nsOpts2) + require.NoError(t, err) + + err = validateNamespaceAggregationOptions([]namespace.Metadata{md1, md2}) + require.NoError(t, err) +} diff --git a/src/query/api/v1/handler/namespace/get_test.go b/src/query/api/v1/handler/namespace/get_test.go index f05a01fc48..9908feda5e 100644 --- a/src/query/api/v1/handler/namespace/get_test.go +++ b/src/query/api/v1/handler/namespace/get_test.go @@ -118,6 +118,7 @@ func TestNamespaceGetHandler(t *testing.T) { "registry": xjson.Map{ "namespaces": xjson.Map{ "test": xjson.Map{ + "aggregationOptions": nil, "bootstrapEnabled": true, "cacheBlocksOnRetrieve": nil, "cleanupEnabled": false, @@ -199,6 +200,7 @@ func TestNamespaceGetHandlerWithDebug(t *testing.T) { "registry": xjson.Map{ "namespaces": xjson.Map{ "test": xjson.Map{ + "aggregationOptions": nil, "bootstrapEnabled": true, "cacheBlocksOnRetrieve": nil, "cleanupEnabled": false, diff --git a/src/query/api/v1/handler/namespace/update.go b/src/query/api/v1/handler/namespace/update.go index ed4417d08e..6283077f4d 100644 --- a/src/query/api/v1/handler/namespace/update.go +++ b/src/query/api/v1/handler/namespace/update.go @@ -49,17 +49,19 @@ var ( // UpdateHTTPMethod is the HTTP method used with this resource. UpdateHTTPMethod = http.MethodPut - fieldNameRetentionOptions = "RetentionOptions" - fieldNameRetetionPeriod = "RetentionPeriodNanos" - fieldNameRuntimeOptions = "RuntimeOptions" + fieldNameRetentionOptions = "RetentionOptions" + fieldNameRetetionPeriod = "RetentionPeriodNanos" + fieldNameRuntimeOptions = "RuntimeOptions" + fieldNameAggregationOptions = "AggregationOptions" errEmptyNamespaceName = errors.New("must specify namespace name") errEmptyNamespaceOptions = errors.New("update options cannot be empty") errNamespaceFieldImmutable = errors.New("namespace option field is immutable") allowedUpdateOptionsFields = map[string]struct{}{ - fieldNameRetentionOptions: struct{}{}, - fieldNameRuntimeOptions: struct{}{}, + fieldNameRetentionOptions: struct{}{}, + fieldNameRuntimeOptions: struct{}{}, + fieldNameAggregationOptions: struct{}{}, } ) @@ -241,6 +243,20 @@ func (h *UpdateHandler) Update( } } + if protoAggOpts := updateReq.Options.AggregationOptions; protoAggOpts != nil { + newAggOpts, err := namespace.ToAggregationOptions(protoAggOpts) + if err != nil { + return emptyReg, nil, fmt.Errorf("error constructing construction aggregationOptions: %w", err) + } + if !ns.Options().AggregationOptions().Equal(newAggOpts) { + opts := ns.Options().SetAggregationOptions(newAggOpts) + ns, err = namespace.NewMetadata(ns.ID(), opts) + if err != nil { + return emptyReg, nil, fmt.Errorf("error constructing new metadata: %w", err) + } + } + } + // Update the namespace in case an update occurred. newMetadata[updateReq.Name] = ns @@ -249,6 +265,11 @@ func (h *UpdateHandler) Update( for _, elem := range newMetadata { newMDs = append(newMDs, elem) } + + if err = validateNamespaceAggregationOptions(newMDs); err != nil { + return emptyReg, nil, err + } + nsMap, err := namespace.NewMap(newMDs) if err != nil { return emptyReg, nil, err diff --git a/src/query/api/v1/handler/namespace/update_test.go b/src/query/api/v1/handler/namespace/update_test.go index 15b014687c..fe169ed516 100644 --- a/src/query/api/v1/handler/namespace/update_test.go +++ b/src/query/api/v1/handler/namespace/update_test.go @@ -52,6 +52,16 @@ const ( }, "runtimeOptions": { "writeIndexingPerCPUConcurrency": 16 + }, + "aggregationOptions": { + "aggregations": [ + { + "aggregated": true, + "attributes": { + "resolutionDuration": "5m" + } + } + ] } } } @@ -141,6 +151,19 @@ func TestNamespaceUpdateHandler(t *testing.T) { "registry": xjson.Map{ "namespaces": xjson.Map{ "testNamespace": xjson.Map{ + "aggregationOptions": xjson.Map{ + "aggregations": xjson.Array{ + xjson.Map{ + "aggregated": true, + "attributes": xjson.Map{ + "resolutionNanos": "300000000000", + "downsampleOptions": xjson.Map{ + "all": true, + }, + }, + }, + }, + }, "bootstrapEnabled": true, "cacheBlocksOnRetrieve": true, "flushEnabled": true, @@ -199,6 +222,7 @@ func TestNamespaceUpdateHandler(t *testing.T) { "registry": xjson.Map{ "namespaces": xjson.Map{ "testNamespace": xjson.Map{ + "aggregationOptions": nil, "bootstrapEnabled": true, "cacheBlocksOnRetrieve": true, "flushEnabled": true, From 6560920d411d7ed8297e25bfd5deecab1fde7b12 Mon Sep 17 00:00:00 2001 From: Nate Broyles Date: Tue, 29 Sep 2020 10:18:11 -0400 Subject: [PATCH 2/5] Add support for arrays in durations <-> nanos converter --- src/x/net/http/convert.go | 66 ++++++++++++++++++++++++++++++++++ src/x/net/http/convert_test.go | 34 +++++++++--------- 2 files changed, 84 insertions(+), 16 deletions(-) diff --git a/src/x/net/http/convert.go b/src/x/net/http/convert.go index b0029dad9d..6142f2afc5 100644 --- a/src/x/net/http/convert.go +++ b/src/x/net/http/convert.go @@ -101,6 +101,13 @@ func NanosToDurationMap(input map[string]interface{}) (map[string]interface{}, e } dictTranslated[k] = durMap + case []interface{}: + durArr, err := nanosToDurationArray(vv) + if err != nil { + return nil, err + } + + dictTranslated[k] = durArr default: dictTranslated[k] = vv } @@ -110,6 +117,32 @@ func NanosToDurationMap(input map[string]interface{}) (map[string]interface{}, e return dictTranslated, nil } +func nanosToDurationArray(input []interface{}) ([]interface{}, error) { + arrTranslated := make([]interface{}, len(input)) + for i, elem := range input { + switch v := elem.(type) { + case map[string]interface{}: + durMap, err := NanosToDurationMap(v) + if err != nil { + return nil, err + } + + arrTranslated[i] = durMap + case []interface{}: + durArr, err := nanosToDurationArray(v) + if err != nil { + return nil, err + } + + arrTranslated[i] = durArr + default: + arrTranslated[i] = v + } + } + + return arrTranslated, nil +} + // DurationToNanosBytes transforms a json byte slice with Duration keys into Nanos func DurationToNanosBytes(r io.Reader) ([]byte, error) { var dict map[string]interface{} @@ -173,6 +206,13 @@ func DurationToNanosMap(input map[string]interface{}) (map[string]interface{}, e } dictTranslated[k] = durMap + case []interface{}: + durArr, err := durationToNanosArray(vv) + if err != nil { + return nil, err + } + + dictTranslated[k] = durArr default: dictTranslated[k] = vv } @@ -181,3 +221,29 @@ func DurationToNanosMap(input map[string]interface{}) (map[string]interface{}, e return dictTranslated, nil } + +func durationToNanosArray(input []interface{}) ([]interface{}, error) { + arrTranslated := make([]interface{}, len(input)) + for i, elem := range input { + switch v := elem.(type) { + case map[string]interface{}: + durMap, err := DurationToNanosMap(v) + if err != nil { + return nil, err + } + + arrTranslated[i] = durMap + case []interface{}: + durArr, err := durationToNanosArray(v) + if err != nil { + return nil, err + } + + arrTranslated[i] = durArr + default: + arrTranslated[i] = v + } + } + + return arrTranslated, nil +} diff --git a/src/x/net/http/convert_test.go b/src/x/net/http/convert_test.go index 9320399cae..2556c2bb5f 100644 --- a/src/x/net/http/convert_test.go +++ b/src/x/net/http/convert_test.go @@ -34,12 +34,13 @@ func TestDurationToNanosBytes(t *testing.T) { shouldErr bool } testCases := map[string]ret{ - `{"field":"value"}`: ret{`{"field":"value"}`, false}, - `{"fieldDuration":"1s"}`: ret{`{"fieldNanos":1000000000}`, false}, - `{"fieldDuration":1234}`: ret{`{"fieldNanos":1234}`, false}, - `{"field":"value","fieldDuration":"1s"}`: ret{`{"field":"value","fieldNanos":1000000000}`, false}, - `{"realDuration":"50ns","nanoDuration":100,"normalNanos":200}`: ret{`{"nanoNanos":100,"normalNanos":200,"realNanos":50}`, false}, + `{"field":"value"}`: ret{`{"field":"value"}`, false}, + `{"fieldDuration":"1s"}`: ret{`{"fieldNanos":1000000000}`, false}, + `{"fieldDuration":1234}`: ret{`{"fieldNanos":1234}`, false}, + `{"field":"value","fieldDuration":"1s"}`: ret{`{"field":"value","fieldNanos":1000000000}`, false}, + `{"realDuration":"50ns","nanoDuration":100,"normalNanos":200}`: ret{`{"nanoNanos":100,"normalNanos":200,"realNanos":50}`, false}, `{"field":"value","moreFields":{"innerDuration":"2ms","innerField":"innerValue"}}`: ret{`{"field":"value","moreFields":{"innerField":"innerValue","innerNanos":2000000}}`, false}, + `{"field":[{"attrs":{"fieldDuration":"0s"}}]}`: ret{`{"field":[{"attrs":{"fieldNanos":0}}]}`, false}, `not json`: ret{"", true}, `{"fieldDuration":[]}`: ret{"", true}, `{"fieldDuration":{}}`: ret{"", true}, @@ -68,18 +69,19 @@ func TestNanoToDurationBytes(t *testing.T) { shouldErr bool } testCases := map[string]ret{ - `{"field":"value"}`: ret{map[string]interface{}{"field": "value"}, false}, - `{"fieldNanos":1000000000}`: ret{map[string]interface{}{"fieldDuration": "1s"}, false}, - `{"fieldNanos":0}`: ret{map[string]interface{}{"fieldDuration": "0s"}, false}, - `{"field":"value","fieldNanos":1000000000}`: ret{map[string]interface{}{"field": "value", "fieldDuration": "1s"}, false}, - `{"realNanos":50,"nanoNanos":100,"normalDuration":"200ns"}`: ret{map[string]interface{}{"nanoDuration": "100ns", "normalDuration": "200ns", "realDuration": "50ns"}, false}, + `{"field":"value"}`: ret{map[string]interface{}{"field": "value"}, false}, + `{"fieldNanos":1000000000}`: ret{map[string]interface{}{"fieldDuration": "1s"}, false}, + `{"fieldNanos":0}`: ret{map[string]interface{}{"fieldDuration": "0s"}, false}, + `{"field":"value","fieldNanos":1000000000}`: ret{map[string]interface{}{"field": "value", "fieldDuration": "1s"}, false}, + `{"realNanos":50,"nanoNanos":100,"normalDuration":"200ns"}`: ret{map[string]interface{}{"nanoDuration": "100ns", "normalDuration": "200ns", "realDuration": "50ns"}, false}, `{"field":"value","moreFields":{"innerNanos":2000000,"innerField":"innerValue"}}`: ret{map[string]interface{}{"field": "value", "moreFields": map[string]interface{}{"innerField": "innerValue", "innerDuration": "2ms"}}, false}, - `not json`: ret{nil, true}, - `{"fieldNanos":[]}`: ret{nil, true}, - `{"fieldNanos":{}}`: ret{nil, true}, - `{"fieldNanos":"badNanos"}`: ret{nil, true}, - `{"fieldNanos":100.5}`: ret{nil, true}, - `{"moreFields":{"innerNanos":"badNanos"}}`: ret{nil, true}, + `{"field":[{"attrs":{"fieldNanos":0}}]}`: ret{map[string]interface{}{"field": []interface{}{map[string]interface{}{"attrs": map[string]interface{}{"fieldDuration": "0s"}}}}, false}, + `not json`: ret{nil, true}, + `{"fieldNanos":[]}`: ret{nil, true}, + `{"fieldNanos":{}}`: ret{nil, true}, + `{"fieldNanos":"badNanos"}`: ret{nil, true}, + `{"fieldNanos":100.5}`: ret{nil, true}, + `{"moreFields":{"innerNanos":"badNanos"}}`: ret{nil, true}, } for k, v := range testCases { From 3e8c26dcdb8506f05b9e24fcfad0d75c7bc1e86d Mon Sep 17 00:00:00 2001 From: Nate Broyles Date: Tue, 29 Sep 2020 10:19:05 -0400 Subject: [PATCH 3/5] Update startup scripts to create namespaces w/ aggregation options --- scripts/development/m3_stack/start_m3.sh | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/scripts/development/m3_stack/start_m3.sh b/scripts/development/m3_stack/start_m3.sh index 0550433ebd..43b7c52deb 100755 --- a/scripts/development/m3_stack/start_m3.sh +++ b/scripts/development/m3_stack/start_m3.sh @@ -192,6 +192,13 @@ curl -vvvsSf -X POST localhost:7201/api/v1/namespace -d '{ "indexOptions": { "enabled": true, "blockSizeDuration": "10m" + }, + "aggregationOptions": { + "aggregations": [ + { + "aggregated": false + } + ] } } }' @@ -215,7 +222,18 @@ curl -vvvsSf -X POST localhost:7201/api/v1/namespace -d '{ "indexOptions": { "enabled": true, "blockSizeDuration": "2h" + }, + "aggregationOptions": { + "aggregations": [ + { + "aggregated": true, + "attributes": { + "resolutionDuration": "30s" + } + } + ] } + } }' echo "Done initializing namespaces" From 96fa07d02631d315113691a2dff6acc2d8091261 Mon Sep 17 00:00:00 2001 From: Nate Broyles Date: Tue, 29 Sep 2020 10:21:18 -0400 Subject: [PATCH 4/5] PR feedback --- src/query/api/v1/handler/namespace/common.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/query/api/v1/handler/namespace/common.go b/src/query/api/v1/handler/namespace/common.go index f812e0d61b..e019d36a4e 100644 --- a/src/query/api/v1/handler/namespace/common.go +++ b/src/query/api/v1/handler/namespace/common.go @@ -177,9 +177,8 @@ func validateNamespaceAggregationOptions(mds []namespace.Metadata) error { if resolutionRetentionMap[key] { return fmt.Errorf("resolution and retention combination must be unique. "+ "namespace with resolution=%v retention=%v already exists", key.resolution, key.retention) - } else { - resolutionRetentionMap[key] = true } + resolutionRetentionMap[key] = true } } } From fe8fb3ccfa46718b63f85d7091bbcf75b1ab998d Mon Sep 17 00:00:00 2001 From: Nate Broyles Date: Tue, 29 Sep 2020 14:17:38 -0400 Subject: [PATCH 5/5] Fix code gen --- src/dbnode/generated/proto/namespace/namespace.pb.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/dbnode/generated/proto/namespace/namespace.pb.go b/src/dbnode/generated/proto/namespace/namespace.pb.go index 4344675ded..43a6bc1c39 100644 --- a/src/dbnode/generated/proto/namespace/namespace.pb.go +++ b/src/dbnode/generated/proto/namespace/namespace.pb.go @@ -287,8 +287,9 @@ func (m *AggregationOptions) GetAggregations() []*Aggregation { type Aggregation struct { // aggregated is true if data points are aggregated, false otherwise. Aggregated bool `protobuf:"varint,1,opt,name=aggregated,proto3" json:"aggregated,omitempty"` - // attributes specifies details for how to aggregate data when aggregated is set to true. - // This field is ignored when aggregated is false. + // attributes specifies how to aggregate data when aggregated is set to true. + // This field is ignored when aggregated is false and required when aggregated + // is true. Attributes *AggregatedAttributes `protobuf:"bytes,2,opt,name=attributes" json:"attributes,omitempty"` }