From 5f5fd4e4ab22a43a0c290cf9a9c2bf2e229f6e18 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 23 Jul 2024 17:04:47 +0200 Subject: [PATCH] feat(rf1): Store index ref in metastore (#13613) --- .../metastore/metastorepb/metastore.pb.go | 364 ++++++++++++++++-- .../metastore/metastorepb/metastore.proto | 10 +- pkg/querier/querier.go | 3 +- pkg/storage/wal/segment.go | 6 + pkg/storage/wal/segment_test.go | 10 + 5 files changed, 361 insertions(+), 32 deletions(-) diff --git a/pkg/ingester-rf1/metastore/metastorepb/metastore.pb.go b/pkg/ingester-rf1/metastore/metastorepb/metastore.pb.go index 6f48abbc688b..6eeb275bf8e1 100644 --- a/pkg/ingester-rf1/metastore/metastorepb/metastore.pb.go +++ b/pkg/ingester-rf1/metastore/metastorepb/metastore.pb.go @@ -6,6 +6,7 @@ package metastorepb import ( context "context" fmt "fmt" + _ "github.com/gogo/protobuf/gogoproto" proto "github.com/gogo/protobuf/proto" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" @@ -112,7 +113,8 @@ type BlockMeta struct { MinTime int64 `protobuf:"varint,3,opt,name=min_time,json=minTime,proto3" json:"min_time,omitempty"` MaxTime int64 `protobuf:"varint,4,opt,name=max_time,json=maxTime,proto3" json:"max_time,omitempty"` CompactionLevel uint32 `protobuf:"varint,6,opt,name=compaction_level,json=compactionLevel,proto3" json:"compaction_level,omitempty"` - TenantStreams []*TenantStreams `protobuf:"bytes,7,rep,name=tenant_streams,json=tenantStreams,proto3" json:"tenant_streams,omitempty"` + IndexRef DataRef `protobuf:"bytes,7,opt,name=indexRef,proto3" json:"indexRef"` + TenantStreams []*TenantStreams `protobuf:"bytes,8,rep,name=tenant_streams,json=tenantStreams,proto3" json:"tenant_streams,omitempty"` } func (m *BlockMeta) Reset() { *m = BlockMeta{} } @@ -182,6 +184,13 @@ func (m *BlockMeta) GetCompactionLevel() uint32 { return 0 } +func (m *BlockMeta) GetIndexRef() DataRef { + if m != nil { + return m.IndexRef + } + return DataRef{} +} + func (m *BlockMeta) GetTenantStreams() []*TenantStreams { if m != nil { return m.TenantStreams @@ -189,6 +198,57 @@ func (m *BlockMeta) GetTenantStreams() []*TenantStreams { return nil } +type DataRef struct { + Offset int64 `protobuf:"varint,1,opt,name=offset,proto3" json:"offset,omitempty"` + Length int64 `protobuf:"varint,2,opt,name=length,proto3" json:"length,omitempty"` +} + +func (m *DataRef) Reset() { *m = DataRef{} } +func (*DataRef) ProtoMessage() {} +func (*DataRef) Descriptor() ([]byte, []int) { + return fileDescriptor_43ce85359599db4e, []int{3} +} +func (m *DataRef) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *DataRef) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_DataRef.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *DataRef) XXX_Merge(src proto.Message) { + xxx_messageInfo_DataRef.Merge(m, src) +} +func (m *DataRef) XXX_Size() int { + return m.Size() +} +func (m *DataRef) XXX_DiscardUnknown() { + xxx_messageInfo_DataRef.DiscardUnknown(m) +} + +var xxx_messageInfo_DataRef proto.InternalMessageInfo + +func (m *DataRef) GetOffset() int64 { + if m != nil { + return m.Offset + } + return 0 +} + +func (m *DataRef) GetLength() int64 { + if m != nil { + return m.Length + } + return 0 +} + // TenantStreams object points to the offset in the block at which // the tenant streams data is located. type TenantStreams struct { @@ -200,7 +260,7 @@ type TenantStreams struct { func (m *TenantStreams) Reset() { *m = TenantStreams{} } func (*TenantStreams) ProtoMessage() {} func (*TenantStreams) Descriptor() ([]byte, []int) { - return fileDescriptor_43ce85359599db4e, []int{3} + return fileDescriptor_43ce85359599db4e, []int{4} } func (m *TenantStreams) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -254,6 +314,7 @@ func init() { proto.RegisterType((*AddBlockRequest)(nil), "metastorepb.AddBlockRequest") proto.RegisterType((*AddBlockResponse)(nil), "metastorepb.AddBlockResponse") proto.RegisterType((*BlockMeta)(nil), "metastorepb.BlockMeta") + proto.RegisterType((*DataRef)(nil), "metastorepb.DataRef") proto.RegisterType((*TenantStreams)(nil), "metastorepb.TenantStreams") } @@ -262,32 +323,37 @@ func init() { } var fileDescriptor_43ce85359599db4e = []byte{ - // 400 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x92, 0x41, 0x6f, 0xd3, 0x30, - 0x14, 0xc7, 0xe3, 0x76, 0x6c, 0x8d, 0xab, 0x74, 0x95, 0x0f, 0x28, 0x0c, 0xb0, 0xa2, 0x48, 0x48, - 0x41, 0x82, 0x4e, 0x14, 0xc4, 0x15, 0x6d, 0xb7, 0x49, 0xec, 0xe2, 0x4d, 0xdc, 0x50, 0xe4, 0x24, - 0x6f, 0x93, 0xb5, 0xda, 0x0e, 0xb6, 0xa9, 0x76, 0xe4, 0x23, 0xf0, 0x31, 0xf8, 0x28, 0x1c, 0x7b, - 0xec, 0x91, 0xa6, 0x12, 0xe2, 0xd8, 0x8f, 0x80, 0x9a, 0xb4, 0xa4, 0x45, 0xe2, 0xc2, 0xed, 0xbd, - 0xff, 0xef, 0xaf, 0xa7, 0xf7, 0xf7, 0x33, 0x7e, 0x5b, 0xde, 0xdd, 0x9e, 0x0a, 0x75, 0x0b, 0xd6, - 0x81, 0x79, 0x69, 0x6e, 0x5e, 0x9d, 0x4a, 0x70, 0xdc, 0x3a, 0x6d, 0xa0, 0xad, 0xca, 0xac, 0xad, - 0x47, 0xa5, 0xd1, 0x4e, 0x93, 0xfe, 0x0e, 0x8c, 0xdf, 0xe1, 0xe3, 0xb3, 0xa2, 0x38, 0x9f, 0xe8, - 0xfc, 0x8e, 0xc1, 0xa7, 0xcf, 0x60, 0x1d, 0x79, 0x81, 0x1f, 0x64, 0xeb, 0x3e, 0x44, 0x11, 0x4a, - 0xfa, 0xe3, 0x87, 0xa3, 0x1d, 0xff, 0xa8, 0x76, 0x5e, 0x82, 0xe3, 0xac, 0x31, 0xc5, 0x04, 0x0f, - 0xdb, 0x01, 0xb6, 0xd4, 0xca, 0x42, 0xfc, 0x13, 0x61, 0xff, 0x8f, 0x91, 0x3c, 0xc3, 0x83, 0x1b, - 0x6d, 0x24, 0x77, 0xe9, 0x14, 0x8c, 0x15, 0x5a, 0xd5, 0x83, 0x0f, 0x58, 0xd0, 0xa8, 0x1f, 0x1a, - 0x91, 0x0c, 0x70, 0x47, 0x14, 0x61, 0x27, 0x42, 0x89, 0xcf, 0x3a, 0xa2, 0x20, 0x8f, 0x70, 0x4f, - 0x0a, 0x95, 0x3a, 0x21, 0x21, 0xec, 0x46, 0x28, 0xe9, 0xb2, 0x23, 0x29, 0xd4, 0xb5, 0x90, 0x50, - 0x23, 0x7e, 0xdf, 0xa0, 0x83, 0x0d, 0xe2, 0xf7, 0x35, 0x7a, 0x8e, 0x87, 0xb9, 0x96, 0x25, 0xcf, - 0x9d, 0xd0, 0x2a, 0x9d, 0xc0, 0x14, 0x26, 0xe1, 0x61, 0x84, 0x92, 0x80, 0x1d, 0xb7, 0xfa, 0xfb, - 0xb5, 0x4c, 0xce, 0xf0, 0xc0, 0x81, 0xe2, 0xca, 0xa5, 0xd6, 0x19, 0xe0, 0xd2, 0x86, 0x47, 0x51, - 0x37, 0xe9, 0x8f, 0x4f, 0xf6, 0x02, 0x5f, 0xd7, 0x96, 0xab, 0xc6, 0xc1, 0x02, 0xb7, 0xdb, 0xc6, - 0x19, 0x0e, 0xf6, 0x38, 0x79, 0x8c, 0xfd, 0xcd, 0x4c, 0x51, 0xd4, 0x31, 0x7d, 0xd6, 0x6b, 0x84, - 0x8b, 0xff, 0x4c, 0x34, 0xfe, 0x88, 0x87, 0x97, 0xdb, 0x7d, 0xae, 0xc0, 0x4c, 0x45, 0x0e, 0xe4, - 0x02, 0xf7, 0xb6, 0x8f, 0x4e, 0x9e, 0xec, 0xad, 0xfb, 0xd7, 0x31, 0x4f, 0x9e, 0xfe, 0x83, 0x6e, - 0x2e, 0xe5, 0x9d, 0xbf, 0x99, 0x2d, 0xa8, 0x37, 0x5f, 0x50, 0x6f, 0xb5, 0xa0, 0xe8, 0x4b, 0x45, - 0xd1, 0xb7, 0x8a, 0xa2, 0xef, 0x15, 0x45, 0xb3, 0x8a, 0xa2, 0x1f, 0x15, 0x45, 0xbf, 0x2a, 0xea, - 0xad, 0x2a, 0x8a, 0xbe, 0x2e, 0xa9, 0x37, 0x5b, 0x52, 0x6f, 0xbe, 0xa4, 0x5e, 0x76, 0x58, 0x7f, - 0xa5, 0xd7, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0x05, 0x41, 0x71, 0x83, 0x84, 0x02, 0x00, 0x00, + // 471 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x52, 0xcf, 0x8a, 0xd3, 0x40, + 0x18, 0xcf, 0xb4, 0xb5, 0x4d, 0xa7, 0xb4, 0x5b, 0x86, 0x65, 0x89, 0x55, 0xc7, 0x10, 0x10, 0x22, + 0x68, 0x8b, 0x55, 0x16, 0x3c, 0xc9, 0x16, 0x2f, 0x0b, 0xee, 0x65, 0x76, 0xf1, 0x26, 0x65, 0xda, + 0x7c, 0x89, 0xc3, 0x36, 0x33, 0x31, 0x19, 0x4b, 0x8f, 0x3e, 0x82, 0x4f, 0xe0, 0xd9, 0x47, 0xd9, + 0x63, 0x8f, 0x7b, 0x12, 0x9b, 0x5e, 0x3c, 0xee, 0x23, 0x48, 0x27, 0xd9, 0x6d, 0x2b, 0x78, 0xd9, + 0xdb, 0xf7, 0xfb, 0x93, 0x1f, 0xf9, 0x7d, 0xf3, 0xe1, 0xe3, 0xe4, 0x32, 0x1a, 0x08, 0x19, 0x41, + 0xa6, 0x21, 0x7d, 0x99, 0x86, 0xaf, 0x06, 0x31, 0x68, 0x9e, 0x69, 0x95, 0xc2, 0x76, 0x4a, 0x26, + 0xdb, 0xb9, 0x9f, 0xa4, 0x4a, 0x2b, 0xd2, 0xda, 0x11, 0x7b, 0x87, 0x91, 0x8a, 0x94, 0xe1, 0x07, + 0x9b, 0xa9, 0xb0, 0x78, 0xef, 0xf0, 0xc1, 0x49, 0x10, 0x8c, 0x66, 0x6a, 0x7a, 0xc9, 0xe0, 0xcb, + 0x57, 0xc8, 0x34, 0x79, 0x81, 0x1f, 0x4c, 0x36, 0xd8, 0x41, 0x2e, 0xf2, 0x5b, 0xc3, 0xa3, 0xfe, + 0x4e, 0x4a, 0xdf, 0x38, 0xcf, 0x40, 0x73, 0x56, 0x98, 0x3c, 0x82, 0xbb, 0xdb, 0x80, 0x2c, 0x51, + 0x32, 0x03, 0xef, 0x47, 0x05, 0x37, 0xef, 0x8c, 0xe4, 0x19, 0xee, 0x84, 0x2a, 0x8d, 0xb9, 0x1e, + 0xcf, 0x21, 0xcd, 0x84, 0x92, 0x26, 0xb8, 0xc6, 0xda, 0x05, 0xfb, 0xb1, 0x20, 0x49, 0x07, 0x57, + 0x44, 0xe0, 0x54, 0x5c, 0xe4, 0x37, 0x59, 0x45, 0x04, 0xe4, 0x21, 0xb6, 0x63, 0x21, 0xc7, 0x5a, + 0xc4, 0xe0, 0x54, 0x5d, 0xe4, 0x57, 0x59, 0x23, 0x16, 0xf2, 0x42, 0xc4, 0x60, 0x24, 0xbe, 0x28, + 0xa4, 0x5a, 0x29, 0xf1, 0x85, 0x91, 0x9e, 0xe3, 0xee, 0x54, 0xc5, 0x09, 0x9f, 0x6a, 0xa1, 0xe4, + 0x78, 0x06, 0x73, 0x98, 0x39, 0x75, 0x17, 0xf9, 0x6d, 0x76, 0xb0, 0xe5, 0x3f, 0x6c, 0x68, 0x72, + 0x8c, 0x6d, 0x21, 0x03, 0x58, 0x30, 0x08, 0x9d, 0x86, 0xa9, 0x7a, 0xb8, 0x57, 0xf5, 0x3d, 0xd7, + 0x9c, 0x41, 0x38, 0xaa, 0x5d, 0xfd, 0x7a, 0x6a, 0xb1, 0x3b, 0x2f, 0x39, 0xc1, 0x1d, 0x0d, 0x92, + 0x4b, 0x3d, 0xce, 0x74, 0x0a, 0x3c, 0xce, 0x1c, 0xdb, 0xad, 0xfa, 0xad, 0x61, 0x6f, 0xef, 0xeb, + 0x0b, 0x63, 0x39, 0x2f, 0x1c, 0xac, 0xad, 0x77, 0xa1, 0xf7, 0x16, 0x37, 0xca, 0x74, 0x72, 0x84, + 0xeb, 0x2a, 0x0c, 0x33, 0xd0, 0x66, 0x2b, 0x55, 0x56, 0xa2, 0x0d, 0x3f, 0x03, 0x19, 0xe9, 0xcf, + 0x66, 0x25, 0x55, 0x56, 0x22, 0x6f, 0x82, 0xdb, 0x7b, 0xd1, 0xe4, 0x11, 0x6e, 0x96, 0xbf, 0x23, + 0x02, 0x93, 0xd1, 0x64, 0x76, 0x41, 0x9c, 0xde, 0x73, 0x89, 0xc3, 0x4f, 0xb8, 0x7b, 0x76, 0x5b, + 0xe5, 0x1c, 0xd2, 0xb9, 0x98, 0x02, 0x39, 0xc5, 0xf6, 0xed, 0x3b, 0x93, 0xc7, 0x7b, 0x4d, 0xff, + 0xb9, 0x9f, 0xde, 0x93, 0xff, 0xa8, 0xe5, 0x71, 0x58, 0xa3, 0x37, 0xcb, 0x15, 0xb5, 0xae, 0x57, + 0xd4, 0xba, 0x59, 0x51, 0xf4, 0x2d, 0xa7, 0xe8, 0x67, 0x4e, 0xd1, 0x55, 0x4e, 0xd1, 0x32, 0xa7, + 0xe8, 0x77, 0x4e, 0xd1, 0x9f, 0x9c, 0x5a, 0x37, 0x39, 0x45, 0xdf, 0xd7, 0xd4, 0x5a, 0xae, 0xa9, + 0x75, 0xbd, 0xa6, 0xd6, 0xa4, 0x6e, 0x0e, 0xf6, 0xf5, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x62, + 0x2d, 0xad, 0xa0, 0x0d, 0x03, 0x00, 0x00, } func (this *AddBlockRequest) Equal(that interface{}) bool { @@ -369,6 +435,9 @@ func (this *BlockMeta) Equal(that interface{}) bool { if this.CompactionLevel != that1.CompactionLevel { return false } + if !this.IndexRef.Equal(&that1.IndexRef) { + return false + } if len(this.TenantStreams) != len(that1.TenantStreams) { return false } @@ -379,6 +448,33 @@ func (this *BlockMeta) Equal(that interface{}) bool { } return true } +func (this *DataRef) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*DataRef) + if !ok { + that2, ok := that.(DataRef) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Offset != that1.Offset { + return false + } + if this.Length != that1.Length { + return false + } + return true +} func (this *TenantStreams) Equal(that interface{}) bool { if that == nil { return this == nil @@ -434,19 +530,31 @@ func (this *BlockMeta) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 10) + s := make([]string, 0, 11) s = append(s, "&metastorepb.BlockMeta{") s = append(s, "FormatVersion: "+fmt.Sprintf("%#v", this.FormatVersion)+",\n") s = append(s, "Id: "+fmt.Sprintf("%#v", this.Id)+",\n") s = append(s, "MinTime: "+fmt.Sprintf("%#v", this.MinTime)+",\n") s = append(s, "MaxTime: "+fmt.Sprintf("%#v", this.MaxTime)+",\n") s = append(s, "CompactionLevel: "+fmt.Sprintf("%#v", this.CompactionLevel)+",\n") + s = append(s, "IndexRef: "+strings.Replace(this.IndexRef.GoString(), `&`, ``, 1)+",\n") if this.TenantStreams != nil { s = append(s, "TenantStreams: "+fmt.Sprintf("%#v", this.TenantStreams)+",\n") } s = append(s, "}") return strings.Join(s, "") } +func (this *DataRef) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&metastorepb.DataRef{") + s = append(s, "Offset: "+fmt.Sprintf("%#v", this.Offset)+",\n") + s = append(s, "Length: "+fmt.Sprintf("%#v", this.Length)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} func (this *TenantStreams) GoString() string { if this == nil { return "nil" @@ -637,9 +745,19 @@ func (m *BlockMeta) MarshalToSizedBuffer(dAtA []byte) (int, error) { i = encodeVarintMetastore(dAtA, i, uint64(size)) } i-- - dAtA[i] = 0x3a + dAtA[i] = 0x42 + } + } + { + size, err := m.IndexRef.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err } + i -= size + i = encodeVarintMetastore(dAtA, i, uint64(size)) } + i-- + dAtA[i] = 0x3a if m.CompactionLevel != 0 { i = encodeVarintMetastore(dAtA, i, uint64(m.CompactionLevel)) i-- @@ -670,6 +788,39 @@ func (m *BlockMeta) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *DataRef) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *DataRef) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *DataRef) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Length != 0 { + i = encodeVarintMetastore(dAtA, i, uint64(m.Length)) + i-- + dAtA[i] = 0x10 + } + if m.Offset != 0 { + i = encodeVarintMetastore(dAtA, i, uint64(m.Offset)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func (m *TenantStreams) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -765,6 +916,8 @@ func (m *BlockMeta) Size() (n int) { if m.CompactionLevel != 0 { n += 1 + sovMetastore(uint64(m.CompactionLevel)) } + l = m.IndexRef.Size() + n += 1 + l + sovMetastore(uint64(l)) if len(m.TenantStreams) > 0 { for _, e := range m.TenantStreams { l = e.Size() @@ -774,6 +927,21 @@ func (m *BlockMeta) Size() (n int) { return n } +func (m *DataRef) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Offset != 0 { + n += 1 + sovMetastore(uint64(m.Offset)) + } + if m.Length != 0 { + n += 1 + sovMetastore(uint64(m.Length)) + } + return n +} + func (m *TenantStreams) Size() (n int) { if m == nil { return 0 @@ -833,11 +1001,23 @@ func (this *BlockMeta) String() string { `MinTime:` + fmt.Sprintf("%v", this.MinTime) + `,`, `MaxTime:` + fmt.Sprintf("%v", this.MaxTime) + `,`, `CompactionLevel:` + fmt.Sprintf("%v", this.CompactionLevel) + `,`, + `IndexRef:` + strings.Replace(strings.Replace(this.IndexRef.String(), "DataRef", "DataRef", 1), `&`, ``, 1) + `,`, `TenantStreams:` + repeatedStringForTenantStreams + `,`, `}`, }, "") return s } +func (this *DataRef) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&DataRef{`, + `Offset:` + fmt.Sprintf("%v", this.Offset) + `,`, + `Length:` + fmt.Sprintf("%v", this.Length) + `,`, + `}`, + }, "") + return s +} func (this *TenantStreams) String() string { if this == nil { return "nil" @@ -1138,6 +1318,39 @@ func (m *BlockMeta) Unmarshal(dAtA []byte) error { } } case 7: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field IndexRef", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetastore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthMetastore + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthMetastore + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.IndexRef.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 8: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field TenantStreams", wireType) } @@ -1195,6 +1408,97 @@ func (m *BlockMeta) Unmarshal(dAtA []byte) error { } return nil } +func (m *DataRef) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetastore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DataRef: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DataRef: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Offset", wireType) + } + m.Offset = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetastore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Offset |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Length", wireType) + } + m.Length = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMetastore + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Length |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipMetastore(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthMetastore + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthMetastore + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *TenantStreams) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/pkg/ingester-rf1/metastore/metastorepb/metastore.proto b/pkg/ingester-rf1/metastore/metastorepb/metastore.proto index ce69aeb9b073..12b71208c729 100644 --- a/pkg/ingester-rf1/metastore/metastorepb/metastore.proto +++ b/pkg/ingester-rf1/metastore/metastorepb/metastore.proto @@ -2,6 +2,8 @@ syntax = "proto3"; package metastorepb; +import "gogoproto/gogo.proto"; + service MetastoreService { rpc AddBlock(AddBlockRequest) returns (AddBlockResponse) {} } @@ -18,8 +20,14 @@ message BlockMeta { int64 min_time = 3; int64 max_time = 4; uint32 compaction_level = 6; + DataRef indexRef = 7 [(gogoproto.nullable) = false]; + + repeated TenantStreams tenant_streams = 8; +} - repeated TenantStreams tenant_streams = 7; +message DataRef { + int64 offset = 1; + int64 length = 2; } // TenantStreams object points to the offset in the block at which diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 7352e966b69d..fa0ada753609 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -24,7 +24,6 @@ import ( "golang.org/x/sync/errgroup" "google.golang.org/grpc/health/grpc_health_v1" - "github.com/grafana/loki/pkg/push" "github.com/grafana/loki/v3/pkg/compactor/deletion" "github.com/grafana/loki/v3/pkg/indexgateway" "github.com/grafana/loki/v3/pkg/iter" @@ -43,6 +42,8 @@ import ( listutil "github.com/grafana/loki/v3/pkg/util" "github.com/grafana/loki/v3/pkg/util/spanlogger" util_validation "github.com/grafana/loki/v3/pkg/util/validation" + + "github.com/grafana/loki/pkg/push" ) const ( diff --git a/pkg/storage/wal/segment.go b/pkg/storage/wal/segment.go index 772651238e99..945d8ff17c7d 100644 --- a/pkg/storage/wal/segment.go +++ b/pkg/storage/wal/segment.go @@ -54,6 +54,7 @@ type SegmentWriter struct { inputSize atomic.Int64 idxWriter *index.Writer consistencyMtx *sync.RWMutex + indexRef metastorepb.DataRef } type streamSegment struct { @@ -209,6 +210,7 @@ func (b *SegmentWriter) Meta(id string) *metastorepb.BlockMeta { Id: id, FormatVersion: uint64(1), CompactionLevel: 0, + IndexRef: b.indexRef, MinTime: globalMinT, MaxTime: globalMaxT, TenantStreams: result, @@ -308,6 +310,8 @@ func (b *SegmentWriter) WriteTo(w io.Writer) (int64, error) { if n != len(buf) { return total, errors.New("invalid written index len") } + b.indexRef.Offset = total + b.indexRef.Length = int64(n) total += int64(n) // write index len 4b @@ -349,6 +353,8 @@ func (b *SegmentWriter) Reset() { b.streams = make(map[streamID]*streamSegment, 64) b.buf1.Reset() b.inputSize.Store(0) + b.indexRef.Length = 0 + b.indexRef.Offset = 0 } // InputSize returns the total size of the input data written to the writer. diff --git a/pkg/storage/wal/segment_test.go b/pkg/storage/wal/segment_test.go index 754a51f583e1..5852294329f9 100644 --- a/pkg/storage/wal/segment_test.go +++ b/pkg/storage/wal/segment_test.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/storage/wal/index" "github.com/grafana/loki/v3/pkg/storage/wal/testdata" "github.com/grafana/loki/pkg/push" @@ -448,6 +449,8 @@ func TestReset(t *testing.T) { func Test_Meta(t *testing.T) { w, err := NewWalSegmentWriter(NewSegmentMetrics(nil)) + buff := bytes.NewBuffer(nil) + require.NoError(t, err) lbls := labels.FromStrings("container", "foo", "namespace", "dev") @@ -462,7 +465,13 @@ func Test_Meta(t *testing.T) { {Timestamp: time.Unix(3, 0), Line: "Entry 2"}, {Timestamp: time.Unix(4, 0), Line: "Entry 3"}, }) + _, err = w.WriteTo(buff) + require.NoError(t, err) meta := w.Meta("bar") + indexReader, err := index.NewReader(index.RealByteSlice(buff.Bytes()[meta.IndexRef.Offset : meta.IndexRef.Offset+meta.IndexRef.Length])) + require.NoError(t, err) + + defer indexReader.Close() require.Equal(t, &metastorepb.BlockMeta{ FormatVersion: 1, @@ -470,6 +479,7 @@ func Test_Meta(t *testing.T) { MinTime: time.Unix(1, 0).UnixNano(), MaxTime: time.Unix(4, 0).UnixNano(), CompactionLevel: 0, + IndexRef: meta.IndexRef, TenantStreams: []*metastorepb.TenantStreams{ { TenantId: "tenanta",