Skip to content

Commit

Permalink
Wait for the right number of streams.
Browse files Browse the repository at this point in the history
Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
  • Loading branch information
tomwilkie committed Jan 31, 2019
1 parent 6d19632 commit a422f39
Showing 1 changed file with 6 additions and 4 deletions.
10 changes: 6 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,19 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
// First we flatten out the request into a list of samples.
// We use the heuristic of 1 sample per TS to size the array.
// We also work out the hash value at the same time.
streams := make([]streamTracker, len(req.Streams))
streams := make([]streamTracker, 0, len(req.Streams))
keys := make([]uint32, 0, len(req.Streams))
var validationErr error
for i, stream := range req.Streams {
for _, stream := range req.Streams {
if err := d.validateLabels(userID, stream.Labels); err != nil {
validationErr = err
continue
}

keys = append(keys, tokenFor(userID, stream.Labels))
streams[i].stream = stream
streams = append(streams, streamTracker{
stream: stream,
})
}

if len(streams) == 0 {
Expand All @@ -150,7 +152,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}

tracker := pushTracker{
samplesPending: int32(len(streams)),
samplesPending: int32(len(samplesByIngester)),
done: make(chan struct{}),
err: make(chan error),
}
Expand Down

0 comments on commit a422f39

Please sign in to comment.