Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitoring enhancements #169

Merged
merged 3 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 20 additions & 42 deletions cmd/proxy/proc/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ func (r *ProxyInResponseContext) OnComplete() {
}
logging.LogToCal(opcode, r.GetOpStatus(), rht, calData)
}
if otel.IsEnabled() {
otel.RecordOperation(r.stats.Opcode.String(), r.stats.ResponseStatus.ShortNameString(), int64(rhtus))
}

otel.RecordOperation(r.stats.Opcode.String(), r.stats.ResponseStatus, int64(rhtus))

r.stats.OnComplete(uint32(rhtus), r.GetOpStatus())
proxystats.SendProcState(r.stats)

Expand Down Expand Up @@ -339,9 +339,7 @@ func (p *ProcessorBase) replyToClient(resp *ResponseWrapper) {
if cal.IsEnabled() {
calLogReqProcError(kDecrypt, []byte(errmsg))
}
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Operation, kDecrypt}, {otel.Status, otel.StatusError}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Operation, kDecrypt}, {otel.Status, otel.StatusError}})
msg := p.clientRequest.CreateResponse()
msg.SetOpStatus(proto.OpStatusInternal)
var raw proto.RawMessage
Expand Down Expand Up @@ -571,9 +569,7 @@ func (p *ProcessorBase) validateInboundRequest(r *proto.OperationalMessage) bool
data.AddReqIdString(r.GetRequestIDString())
data.AddInt([]byte("len"), szKey)
calLogReqProcEvent(kBadParamInvalidKeyLen, data.Bytes())
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidKeyLen}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidKeyLen}})
return false
}
szNs := len(r.GetNamespace())
Expand All @@ -584,9 +580,7 @@ func (p *ProcessorBase) validateInboundRequest(r *proto.OperationalMessage) bool
data.AddReqIdString(r.GetRequestIDString())
data.AddInt([]byte("len"), szNs)
calLogReqProcEvent(kBadParamInvalidNsLen, data.Bytes())
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidNsLen}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidNsLen}})
return false
}
ttl := r.GetTimeToLive()
Expand All @@ -597,9 +591,7 @@ func (p *ProcessorBase) validateInboundRequest(r *proto.OperationalMessage) bool
data.AddReqIdString(r.GetRequestIDString())
data.AddInt([]byte("ttl"), int(ttl))
calLogReqProcEvent(kBadParamInvalidTTL, data.Bytes())
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidTTL}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidTTL}})
return false
}
} else {
Expand All @@ -610,9 +602,7 @@ func (p *ProcessorBase) validateInboundRequest(r *proto.OperationalMessage) bool
data.AddInt([]byte("len"), szKey)
calLogReqProcEvent(kBadParamInvalidKeyLen, data.Bytes())
glog.Warningf("limit exceeded: key length %d > %d", szKey, limits.MaxKeyLength)
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidKeyLen}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidKeyLen}})
return false
}
if limits.MaxTimeToLive != 0 && ttl > limits.MaxTimeToLive {
Expand All @@ -621,9 +611,7 @@ func (p *ProcessorBase) validateInboundRequest(r *proto.OperationalMessage) bool
data.AddInt([]byte("ttl"), int(ttl))
calLogReqProcEvent(kBadParamInvalidTTL, data.Bytes())
glog.Warningf("limit exceeded: TTL %d > %d", ttl, limits.MaxTimeToLive)
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidTTL}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidTTL}})
return false
}
szValue := r.GetPayloadValueLength()
Expand All @@ -633,9 +621,7 @@ func (p *ProcessorBase) validateInboundRequest(r *proto.OperationalMessage) bool
data.AddInt([]byte("len"), int(szValue))
calLogReqProcEvent(kBadParamInvalidValueLen, data.Bytes())
glog.Warningf("limit exceeded: payload length %d > %d", szValue, limits.MaxPayloadLength)
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidValueLen}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidValueLen}})
return false
}
}
Expand Down Expand Up @@ -692,11 +678,11 @@ func (p *ProcessorBase) Process(request io.IRequestContext) bool {
}
}
}
if otel.IsEnabled() {
if p.clientRequest.IsForReplication() {
otel.RecordCount(otel.RAPI, nil)
}

if p.clientRequest.IsForReplication() {
otel.RecordCount(otel.RAPI, nil)
}

p.shardId = shardId.Uint16()

if err := proto.SetShardId(p.requestContext.GetMessage(), p.shardId); err != nil {
Expand Down Expand Up @@ -770,9 +756,7 @@ func (p *ProcessorBase) OnRequestTimeout() {
b.AddOpCode(p.clientRequest.GetOpCode()).AddReqIdString(p.requestID)
calLogReqProcEvent(kReqTimeout, b.Bytes())
}
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kReqTimeout}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kReqTimeout}})
p.replyStatusToClient(proto.OpStatusBusy)
}
now := time.Now()
Expand All @@ -794,9 +778,7 @@ func (p *ProcessorBase) OnCancelled() {
b.AddOpCode(p.clientRequest.GetOpCode()).AddReqIdString(p.requestID)
calLogReqProcEvent(kReqCancelled, b.Bytes())
}
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kReqCancelled}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kReqCancelled}})
p.replyStatusToClient(proto.OpStatusBusy)
}
now := time.Now()
Expand Down Expand Up @@ -841,10 +823,8 @@ func (p *ProcessorBase) handleSSTimeout(now time.Time) {
writeBasicSSRequestInfo(b, st.opCode, int(st.ssIndex), p.ssGroup.processors[st.ssIndex].GetConnInfo(), p)
calLogReqProcEvent(calNameReqTimeoutFor(st.opCode), b.Bytes())
}
if otel.IsEnabled() {
status := otel.SSReqTimeout + "_" + st.opCode.String()
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, status}})
}
status := otel.SSReqTimeout + "_" + st.opCode.String()
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, status}})
if cluster.GetShardMgr().StatsEnabled() {
zoneId, hostId := p.ssGroup.processors[st.ssIndex].GetNodeInfo()
cluster.GetShardMgr().SendStats(zoneId, hostId, true, confSSRequestTimeout.Microseconds())
Expand Down Expand Up @@ -918,10 +898,8 @@ func (p *ProcessorBase) preprocessAndValidateResponse(resp io.IResponseContext)
errStr := strings.Replace(statusText, " ", "_", -1)
calLogReqProcEvent(fmt.Sprintf("SS_%s", errStr), buf.Bytes()) //TODO revisit log as error?
}
if otel.IsEnabled() {
errStr := strings.Replace(statusText, " ", "_", -1)
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, fmt.Sprintf("SS_%s", errStr)}})
}
errStr := strings.Replace(statusText, " ", "_", -1)
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, fmt.Sprintf("SS_%s", errStr)}})
st.state = stSSResponseIOError
st.timeRespReceived = time.Now()
p.self.OnSSIOError(st)
Expand Down
4 changes: 1 addition & 3 deletions cmd/proxy/proc/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ func (p *CreateProcessor) setInitSSRequest() bool {
if cal.IsEnabled() {
calLogReqProcError(kEncrypt, []byte(errmsg))
}
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Operation, kEncrypt}, {otel.Status, otel.StatusError}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Operation, kEncrypt}, {otel.Status, otel.StatusError}})
p.replyStatusToClient(proto.OpStatusInternal)
return false
}
Expand Down
24 changes: 6 additions & 18 deletions cmd/proxy/proc/inbreqctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ func (r *InboundRequestContext) ValidateRequest() bool {
data.AddReqIdString(r.GetRequestIDString())
data.AddInt([]byte("len"), szKey)
calLogReqProcEvent(kBadParamInvalidKeyLen, data.Bytes())
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidKeyLen}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidKeyLen}})
return false
}
szNs := len(r.GetNamespace())
Expand All @@ -88,9 +86,7 @@ func (r *InboundRequestContext) ValidateRequest() bool {
data.AddReqIdString(r.GetRequestIDString())
data.AddInt([]byte("len"), szNs)
calLogReqProcEvent(kBadParamInvalidNsLen, data.Bytes())
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidNsLen}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidNsLen}})
return false
}
ttl := r.GetTimeToLive()
Expand All @@ -101,9 +97,7 @@ func (r *InboundRequestContext) ValidateRequest() bool {
data.AddReqIdString(r.GetRequestIDString())
data.AddInt([]byte("ttl"), int(ttl))
calLogReqProcEvent(kBadParamInvalidTTL, data.Bytes())
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidTTL}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidTTL}})
return false
}
} else {
Expand All @@ -114,9 +108,7 @@ func (r *InboundRequestContext) ValidateRequest() bool {
data.AddInt([]byte("len"), szKey)
calLogReqProcEvent(kBadParamInvalidKeyLen, data.Bytes())
glog.Warningf("limit exceeded: key length %d > %d", szKey, limits.MaxKeyLength)
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidKeyLen}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidKeyLen}})
return false
}
if limits.MaxTimeToLive != 0 && ttl > limits.MaxTimeToLive {
Expand All @@ -125,9 +117,7 @@ func (r *InboundRequestContext) ValidateRequest() bool {
data.AddInt([]byte("ttl"), int(ttl))
calLogReqProcEvent(kBadParamInvalidTTL, data.Bytes())
glog.Warningf("limit exceeded: TTL %d > %d", ttl, limits.MaxTimeToLive)
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidTTL}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidTTL}})
return false
}
szValue := r.GetPayloadValueLength()
Expand All @@ -137,9 +127,7 @@ func (r *InboundRequestContext) ValidateRequest() bool {
data.AddInt([]byte("len"), int(szValue))
calLogReqProcEvent(kBadParamInvalidValueLen, data.Bytes())
glog.Warningf("limit exceeded: payload length %d > %d", ttl, limits.MaxTimeToLive)
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidValueLen}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kBadParamInvalidValueLen}})
return false
}
}
Expand Down
8 changes: 2 additions & 6 deletions cmd/proxy/proc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,7 @@ func (p *TwoPhaseProcessor) setInitSSRequest() bool {
if cal.IsEnabled() {
calLogReqProcError(kEncrypt, []byte(errmsg))
}
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Operation, kEncrypt}, {otel.Status, otel.StatusError}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Operation, kEncrypt}, {otel.Status, otel.StatusError}})
p.replyStatusToClient(proto.OpStatusInternal)
return false
}
Expand Down Expand Up @@ -351,9 +349,7 @@ func (p *TwoPhaseProcessor) onRepairFailure(rc *SSRequestContext) {
writeBasicSSRequestInfo(buf, rc.opCode, int(rc.ssIndex), p.ssGroup.processors[rc.ssIndex].GetConnInfo(), &p.ProcessorBase)
calLogReqProcEvent(kInconsistent, buf.Bytes())
}
if otel.IsEnabled() {
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kInconsistent}})
}
otel.RecordCount(otel.ReqProc, []otel.Tags{{otel.Status, kInconsistent}})
p.replyStatusToClient(proto.OpStatusInconsistent)
}
}
Expand Down
17 changes: 5 additions & 12 deletions cmd/proxy/replication/replicaterequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,8 @@ func (r *RepRequestContext) complete(calstatus string, opStatus string, rht time
cal.AtomicTransaction(targetType, opCode, calstatus, rht, r.calBuf.Bytes())
}
}
if otel.IsEnabled() {
otel.RecordReplication(opCode, opStatus, target, rht.Milliseconds())
}

