Skip to content

Commit

Permalink
feat(blooms): Add bloom planner and bloom builder to backend target (
Browse files Browse the repository at this point in the history
…#13997)

Previously, the bloom compactor component was part of the `backend` target in the Simple Scalable Deployment (SSD) mode. However, the bloom compactor was removed (#13969) in favour of planner and builder, and therefore also removed from the backend target.

This PR adds the planner and builder components to the backend target so it can continue building blooms if enabled.

The planner needs to be run as singleton, therefore there must only be one instance that creates tasks for the builders, even if multiple replicas of the backend target are deployed.
This is achieved by leader election through the already existing index gateway ring in the backend target. The planner leader is determined by the ownership of the leader key. Builders connect to the planner leader to pull tasks.

----

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum authored Sep 2, 2024
1 parent ef1df0e commit bf60455
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 15 deletions.
4 changes: 2 additions & 2 deletions docs/sources/operations/query-acceleration-blooms.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ and querying the bloom filters that only pays off at large scale deployments.
{{< /admonition >}}

To start building and using blooms you need to:
- Deploy the [Bloom Planner and Builder](#bloom-planner-and-builder) components and enable the component in the [Bloom Build config][bloom-build-cfg].
- Deploy the [Bloom Gateway](#bloom-gateway) component (as a [microservice][microservices] or via the [SSD][ssd] Backend target) and enable the component in the [Bloom Gateway config][bloom-gateway-cfg].
- Deploy the [Bloom Planner and Builder](#bloom-planner-and-builder) components (as [microservices][microservices] or via the [SSD][ssd] `backend` target) and enable the components in the [Bloom Build config][bloom-build-cfg].
- Deploy the [Bloom Gateway](#bloom-gateway) component (as a [microservice][microservices] or via the [SSD][ssd] `backend` target) and enable the component in the [Bloom Gateway config][bloom-gateway-cfg].
- Enable blooms building and filtering for each tenant individually, or for all of them by default.

```yaml
Expand Down
42 changes: 37 additions & 5 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
utillog "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/ring"
)

type Builder struct {
Expand All @@ -47,6 +48,10 @@ type Builder struct {
chunkLoader ChunkLoader

client protos.PlannerForBuilderClient

// used only in SSD mode where a single planner of the backend replicas needs to create tasksQueue
// therefore is nil when planner is run in microservice mode (default)
ringWatcher *common.RingWatcher
}

func New(
Expand All @@ -59,6 +64,7 @@ func New(
bloomStore bloomshipper.Store,
logger log.Logger,
r prometheus.Registerer,
rm *ring.RingManager,
) (*Builder, error) {
utillog.WarnExperimentalUse("Bloom Builder", logger)

Expand All @@ -82,18 +88,33 @@ func New(
logger: logger,
}

if rm != nil {
b.ringWatcher = common.NewRingWatcher(rm.RingLifecycler.GetInstanceID(), rm.Ring, time.Minute, logger)
}

b.Service = services.NewBasicService(b.starting, b.running, b.stopping)
return b, nil
}

func (b *Builder) starting(_ context.Context) error {
func (b *Builder) starting(ctx context.Context) error {
if b.ringWatcher != nil {
if err := services.StartAndAwaitRunning(ctx, b.ringWatcher); err != nil {
return fmt.Errorf("error starting builder subservices: %w", err)
}
}
b.metrics.running.Set(1)
return nil
}

func (b *Builder) stopping(_ error) error {
defer b.metrics.running.Set(0)

if b.ringWatcher != nil {
if err := services.StopAndAwaitTerminated(context.Background(), b.ringWatcher); err != nil {
return fmt.Errorf("error stopping builder subservices: %w", err)
}
}

if b.client != nil {
// The gRPC server we use from dskit expects the orgID to be injected into the context when auth is enabled
// We won't actually use the orgID anywhere in this service, but we need to inject it to satisfy the server.
Expand Down Expand Up @@ -137,16 +158,27 @@ func (b *Builder) running(ctx context.Context) error {
return nil
}

func (b *Builder) connectAndBuild(
ctx context.Context,
) error {
func (b *Builder) plannerAddress() string {
if b.ringWatcher == nil {
return b.cfg.PlannerAddress
}

addr, err := b.ringWatcher.GetLeaderAddress()
if err != nil {
return b.cfg.PlannerAddress
}

return addr
}

func (b *Builder) connectAndBuild(ctx context.Context) error {
opts, err := b.cfg.GrpcConfig.DialOption(nil, nil)
if err != nil {
return fmt.Errorf("failed to create grpc dial options: %w", err)
}

// nolint:staticcheck // grpc.DialContext() has been deprecated; we'll address it before upgrading to gRPC 2.
conn, err := grpc.DialContext(ctx, b.cfg.PlannerAddress, opts...)
conn, err := grpc.DialContext(ctx, b.plannerAddress(), opts...)
if err != nil {
return fmt.Errorf("failed to dial bloom planner: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func Test_BuilderLoop(t *testing.T) {
}
flagext.DefaultValues(&cfg.GrpcConfig)

builder, err := New(cfg, limits, schemaCfg, storageCfg, storage.NewClientMetrics(), nil, fakeBloomStore{}, logger, prometheus.DefaultRegisterer)
builder, err := New(cfg, limits, schemaCfg, storageCfg, storage.NewClientMetrics(), nil, fakeBloomStore{}, logger, prometheus.DefaultRegisterer, nil)
require.NoError(t, err)
t.Cleanup(func() {
err = services.StopAndAwaitTerminated(context.Background(), builder)
Expand Down
119 changes: 119 additions & 0 deletions pkg/bloombuild/common/ringwatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package common

import (
"context"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
)

const (
RingKeyOfLeader = 0xffff
)

type RingWatcher struct {
services.Service
id string
ring *ring.Ring
leader *ring.InstanceDesc
lookupPeriod time.Duration
logger log.Logger
}

// NewRingWatcher creates a service.Service that watches a ring for a leader instance.
// The leader instance is the instance that owns the key `RingKeyOfLeader`.
// It provides functions to get the leader's address, and to check whether a given instance in the ring is leader.
// Bloom planner and bloom builder use this ring watcher to hook into index gateway ring when they are run as
// part of the `backend` target of the Simple Scalable Deployment (SSD).
// It should not be used for any other components outside of the bloombuild package.
func NewRingWatcher(id string, ring *ring.Ring, lookupPeriod time.Duration, logger log.Logger) *RingWatcher {
w := &RingWatcher{
id: id,
ring: ring,
lookupPeriod: lookupPeriod,
logger: logger,
}
w.Service = services.NewBasicService(nil, w.updateLoop, nil)
return w
}

func (w *RingWatcher) waitForInitialLeader(ctx context.Context) error {
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-syncTicker.C:
w.lookupAddresses()
if w.leader != nil {
return nil
}
}
}
}

func (w *RingWatcher) updateLoop(ctx context.Context) error {
_ = w.waitForInitialLeader(ctx)

syncTicker := time.NewTicker(w.lookupPeriod)
defer syncTicker.Stop()

for {
select {
case <-ctx.Done():
return nil
case <-syncTicker.C:
w.lookupAddresses()
}
}
}

func (w *RingWatcher) lookupAddresses() {
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()
rs, err := w.ring.Get(RingKeyOfLeader, ring.WriteNoExtend, bufDescs, bufHosts, bufZones)
if err != nil {
level.Error(w.logger).Log("msg", "failed to get replicationset for key", "key", RingKeyOfLeader, "err", err)
w.leader = nil
return
}

for i := range rs.Instances {
inst := rs.Instances[i]
state, err := w.ring.GetInstanceState(inst.Id)
if err != nil || state != ring.ACTIVE {
return
}
tr, err := w.ring.GetTokenRangesForInstance(inst.Id)
if err != nil && (len(tr) == 0 || tr.IncludesKey(RingKeyOfLeader)) {
if w.leader == nil || w.leader.Id != inst.Id {
level.Info(w.logger).Log("msg", "updated leader", "new_leader", inst)
}
w.leader = &inst
return
}
}

w.leader = nil
}

func (w *RingWatcher) IsLeader() bool {
return w.IsInstanceLeader(w.id)
}

func (w *RingWatcher) IsInstanceLeader(instanceID string) bool {
res := w.leader != nil && w.leader.Id == instanceID
level.Debug(w.logger).Log("msg", "check if instance is leader", "inst", instanceID, "curr_leader", w.leader, "is_leader", res)
return res
}

func (w *RingWatcher) GetLeaderAddress() (string, error) {
if w.leader == nil {
return "", ring.ErrEmptyRing
}
return w.leader.Addr, nil
}
48 changes: 43 additions & 5 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/util"
utillog "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/ring"
)

var errPlannerIsNotRunning = errors.New("planner is not running")
var (
errPlannerIsNotRunning = errors.New("planner is not running")
errPlannerIsNotLeader = errors.New("planner is not leader")
)

type Planner struct {
services.Service
Expand All @@ -52,6 +56,10 @@ type Planner struct {

metrics *Metrics
logger log.Logger

// used only in SSD mode where a single planner of the backend replicas needs to create tasksQueue
// therefore is nil when planner is run in microservice mode (default)
ringWatcher *common.RingWatcher
}

func New(
Expand All @@ -63,6 +71,7 @@ func New(
bloomStore bloomshipper.StoreBase,
logger log.Logger,
r prometheus.Registerer,
rm *ring.RingManager,
) (*Planner, error) {
utillog.WarnExperimentalUse("Bloom Planner", logger)

Expand Down Expand Up @@ -101,6 +110,12 @@ func New(
)

svcs := []services.Service{p.tasksQueue, p.activeUsers}

if rm != nil {
p.ringWatcher = common.NewRingWatcher(rm.RingLifecycler.GetInstanceID(), rm.Ring, time.Minute, logger)
svcs = append(svcs, p.ringWatcher)
}

p.subservices, err = services.NewManager(svcs...)
if err != nil {
return nil, fmt.Errorf("error creating subservices manager: %w", err)
Expand All @@ -112,6 +127,15 @@ func New(
return p, nil
}

func (p *Planner) isLeader() bool {
if p.ringWatcher == nil {
// when the planner runs as standalone service in microserivce mode, then there is no ringWatcher
// therefore we can safely assume that the planner is a singleton
return true
}
return p.ringWatcher.IsLeader()
}

func (p *Planner) starting(ctx context.Context) (err error) {
if err := services.StartManagerAndAwaitHealthy(ctx, p.subservices); err != nil {
return fmt.Errorf("error starting planner subservices: %w", err)
Expand All @@ -135,10 +159,9 @@ func (p *Planner) stopping(_ error) error {
func (p *Planner) running(ctx context.Context) error {
go p.trackInflightRequests(ctx)

// run once at beginning
if err := p.runOne(ctx); err != nil {
level.Error(p.logger).Log("msg", "bloom build iteration failed for the first time", "err", err)
}
// run once at beginning, but delay by 1m to allow ring consolidation when running in SSD mode
initialPlanningTimer := time.NewTimer(time.Minute)
defer initialPlanningTimer.Stop()

planningTicker := time.NewTicker(p.cfg.PlanningInterval)
defer planningTicker.Stop()
Expand All @@ -154,6 +177,12 @@ func (p *Planner) running(ctx context.Context) error {
level.Debug(p.logger).Log("msg", "planner context done")
return nil

case <-initialPlanningTimer.C:
level.Info(p.logger).Log("msg", "starting initial bloom build iteration")
if err := p.runOne(ctx); err != nil {
level.Error(p.logger).Log("msg", "initial bloom build iteration failed", "err", err)
}

case <-planningTicker.C:
level.Info(p.logger).Log("msg", "starting bloom build iteration")
if err := p.runOne(ctx); err != nil {
Expand Down Expand Up @@ -192,6 +221,10 @@ type tenantTable struct {
}

func (p *Planner) runOne(ctx context.Context) error {
if !p.isLeader() {
return errPlannerIsNotLeader
}

var (
wg sync.WaitGroup
start = time.Now()
Expand Down Expand Up @@ -901,6 +934,11 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer

builderID := resp.GetBuilderID()
logger := log.With(p.logger, "builder", builderID)

if !p.isLeader() {
return errPlannerIsNotLeader
}

level.Debug(logger).Log("msg", "builder connected")

p.tasksQueue.RegisterConsumerConnection(builderID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func createPlanner(
bloomStore, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, storage.ClientMetrics{}, metasCache, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger)
require.NoError(t, err)

planner, err := New(cfg, limits, schemaCfg, storageCfg, storage.ClientMetrics{}, bloomStore, logger, reg)
planner, err := New(cfg, limits, schemaCfg, storageCfg, storage.ClientMetrics{}, bloomStore, logger, reg, nil)
require.NoError(t, err)

return planner
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ func (t *Loki) setupModuleManager() error {

Read: {QueryFrontend, Querier},
Write: {Ingester, IngesterRF1, Distributor, PatternIngester, IngesterKafka},
Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomGateway},
Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomPlanner, BloomBuilder, BloomGateway},

All: {QueryScheduler, QueryFrontend, Querier, Ingester, IngesterRF1, PatternIngester, Distributor, Ruler, Compactor, Metastore, IngesterKafka},
}
Expand Down
Loading

0 comments on commit bf60455

Please sign in to comment.