Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Fixes MaxCollectDuration log in the streaming proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
jcooklin committed Jun 16, 2017
1 parent 7d272ce commit 0e78d6b
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 5 deletions.
2 changes: 1 addition & 1 deletion examples/snap-plugin-collector-rand-streaming/rand/rand.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (r *RandCollector) streamIt(ch chan []plugin.Metric, err chan string) {
}
}
ch <- metrics
time.Sleep(time.Second * time.Duration(rand.Int63n(10)))
time.Sleep(time.Millisecond * time.Duration(rand.Int63n(1000)))
}
}

Expand Down
8 changes: 6 additions & 2 deletions v1/plugin/stream_collector_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func (p *StreamProxy) metricSend(ch chan []Metric, stream rpc.StreamCollector_St
).Debug("sending metrics")
metrics := []*rpc.Metric{}

afterCollectDuration := time.After(p.maxCollectDuration)
for {
select {
case mts := <-ch:
Expand Down Expand Up @@ -128,11 +129,14 @@ func (p *StreamProxy) metricSend(ch chan []Metric, stream rpc.StreamCollector_St
if p.maxMetricsBuffer == 0 {
sendReply(metrics, stream)
metrics = []*rpc.Metric{}
afterCollectDuration = time.After(p.maxCollectDuration)
}
case <-time.After(p.maxCollectDuration):

case <-afterCollectDuration:
// send metrics if maxCollectDuration is reached
sendReply(metrics, stream)
metrics = []*rpc.Metric{}
afterCollectDuration = time.After(p.maxCollectDuration)
case <-stream.Context().Done():
return
}
Expand Down Expand Up @@ -169,7 +173,7 @@ func (p *StreamProxy) streamRecv(ch chan []Metric, stream rpc.StreamCollector_St
if s.MaxCollectDuration > 0 {
logger.WithFields(log.Fields{
"option": "max-collect-duration",
"value": fmt.Sprintf("seconds %v", time.Duration(s.MaxCollectDuration).Seconds()),
"value": fmt.Sprintf("%v seconds", time.Duration(s.MaxCollectDuration).Seconds()),
}).Debug("setting max collect duration option")
p.setMaxCollectDuration(time.Duration(s.MaxCollectDuration))
}
Expand Down
4 changes: 2 additions & 2 deletions v1/plugin/stream_collector_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ func TestStreamMetrics(t *testing.T) {
Convey("get buffered metrics through stream proxy when maxMetricsBuffer is reached", func() {
Convey("when maxMetricsBuffer is reached", func() {
So(pl.outMetric, ShouldNotBeNil)
// Send metrics down to channel every 100 ms
pl.doAction(time.Millisecond*100, metrics)
// Send metrics down to channel every 20 ms
pl.doAction(time.Millisecond*20, metrics)
select {
case mts := <-sendChan:
// Success! we got something....
Expand Down

0 comments on commit 0e78d6b

Please sign in to comment.