otel.RecordReplication(opCode, opStatus, target, rht.Microseconds())

r.this.OnComplete()
}
Expand Down Expand Up @@ -221,9 +220,7 @@ func (r *RepRequestContext) Reply(resp io.IResponseContext) {
}
r.calBuf.AddDropReason("MaxRetry")
}
if otel.IsEnabled() {
otel.RecordCount(otel.RRDropMaxRetry, []otel.Tags{{"target", r.targetId}})
}
otel.RecordCount(otel.RRDropMaxRetry, []otel.Tags{{"target", r.targetId}})
r.errCnt.Add(1)
r.complete(cal.StatusError, opstatus.String(), rht, opCodeText, r.targetId)
return
Expand All @@ -245,9 +242,7 @@ func (r *RepRequestContext) Reply(resp io.IResponseContext) {
}
r.calBuf.AddDropReason("QueueFull")
}
if otel.IsEnabled() {
otel.RecordCount(otel.RRDropQueueFull, []otel.Tags{{otel.Target, r.targetId}})
}
otel.RecordCount(otel.RRDropQueueFull, []otel.Tags{{otel.Target, r.targetId}})
r.dropCnt.Add(1)
r.complete(cal.StatusError, opstatus.String(), rht, opCodeText, r.targetId)
}
Expand All @@ -261,9 +256,7 @@ func (r *RepRequestContext) Reply(resp io.IResponseContext) {
}
r.calBuf.AddDropReason("RecExpired")
}
if otel.IsEnabled() {
otel.RecordCount(otel.RRDropRecExpired, []otel.Tags{{otel.Target, r.targetId}})
}
otel.RecordCount(otel.RRDropRecExpired, []otel.Tags{{otel.Target, r.targetId}})
r.complete(cal.StatusSuccess, opstatus.String(), rht, opCodeText, r.targetId)
}
}
Expand Down
10 changes: 2 additions & 8 deletions cmd/storageserv/storage/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,13 @@ func (p *reqProcCtxT) OnComplete() {
calData.AddOpRequestResponse(&p.request, &p.response).AddRequestHandleTime(rhtus)
cal.AtomicTransaction(cal.TxnTypeAPI, opcode.String(), calst.CalStatus(), rht, calData.Bytes())
}
if otel.IsEnabled() {
otel.RecordOperation(opcode.String(), p.response.GetOpStatus().String(), int64(rhtus))
opst := p.response.GetOpStatus()
calst := logging.CalStatus(opst)
if (opst == proto.OpStatusInconsistent) || calst.NotSuccess() {
otel.RecordCount(otel.ProcErr, []otel.Tags{{otel.Operation, opcode.String() + "_" + opst.String()}, {otel.Status, otel.StatusError}})
}
}
if (opst == proto.OpStatusInconsistent) || calst.NotSuccess() {
cal.Event("ProcErr", opcode.String()+"_"+opst.String(), cal.StatusSuccess, nil)
}
}

otel.RecordOperation(opcode.String(), p.response.GetOpStatus(), int64(rhtus))

if p.cacheable {
if p.prepareCtx != nil {
if p.prepareCtx.cacheable {
Expand Down
29 changes: 29 additions & 0 deletions docker/manifest/config/otel/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# connfig ref : https://opentelemetry.io/docs/collector/configuration/
receivers:
otlp:
protocols:
http:
endpoint: "0.0.0.0:4318"


exporters:
# Data sources: metrics
prometheus:
endpoint: "0.0.0.0:8889"
namespace: default
send_timestamps: true
metric_expiration: 180m
# resource_to_telemetry_conversion:
# enabled: true

extensions:
health_check:
pprof:
zpages:

service:
extensions: [health_check, pprof, zpages]
pipelines:
metrics:
receivers: [otlp]
exporters: [prometheus]
6 changes: 6 additions & 0 deletions docker/manifest/config/prometheus/prometheus.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
scrape_configs:
- job_name: 'otel'
scrape_interval: 10s
static_configs:
- targets: ['otel:8888']
- targets: ['otel:8889']
Loading