Skip to content

Commit

Permalink
feat: Ingester RF-1 (#13365)
Browse files Browse the repository at this point in the history
  • Loading branch information
grobinson-grafana authored Jul 3, 2024
1 parent ca2f11d commit 7f35179
Show file tree
Hide file tree
Showing 43 changed files with 4,384 additions and 83 deletions.
4 changes: 4 additions & 0 deletions cmd/loki/loki-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ server:
http_listen_port: 3100
grpc_listen_port: 9096
log_level: debug
grpc_server_max_concurrent_streams: 1000

common:
instance_addr: 127.0.0.1
Expand All @@ -17,6 +18,9 @@ common:
kvstore:
store: inmemory

ingester_rf1:
enabled: false

query_range:
results_cache:
cache:
Expand Down
358 changes: 309 additions & 49 deletions docs/sources/shared/configuration.md

Large diffs are not rendered by default.

104 changes: 104 additions & 0 deletions pkg/ingester-rf1/clientpool/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package clientpool

import (
"flag"
"io"
"time"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/util/server"

"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/middleware"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
)

var ingesterClientRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "loki_ingester_rf1_client_request_duration_seconds",
Help: "Time spent doing Ingester RF1 requests.",
Buckets: prometheus.ExponentialBuckets(0.001, 4, 6),
}, []string{"operation", "status_code"})

type HealthAndIngesterClient interface {
grpc_health_v1.HealthClient
Close() error
}

type ClosableHealthAndIngesterClient struct {
logproto.PusherRF1Client
grpc_health_v1.HealthClient
io.Closer
}

// Config for an ingester client.
type Config struct {
PoolConfig PoolConfig `yaml:"pool_config,omitempty" doc:"description=Configures how connections are pooled."`
RemoteTimeout time.Duration `yaml:"remote_timeout,omitempty"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures how the gRPC connection to ingesters work as a client."`
GRPCUnaryClientInterceptors []grpc.UnaryClientInterceptor `yaml:"-"`
GRCPStreamClientInterceptors []grpc.StreamClientInterceptor `yaml:"-"`

// Internal is used to indicate that this client communicates on behalf of
// a machine and not a user. When Internal = true, the client won't attempt
// to inject an userid into the context.
Internal bool `yaml:"-"`
}

// RegisterFlags registers flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ingester-rf1.client", f)
cfg.PoolConfig.RegisterFlagsWithPrefix("ingester-rf1.", f)

f.DurationVar(&cfg.PoolConfig.RemoteTimeout, "ingester-rf1.client.healthcheck-timeout", 1*time.Second, "How quickly a dead client will be removed after it has been detected to disappear. Set this to a value to allow time for a secondary health check to recover the missing client.")
f.DurationVar(&cfg.RemoteTimeout, "ingester-rf1.client.timeout", 5*time.Second, "The remote request timeout on the client side.")
}

// New returns a new ingester client.
func NewClient(cfg Config, addr string) (HealthAndIngesterClient, error) {
opts := []grpc.DialOption{
grpc.WithDefaultCallOptions(cfg.GRPCClientConfig.CallOptions()...),
}

dialOpts, err := cfg.GRPCClientConfig.DialOption(instrumentation(&cfg))
if err != nil {
return nil, err
}

opts = append(opts, dialOpts...)
conn, err := grpc.Dial(addr, opts...)
if err != nil {
return nil, err
}
return ClosableHealthAndIngesterClient{
PusherRF1Client: logproto.NewPusherRF1Client(conn),
HealthClient: grpc_health_v1.NewHealthClient(conn),
Closer: conn,
}, nil
}

func instrumentation(cfg *Config) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) {
var unaryInterceptors []grpc.UnaryClientInterceptor
unaryInterceptors = append(unaryInterceptors, cfg.GRPCUnaryClientInterceptors...)
unaryInterceptors = append(unaryInterceptors, server.UnaryClientQueryTagsInterceptor)
unaryInterceptors = append(unaryInterceptors, otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()))
if !cfg.Internal {
unaryInterceptors = append(unaryInterceptors, middleware.ClientUserHeaderInterceptor)
}
unaryInterceptors = append(unaryInterceptors, middleware.UnaryClientInstrumentInterceptor(ingesterClientRequestDuration))

