diff --git a/src/dbnode/generated/thrift/rpc.thrift b/src/dbnode/generated/thrift/rpc.thrift index dda00739c3..afc834b7c1 100644 --- a/src/dbnode/generated/thrift/rpc.thrift +++ b/src/dbnode/generated/thrift/rpc.thrift @@ -64,6 +64,8 @@ service Node { void repair() throws (1: Error err) TruncateResult truncate(1: TruncateRequest req) throws (1: Error err) + AggregateTilesResult aggregateTiles(1: AggregateTilesRequest req) throws (1: Error err) + // Management endpoints NodeHealthResult health() throws (1: Error err) // NB: bootstrapped is for use with cluster management tools like k8s. @@ -491,6 +493,20 @@ struct Query { 7: optional FieldQuery field } +struct AggregateTilesRequest { + 1: required string sourceNameSpace + 2: required string targetNameSpace + 3: required i64 rangeStart + 4: required i64 rangeEnd + 5: required string step + 6: bool removeResets + 7: optional TimeType rangeType = TimeType.UNIX_SECONDS +} + +struct AggregateTilesResult { + 1: required i64 processedBlockCount +} + struct DebugProfileStartRequest { 1: required string name 2: required string filePathTemplate diff --git a/src/dbnode/generated/thrift/rpc/rpc.go b/src/dbnode/generated/thrift/rpc/rpc.go index 66575da7fd..9c9a7ae137 100644 --- a/src/dbnode/generated/thrift/rpc/rpc.go +++ b/src/dbnode/generated/thrift/rpc/rpc.go @@ -12906,6 +12906,445 @@ func (p *Query) String() string { return fmt.Sprintf("Query(%+v)", *p) } +// Attributes: +// - SourceNameSpace +// - TargetNameSpace +// - RangeStart +// - RangeEnd +// - Step +// - RemoveResets +// - RangeType +type AggregateTilesRequest struct { + SourceNameSpace string `thrift:"sourceNameSpace,1,required" db:"sourceNameSpace" json:"sourceNameSpace"` + TargetNameSpace string `thrift:"targetNameSpace,2,required" db:"targetNameSpace" json:"targetNameSpace"` + RangeStart int64 `thrift:"rangeStart,3,required" db:"rangeStart" json:"rangeStart"` + RangeEnd int64 `thrift:"rangeEnd,4,required" db:"rangeEnd" json:"rangeEnd"` + Step string `thrift:"step,5,required" db:"step" json:"step"` + RemoveResets bool `thrift:"removeResets,6" db:"removeResets" json:"removeResets"` + RangeType TimeType `thrift:"rangeType,7" db:"rangeType" json:"rangeType,omitempty"` +} + +func NewAggregateTilesRequest() *AggregateTilesRequest { + return &AggregateTilesRequest{ + RangeType: 0, + } +} + +func (p *AggregateTilesRequest) GetSourceNameSpace() string { + return p.SourceNameSpace +} + +func (p *AggregateTilesRequest) GetTargetNameSpace() string { + return p.TargetNameSpace +} + +func (p *AggregateTilesRequest) GetRangeStart() int64 { + return p.RangeStart +} + +func (p *AggregateTilesRequest) GetRangeEnd() int64 { + return p.RangeEnd +} + +func (p *AggregateTilesRequest) GetStep() string { + return p.Step +} + +func (p *AggregateTilesRequest) GetRemoveResets() bool { + return p.RemoveResets +} + +var AggregateTilesRequest_RangeType_DEFAULT TimeType = 0 + +func (p *AggregateTilesRequest) GetRangeType() TimeType { + return p.RangeType +} +func (p *AggregateTilesRequest) IsSetRangeType() bool { + return p.RangeType != AggregateTilesRequest_RangeType_DEFAULT +} + +func (p *AggregateTilesRequest) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + var issetSourceNameSpace bool = false + var issetTargetNameSpace bool = false + var issetRangeStart bool = false + var issetRangeEnd bool = false + var issetStep bool = false + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if err := p.ReadField1(iprot); err != nil { + return err + } + issetSourceNameSpace = true + case 2: + if err := p.ReadField2(iprot); err != nil { + return err + } + issetTargetNameSpace = true + case 3: + if err := p.ReadField3(iprot); err != nil { + return err + } + issetRangeStart = true + case 4: + if err := p.ReadField4(iprot); err != nil { + return err + } + issetRangeEnd = true + case 5: + if err := p.ReadField5(iprot); err != nil { + return err + } + issetStep = true + case 6: + if err := p.ReadField6(iprot); err != nil { + return err + } + case 7: + if err := p.ReadField7(iprot); err != nil { + return err + } + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + if !issetSourceNameSpace { + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field SourceNameSpace is not set")) + } + if !issetTargetNameSpace { + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field TargetNameSpace is not set")) + } + if !issetRangeStart { + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field RangeStart is not set")) + } + if !issetRangeEnd { + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field RangeEnd is not set")) + } + if !issetStep { + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field Step is not set")) + } + return nil +} + +func (p *AggregateTilesRequest) ReadField1(iprot thrift.TProtocol) error { + if v, err := iprot.ReadString(); err != nil { + return thrift.PrependError("error reading field 1: ", err) + } else { + p.SourceNameSpace = v + } + return nil +} + +func (p *AggregateTilesRequest) ReadField2(iprot thrift.TProtocol) error { + if v, err := iprot.ReadString(); err != nil { + return thrift.PrependError("error reading field 2: ", err) + } else { + p.TargetNameSpace = v + } + return nil +} + +func (p *AggregateTilesRequest) ReadField3(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 3: ", err) + } else { + p.RangeStart = v + } + return nil +} + +func (p *AggregateTilesRequest) ReadField4(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 4: ", err) + } else { + p.RangeEnd = v + } + return nil +} + +func (p *AggregateTilesRequest) ReadField5(iprot thrift.TProtocol) error { + if v, err := iprot.ReadString(); err != nil { + return thrift.PrependError("error reading field 5: ", err) + } else { + p.Step = v + } + return nil +} + +func (p *AggregateTilesRequest) ReadField6(iprot thrift.TProtocol) error { + if v, err := iprot.ReadBool(); err != nil { + return thrift.PrependError("error reading field 6: ", err) + } else { + p.RemoveResets = v + } + return nil +} + +func (p *AggregateTilesRequest) ReadField7(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI32(); err != nil { + return thrift.PrependError("error reading field 7: ", err) + } else { + temp := TimeType(v) + p.RangeType = temp + } + return nil +} + +func (p *AggregateTilesRequest) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("AggregateTilesRequest"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) + } + if p != nil { + if err := p.writeField1(oprot); err != nil { + return err + } + if err := p.writeField2(oprot); err != nil { + return err + } + if err := p.writeField3(oprot); err != nil { + return err + } + if err := p.writeField4(oprot); err != nil { + return err + } + if err := p.writeField5(oprot); err != nil { + return err + } + if err := p.writeField6(oprot); err != nil { + return err + } + if err := p.writeField7(oprot); err != nil { + return err + } + } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) + } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) + } + return nil +} + +func (p *AggregateTilesRequest) writeField1(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("sourceNameSpace", thrift.STRING, 1); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:sourceNameSpace: ", p), err) + } + if err := oprot.WriteString(string(p.SourceNameSpace)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.sourceNameSpace (1) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 1:sourceNameSpace: ", p), err) + } + return err +} + +func (p *AggregateTilesRequest) writeField2(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("targetNameSpace", thrift.STRING, 2); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 2:targetNameSpace: ", p), err) + } + if err := oprot.WriteString(string(p.TargetNameSpace)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.targetNameSpace (2) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 2:targetNameSpace: ", p), err) + } + return err +} + +func (p *AggregateTilesRequest) writeField3(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("rangeStart", thrift.I64, 3); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 3:rangeStart: ", p), err) + } + if err := oprot.WriteI64(int64(p.RangeStart)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.rangeStart (3) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 3:rangeStart: ", p), err) + } + return err +} + +func (p *AggregateTilesRequest) writeField4(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("rangeEnd", thrift.I64, 4); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 4:rangeEnd: ", p), err) + } + if err := oprot.WriteI64(int64(p.RangeEnd)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.rangeEnd (4) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 4:rangeEnd: ", p), err) + } + return err +} + +func (p *AggregateTilesRequest) writeField5(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("step", thrift.STRING, 5); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 5:step: ", p), err) + } + if err := oprot.WriteString(string(p.Step)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.step (5) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 5:step: ", p), err) + } + return err +} + +func (p *AggregateTilesRequest) writeField6(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("removeResets", thrift.BOOL, 6); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 6:removeResets: ", p), err) + } + if err := oprot.WriteBool(bool(p.RemoveResets)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.removeResets (6) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 6:removeResets: ", p), err) + } + return err +} + +func (p *AggregateTilesRequest) writeField7(oprot thrift.TProtocol) (err error) { + if p.IsSetRangeType() { + if err := oprot.WriteFieldBegin("rangeType", thrift.I32, 7); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 7:rangeType: ", p), err) + } + if err := oprot.WriteI32(int32(p.RangeType)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.rangeType (7) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 7:rangeType: ", p), err) + } + } + return err +} + +func (p *AggregateTilesRequest) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("AggregateTilesRequest(%+v)", *p) +} + +// Attributes: +// - ProcessedBlockCount +type AggregateTilesResult_ struct { + ProcessedBlockCount int64 `thrift:"processedBlockCount,1,required" db:"processedBlockCount" json:"processedBlockCount"` +} + +func NewAggregateTilesResult_() *AggregateTilesResult_ { + return &AggregateTilesResult_{} +} + +func (p *AggregateTilesResult_) GetProcessedBlockCount() int64 { + return p.ProcessedBlockCount +} +func (p *AggregateTilesResult_) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + var issetProcessedBlockCount bool = false + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if err := p.ReadField1(iprot); err != nil { + return err + } + issetProcessedBlockCount = true + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + if !issetProcessedBlockCount { + return thrift.NewTProtocolExceptionWithType(thrift.INVALID_DATA, fmt.Errorf("Required field ProcessedBlockCount is not set")) + } + return nil +} + +func (p *AggregateTilesResult_) ReadField1(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return thrift.PrependError("error reading field 1: ", err) + } else { + p.ProcessedBlockCount = v + } + return nil +} + +func (p *AggregateTilesResult_) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("AggregateTilesResult"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) + } + if p != nil { + if err := p.writeField1(oprot); err != nil { + return err + } + } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) + } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) + } + return nil +} + +func (p *AggregateTilesResult_) writeField1(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("processedBlockCount", thrift.I64, 1); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:processedBlockCount: ", p), err) + } + if err := oprot.WriteI64(int64(p.ProcessedBlockCount)); err != nil { + return thrift.PrependError(fmt.Sprintf("%T.processedBlockCount (1) field write error: ", p), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 1:processedBlockCount: ", p), err) + } + return err +} + +func (p *AggregateTilesResult_) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("AggregateTilesResult_(%+v)", *p) +} + // Attributes: // - Name // - FilePathTemplate @@ -13734,6 +14173,9 @@ type Node interface { // Parameters: // - Req Truncate(req *TruncateRequest) (r *TruncateResult_, err error) + // Parameters: + // - Req + AggregateTiles(req *AggregateTilesRequest) (r *AggregateTilesResult_, err error) Health() (r *NodeHealthResult_, err error) Bootstrapped() (r *NodeBootstrappedResult_, err error) BootstrappedInPlacementOrNoPlacement() (r *NodeBootstrappedInPlacementOrNoPlacementResult_, err error) @@ -15156,6 +15598,87 @@ func (p *NodeClient) recvTruncate() (value *TruncateResult_, err error) { return } +// Parameters: +// - Req +func (p *NodeClient) AggregateTiles(req *AggregateTilesRequest) (r *AggregateTilesResult_, err error) { + if err = p.sendAggregateTiles(req); err != nil { + return + } + return p.recvAggregateTiles() +} + +func (p *NodeClient) sendAggregateTiles(req *AggregateTilesRequest) (err error) { + oprot := p.OutputProtocol + if oprot == nil { + oprot = p.ProtocolFactory.GetProtocol(p.Transport) + p.OutputProtocol = oprot + } + p.SeqId++ + if err = oprot.WriteMessageBegin("aggregateTiles", thrift.CALL, p.SeqId); err != nil { + return + } + args := NodeAggregateTilesArgs{ + Req: req, + } + if err = args.Write(oprot); err != nil { + return + } + if err = oprot.WriteMessageEnd(); err != nil { + return + } + return oprot.Flush() +} + +func (p *NodeClient) recvAggregateTiles() (value *AggregateTilesResult_, err error) { + iprot := p.InputProtocol + if iprot == nil { + iprot = p.ProtocolFactory.GetProtocol(p.Transport) + p.InputProtocol = iprot + } + method, mTypeId, seqId, err := iprot.ReadMessageBegin() + if err != nil { + return + } + if method != "aggregateTiles" { + err = thrift.NewTApplicationException(thrift.WRONG_METHOD_NAME, "aggregateTiles failed: wrong method name") + return + } + if p.SeqId != seqId { + err = thrift.NewTApplicationException(thrift.BAD_SEQUENCE_ID, "aggregateTiles failed: out of sequence response") + return + } + if mTypeId == thrift.EXCEPTION { + error69 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error70 error + error70, err = error69.Read(iprot) + if err != nil { + return + } + if err = iprot.ReadMessageEnd(); err != nil { + return + } + err = error70 + return + } + if mTypeId != thrift.REPLY { + err = thrift.NewTApplicationException(thrift.INVALID_MESSAGE_TYPE_EXCEPTION, "aggregateTiles failed: invalid message type") + return + } + result := NodeAggregateTilesResult{} + if err = result.Read(iprot); err != nil { + return + } + if err = iprot.ReadMessageEnd(); err != nil { + return + } + if result.Err != nil { + err = result.Err + return + } + value = result.GetSuccess() + return +} + func (p *NodeClient) Health() (r *NodeHealthResult_, err error) { if err = p.sendHealth(); err != nil { return @@ -15202,16 +15725,16 @@ func (p *NodeClient) recvHealth() (value *NodeHealthResult_, err error) { return } if mTypeId == thrift.EXCEPTION { - error69 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error70 error - error70, err = error69.Read(iprot) + error71 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error72 error + error72, err = error71.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error70 + err = error72 return } if mTypeId != thrift.REPLY { @@ -15279,16 +15802,16 @@ func (p *NodeClient) recvBootstrapped() (value *NodeBootstrappedResult_, err err return } if mTypeId == thrift.EXCEPTION { - error71 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error72 error - error72, err = error71.Read(iprot) + error73 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error74 error + error74, err = error73.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error72 + err = error74 return } if mTypeId != thrift.REPLY { @@ -15356,16 +15879,16 @@ func (p *NodeClient) recvBootstrappedInPlacementOrNoPlacement() (value *NodeBoot return } if mTypeId == thrift.EXCEPTION { - error73 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error74 error - error74, err = error73.Read(iprot) + error75 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error76 error + error76, err = error75.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error74 + err = error76 return } if mTypeId != thrift.REPLY { @@ -15433,16 +15956,16 @@ func (p *NodeClient) recvGetPersistRateLimit() (value *NodePersistRateLimitResul return } if mTypeId == thrift.EXCEPTION { - error75 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error76 error - error76, err = error75.Read(iprot) + error77 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error78 error + error78, err = error77.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error76 + err = error78 return } if mTypeId != thrift.REPLY { @@ -15514,16 +16037,16 @@ func (p *NodeClient) recvSetPersistRateLimit() (value *NodePersistRateLimitResul return } if mTypeId == thrift.EXCEPTION { - error77 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error78 error - error78, err = error77.Read(iprot) + error79 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error80 error + error80, err = error79.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error78 + err = error80 return } if mTypeId != thrift.REPLY { @@ -15591,16 +16114,16 @@ func (p *NodeClient) recvGetWriteNewSeriesAsync() (value *NodeWriteNewSeriesAsyn return } if mTypeId == thrift.EXCEPTION { - error79 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error80 error - error80, err = error79.Read(iprot) + error81 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error82 error + error82, err = error81.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error80 + err = error82 return } if mTypeId != thrift.REPLY { @@ -15672,16 +16195,16 @@ func (p *NodeClient) recvSetWriteNewSeriesAsync() (value *NodeWriteNewSeriesAsyn return } if mTypeId == thrift.EXCEPTION { - error81 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error82 error - error82, err = error81.Read(iprot) + error83 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error84 error + error84, err = error83.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error82 + err = error84 return } if mTypeId != thrift.REPLY { @@ -15749,16 +16272,16 @@ func (p *NodeClient) recvGetWriteNewSeriesBackoffDuration() (value *NodeWriteNew return } if mTypeId == thrift.EXCEPTION { - error83 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error84 error - error84, err = error83.Read(iprot) + error85 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error86 error + error86, err = error85.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error84 + err = error86 return } if mTypeId != thrift.REPLY { @@ -15830,16 +16353,16 @@ func (p *NodeClient) recvSetWriteNewSeriesBackoffDuration() (value *NodeWriteNew return } if mTypeId == thrift.EXCEPTION { - error85 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error86 error - error86, err = error85.Read(iprot) + error87 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error88 error + error88, err = error87.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error86 + err = error88 return } if mTypeId != thrift.REPLY { @@ -15907,16 +16430,16 @@ func (p *NodeClient) recvGetWriteNewSeriesLimitPerShardPerSecond() (value *NodeW return } if mTypeId == thrift.EXCEPTION { - error87 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error88 error - error88, err = error87.Read(iprot) + error89 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error90 error + error90, err = error89.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error88 + err = error90 return } if mTypeId != thrift.REPLY { @@ -15988,16 +16511,16 @@ func (p *NodeClient) recvSetWriteNewSeriesLimitPerShardPerSecond() (value *NodeW return } if mTypeId == thrift.EXCEPTION { - error89 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error90 error - error90, err = error89.Read(iprot) + error91 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error92 error + error92, err = error91.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error90 + err = error92 return } if mTypeId != thrift.REPLY { @@ -16069,16 +16592,16 @@ func (p *NodeClient) recvDebugProfileStart() (value *DebugProfileStartResult_, e return } if mTypeId == thrift.EXCEPTION { - error91 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error92 error - error92, err = error91.Read(iprot) + error93 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error94 error + error94, err = error93.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error92 + err = error94 return } if mTypeId != thrift.REPLY { @@ -16150,16 +16673,16 @@ func (p *NodeClient) recvDebugProfileStop() (value *DebugProfileStopResult_, err return } if mTypeId == thrift.EXCEPTION { - error93 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error94 error - error94, err = error93.Read(iprot) + error95 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error96 error + error96, err = error95.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error94 + err = error96 return } if mTypeId != thrift.REPLY { @@ -16231,16 +16754,16 @@ func (p *NodeClient) recvDebugIndexMemorySegments() (value *DebugIndexMemorySegm return } if mTypeId == thrift.EXCEPTION { - error95 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error96 error - error96, err = error95.Read(iprot) + error97 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error98 error + error98, err = error97.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error96 + err = error98 return } if mTypeId != thrift.REPLY { @@ -16282,39 +16805,40 @@ func (p *NodeProcessor) ProcessorMap() map[string]thrift.TProcessorFunction { func NewNodeProcessor(handler Node) *NodeProcessor { - self97 := &NodeProcessor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)} - self97.processorMap["query"] = &nodeProcessorQuery{handler: handler} - self97.processorMap["aggregateRaw"] = &nodeProcessorAggregateRaw{handler: handler} - self97.processorMap["aggregate"] = &nodeProcessorAggregate{handler: handler} - self97.processorMap["fetch"] = &nodeProcessorFetch{handler: handler} - self97.processorMap["fetchTagged"] = &nodeProcessorFetchTagged{handler: handler} - self97.processorMap["write"] = &nodeProcessorWrite{handler: handler} - self97.processorMap["writeTagged"] = &nodeProcessorWriteTagged{handler: handler} - self97.processorMap["fetchBatchRaw"] = &nodeProcessorFetchBatchRaw{handler: handler} - self97.processorMap["fetchBatchRawV2"] = &nodeProcessorFetchBatchRawV2{handler: handler} - self97.processorMap["fetchBlocksRaw"] = &nodeProcessorFetchBlocksRaw{handler: handler} - self97.processorMap["fetchBlocksMetadataRawV2"] = &nodeProcessorFetchBlocksMetadataRawV2{handler: handler} - self97.processorMap["writeBatchRaw"] = &nodeProcessorWriteBatchRaw{handler: handler} - self97.processorMap["writeBatchRawV2"] = &nodeProcessorWriteBatchRawV2{handler: handler} - self97.processorMap["writeTaggedBatchRaw"] = &nodeProcessorWriteTaggedBatchRaw{handler: handler} - self97.processorMap["writeTaggedBatchRawV2"] = &nodeProcessorWriteTaggedBatchRawV2{handler: handler} - self97.processorMap["repair"] = &nodeProcessorRepair{handler: handler} - self97.processorMap["truncate"] = &nodeProcessorTruncate{handler: handler} - self97.processorMap["health"] = &nodeProcessorHealth{handler: handler} - self97.processorMap["bootstrapped"] = &nodeProcessorBootstrapped{handler: handler} - self97.processorMap["bootstrappedInPlacementOrNoPlacement"] = &nodeProcessorBootstrappedInPlacementOrNoPlacement{handler: handler} - self97.processorMap["getPersistRateLimit"] = &nodeProcessorGetPersistRateLimit{handler: handler} - self97.processorMap["setPersistRateLimit"] = &nodeProcessorSetPersistRateLimit{handler: handler} - self97.processorMap["getWriteNewSeriesAsync"] = &nodeProcessorGetWriteNewSeriesAsync{handler: handler} - self97.processorMap["setWriteNewSeriesAsync"] = &nodeProcessorSetWriteNewSeriesAsync{handler: handler} - self97.processorMap["getWriteNewSeriesBackoffDuration"] = &nodeProcessorGetWriteNewSeriesBackoffDuration{handler: handler} - self97.processorMap["setWriteNewSeriesBackoffDuration"] = &nodeProcessorSetWriteNewSeriesBackoffDuration{handler: handler} - self97.processorMap["getWriteNewSeriesLimitPerShardPerSecond"] = &nodeProcessorGetWriteNewSeriesLimitPerShardPerSecond{handler: handler} - self97.processorMap["setWriteNewSeriesLimitPerShardPerSecond"] = &nodeProcessorSetWriteNewSeriesLimitPerShardPerSecond{handler: handler} - self97.processorMap["debugProfileStart"] = &nodeProcessorDebugProfileStart{handler: handler} - self97.processorMap["debugProfileStop"] = &nodeProcessorDebugProfileStop{handler: handler} - self97.processorMap["debugIndexMemorySegments"] = &nodeProcessorDebugIndexMemorySegments{handler: handler} - return self97 + self99 := &NodeProcessor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)} + self99.processorMap["query"] = &nodeProcessorQuery{handler: handler} + self99.processorMap["aggregateRaw"] = &nodeProcessorAggregateRaw{handler: handler} + self99.processorMap["aggregate"] = &nodeProcessorAggregate{handler: handler} + self99.processorMap["fetch"] = &nodeProcessorFetch{handler: handler} + self99.processorMap["fetchTagged"] = &nodeProcessorFetchTagged{handler: handler} + self99.processorMap["write"] = &nodeProcessorWrite{handler: handler} + self99.processorMap["writeTagged"] = &nodeProcessorWriteTagged{handler: handler} + self99.processorMap["fetchBatchRaw"] = &nodeProcessorFetchBatchRaw{handler: handler} + self99.processorMap["fetchBatchRawV2"] = &nodeProcessorFetchBatchRawV2{handler: handler} + self99.processorMap["fetchBlocksRaw"] = &nodeProcessorFetchBlocksRaw{handler: handler} + self99.processorMap["fetchBlocksMetadataRawV2"] = &nodeProcessorFetchBlocksMetadataRawV2{handler: handler} + self99.processorMap["writeBatchRaw"] = &nodeProcessorWriteBatchRaw{handler: handler} + self99.processorMap["writeBatchRawV2"] = &nodeProcessorWriteBatchRawV2{handler: handler} + self99.processorMap["writeTaggedBatchRaw"] = &nodeProcessorWriteTaggedBatchRaw{handler: handler} + self99.processorMap["writeTaggedBatchRawV2"] = &nodeProcessorWriteTaggedBatchRawV2{handler: handler} + self99.processorMap["repair"] = &nodeProcessorRepair{handler: handler} + self99.processorMap["truncate"] = &nodeProcessorTruncate{handler: handler} + self99.processorMap["aggregateTiles"] = &nodeProcessorAggregateTiles{handler: handler} + self99.processorMap["health"] = &nodeProcessorHealth{handler: handler} + self99.processorMap["bootstrapped"] = &nodeProcessorBootstrapped{handler: handler} + self99.processorMap["bootstrappedInPlacementOrNoPlacement"] = &nodeProcessorBootstrappedInPlacementOrNoPlacement{handler: handler} + self99.processorMap["getPersistRateLimit"] = &nodeProcessorGetPersistRateLimit{handler: handler} + self99.processorMap["setPersistRateLimit"] = &nodeProcessorSetPersistRateLimit{handler: handler} + self99.processorMap["getWriteNewSeriesAsync"] = &nodeProcessorGetWriteNewSeriesAsync{handler: handler} + self99.processorMap["setWriteNewSeriesAsync"] = &nodeProcessorSetWriteNewSeriesAsync{handler: handler} + self99.processorMap["getWriteNewSeriesBackoffDuration"] = &nodeProcessorGetWriteNewSeriesBackoffDuration{handler: handler} + self99.processorMap["setWriteNewSeriesBackoffDuration"] = &nodeProcessorSetWriteNewSeriesBackoffDuration{handler: handler} + self99.processorMap["getWriteNewSeriesLimitPerShardPerSecond"] = &nodeProcessorGetWriteNewSeriesLimitPerShardPerSecond{handler: handler} + self99.processorMap["setWriteNewSeriesLimitPerShardPerSecond"] = &nodeProcessorSetWriteNewSeriesLimitPerShardPerSecond{handler: handler} + self99.processorMap["debugProfileStart"] = &nodeProcessorDebugProfileStart{handler: handler} + self99.processorMap["debugProfileStop"] = &nodeProcessorDebugProfileStop{handler: handler} + self99.processorMap["debugIndexMemorySegments"] = &nodeProcessorDebugIndexMemorySegments{handler: handler} + return self99 } func (p *NodeProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) { @@ -16327,12 +16851,12 @@ func (p *NodeProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, er } iprot.Skip(thrift.STRUCT) iprot.ReadMessageEnd() - x98 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name) + x100 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name) oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId) - x98.Write(oprot) + x100.Write(oprot) oprot.WriteMessageEnd() oprot.Flush() - return false, x98 + return false, x100 } @@ -17145,7 +17669,60 @@ func (p *nodeProcessorRepair) Process(seqId int32, iprot, oprot thrift.TProtocol return true, err2 } } - if err2 = oprot.WriteMessageBegin("repair", thrift.REPLY, seqId); err2 != nil { + if err2 = oprot.WriteMessageBegin("repair", thrift.REPLY, seqId); err2 != nil { + err = err2 + } + if err2 = result.Write(oprot); err == nil && err2 != nil { + err = err2 + } + if err2 = oprot.WriteMessageEnd(); err == nil && err2 != nil { + err = err2 + } + if err2 = oprot.Flush(); err == nil && err2 != nil { + err = err2 + } + if err != nil { + return + } + return true, err +} + +type nodeProcessorTruncate struct { + handler Node +} + +func (p *nodeProcessorTruncate) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) { + args := NodeTruncateArgs{} + if err = args.Read(iprot); err != nil { + iprot.ReadMessageEnd() + x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error()) + oprot.WriteMessageBegin("truncate", thrift.EXCEPTION, seqId) + x.Write(oprot) + oprot.WriteMessageEnd() + oprot.Flush() + return false, err + } + + iprot.ReadMessageEnd() + result := NodeTruncateResult{} + var retval *TruncateResult_ + var err2 error + if retval, err2 = p.handler.Truncate(args.Req); err2 != nil { + switch v := err2.(type) { + case *Error: + result.Err = v + default: + x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing truncate: "+err2.Error()) + oprot.WriteMessageBegin("truncate", thrift.EXCEPTION, seqId) + x.Write(oprot) + oprot.WriteMessageEnd() + oprot.Flush() + return true, err2 + } + } else { + result.Success = retval + } + if err2 = oprot.WriteMessageBegin("truncate", thrift.REPLY, seqId); err2 != nil { err = err2 } if err2 = result.Write(oprot); err == nil && err2 != nil { @@ -17163,16 +17740,16 @@ func (p *nodeProcessorRepair) Process(seqId int32, iprot, oprot thrift.TProtocol return true, err } -type nodeProcessorTruncate struct { +type nodeProcessorAggregateTiles struct { handler Node } -func (p *nodeProcessorTruncate) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) { - args := NodeTruncateArgs{} +func (p *nodeProcessorAggregateTiles) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) { + args := NodeAggregateTilesArgs{} if err = args.Read(iprot); err != nil { iprot.ReadMessageEnd() x := thrift.NewTApplicationException(thrift.PROTOCOL_ERROR, err.Error()) - oprot.WriteMessageBegin("truncate", thrift.EXCEPTION, seqId) + oprot.WriteMessageBegin("aggregateTiles", thrift.EXCEPTION, seqId) x.Write(oprot) oprot.WriteMessageEnd() oprot.Flush() @@ -17180,16 +17757,16 @@ func (p *nodeProcessorTruncate) Process(seqId int32, iprot, oprot thrift.TProtoc } iprot.ReadMessageEnd() - result := NodeTruncateResult{} - var retval *TruncateResult_ + result := NodeAggregateTilesResult{} + var retval *AggregateTilesResult_ var err2 error - if retval, err2 = p.handler.Truncate(args.Req); err2 != nil { + if retval, err2 = p.handler.AggregateTiles(args.Req); err2 != nil { switch v := err2.(type) { case *Error: result.Err = v default: - x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing truncate: "+err2.Error()) - oprot.WriteMessageBegin("truncate", thrift.EXCEPTION, seqId) + x := thrift.NewTApplicationException(thrift.INTERNAL_ERROR, "Internal error processing aggregateTiles: "+err2.Error()) + oprot.WriteMessageBegin("aggregateTiles", thrift.EXCEPTION, seqId) x.Write(oprot) oprot.WriteMessageEnd() oprot.Flush() @@ -17198,7 +17775,7 @@ func (p *nodeProcessorTruncate) Process(seqId int32, iprot, oprot thrift.TProtoc } else { result.Success = retval } - if err2 = oprot.WriteMessageBegin("truncate", thrift.REPLY, seqId); err2 != nil { + if err2 = oprot.WriteMessageBegin("aggregateTiles", thrift.REPLY, seqId); err2 != nil { err = err2 } if err2 = result.Write(oprot); err == nil && err2 != nil { @@ -21878,6 +22455,259 @@ func (p *NodeTruncateResult) String() string { return fmt.Sprintf("NodeTruncateResult(%+v)", *p) } +// Attributes: +// - Req +type NodeAggregateTilesArgs struct { + Req *AggregateTilesRequest `thrift:"req,1" db:"req" json:"req"` +} + +func NewNodeAggregateTilesArgs() *NodeAggregateTilesArgs { + return &NodeAggregateTilesArgs{} +} + +var NodeAggregateTilesArgs_Req_DEFAULT *AggregateTilesRequest + +func (p *NodeAggregateTilesArgs) GetReq() *AggregateTilesRequest { + if !p.IsSetReq() { + return NodeAggregateTilesArgs_Req_DEFAULT + } + return p.Req +} +func (p *NodeAggregateTilesArgs) IsSetReq() bool { + return p.Req != nil +} + +func (p *NodeAggregateTilesArgs) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if err := p.ReadField1(iprot); err != nil { + return err + } + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + return nil +} + +func (p *NodeAggregateTilesArgs) ReadField1(iprot thrift.TProtocol) error { + p.Req = &AggregateTilesRequest{ + RangeType: 0, + } + if err := p.Req.Read(iprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Req), err) + } + return nil +} + +func (p *NodeAggregateTilesArgs) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("aggregateTiles_args"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) + } + if p != nil { + if err := p.writeField1(oprot); err != nil { + return err + } + } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) + } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) + } + return nil +} + +func (p *NodeAggregateTilesArgs) writeField1(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("req", thrift.STRUCT, 1); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:req: ", p), err) + } + if err := p.Req.Write(oprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error writing struct: ", p.Req), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 1:req: ", p), err) + } + return err +} + +func (p *NodeAggregateTilesArgs) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("NodeAggregateTilesArgs(%+v)", *p) +} + +// Attributes: +// - Success +// - Err +type NodeAggregateTilesResult struct { + Success *AggregateTilesResult_ `thrift:"success,0" db:"success" json:"success,omitempty"` + Err *Error `thrift:"err,1" db:"err" json:"err,omitempty"` +} + +func NewNodeAggregateTilesResult() *NodeAggregateTilesResult { + return &NodeAggregateTilesResult{} +} + +var NodeAggregateTilesResult_Success_DEFAULT *AggregateTilesResult_ + +func (p *NodeAggregateTilesResult) GetSuccess() *AggregateTilesResult_ { + if !p.IsSetSuccess() { + return NodeAggregateTilesResult_Success_DEFAULT + } + return p.Success +} + +var NodeAggregateTilesResult_Err_DEFAULT *Error + +func (p *NodeAggregateTilesResult) GetErr() *Error { + if !p.IsSetErr() { + return NodeAggregateTilesResult_Err_DEFAULT + } + return p.Err +} +func (p *NodeAggregateTilesResult) IsSetSuccess() bool { + return p.Success != nil +} + +func (p *NodeAggregateTilesResult) IsSetErr() bool { + return p.Err != nil +} + +func (p *NodeAggregateTilesResult) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 0: + if err := p.ReadField0(iprot); err != nil { + return err + } + case 1: + if err := p.ReadField1(iprot); err != nil { + return err + } + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + return nil +} + +func (p *NodeAggregateTilesResult) ReadField0(iprot thrift.TProtocol) error { + p.Success = &AggregateTilesResult_{} + if err := p.Success.Read(iprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Success), err) + } + return nil +} + +func (p *NodeAggregateTilesResult) ReadField1(iprot thrift.TProtocol) error { + p.Err = &Error{ + Type: 0, + } + if err := p.Err.Read(iprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.Err), err) + } + return nil +} + +func (p *NodeAggregateTilesResult) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("aggregateTiles_result"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) + } + if p != nil { + if err := p.writeField0(oprot); err != nil { + return err + } + if err := p.writeField1(oprot); err != nil { + return err + } + } + if err := oprot.WriteFieldStop(); err != nil { + return thrift.PrependError("write field stop error: ", err) + } + if err := oprot.WriteStructEnd(); err != nil { + return thrift.PrependError("write struct stop error: ", err) + } + return nil +} + +func (p *NodeAggregateTilesResult) writeField0(oprot thrift.TProtocol) (err error) { + if p.IsSetSuccess() { + if err := oprot.WriteFieldBegin("success", thrift.STRUCT, 0); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 0:success: ", p), err) + } + if err := p.Success.Write(oprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error writing struct: ", p.Success), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 0:success: ", p), err) + } + } + return err +} + +func (p *NodeAggregateTilesResult) writeField1(oprot thrift.TProtocol) (err error) { + if p.IsSetErr() { + if err := oprot.WriteFieldBegin("err", thrift.STRUCT, 1); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 1:err: ", p), err) + } + if err := p.Err.Write(oprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error writing struct: ", p.Err), err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 1:err: ", p), err) + } + } + return err +} + +func (p *NodeAggregateTilesResult) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("NodeAggregateTilesResult(%+v)", *p) +} + type NodeHealthArgs struct { } @@ -25166,16 +25996,16 @@ func (p *ClusterClient) recvHealth() (value *HealthResult_, err error) { return } if mTypeId == thrift.EXCEPTION { - error237 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error238 error - error238, err = error237.Read(iprot) + error245 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error246 error + error246, err = error245.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error238 + err = error246 return } if mTypeId != thrift.REPLY { @@ -25247,16 +26077,16 @@ func (p *ClusterClient) recvWrite() (err error) { return } if mTypeId == thrift.EXCEPTION { - error239 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error240 error - error240, err = error239.Read(iprot) + error247 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error248 error + error248, err = error247.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error240 + err = error248 return } if mTypeId != thrift.REPLY { @@ -25327,16 +26157,16 @@ func (p *ClusterClient) recvWriteTagged() (err error) { return } if mTypeId == thrift.EXCEPTION { - error241 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error242 error - error242, err = error241.Read(iprot) + error249 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error250 error + error250, err = error249.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error242 + err = error250 return } if mTypeId != thrift.REPLY { @@ -25407,16 +26237,16 @@ func (p *ClusterClient) recvQuery() (value *QueryResult_, err error) { return } if mTypeId == thrift.EXCEPTION { - error243 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error244 error - error244, err = error243.Read(iprot) + error251 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error252 error + error252, err = error251.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error244 + err = error252 return } if mTypeId != thrift.REPLY { @@ -25488,16 +26318,16 @@ func (p *ClusterClient) recvAggregate() (value *AggregateQueryResult_, err error return } if mTypeId == thrift.EXCEPTION { - error245 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error246 error - error246, err = error245.Read(iprot) + error253 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error254 error + error254, err = error253.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error246 + err = error254 return } if mTypeId != thrift.REPLY { @@ -25569,16 +26399,16 @@ func (p *ClusterClient) recvFetch() (value *FetchResult_, err error) { return } if mTypeId == thrift.EXCEPTION { - error247 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error248 error - error248, err = error247.Read(iprot) + error255 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error256 error + error256, err = error255.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error248 + err = error256 return } if mTypeId != thrift.REPLY { @@ -25650,16 +26480,16 @@ func (p *ClusterClient) recvTruncate() (value *TruncateResult_, err error) { return } if mTypeId == thrift.EXCEPTION { - error249 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") - var error250 error - error250, err = error249.Read(iprot) + error257 := thrift.NewTApplicationException(thrift.UNKNOWN_APPLICATION_EXCEPTION, "Unknown Exception") + var error258 error + error258, err = error257.Read(iprot) if err != nil { return } if err = iprot.ReadMessageEnd(); err != nil { return } - err = error250 + err = error258 return } if mTypeId != thrift.REPLY { @@ -25701,15 +26531,15 @@ func (p *ClusterProcessor) ProcessorMap() map[string]thrift.TProcessorFunction { func NewClusterProcessor(handler Cluster) *ClusterProcessor { - self251 := &ClusterProcessor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)} - self251.processorMap["health"] = &clusterProcessorHealth{handler: handler} - self251.processorMap["write"] = &clusterProcessorWrite{handler: handler} - self251.processorMap["writeTagged"] = &clusterProcessorWriteTagged{handler: handler} - self251.processorMap["query"] = &clusterProcessorQuery{handler: handler} - self251.processorMap["aggregate"] = &clusterProcessorAggregate{handler: handler} - self251.processorMap["fetch"] = &clusterProcessorFetch{handler: handler} - self251.processorMap["truncate"] = &clusterProcessorTruncate{handler: handler} - return self251 + self259 := &ClusterProcessor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)} + self259.processorMap["health"] = &clusterProcessorHealth{handler: handler} + self259.processorMap["write"] = &clusterProcessorWrite{handler: handler} + self259.processorMap["writeTagged"] = &clusterProcessorWriteTagged{handler: handler} + self259.processorMap["query"] = &clusterProcessorQuery{handler: handler} + self259.processorMap["aggregate"] = &clusterProcessorAggregate{handler: handler} + self259.processorMap["fetch"] = &clusterProcessorFetch{handler: handler} + self259.processorMap["truncate"] = &clusterProcessorTruncate{handler: handler} + return self259 } func (p *ClusterProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) { @@ -25722,12 +26552,12 @@ func (p *ClusterProcessor) Process(iprot, oprot thrift.TProtocol) (success bool, } iprot.Skip(thrift.STRUCT) iprot.ReadMessageEnd() - x252 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name) + x260 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name) oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId) - x252.Write(oprot) + x260.Write(oprot) oprot.WriteMessageEnd() oprot.Flush() - return false, x252 + return false, x260 } diff --git a/src/dbnode/generated/thrift/rpc/rpc_mock.go b/src/dbnode/generated/thrift/rpc/rpc_mock.go index 647e08992b..a7be5ca909 100644 --- a/src/dbnode/generated/thrift/rpc/rpc_mock.go +++ b/src/dbnode/generated/thrift/rpc/rpc_mock.go @@ -210,6 +210,21 @@ func (mr *MockTChanNodeMockRecorder) AggregateRaw(ctx, req interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateRaw", reflect.TypeOf((*MockTChanNode)(nil).AggregateRaw), ctx, req) } +// AggregateTiles mocks base method +func (m *MockTChanNode) AggregateTiles(ctx thrift.Context, req *AggregateTilesRequest) (*AggregateTilesResult_, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AggregateTiles", ctx, req) + ret0, _ := ret[0].(*AggregateTilesResult_) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AggregateTiles indicates an expected call of AggregateTiles +func (mr *MockTChanNodeMockRecorder) AggregateTiles(ctx, req interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockTChanNode)(nil).AggregateTiles), ctx, req) +} + // Bootstrapped mocks base method func (m *MockTChanNode) Bootstrapped(ctx thrift.Context) (*NodeBootstrappedResult_, error) { m.ctrl.T.Helper() diff --git a/src/dbnode/generated/thrift/rpc/tchan-rpc.go b/src/dbnode/generated/thrift/rpc/tchan-rpc.go index 3d32c85d9e..cf6c94667a 100644 --- a/src/dbnode/generated/thrift/rpc/tchan-rpc.go +++ b/src/dbnode/generated/thrift/rpc/tchan-rpc.go @@ -47,6 +47,7 @@ type TChanCluster interface { type TChanNode interface { Aggregate(ctx thrift.Context, req *AggregateQueryRequest) (*AggregateQueryResult_, error) AggregateRaw(ctx thrift.Context, req *AggregateQueryRawRequest) (*AggregateQueryRawResult_, error) + AggregateTiles(ctx thrift.Context, req *AggregateTilesRequest) (*AggregateTilesResult_, error) Bootstrapped(ctx thrift.Context) (*NodeBootstrappedResult_, error) BootstrappedInPlacementOrNoPlacement(ctx thrift.Context) (*NodeBootstrappedInPlacementOrNoPlacementResult_, error) DebugIndexMemorySegments(ctx thrift.Context, req *DebugIndexMemorySegmentsRequest) (*DebugIndexMemorySegmentsResult_, error) @@ -518,6 +519,24 @@ func (c *tchanNodeClient) AggregateRaw(ctx thrift.Context, req *AggregateQueryRa return resp.GetSuccess(), err } +func (c *tchanNodeClient) AggregateTiles(ctx thrift.Context, req *AggregateTilesRequest) (*AggregateTilesResult_, error) { + var resp NodeAggregateTilesResult + args := NodeAggregateTilesArgs{ + Req: req, + } + success, err := c.client.Call(ctx, c.thriftService, "aggregateTiles", &args, &resp) + if err == nil && !success { + switch { + case resp.Err != nil: + err = resp.Err + default: + err = fmt.Errorf("received no result or unknown exception for aggregateTiles") + } + } + + return resp.GetSuccess(), err +} + func (c *tchanNodeClient) Bootstrapped(ctx thrift.Context) (*NodeBootstrappedResult_, error) { var resp NodeBootstrappedResult args := NodeBootstrappedArgs{} @@ -1044,6 +1063,7 @@ func (s *tchanNodeServer) Methods() []string { return []string{ "aggregate", "aggregateRaw", + "aggregateTiles", "bootstrapped", "bootstrappedInPlacementOrNoPlacement", "debugIndexMemorySegments", @@ -1082,6 +1102,8 @@ func (s *tchanNodeServer) Handle(ctx thrift.Context, methodName string, protocol return s.handleAggregate(ctx, protocol) case "aggregateRaw": return s.handleAggregateRaw(ctx, protocol) + case "aggregateTiles": + return s.handleAggregateTiles(ctx, protocol) case "bootstrapped": return s.handleBootstrapped(ctx, protocol) case "bootstrappedInPlacementOrNoPlacement": @@ -1202,6 +1224,34 @@ func (s *tchanNodeServer) handleAggregateRaw(ctx thrift.Context, protocol athrif return err == nil, &res, nil } +func (s *tchanNodeServer) handleAggregateTiles(ctx thrift.Context, protocol athrift.TProtocol) (bool, athrift.TStruct, error) { + var req NodeAggregateTilesArgs + var res NodeAggregateTilesResult + + if err := req.Read(protocol); err != nil { + return false, nil, err + } + + r, err := + s.handler.AggregateTiles(ctx, req.Req) + + if err != nil { + switch v := err.(type) { + case *Error: + if v == nil { + return false, nil, fmt.Errorf("Handler for err returned non-nil error type *Error but nil value") + } + res.Err = v + default: + return false, nil, err + } + } else { + res.Success = r + } + + return err == nil, &res, nil +} + func (s *tchanNodeServer) handleBootstrapped(ctx thrift.Context, protocol athrift.TProtocol) (bool, athrift.TStruct, error) { var req NodeBootstrappedArgs var res NodeBootstrappedResult diff --git a/src/dbnode/integration/large_tiles_test.go b/src/dbnode/integration/large_tiles_test.go new file mode 100644 index 0000000000..a8d8f04a7a --- /dev/null +++ b/src/dbnode/integration/large_tiles_test.go @@ -0,0 +1,163 @@ +// +build integration + +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package integration + +import ( + "github.com/stretchr/testify/assert" + "testing" + "time" + + "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/retention" + "github.com/m3db/m3/src/dbnode/storage" + xmetrics "github.com/m3db/m3/src/dbnode/x/metrics" + xclock "github.com/m3db/m3/src/x/clock" + "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/instrument" + xtime "github.com/m3db/m3/src/x/time" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + "go.uber.org/zap" +) + +func TestReadAggregateWrite(t *testing.T) { + var ( + blockSize = 2 * time.Hour + indexBlockSize = 2 * blockSize + rOpts = retention.NewOptions().SetRetentionPeriod(24 * blockSize).SetBlockSize(blockSize) + idxOpts = namespace.NewIndexOptions().SetEnabled(true).SetBlockSize(indexBlockSize) + nsOpts = namespace.NewOptions(). + SetRetentionOptions(rOpts). + SetIndexOptions(idxOpts). + SetColdWritesEnabled(true) + ) + + srcNs, err := namespace.NewMetadata(testNamespaces[0], nsOpts) + require.NoError(t, err) + trgNs, err := namespace.NewMetadata(testNamespaces[1], nsOpts) + require.NoError(t, err) + + testOpts := NewTestOptions(t). + SetNamespaces([]namespace.Metadata{srcNs, trgNs}). + SetWriteNewSeriesAsync(true) + + testSetup := newTestSetupWithCommitLogAndFilesystemBootstrapper(t, testOpts) + defer testSetup.Close() + + reporter := xmetrics.NewTestStatsReporter(xmetrics.NewTestStatsReporterOptions()) + scope, closer := tally.NewRootScope( + tally.ScopeOptions{Reporter: reporter}, time.Millisecond) + defer closer.Close() + testSetup.SetStorageOpts(testSetup.StorageOpts().SetInstrumentOptions( + instrument.NewOptions().SetMetricsScope(scope))) + + storageOpts := testSetup.StorageOpts() + testSetup.SetStorageOpts(storageOpts) + + // Start the server. + log := storageOpts.InstrumentOptions().Logger() + require.NoError(t, testSetup.StartServer()) + + // Stop the server. + defer func() { + require.NoError(t, testSetup.StopServer()) + log.Debug("server is now down") + }() + + start := time.Now() + session, err := testSetup.M3DBClient().DefaultSession() + require.NoError(t, err) + nowFn := testSetup.NowFn() + + tags := []ident.Tag{ + ident.StringTag("__name__", "cpu"), + ident.StringTag("job", "job1"), + } + + dpTimeStart := nowFn().Truncate(indexBlockSize).Add(-3 * indexBlockSize) + dpTime := dpTimeStart + + // Write test data. + for a := 0.0; a < 20.0; a++ { + err = session.WriteTagged(srcNs.ID(), ident.StringID("foo"), ident.NewTagsIterator(ident.NewTags(tags...)), dpTime, 42.1+a, xtime.Second, nil) + require.NoError(t, err) + dpTime = dpTime.Add(10 * time.Minute) + } + log.Info("test data written", zap.Duration("took", time.Since(start))) + + log.Info("waiting till data is cold flushed") + start = time.Now() + expectedNumWrites := int64(20) + flushed := xclock.WaitUntil(func() bool { + counters := reporter.Counters() + counter, ok := counters["database.series.cold-writes"] // Wait until data is written + warmData, ok := counters["database.flushWarmData.success"] // Wait until data is flushed + return ok && counter == expectedNumWrites && warmData >= expectedNumWrites + }, time.Minute) + require.True(t, flushed) + log.Info("verified data has been cold flushed", zap.Duration("took", time.Since(start))) + + aggOpts, err := storage.NewAggregateTilesOptions(dpTimeStart, dpTimeStart.Add(blockSize), time.Hour, false) + require.NoError(t, err) + + // Retry aggregation as persist manager could be still locked by cold writes. + // TODO: Remove retry when a separate persist manager will be implemented. + var processedBlockCount int64 + for retries := 0; retries < 10; retries++ { + processedBlockCount, err = testSetup.DB().AggregateTiles(storageOpts.ContextPool().Get(), srcNs.ID(), trgNs.ID(), aggOpts) + if err == nil { + break + } + time.Sleep(time.Second) + } + require.NoError(t, err) + assert.Equal(t, int64(1), processedBlockCount) + + log.Info("fetching aggregated data") + series, err := session.Fetch(trgNs.ID(), ident.StringID("foo"), dpTimeStart, nowFn()) + require.NoError(t, err) + + expectedDps := make(map[int64]float64) + // TODO: Replace with exact values when aggregation will be implemented. + timestamp := dpTimeStart + // TODO: now we aggregate only a single block, that's why we do expect + // 12 items in place of 20 + for a := 0; a < 12; a++ { + expectedDps[timestamp.Unix()] = 42.1 + float64(a) + timestamp = timestamp.Add(10 * time.Minute) + } + + count := 0 + for series.Next() { + dp, _, _ := series.Current() + value, ok := expectedDps[dp.Timestamp.Unix()] + require.True(t, ok, + "didn't expect to find timestamp %v in aggregated result", + dp.Timestamp.Unix()) + require.Equal(t, value, dp.Value, + "value for timestamp %v doesn't match. Expected %v but got %v", + dp.Timestamp.Unix(), value, dp.Value) + count++ + } + require.Equal(t, len(expectedDps), count) +} diff --git a/src/dbnode/network/server/tchannelthrift/node/service.go b/src/dbnode/network/server/tchannelthrift/node/service.go index 652b47c409..6f7f41e84e 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service.go +++ b/src/dbnode/network/server/tchannelthrift/node/service.go @@ -538,6 +538,61 @@ func (s *service) query(ctx context.Context, db storage.Database, req *rpc.Query return result, nil } +func (s *service) AggregateTiles(tctx thrift.Context, req *rpc.AggregateTilesRequest) (*rpc.AggregateTilesResult_, error) { + db, err := s.startWriteRPCWithDB() + if err != nil { + return nil, err + } + defer s.writeRPCCompleted() + + ctx, sp, sampled := tchannelthrift.Context(tctx).StartSampledTraceSpan(tracepoint.AggregateTiles) + defer sp.Finish() + + if sampled { + sp.LogFields( + opentracinglog.String("sourceNameSpace", req.SourceNameSpace), + opentracinglog.String("targetNameSpace", req.TargetNameSpace), + xopentracing.Time("start", time.Unix(0, req.RangeStart)), + xopentracing.Time("end", time.Unix(0, req.RangeEnd)), + opentracinglog.String("step", req.Step), + ) + } + + processedBlockCount, err := s.aggregateTiles(ctx, db, req) + if err != nil { + sp.LogFields(opentracinglog.Error(err)) + } + + return &rpc.AggregateTilesResult_{ + ProcessedBlockCount: processedBlockCount, + }, err +} + +func (s *service) aggregateTiles( + ctx context.Context, + db storage.Database, + req *rpc.AggregateTilesRequest, +) (int64, error) { + start, rangeStartErr := convert.ToTime(req.RangeStart, req.RangeType) + end, rangeEndErr := convert.ToTime(req.RangeEnd, req.RangeType) + step, stepErr := time.ParseDuration(req.Step) + opts, optsErr := storage.NewAggregateTilesOptions(start, end, step, req.RemoveResets) + if rangeStartErr != nil || rangeEndErr != nil || stepErr != nil || optsErr != nil { + multiErr := xerrors.NewMultiError().Add(rangeStartErr).Add(rangeEndErr).Add(stepErr).Add(optsErr) + return 0, tterrors.NewBadRequestError(multiErr.FinalError()) + } + + sourceNsID := s.pools.id.GetStringID(ctx, req.SourceNameSpace) + targetNsID := s.pools.id.GetStringID(ctx, req.TargetNameSpace) + + processedBlockCount, err := db.AggregateTiles(ctx, sourceNsID, targetNsID, opts) + if err != nil { + return processedBlockCount, convert.ToRPCError(err) + } + + return processedBlockCount, nil +} + func (s *service) Fetch(tctx thrift.Context, req *rpc.FetchRequest) (*rpc.FetchResult_, error) { db, err := s.startReadRPCWithDB() if err != nil { diff --git a/src/dbnode/network/server/tchannelthrift/node/service_test.go b/src/dbnode/network/server/tchannelthrift/node/service_test.go index 9e43ae2561..f22898b007 100644 --- a/src/dbnode/network/server/tchannelthrift/node/service_test.go +++ b/src/dbnode/network/server/tchannelthrift/node/service_test.go @@ -3017,3 +3017,48 @@ func TestServiceSetWriteNewSeriesLimitPerShardPerSecond(t *testing.T) { require.NoError(t, err) assert.Equal(t, int64(84), setResp.WriteNewSeriesLimitPerShardPerSecond) } + +func TestServiceAggregateTiles(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + mockDB := storage.NewMockDatabase(ctrl) + mockDB.EXPECT().Options().Return(testStorageOpts).AnyTimes() + mockDB.EXPECT().IsOverloaded().Return(false) + + service := NewService(mockDB, testTChannelThriftOptions).(*service) + + tctx, _ := tchannelthrift.NewContext(time.Minute) + ctx := tchannelthrift.Context(tctx) + defer ctx.Close() + + start := time.Now().Truncate(time.Hour).Add(-1 * time.Hour) + end := start.Add(time.Hour) + + start, end = start.Truncate(time.Second), end.Truncate(time.Second) + + step := "10m" + stepDuration, _ := time.ParseDuration(step) + + sourceNsID := "source" + targetNsID := "target" + + mockDB.EXPECT().AggregateTiles( + ctx, + ident.NewIDMatcher(sourceNsID), + ident.NewIDMatcher(targetNsID), + storage.AggregateTilesOptions{Start: start, End: end, Step: stepDuration, HandleCounterResets: true}, + ).Return(int64(4), nil) + + result, err := service.AggregateTiles(tctx, &rpc.AggregateTilesRequest{ + SourceNameSpace: sourceNsID, + TargetNameSpace: targetNsID, + RangeStart: start.Unix(), + RangeEnd: end.Unix(), + Step: step, + RemoveResets: true, + RangeType: rpc.TimeType_UNIX_SECONDS, + }) + require.NoError(t, err) + assert.Equal(t, int64(4), result.ProcessedBlockCount) +} diff --git a/src/dbnode/persist/types.go b/src/dbnode/persist/types.go index 0568e0099c..a24d4040b1 100644 --- a/src/dbnode/persist/types.go +++ b/src/dbnode/persist/types.go @@ -218,7 +218,7 @@ type Preparer interface { } // FlushPreparer is a persist flush cycle, each shard and block start permutation needs -// to explicility be prepared. +// to explicitly be prepared. type FlushPreparer interface { Preparer @@ -227,7 +227,7 @@ type FlushPreparer interface { } // SnapshotPreparer is a persist snapshot cycle, each shard and block start permutation needs -// to explicility be prepared. +// to explicitly be prepared. type SnapshotPreparer interface { Preparer @@ -236,7 +236,7 @@ type SnapshotPreparer interface { } // IndexFlush is a persist flush cycle, each namespace, block combination needs -// to explicility be prepared. +// to explicitly be prepared. type IndexFlush interface { // Prepare prepares writing data for a given ns/blockStart, returning a // PreparedIndexPersist object and any error encountered during diff --git a/src/dbnode/storage/bootstrap.go b/src/dbnode/storage/bootstrap.go index a730d829c4..d7e6dc8bf7 100644 --- a/src/dbnode/storage/bootstrap.go +++ b/src/dbnode/storage/bootstrap.go @@ -61,6 +61,9 @@ var ( // errBootstrapEnqueued raised when trying to bootstrap and bootstrap becomes enqueued. errBootstrapEnqueued = errors.New("database bootstrapping enqueued bootstrap") + + // errColdWritesDisabled raised when trying to do large tiles aggregation with cold writes disabled. + errColdWritesDisabled = errors.New("cold writes are disabled") ) const ( diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index 653f98d4c7..746025378c 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -1083,6 +1083,41 @@ func (d *db) OwnedNamespaces() ([]databaseNamespace, error) { return d.ownedNamespacesWithLock(), nil } +func (d *db) AggregateTiles( + ctx context.Context, + sourceNsID, + targetNsID ident.ID, + opts AggregateTilesOptions, +) (int64, error) { + ctx, sp, sampled := ctx.StartSampledTraceSpan(tracepoint.DBAggregateTiles) + if sampled { + sp.LogFields( + opentracinglog.String("sourceNameSpace", sourceNsID.String()), + opentracinglog.String("targetNameSpace", targetNsID.String()), + xopentracing.Time("start", opts.Start), + xopentracing.Time("end", opts.End), + xopentracing.Duration("step", opts.Step), + ) + } + defer sp.Finish() + + sourceNs, err := d.namespaceFor(sourceNsID) + if err != nil { + d.metrics.unknownNamespaceRead.Inc(1) + return 0, err + } + + targetNs, err := d.namespaceFor(targetNsID) + if err != nil { + d.metrics.unknownNamespaceRead.Inc(1) + return 0, err + } + + // TODO: Create and use a dedicated persist manager + pm := d.opts.PersistManager() + return targetNs.AggregateTiles(ctx, sourceNs, opts, pm) +} + func (d *db) nextIndex() uint64 { // Start with index at "1" so that a default "uniqueIndex" // with "0" is invalid (AddUint64 will return the new value). @@ -1126,3 +1161,19 @@ func (m metadatas) String() (string, error) { buf.WriteRune(']') return buf.String(), nil } + +func NewAggregateTilesOptions( + start, end time.Time, + step time.Duration, + handleCounterResets bool, +) (AggregateTilesOptions, error) { + if !end.After(start) { + return AggregateTilesOptions{}, fmt.Errorf("AggregateTilesOptions.End must be after Start, got %s - %s", start, end) + } + + if step <= 0 { + return AggregateTilesOptions{}, fmt.Errorf("AggregateTilesOptions.Step must be positive, got %s", step) + } + + return AggregateTilesOptions{Start: start, End: end, Step: step, HandleCounterResets: handleCounterResets}, nil +} diff --git a/src/dbnode/storage/database_test.go b/src/dbnode/storage/database_test.go index 46527674d7..aed7befced 100644 --- a/src/dbnode/storage/database_test.go +++ b/src/dbnode/storage/database_test.go @@ -57,7 +57,7 @@ import ( "github.com/fortytw2/leaktest" "github.com/golang/mock/gomock" "github.com/m3db/m3/src/dbnode/testdata/prototest" - opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/mocktracer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -68,9 +68,9 @@ var ( defaultTestNs1ID = ident.StringID("testns1") defaultTestNs2ID = ident.StringID("testns2") defaultTestRetentionOpts = retention.NewOptions().SetBufferFuture(10 * time.Minute).SetBufferPast(10 * time.Minute). - SetBlockSize(2 * time.Hour).SetRetentionPeriod(2 * 24 * time.Hour) + SetBlockSize(2 * time.Hour).SetRetentionPeriod(2 * 24 * time.Hour) defaultTestNs2RetentionOpts = retention.NewOptions().SetBufferFuture(10 * time.Minute).SetBufferPast(10 * time.Minute). - SetBlockSize(4 * time.Hour).SetRetentionPeriod(2 * 24 * time.Hour) + SetBlockSize(4 * time.Hour).SetRetentionPeriod(2 * 24 * time.Hour) defaultTestNs1Opts = namespace.NewOptions().SetRetentionOptions(defaultTestRetentionOpts) defaultTestNs2Opts = namespace.NewOptions().SetRetentionOptions(defaultTestNs2RetentionOpts) testSchemaHistory = prototest.NewSchemaHistory() @@ -1287,3 +1287,51 @@ func TestDatabaseIsOverloaded(t *testing.T) { mockCL.EXPECT().QueueLength().Return(int64(90)) require.Equal(t, true, d.IsOverloaded()) } + +func TestDatabaseAggregateTiles(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + d, mapCh, _ := defaultTestDatabase(t, ctrl, Bootstrapped) + defer func() { + close(mapCh) + }() + + var ( + sourceNsID = ident.StringID("source") + targetNsID = ident.StringID("target") + ctx = context.NewContext() + pm = d.opts.PersistManager() + start = time.Now().Truncate(time.Hour) + ) + + opts, err := NewAggregateTilesOptions(start, start.Add(-time.Second), time.Minute, true) + require.NotNil(t, err) + + sourceNs := dbAddNewMockNamespace(ctrl, d, sourceNsID.String()) + targetNs := dbAddNewMockNamespace(ctrl, d, targetNsID.String()) + targetNs.EXPECT().AggregateTiles(ctx, sourceNs, opts, pm).Return(int64(4), nil) + + processedBlockCount, err := d.AggregateTiles(ctx, sourceNsID, targetNsID, opts) + require.NoError(t, err) + assert.Equal(t, int64(4), processedBlockCount) +} + +func TestNewAggregateTilesOptions(t *testing.T) { + start := time.Now().Truncate(time.Hour) + + _, err := NewAggregateTilesOptions(start, start.Add(-time.Second), time.Minute, false) + assert.Error(t, err) + + _, err = NewAggregateTilesOptions(start, start, time.Minute, false) + assert.Error(t, err) + + _, err = NewAggregateTilesOptions(start, start.Add(time.Second), -time.Minute, false) + assert.Error(t, err) + + _, err = NewAggregateTilesOptions(start, start.Add(time.Second), 0, false) + assert.Error(t, err) + + _, err = NewAggregateTilesOptions(start, start.Add(time.Second), time.Minute, false) + assert.NoError(t, err) +} diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 0f145ba574..f843969837 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -155,6 +155,7 @@ type databaseNamespaceMetrics struct { flushWarmData instrument.MethodMetrics flushColdData instrument.MethodMetrics flushIndex instrument.MethodMetrics + writeAggData instrument.MethodMetrics snapshot instrument.MethodMetrics write instrument.MethodMetrics writeTagged instrument.MethodMetrics @@ -237,6 +238,7 @@ func newDatabaseNamespaceMetrics( flushWarmData: instrument.NewMethodMetrics(scope, "flushWarmData", opts), flushColdData: instrument.NewMethodMetrics(scope, "flushColdData", opts), flushIndex: instrument.NewMethodMetrics(scope, "flushIndex", opts), + writeAggData: instrument.NewMethodMetrics(scope, "writeAggData", opts), snapshot: instrument.NewMethodMetrics(scope, "snapshot", opts), write: instrument.NewMethodMetrics(scope, "write", opts), writeTagged: instrument.NewMethodMetrics(scope, "write-tagged", opts), @@ -1592,3 +1594,171 @@ func (n *dbNamespace) FlushState(shardID uint32, blockStart time.Time) (fileOpSt func (n *dbNamespace) nsContextWithRLock() namespace.Context { return namespace.Context{ID: n.id, Schema: n.schemaDescr} } + +func (n *dbNamespace) AggregateTiles( + ctx context.Context, + sourceNs databaseNamespace, + opts AggregateTilesOptions, + pm persist.Manager, +) (int64, error) { + callStart := n.nowFn() + processedBlockCount, err := n.aggregateTiles(ctx, sourceNs, opts, pm) + n.metrics.writeAggData.ReportSuccessOrError(err, n.nowFn().Sub(callStart)) + + return processedBlockCount, err +} + +func (n *dbNamespace) aggregateTiles( + ctx context.Context, + sourceNs databaseNamespace, + opts AggregateTilesOptions, + pm persist.Manager, +) (int64, error) { + targetBlockSize := n.Metadata().Options().RetentionOptions().BlockSize() + blockStart := opts.Start.Truncate(targetBlockSize) + if blockStart.Add(targetBlockSize).Before(opts.End) { + return 0, fmt.Errorf("tile aggregation must be done within a single target block (start=%s, end=%s, blockSize=%s)", + opts.Start, opts.End, targetBlockSize.String()) + } + + n.RLock() + if n.bootstrapState != Bootstrapped { + n.RUnlock() + return 0, errNamespaceNotBootstrapped + } + nsCtx := n.nsContextWithRLock() + n.RUnlock() + + targetShards := n.OwnedShards() + + // Note: Cold writes must be enabled for Large Tiles to work. + if !n.nopts.ColdWritesEnabled() { + return 0, errColdWritesDisabled + } + + sourceBlockSize := sourceNs.Metadata().Options().RetentionOptions().BlockSize() + sourceBlockStart := opts.Start.Truncate(sourceBlockSize) + + sourceNsOpts := sourceNs.StorageOptions() + reader, err := fs.NewReader(sourceNsOpts.BytesPool(), sourceNsOpts.CommitLogOptions().FilesystemOptions()) + if err != nil { + return 0, err + } + + wOpts := series.WriteOptions{ + TruncateType: n.opts.TruncateType(), + SchemaDesc: nsCtx.Schema, + } + + resources, err := newColdFlushReuseableResources(n.opts) + if err != nil { + return 0, err + } + + // NB(bodu): Deferred targetShard cold flushes so that we can ensure that cold flush index data is + // persisted before persisting TSDB data to ensure crash consistency. + multiErr := xerrors.NewMultiError() + var processedBlockCount int64 + for _, targetShard := range targetShards { + sourceShard, _, err := sourceNs.readableShardAt(targetShard.ID()) + if err != nil { + detailedErr := fmt.Errorf("no matching shard in source namespace %s: %v", sourceNs.ID(), err) + multiErr = multiErr.Add(detailedErr) + continue + } + shardProcessedBlockCount, err := targetShard.AggregateTiles(ctx, reader, sourceNs.ID(), sourceBlockStart, sourceShard, opts, wOpts) + processedBlockCount += shardProcessedBlockCount + if err != nil { + detailedErr := fmt.Errorf("shard %d aggregation failed: %v", targetShard.ID(), err) + multiErr = multiErr.Add(detailedErr) + continue + } + + multiErr = n.coldFlushSingleShard(nsCtx, targetShard, pm, resources, multiErr) + } + + return processedBlockCount, multiErr.FinalError() +} + +func (n *dbNamespace) coldFlushSingleShard( + nsCtx namespace.Context, + shard databaseShard, + pm persist.Manager, + resources coldFlushReuseableResources, + multiErr xerrors.MultiError, +) xerrors.MultiError { + // NB(rartoul): This value can be used for emitting metrics, but should not be used + // for business logic. + callStart := n.nowFn() + + // NB(bodu): The in-mem index will lag behind the TSDB in terms of new series writes. For a period of + // time between when we rotate out the active cold mutable index segments (happens here) and when + // we actually cold flush the data to disk we will be making writes to the newly active mutable seg. + // This means that some series can live doubly in-mem and loaded from disk until the next cold flush + // where they will be evicted from the in-mem index. + var ( + onColdFlushDone OnColdFlushDone + err error + ) + if n.reverseIndex != nil { + onColdFlushDone, err = n.reverseIndex.ColdFlush([]databaseShard{shard}) + if err != nil { + n.metrics.writeAggData.ReportError(n.nowFn().Sub(callStart)) + return multiErr.Add( + fmt.Errorf("error preparing to coldflush a reverse index for shard %d: %v", + shard.ID(), + err)) + } + } + + onColdFlushNs, err := n.opts.OnColdFlush().ColdFlushNamespace(n) + if err != nil { + n.metrics.writeAggData.ReportError(n.nowFn().Sub(callStart)) + return multiErr.Add( + fmt.Errorf("error preparing to coldflush a namespace for shard %d: %v", + shard.ID(), + err)) + } + + flushPersist, err := pm.StartFlushPersist() + if err != nil { + n.metrics.writeAggData.ReportError(n.nowFn().Sub(callStart)) + return multiErr.Add( + fmt.Errorf("error starting flush persist for shard %d: %v", + shard.ID(), + err)) + } + + localErrors := xerrors.NewMultiError() + shardColdFlush, err := shard.ColdFlush(flushPersist, resources, nsCtx, onColdFlushNs) + if err != nil { + detailedErr := fmt.Errorf("shard %d failed to compact: %v", shard.ID(), err) + localErrors = localErrors.Add(detailedErr) + } + + // We go through this error checking process to allow for partially successful flushes. + indexColdFlushError := onColdFlushNs.Done() + if indexColdFlushError == nil && onColdFlushDone != nil { + // Only evict rotated cold mutable index segments if the index cold flush was successful + // or we will lose queryability of data that's still in mem. + indexColdFlushError = onColdFlushDone() + } + if indexColdFlushError == nil { + // NB(bodu): We only want to complete data cold flushes if the index cold flush + // is successful. If index cold flush is successful, we want to attempt writing + // of checkpoint files to complete the cold data flush lifecycle for successful shards. + localErrors = localErrors.Add(shardColdFlush.Done()) + } + localErrors = localErrors.Add(indexColdFlushError) + err = flushPersist.DoneFlush() + localErrors = multiErr.Add(err) + + res := localErrors.FinalError() + n.metrics.writeAggData.ReportSuccessOrError(res, n.nowFn().Sub(callStart)) + + for _, err := range localErrors.Errors() { + multiErr = multiErr.Add(err) + } + + return multiErr +} diff --git a/src/dbnode/storage/namespace_readers.go b/src/dbnode/storage/namespace_readers.go index df92ed0144..ab1ba4b089 100644 --- a/src/dbnode/storage/namespace_readers.go +++ b/src/dbnode/storage/namespace_readers.go @@ -73,6 +73,8 @@ type databaseNamespaceReaderManager interface { put(reader fs.DataFileSetReader) error + latestVolume(shard uint32, blockStart time.Time) (int, error) + assignShardSet(shardSet sharding.ShardSet) tick() diff --git a/src/dbnode/storage/namespace_test.go b/src/dbnode/storage/namespace_test.go index 7db0854009..606089238c 100644 --- a/src/dbnode/storage/namespace_test.go +++ b/src/dbnode/storage/namespace_test.go @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3/src/cluster/shard" "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/sharding" @@ -1295,6 +1296,110 @@ func TestNamespaceFlushState(t *testing.T) { require.Equal(t, expectedFlushState, flushState) } +func TestNamespaceAggregateTilesFailOnBootstrapping(t *testing.T) { + var ( + sourceNsID = ident.StringID("source") + targetNsID = ident.StringID("target") + ctx = context.NewContext() + pm, _ = fs.NewPersistManager(fs.NewOptions()) + start = time.Now().Truncate(time.Hour) + opts = AggregateTilesOptions{Start: start, End: start.Add(time.Hour)} + ) + + sourceNs, sourceCloser := newTestNamespaceWithIDOpts(t, sourceNsID, namespace.NewOptions()) + defer sourceCloser() + + targetNs, targetCloser := newTestNamespaceWithIDOpts(t, targetNsID, namespace.NewOptions()) + defer targetCloser() + targetNs.bootstrapState = Bootstrapping + + _, err := targetNs.AggregateTiles(ctx, sourceNs, opts, pm) + require.Equal(t, errNamespaceNotBootstrapped, err) +} + +func TestNamespaceAggregateTilesFailOnDisabledColdWrites(t *testing.T) { + var ( + sourceNsID = ident.StringID("source") + targetNsID = ident.StringID("target") + ctx = context.NewContext() + pm, _ = fs.NewPersistManager(fs.NewOptions()) + start = time.Now().Truncate(time.Hour) + opts = AggregateTilesOptions{Start: start, End: start.Add(time.Hour)} + ) + + sourceNs, sourceCloser := newTestNamespaceWithIDOpts(t, sourceNsID, namespace.NewOptions()) + defer sourceCloser() + + targetNs, targetCloser := newTestNamespaceWithIDOpts(t, targetNsID, namespace.NewOptions()) + defer targetCloser() + targetNs.bootstrapState = Bootstrapped + + _, err := targetNs.AggregateTiles(ctx, sourceNs, opts, pm) + require.Equal(t, errColdWritesDisabled, err) +} + +func TestNamespaceAggregateTiles(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + sourceNsID = ident.StringID("source") + targetNsID = ident.StringID("target") + ctx = context.NewContext() + pm, _ = fs.NewPersistManager(fs.NewOptions()) + start = time.Now().Truncate(2 * time.Hour) + opts = AggregateTilesOptions{Start: start, End: start.Add(time.Hour)} + ) + + sourceNs, sourceCloser := newTestNamespaceWithIDOpts(t, sourceNsID, namespace.NewOptions()) + defer sourceCloser() + + targetNs, targetCloser := newTestNamespaceWithIDOpts(t, targetNsID, namespace.NewOptions()) + defer targetCloser() + targetNs.bootstrapState = Bootstrapped + targetNs.nopts = targetNs.nopts.SetColdWritesEnabled(true) + + wOpts := series.WriteOptions{ + TruncateType: targetNs.opts.TruncateType(), + SchemaDesc: targetNs.Schema(), + } + + sourceShard0 := NewMockdatabaseShard(ctrl) + sourceShard1 := NewMockdatabaseShard(ctrl) + sourceNs.shards[0] = sourceShard0 + sourceNs.shards[1] = sourceShard1 + + sourceShard0.EXPECT().IsBootstrapped().Return(true) + sourceShard1.EXPECT().IsBootstrapped().Return(true) + + targetShard0 := NewMockdatabaseShard(ctrl) + targetShard1 := NewMockdatabaseShard(ctrl) + targetNs.shards[0] = targetShard0 + targetNs.shards[1] = targetShard1 + + targetShard0.EXPECT().ID().Return(uint32(0)) + targetShard1.EXPECT().ID().Return(uint32(1)) + + sourceNsIDMatcher := ident.NewIDMatcher(sourceNsID.String()) + targetShard0.EXPECT().AggregateTiles(ctx, gomock.Any(), sourceNsIDMatcher, start, sourceShard0, opts, wOpts).Return(int64(3), nil) + targetShard1.EXPECT().AggregateTiles(ctx, gomock.Any(), sourceNsIDMatcher, start, sourceShard1, opts, wOpts).Return(int64(2), nil) + + shardColdFlush0 := NewMockShardColdFlush(ctrl) + shardColdFlush0.EXPECT().Done().Return(nil) + shardColdFlush1 := NewMockShardColdFlush(ctrl) + shardColdFlush1.EXPECT().Done().Return(nil) + + nsCtx := targetNs.nsContextWithRLock() + onColdFlushNs, err := targetNs.opts.OnColdFlush().ColdFlushNamespace(targetNs) + require.NoError(t, err) + targetShard0.EXPECT().ColdFlush(gomock.Any(), gomock.Any(), nsCtx, onColdFlushNs).Return(shardColdFlush0, nil) + targetShard1.EXPECT().ColdFlush(gomock.Any(), gomock.Any(), nsCtx, onColdFlushNs).Return(shardColdFlush1, nil) + + processedBlockCount, err := targetNs.AggregateTiles(ctx, sourceNs, opts, pm) + require.NoError(t, err) + assert.Equal(t, int64(3+2), processedBlockCount) +} + func waitForStats( reporter xmetrics.TestStatsReporter, check func(xmetrics.TestStatsReporter) bool, diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index ab8dd5dfb2..bee74c0b40 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -21,6 +21,7 @@ package storage import ( + "bytes" "container/list" "errors" "fmt" @@ -30,6 +31,8 @@ import ( "time" "github.com/m3db/m3/src/dbnode/clock" + "github.com/m3db/m3/src/dbnode/encoding" + "github.com/m3db/m3/src/dbnode/encoding/m3tsz" "github.com/m3db/m3/src/dbnode/generated/proto/pagetoken" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" @@ -210,6 +213,7 @@ type dbShardMetrics struct { insertAsyncWriteInternalErrors tally.Counter insertAsyncWriteInvalidParamsErrors tally.Counter insertAsyncIndexErrors tally.Counter + largeTilesWriteErrors tally.Counter } func newDatabaseShardMetrics(shardID uint32, scope tally.Scope) dbShardMetrics { @@ -238,6 +242,10 @@ func newDatabaseShardMetrics(shardID uint32, scope tally.Scope) dbShardMetrics { "error_type": "reverse-index", "suberror_type": "write-batch-error", }).Counter(insertErrorName), + largeTilesWriteErrors: scope.Tagged(map[string]string{ + "error_type": "large_tiles", + "suberror_type": "write-error", + }).Counter(insertErrorName), } } @@ -2562,6 +2570,78 @@ func (s *dbShard) Repair( return repairer.Repair(ctx, nsCtx, nsMeta, tr, s) } +func (s *dbShard) AggregateTiles( + ctx context.Context, + reader fs.DataFileSetReader, + sourceNsID ident.ID, + sourceBlockStart time.Time, + sourceShard databaseShard, + opts AggregateTilesOptions, + wOpts series.WriteOptions, +) (int64, error) { + latestSourceVolume, err := sourceShard.latestVolume(sourceBlockStart) + if err != nil { + return 0, err + } + + openOpts := fs.DataReaderOpenOptions{ + Identifier: fs.FileSetFileIdentifier{ + Namespace: sourceNsID, + Shard: sourceShard.ID(), + BlockStart: sourceBlockStart, + VolumeIndex: latestSourceVolume, + }, + FileSetType: persist.FileSetFlushType, + //TODO add after https://github.com/chronosphereio/m3/pull/10 for proper streaming - OrderByIndex: true + } + if err := reader.Open(openOpts); err != nil { + return 0, err + } + defer reader.Close() + + encodingOpts := encoding.NewOptions().SetBytesPool(s.opts.BytesPool()) + bytesReader := bytes.NewReader(nil) + dataPointIter := m3tsz.NewReaderIterator(bytesReader, m3tsz.DefaultIntOptimizationEnabled, encodingOpts) + var lastWriteError error + var processedBlockCount int64 + + for { + id, tags, data, _, err := reader.Read() + if err == io.EOF { + break + } + if err != nil { + return processedBlockCount, err + } + + data.IncRef() + bytesReader.Reset(data.Bytes()) + dataPointIter.Reset(bytesReader, nil) + + for dataPointIter.Next() { + dp, unit, annot := dataPointIter.Current() + _, err = s.writeAndIndex(ctx, id, tags, dp.Timestamp, dp.Value, unit, annot, wOpts, true) + if err != nil { + s.metrics.largeTilesWriteErrors.Inc(1) + lastWriteError = err + } + } + + dataPointIter.Close() + + data.DecRef() + data.Finalize() + + processedBlockCount++ + } + + s.logger.Debug("finished aggregating tiles", + zap.Uint32("shard", s.ID()), + zap.Int64("processedBlocks", processedBlockCount)) + + return processedBlockCount, lastWriteError +} + func (s *dbShard) BootstrapState() BootstrapState { s.RLock() bs := s.bootstrapState @@ -2583,6 +2663,10 @@ func (s *dbShard) DocRef(id ident.ID) (doc.Document, bool, error) { return emptyDoc, false, err } +func (s *dbShard) latestVolume(blockStart time.Time) (int, error) { + return s.namespaceReaderMgr.latestVolume(s.shard, blockStart) +} + func (s *dbShard) logFlushResult(r dbShardFlushResult) { s.logger.Debug("shard flush outcome", zap.Uint32("shard", s.ID()), diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index 55e0bb46c6..15deeb610f 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -23,6 +23,7 @@ package storage import ( "errors" "fmt" + "io" "io/ioutil" "os" "strconv" @@ -190,8 +191,8 @@ func TestShardBootstrapWithFlushVersion(t *testing.T) { fsOpts = opts.CommitLogOptions().FilesystemOptions(). SetFilePathPrefix(dir) newClOpts = opts. - CommitLogOptions(). - SetFilesystemOptions(fsOpts) + CommitLogOptions(). + SetFilesystemOptions(fsOpts) ) opts = opts. SetCommitLogOptions(newClOpts) @@ -268,8 +269,8 @@ func TestShardBootstrapWithFlushVersionNoCleanUp(t *testing.T) { fsOpts = opts.CommitLogOptions().FilesystemOptions(). SetFilePathPrefix(dir) newClOpts = opts. - CommitLogOptions(). - SetFilesystemOptions(fsOpts) + CommitLogOptions(). + SetFilesystemOptions(fsOpts) ) opts = opts. SetCommitLogOptions(newClOpts) @@ -326,8 +327,8 @@ func TestShardBootstrapWithCacheShardIndices(t *testing.T) { fsOpts = opts.CommitLogOptions().FilesystemOptions(). SetFilePathPrefix(dir) newClOpts = opts. - CommitLogOptions(). - SetFilesystemOptions(fsOpts) + CommitLogOptions(). + SetFilesystemOptions(fsOpts) mockRetriever = block.NewMockDatabaseBlockRetriever(ctrl) ) opts = opts.SetCommitLogOptions(newClOpts) @@ -1765,3 +1766,45 @@ func TestShardIterateBatchSize(t *testing.T) { require.True(t, shardIterateBatchMinSize < iterateBatchSize(2000)) } + +func TestAggregateTiles(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + ctx = context.NewContext() + start = time.Now().Truncate(time.Hour) + opts = AggregateTilesOptions{Start: start, End: start.Add(time.Hour)} + bytes = checked.NewBytes([]byte{}, checked.NewBytesOptions()) + ) + + sourceShard := testDatabaseShard(t, DefaultTestOptions()) + defer sourceShard.Close() + + targetShard := testDatabaseShard(t, DefaultTestOptions()) + defer targetShard.Close() + + latestSourceVolume, err := sourceShard.latestVolume(opts.Start) + require.NoError(t, err) + + sourceNsID := sourceShard.namespace.ID() + readerOpenOpts := fs.DataReaderOpenOptions{ + Identifier: fs.FileSetFileIdentifier{ + Namespace: sourceNsID, + Shard: sourceShard.ID(), + BlockStart: opts.Start, + VolumeIndex: latestSourceVolume, + }, + FileSetType: persist.FileSetFlushType, + } + + reader := fs.NewMockDataFileSetReader(ctrl) + reader.EXPECT().Open(readerOpenOpts).Return(nil) + reader.EXPECT().Read().Return(ident.StringID("id1"), ident.EmptyTagIterator, bytes, uint32(11), nil) + reader.EXPECT().Read().Return(nil, nil, nil, uint32(0), io.EOF) + reader.EXPECT().Close() + + processedBlockCount, err := targetShard.AggregateTiles(ctx, reader, sourceNsID, start, sourceShard, opts, series.WriteOptions{}) + require.NoError(t, err) + assert.Equal(t, int64(1), processedBlockCount) +} diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 864655e539..8dcfc8103f 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -488,6 +488,21 @@ func (mr *MockDatabaseMockRecorder) FlushState(namespace, shardID, blockStart in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FlushState", reflect.TypeOf((*MockDatabase)(nil).FlushState), namespace, shardID, blockStart) } +// AggregateTiles mocks base method +func (m *MockDatabase) AggregateTiles(ctx context.Context, sourceNsID, targetNsID ident.ID, opts AggregateTilesOptions) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AggregateTiles", ctx, sourceNsID, targetNsID, opts) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AggregateTiles indicates an expected call of AggregateTiles +func (mr *MockDatabaseMockRecorder) AggregateTiles(ctx, sourceNsID, targetNsID, opts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockDatabase)(nil).AggregateTiles), ctx, sourceNsID, targetNsID, opts) +} + // Mockdatabase is a mock of database interface type Mockdatabase struct { ctrl *gomock.Controller @@ -883,6 +898,21 @@ func (mr *MockdatabaseMockRecorder) FlushState(namespace, shardID, blockStart in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FlushState", reflect.TypeOf((*Mockdatabase)(nil).FlushState), namespace, shardID, blockStart) } +// AggregateTiles mocks base method +func (m *Mockdatabase) AggregateTiles(ctx context.Context, sourceNsID, targetNsID ident.ID, opts AggregateTilesOptions) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AggregateTiles", ctx, sourceNsID, targetNsID, opts) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AggregateTiles indicates an expected call of AggregateTiles +func (mr *MockdatabaseMockRecorder) AggregateTiles(ctx, sourceNsID, targetNsID, opts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*Mockdatabase)(nil).AggregateTiles), ctx, sourceNsID, targetNsID, opts) +} + // OwnedNamespaces mocks base method func (m *Mockdatabase) OwnedNamespaces() ([]databaseNamespace, error) { m.ctrl.T.Helper() @@ -1532,6 +1562,37 @@ func (mr *MockdatabaseNamespaceMockRecorder) WritePendingIndexInserts(pending in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WritePendingIndexInserts", reflect.TypeOf((*MockdatabaseNamespace)(nil).WritePendingIndexInserts), pending) } +// AggregateTiles mocks base method +func (m *MockdatabaseNamespace) AggregateTiles(ctx context.Context, sourceNs databaseNamespace, opts AggregateTilesOptions, pm persist.Manager) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AggregateTiles", ctx, sourceNs, opts, pm) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AggregateTiles indicates an expected call of AggregateTiles +func (mr *MockdatabaseNamespaceMockRecorder) AggregateTiles(ctx, sourceNs, opts, pm interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockdatabaseNamespace)(nil).AggregateTiles), ctx, sourceNs, opts, pm) +} + +// readableShardAt mocks base method +func (m *MockdatabaseNamespace) readableShardAt(shardID uint32) (databaseShard, namespace.Context, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "readableShardAt", shardID) + ret0, _ := ret[0].(databaseShard) + ret1, _ := ret[1].(namespace.Context) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// readableShardAt indicates an expected call of readableShardAt +func (mr *MockdatabaseNamespaceMockRecorder) readableShardAt(shardID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "readableShardAt", reflect.TypeOf((*MockdatabaseNamespace)(nil).readableShardAt), shardID) +} + // MockShard is a mock of Shard interface type MockShard struct { ctrl *gomock.Controller @@ -2008,6 +2069,36 @@ func (mr *MockdatabaseShardMockRecorder) DocRef(id interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DocRef", reflect.TypeOf((*MockdatabaseShard)(nil).DocRef), id) } +// AggregateTiles mocks base method +func (m *MockdatabaseShard) AggregateTiles(ctx context.Context, reader fs.DataFileSetReader, sourceNsID ident.ID, sourceBlockStart time.Time, sourceShard databaseShard, opts AggregateTilesOptions, wOpts series.WriteOptions) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AggregateTiles", ctx, reader, sourceNsID, sourceBlockStart, sourceShard, opts, wOpts) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AggregateTiles indicates an expected call of AggregateTiles +func (mr *MockdatabaseShardMockRecorder) AggregateTiles(ctx, reader, sourceNsID, sourceBlockStart, sourceShard, opts, wOpts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockdatabaseShard)(nil).AggregateTiles), ctx, reader, sourceNsID, sourceBlockStart, sourceShard, opts, wOpts) +} + +// latestVolume mocks base method +func (m *MockdatabaseShard) latestVolume(blockStart time.Time) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "latestVolume", blockStart) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// latestVolume indicates an expected call of latestVolume +func (mr *MockdatabaseShardMockRecorder) latestVolume(blockStart interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "latestVolume", reflect.TypeOf((*MockdatabaseShard)(nil).latestVolume), blockStart) +} + // MockShardColdFlush is a mock of ShardColdFlush interface type MockShardColdFlush struct { ctrl *gomock.Controller diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 68a77e2038..0b27782eca 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -219,6 +219,9 @@ type Database interface { // FlushState returns the flush state for the specified shard and block start. FlushState(namespace ident.ID, shardID uint32, blockStart time.Time) (fileOpState, error) + + // AggregateTiles does large tile aggregation from source namespace to target namespace. + AggregateTiles(ctx context.Context, sourceNsID, targetNsID ident.ID, opts AggregateTilesOptions) (int64, error) } // database is the internal database interface. @@ -405,6 +408,16 @@ type databaseNamespace interface { // WritePendingIndexInserts will write any pending index inserts. WritePendingIndexInserts(pending []writes.PendingIndexInsert) error + + // AggregateTiles does large tile aggregation from source namespace into this namespace. + AggregateTiles( + ctx context.Context, + sourceNs databaseNamespace, + opts AggregateTilesOptions, + pm persist.Manager, + ) (int64, error) + + readableShardAt(shardID uint32) (databaseShard, namespace.Context, error) } // SeriesReadWriteRef is a read/write reference for a series, @@ -581,6 +594,19 @@ type databaseShard interface { // DocRef returns the doc if already present in a shard series. DocRef(id ident.ID) (doc.Document, bool, error) + + // AggregateTiles does large tile aggregation from source shards into this shard. + AggregateTiles( + ctx context.Context, + reader fs.DataFileSetReader, + sourceNsID ident.ID, + sourceBlockStart time.Time, + sourceShard databaseShard, + opts AggregateTilesOptions, + wOpts series.WriteOptions, + ) (int64, error) + + latestVolume(blockStart time.Time) (int, error) } // ShardColdFlush exposes a done method to finalize shard cold flush @@ -1211,3 +1237,11 @@ type newFSMergeWithMemFn func( dirtySeries *dirtySeriesMap, dirtySeriesToWrite map[xtime.UnixNano]*idList, ) fs.MergeWith + +type AggregateTilesOptions struct { + Start, End time.Time + Step time.Duration + // HandleCounterResets is temporarily used to force counter reset handling logics on the processed series. + // TODO: remove once we have metrics type stored in the metadata. + HandleCounterResets bool +} diff --git a/src/dbnode/tracepoint/tracepoint.go b/src/dbnode/tracepoint/tracepoint.go index 97d2410eda..c3d6ffa61a 100644 --- a/src/dbnode/tracepoint/tracepoint.go +++ b/src/dbnode/tracepoint/tracepoint.go @@ -46,6 +46,9 @@ const ( // FetchReadSegment is the operation name for the tchannelthrift FetchReadSegment path. FetchReadSegment = "tchannelthrift/node.service.FetchReadSegment" + // AggregateTiles is the operation name for the tchannelthrift AggregateTiles path. + AggregateTiles = "tchannelthrift/node.service.AggregateTiles" + // DBQueryIDs is the operation name for the db QueryIDs path. DBQueryIDs = "storage.db.QueryIDs" @@ -64,6 +67,9 @@ const ( // DBWriteBatch is the operation name for the db WriteBatch path. DBWriteBatch = "storage.db.WriteBatch" + // DBAggregateTiles is the operation name for the db AggregateTiles path. + DBAggregateTiles = "storage.db.AggregateTiles" + // NSQueryIDs is the operation name for the dbNamespace QueryIDs path. NSQueryIDs = "storage.dbNamespace.QueryIDs"