diff --git a/pkg/storage/wal/segment.go b/pkg/storage/wal/segment.go index 945d8ff17c7d..57ad096fac30 100644 --- a/pkg/storage/wal/segment.go +++ b/pkg/storage/wal/segment.go @@ -30,7 +30,6 @@ var ( streamSegmentPool = sync.Pool{ New: func() interface{} { return &streamSegment{ - lock: &sync.Mutex{}, entries: make([]*logproto.Entry, 0, 4096), } }, @@ -47,18 +46,16 @@ type streamID struct { } type SegmentWriter struct { - metrics *SegmentMetrics - streams map[streamID]*streamSegment - buf1 encoding.Encbuf - outputSize atomic.Int64 - inputSize atomic.Int64 - idxWriter *index.Writer - consistencyMtx *sync.RWMutex - indexRef metastorepb.DataRef + metrics *SegmentMetrics + streams map[streamID]*streamSegment + buf1 encoding.Encbuf + outputSize atomic.Int64 + inputSize atomic.Int64 + idxWriter *index.Writer + indexRef metastorepb.DataRef } type streamSegment struct { - lock *sync.Mutex lbls labels.Labels entries []*logproto.Entry tenantID string @@ -86,24 +83,19 @@ func NewWalSegmentWriter(m *SegmentMetrics) (*SegmentWriter, error) { return nil, err } return &SegmentWriter{ - metrics: m, - streams: make(map[streamID]*streamSegment, 64), - buf1: encoding.EncWith(make([]byte, 0, 4)), - idxWriter: idxWriter, - inputSize: atomic.Int64{}, - consistencyMtx: &sync.RWMutex{}, + metrics: m, + streams: make(map[streamID]*streamSegment, 64), + buf1: encoding.EncWith(make([]byte, 0, 4)), + idxWriter: idxWriter, + inputSize: atomic.Int64{}, }, nil } func (b *SegmentWriter) getOrCreateStream(id streamID, lbls labels.Labels) *streamSegment { - b.consistencyMtx.RLock() s, ok := b.streams[id] - b.consistencyMtx.RUnlock() if ok { return s } - b.consistencyMtx.Lock() - defer b.consistencyMtx.Unlock() // Check another thread has not created it s, ok = b.streams[id] if ok { @@ -130,8 +122,6 @@ func (b *SegmentWriter) Append(tenantID, labelsString string, lbls labels.Labels id := streamID{labels: labelsString, tenant: tenantID} s := b.getOrCreateStream(id, lbls) - s.lock.Lock() - defer s.lock.Unlock() for i, e := range entries { if e.Timestamp.UnixNano() >= s.maxt { s.entries = append(s.entries, entries[i]) @@ -152,9 +142,6 @@ func (b *SegmentWriter) Append(tenantID, labelsString string, lbls labels.Labels // ReportMetrics for the writer. If called before WriteTo then the output size // histogram will observe 0. func (b *SegmentWriter) ReportMetrics() { - b.consistencyMtx.Lock() - defer b.consistencyMtx.Unlock() - b.metrics.streams.Observe(float64(len(b.streams))) tenants := make(map[string]struct{}, 64) for _, s := range b.streams { @@ -166,9 +153,6 @@ func (b *SegmentWriter) ReportMetrics() { } func (b *SegmentWriter) Meta(id string) *metastorepb.BlockMeta { - b.consistencyMtx.Lock() - defer b.consistencyMtx.Unlock() - var globalMinT, globalMaxT int64 tenants := make(map[string]*metastorepb.TenantStreams, 64) diff --git a/pkg/storage/wal/segment_test.go b/pkg/storage/wal/segment_test.go index 5852294329f9..34b8d78b2d58 100644 --- a/pkg/storage/wal/segment_test.go +++ b/pkg/storage/wal/segment_test.go @@ -5,7 +5,6 @@ import ( "context" "fmt" "sort" - "sync" "testing" "time" @@ -132,163 +131,6 @@ func TestWalSegmentWriter_Append(t *testing.T) { } } -func BenchmarkConcurrentAppends(t *testing.B) { - type appendArgs struct { - tenant string - labels labels.Labels - entries []*push.Entry - } - - lbls := []labels.Labels{ - labels.FromStrings("container", "foo", "namespace", "dev"), - labels.FromStrings("container", "bar", "namespace", "staging"), - labels.FromStrings("container", "bar", "namespace", "prod"), - } - characters := "abcdefghijklmnopqrstuvwxyz" - tenants := []string{} - // 676 unique tenants (26^2) - for i := 0; i < len(characters); i++ { - for j := 0; j < len(characters); j++ { - tenants = append(tenants, string(characters[i])+string(characters[j])) - } - } - - workChan := make(chan *appendArgs) - var wg sync.WaitGroup - var w *SegmentWriter - for i := 0; i < 100; i++ { - wg.Add(1) - go func(i int) { - for args := range workChan { - w.Append(args.tenant, args.labels.String(), args.labels, args.entries) - } - wg.Done() - }(i) - } - - t.ResetTimer() - for i := 0; i < t.N; i++ { - var err error - w, err = NewWalSegmentWriter(NewSegmentMetrics(nil)) - require.NoError(t, err) - - for _, lbl := range lbls { - for _, r := range tenants { - for i := 0; i < 10; i++ { - workChan <- &appendArgs{ - tenant: r, - labels: lbl, - entries: []*push.Entry{ - {Timestamp: time.Unix(0, int64(i)), Line: fmt.Sprintf("log line %d", i)}, - }, - } - } - } - } - } - close(workChan) - wg.Wait() -} - -func TestConcurrentAppends(t *testing.T) { - type appendArgs struct { - tenant string - labels labels.Labels - entries []*push.Entry - } - dst := bytes.NewBuffer(nil) - - w, err := NewWalSegmentWriter(NewSegmentMetrics(nil)) - require.NoError(t, err) - var wg sync.WaitGroup - workChan := make(chan *appendArgs, 100) - for i := 0; i < 100; i++ { - wg.Add(1) - go func(i int) { - for args := range workChan { - w.Append(args.tenant, args.labels.String(), args.labels, args.entries) - } - wg.Done() - }(i) - } - - lbls := []labels.Labels{ - labels.FromStrings("container", "foo", "namespace", "dev"), - labels.FromStrings("container", "bar", "namespace", "staging"), - labels.FromStrings("container", "bar", "namespace", "prod"), - } - characters := "abcdefghijklmnopqrstuvwxyz" - tenants := []string{} - // 676 unique tenants (26^2) - for i := 0; i < len(characters); i++ { - for j := 0; j < len(characters); j++ { - for k := 0; k < len(characters); k++ { - tenants = append(tenants, string(characters[i])+string(characters[j])+string(characters[k])) - } - } - } - - msgsPerSeries := 10 - msgsGenerated := 0 - for _, r := range tenants { - for _, lbl := range lbls { - for i := 0; i < msgsPerSeries; i++ { - msgsGenerated++ - workChan <- &appendArgs{ - tenant: r, - labels: lbl, - entries: []*push.Entry{ - {Timestamp: time.Unix(0, int64(i)), Line: fmt.Sprintf("log line %d", i)}, - }, - } - } - } - } - close(workChan) - wg.Wait() - - n, err := w.WriteTo(dst) - require.NoError(t, err) - require.True(t, n > 0) - - r, err := NewReader(dst.Bytes()) - require.NoError(t, err) - - iter, err := r.Series(context.Background()) - require.NoError(t, err) - - var expectedSeries, actualSeries []string - - for _, tenant := range tenants { - for _, lbl := range lbls { - expectedSeries = append(expectedSeries, labels.NewBuilder(lbl).Set(tenantLabel, tenant).Labels().String()) - } - } - - msgsRead := 0 - for iter.Next() { - actualSeries = append(actualSeries, iter.At().String()) - chk, err := iter.ChunkReader(nil) - require.NoError(t, err) - // verify all lines - var i int - for chk.Next() { - ts, line := chk.At() - require.Equal(t, int64(i), ts) - require.Equal(t, fmt.Sprintf("log line %d", i), string(line)) - msgsRead++ - i++ - } - require.NoError(t, chk.Err()) - require.NoError(t, chk.Close()) - require.Equal(t, msgsPerSeries, i) - } - require.NoError(t, iter.Err()) - require.ElementsMatch(t, expectedSeries, actualSeries) - require.Equal(t, msgsGenerated, msgsRead) - t.Logf("Generated %d messages between %d tenants", msgsGenerated, len(tenants)) -} - func TestMultiTenantWrite(t *testing.T) { w, err := NewWalSegmentWriter(NewSegmentMetrics(nil)) require.NoError(t, err)