var streamInterceptors []grpc.StreamClientInterceptor
streamInterceptors = append(streamInterceptors, cfg.GRCPStreamClientInterceptors...)
streamInterceptors = append(streamInterceptors, server.StreamClientQueryTagsInterceptor)
streamInterceptors = append(streamInterceptors, otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()))
if !cfg.Internal {
streamInterceptors = append(streamInterceptors, middleware.StreamClientUserHeaderInterceptor)
}
streamInterceptors = append(streamInterceptors, middleware.StreamClientInstrumentInterceptor(ingesterClientRequestDuration))

return unaryInterceptors, streamInterceptors
}
46 changes: 46 additions & 0 deletions pkg/ingester-rf1/clientpool/ingester_client_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package clientpool

import (
"flag"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var clients prometheus.Gauge

// PoolConfig is config for creating a Pool.
type PoolConfig struct {
ClientCleanupPeriod time.Duration `yaml:"client_cleanup_period"`
HealthCheckIngesters bool `yaml:"health_check_ingesters"`
RemoteTimeout time.Duration `yaml:"remote_timeout"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *PoolConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.ClientCleanupPeriod, prefix+"client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.")
f.BoolVar(&cfg.HealthCheckIngesters, prefix+"health-check-ingesters", true, "Run a health check on each ingester client during periodic cleanup.")
f.DurationVar(&cfg.RemoteTimeout, prefix+"remote-timeout", 1*time.Second, "Timeout for the health check.")
}

func NewPool(name string, cfg PoolConfig, ring ring.ReadRing, factory ring_client.PoolFactory, logger log.Logger, metricsNamespace string) *ring_client.Pool {
poolCfg := ring_client.PoolConfig{
CheckInterval: cfg.ClientCleanupPeriod,
HealthCheckEnabled: cfg.HealthCheckIngesters,
HealthCheckTimeout: cfg.RemoteTimeout,
}

if clients == nil {
clients = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Name: "ingester_rf1_clients",
Help: "The current number of RF1 ingester clients.",
})
}
// TODO(chaudum): Allow configuration of metric name by the caller.
return ring_client.NewPool(name, poolCfg, ring_client.NewRingServiceDiscovery(ring), factory, clients, logger)
}
186 changes: 186 additions & 0 deletions pkg/ingester-rf1/flush.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package ingesterrf1

import (
"fmt"
"net/http"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/ring"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"golang.org/x/net/context"

"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/wal"
"github.com/grafana/loki/v3/pkg/util"
)

const (
// Backoff for retrying 'immediate' flushes. Only counts for queue
// position, not wallclock time.
flushBackoff = 1 * time.Second

nameLabel = "__name__"
logsValue = "logs"

flushReasonIdle = "idle"
flushReasonMaxAge = "max_age"
flushReasonForced = "forced"
flushReasonFull = "full"
flushReasonSynced = "synced"
)

// Note: this is called both during the WAL replay (zero or more times)
// and then after replay as well.
func (i *Ingester) InitFlushQueues() {
i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes)
for j := 0; j < i.cfg.ConcurrentFlushes; j++ {
i.flushQueues[j] = util.NewPriorityQueue(i.metrics.flushQueueLength)
go i.flushLoop(j)
}
}

// Flush implements ring.FlushTransferer
// Flush triggers a flush of all the chunks and closes the flush queues.
// Called from the Lifecycler as part of the ingester shutdown.
func (i *Ingester) Flush() {
i.flush()
}

// TransferOut implements ring.FlushTransferer
// Noop implemenetation because ingesters have a WAL now that does not require transferring chunks any more.
// We return ErrTransferDisabled to indicate that we don't support transfers, and therefore we may flush on shutdown if configured to do so.
func (i *Ingester) TransferOut(_ context.Context) error {
return ring.ErrTransferDisabled
}

func (i *Ingester) flush() {
// TODO: Flush the last chunks
// Close the flush queues, to unblock waiting workers.
for _, flushQueue := range i.flushQueues {
flushQueue.Close()
}

i.flushQueuesDone.Wait()
level.Debug(i.logger).Log("msg", "flush queues have drained")
}

// FlushHandler triggers a flush of all in memory chunks. Mainly used for
// local testing.
func (i *Ingester) FlushHandler(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusNoContent)
}

type flushOp struct {
from model.Time
userID string
fp model.Fingerprint
immediate bool
}

func (o *flushOp) Key() string {
return fmt.Sprintf("%s-%s-%v", o.userID, o.fp, o.immediate)
}

func (o *flushOp) Priority() int64 {
return -int64(o.from)
}

func (i *Ingester) flushLoop(j int) {
l := log.With(i.logger, "loop", j)
defer func() {
level.Debug(l).Log("msg", "Ingester.flushLoop() exited")
i.flushQueuesDone.Done()
}()

for {
o := i.flushQueues[j].Dequeue()
if o == nil {
return
}
op := o.(*flushCtx)

err := i.flushOp(l, op)
if err != nil {
level.Error(l).Log("msg", "failed to flush", "err", err)
// Immediately re-queue another attempt at flushing this segment.
// TODO: Add some backoff or something?
i.flushQueues[j].Enqueue(op)
} else {
// Close the channel and trigger all waiting listeners to return
// TODO: Figure out how to return an error if we want to?
close(op.flushDone)
}
}
}

func (i *Ingester) flushOp(l log.Logger, flushCtx *flushCtx) error {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

b := backoff.New(ctx, i.cfg.FlushOpBackoff)
for b.Ongoing() {
err := i.flushSegment(ctx, flushCtx.segmentWriter)
if err == nil {
break
}
level.Error(l).Log("msg", "failed to flush", "retries", b.NumRetries(), "err", err)
b.Wait()
}
return b.Err()
}

// flushChunk flushes the given chunk to the store.
//
// If the flush is successful, metrics for this flush are to be reported.
// If the flush isn't successful, the operation for this userID is requeued allowing this and all other unflushed
// segments to have another opportunity to be flushed.
func (i *Ingester) flushSegment(ctx context.Context, ch *wal.SegmentWriter) error {
if err := i.store.PutWal(ctx, ch); err != nil {
i.metrics.chunksFlushFailures.Inc()
return fmt.Errorf("store put chunk: %w", err)
}
i.metrics.flushedChunksStats.Inc(1)
// TODO: report some flush metrics
return nil
}

// reportFlushedChunkStatistics calculate overall statistics of flushed chunks without compromising the flush process.
func (i *Ingester) reportFlushedChunkStatistics(ch *chunk.Chunk, desc *chunkDesc, sizePerTenant prometheus.Counter, countPerTenant prometheus.Counter, reason string) {
byt, err := ch.Encoded()
if err != nil {
level.Error(i.logger).Log("msg", "failed to encode flushed wire chunk", "err", err)
return
}

i.metrics.chunksFlushedPerReason.WithLabelValues(reason).Add(1)

compressedSize := float64(len(byt))
uncompressedSize, ok := chunkenc.UncompressedSize(ch.Data)

if ok && compressedSize > 0 {
i.metrics.chunkCompressionRatio.Observe(float64(uncompressedSize) / compressedSize)
}

utilization := ch.Data.Utilization()
i.metrics.chunkUtilization.Observe(utilization)
numEntries := desc.chunk.Size()
i.metrics.chunkEntries.Observe(float64(numEntries))
i.metrics.chunkSize.Observe(compressedSize)
sizePerTenant.Add(compressedSize)
countPerTenant.Inc()

boundsFrom, boundsTo := desc.chunk.Bounds()
i.metrics.chunkAge.Observe(time.Since(boundsFrom).Seconds())
i.metrics.chunkLifespan.Observe(boundsTo.Sub(boundsFrom).Hours())

i.metrics.flushedChunksBytesStats.Record(compressedSize)
i.metrics.flushedChunksLinesStats.Record(float64(numEntries))
i.metrics.flushedChunksUtilizationStats.Record(utilization)
i.metrics.flushedChunksAgeStats.Record(time.Since(boundsFrom).Seconds())
i.metrics.flushedChunksLifespanStats.Record(boundsTo.Sub(boundsFrom).Seconds())
}
Loading

0 comments on commit 7f35179

Please sign in to comment.