Skip to content

Commit

Permalink
fix: experimental distributor data skew
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae committed Sep 18, 2024
1 parent fcc3dfd commit ca8e7c8
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ import (
"github.com/grafana/dskit/ring"

v1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
"github.com/grafana/pyroscope/pkg/experiment/ingester/client/distributor/placement"
"github.com/grafana/pyroscope/pkg/experiment/distributor/placement"
phlaremodel "github.com/grafana/pyroscope/pkg/model"
)

const (
defaultRingUpdateInterval = 5 * time.Second
defaultFallbackLocations = 5
defaultFallbackLocations = 7
)

// NewTenantServiceDatasetKey build a distribution key, where
Expand Down Expand Up @@ -104,7 +104,7 @@ func (d *Distributor) datasetShards(k placement.Key, ms uint32) ([]shard, uint32
if size == 0 {
size = s
}
return d.distribution.selectShards(shards, max(size, ms), k.Dataset), size
return selectShards(d.distribution.shards, max(size, ms), k.Dataset), size
}

// tenantShards returns the list of shards that are available to the tenant.
Expand All @@ -114,7 +114,7 @@ func (d *Distributor) tenantShards(k placement.Key, ms uint32) []shard {
if size == 0 {
return d.distribution.shards
}
return d.distribution.selectShards(nil, max(size, ms), k.Tenant)
return selectShards(d.distribution.shards, max(size, ms), k.Tenant)
}

// TODO(kolesnikovae):
Expand Down Expand Up @@ -227,18 +227,18 @@ func init() {
}
}

func (d *distribution) selectShards(s []shard, n uint32, k uint64) []shard {
func selectShards(s []shard, n uint32, k uint64) []shard {
// m options are available total.
m := len(d.shards)
// pick n options from m.
m := len(s)
// Pick n options from m.
n = min(uint32(len(steps)), uint32(m), n)
s = slices.Grow(s[:0], int(n))
x := make([]shard, 0, n)
// Note that the choice is deterministic.
for i := uint32(0); i < n; i++ {
j := jump(k&^steps[i], m)
s = append(s, d.shards[j])
x = append(x, s[j])
}
return s
return x
}

// The inputs are a key and the number of buckets.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/stretchr/testify/require"

typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
"github.com/grafana/pyroscope/pkg/experiment/ingester/client/distributor/placement"
"github.com/grafana/pyroscope/pkg/experiment/distributor/placement"
"github.com/grafana/pyroscope/pkg/testhelper"
)

Expand Down
4 changes: 2 additions & 2 deletions pkg/experiment/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"google.golang.org/grpc/status"

segmentwriterv1 "github.com/grafana/pyroscope/api/gen/proto/go/segmentwriter/v1"
"github.com/grafana/pyroscope/pkg/experiment/distributor"
"github.com/grafana/pyroscope/pkg/experiment/distributor/placement"
"github.com/grafana/pyroscope/pkg/experiment/ingester/client/connpool"
"github.com/grafana/pyroscope/pkg/experiment/ingester/client/distributor"
"github.com/grafana/pyroscope/pkg/experiment/ingester/client/distributor/placement"
"github.com/grafana/pyroscope/pkg/util/circuitbreaker"
)

Expand Down

0 comments on commit ca8e7c8

Please sign in to comment.