Skip to content

Commit

Permalink
Merge branch 'main' into fix-upgrade-discarded-lines-per-interval
Browse files Browse the repository at this point in the history
  • Loading branch information
QuentinBisson authored May 22, 2024
2 parents 75632dc + 1948899 commit 74dd75a
Show file tree
Hide file tree
Showing 442 changed files with 52,350 additions and 1,149 deletions.
1 change: 1 addition & 0 deletions .github/workflows/helm-release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
push:
branches:
- main
- helm-5.48
paths:
- 'production/helm/loki/Chart.yaml'

Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/operator-check-prepare-release-commit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ jobs:
steps:
- name: Extract release version
id: pr_semver
env:
PR_TITLE: ${{ github.event.pull_request.title }}
run: |
PR_TITLE="${{ github.event.pull_request.title }}"
SEMVER=$(echo "$PR_TITLE" | sed -n 's/^chore( operator): community release \([0-9]\+\.[0-9]\+\.[0-9]\+\)$/\1/p')
echo "semver=$SEMVER" >> $GITHUB_OUTPUT
Expand Down
35 changes: 29 additions & 6 deletions clients/pkg/promtail/targets/file/filetarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const (
FilenameLabel = "filename"
)

var errFileTargetStopped = errors.New("File target is stopped")

// Config describes behavior for Target
type Config struct {
SyncPeriod time.Duration `mapstructure:"sync_period" yaml:"sync_period"`
Expand Down Expand Up @@ -223,6 +225,11 @@ func (t *FileTarget) run() {
}
case <-ticker.C:
err := t.sync()
if errors.Is(err, errFileTargetStopped) {
// This file target has been stopped.
// This is normal and there is no need to log an error.
return
}
if err != nil {
level.Error(t.logger).Log("msg", "error running sync function", "error", err)
}
Expand Down Expand Up @@ -291,14 +298,20 @@ func (t *FileTarget) sync() error {
t.watchesMutex.Lock()
toStartWatching := missing(t.watches, dirs)
t.watchesMutex.Unlock()
t.startWatching(toStartWatching)
err := t.startWatching(toStartWatching)
if errors.Is(err, errFileTargetStopped) {
return err
}

// Remove any directories which no longer need watching.
t.watchesMutex.Lock()
toStopWatching := missing(dirs, t.watches)
t.watchesMutex.Unlock()

t.stopWatching(toStopWatching)
err = t.stopWatching(toStopWatching)
if errors.Is(err, errFileTargetStopped) {
return err
}

// fsnotify.Watcher doesn't allow us to see what is currently being watched so we have to track it ourselves.
t.watchesMutex.Lock()
Expand All @@ -321,32 +334,42 @@ func (t *FileTarget) sync() error {
return nil
}

func (t *FileTarget) startWatching(dirs map[string]struct{}) {
func (t *FileTarget) startWatching(dirs map[string]struct{}) error {
for dir := range dirs {
if _, ok := t.getWatch(dir); ok {
continue
}

level.Info(t.logger).Log("msg", "watching new directory", "directory", dir)
t.targetEventHandler <- fileTargetEvent{
select {
case <-t.quit:
return errFileTargetStopped
case t.targetEventHandler <- fileTargetEvent{
path: dir,
eventType: fileTargetEventWatchStart,
}:
}
}
return nil
}

func (t *FileTarget) stopWatching(dirs map[string]struct{}) {
func (t *FileTarget) stopWatching(dirs map[string]struct{}) error {
for dir := range dirs {
if _, ok := t.getWatch(dir); !ok {
continue
}

level.Info(t.logger).Log("msg", "removing directory from watcher", "directory", dir)
t.targetEventHandler <- fileTargetEvent{
select {
case <-t.quit:
return errFileTargetStopped
case t.targetEventHandler <- fileTargetEvent{
path: dir,
eventType: fileTargetEventWatchStop,
}:
}
}
return nil
}

func (t *FileTarget) startTailing(ps []string) {
Expand Down
87 changes: 87 additions & 0 deletions clients/pkg/promtail/targets/file/filetarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,93 @@ func TestFileTarget_StopsTailersCleanly_Parallel(t *testing.T) {
ps.Stop()
}

// Make sure that Stop() doesn't hang if FileTarget is waiting on a channel send.
func TestFileTarget_StopAbruptly(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)

dirName := newTestLogDirectories(t)
positionsFileName := filepath.Join(dirName, "positions.yml")
logDir1 := filepath.Join(dirName, "log1")
logDir2 := filepath.Join(dirName, "log2")
logDir3 := filepath.Join(dirName, "log3")

logfile1 := filepath.Join(logDir1, "test1.log")
logfile2 := filepath.Join(logDir2, "test1.log")
logfile3 := filepath.Join(logDir3, "test1.log")

ps, err := positions.New(logger, positions.Config{
SyncPeriod: 10 * time.Millisecond,
PositionsFile: positionsFileName,
})
require.NoError(t, err)

client := fake.New(func() {})
defer client.Stop()

// fakeHandler has to be a buffered channel so that we can call the len() function on it.
// We need to call len() to check if the channel is full.
fakeHandler := make(chan fileTargetEvent, 1)
pathToWatch := filepath.Join(dirName, "**", "*.log")
registry := prometheus.NewRegistry()
target, err := NewFileTarget(NewMetrics(registry), logger, client, ps, pathToWatch, "", nil, nil, &Config{
SyncPeriod: 10 * time.Millisecond,
}, DefaultWatchConig, nil, fakeHandler, "", nil)
assert.NoError(t, err)

// Create a directory, still nothing is watched.
err = os.MkdirAll(logDir1, 0750)
assert.NoError(t, err)
_, err = os.Create(logfile1)
assert.NoError(t, err)

// There should be only one WatchStart event in the channel so far.
ftEvent := <-fakeHandler
require.Equal(t, fileTargetEventWatchStart, ftEvent.eventType)

requireEventually(t, func() bool {
return target.getReadersLen() == 1
}, "expected 1 tailer to be created")

require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP promtail_files_active_total Number of active files.
# TYPE promtail_files_active_total gauge
promtail_files_active_total 1
`), "promtail_files_active_total"))

// Create two directories - one more than the buffer of fakeHandler,
// so that the file target hands until we call Stop().
err = os.MkdirAll(logDir2, 0750)
assert.NoError(t, err)
_, err = os.Create(logfile2)
assert.NoError(t, err)

err = os.MkdirAll(logDir3, 0750)
assert.NoError(t, err)
_, err = os.Create(logfile3)
assert.NoError(t, err)

// Wait until the file target is waiting on a channel send due to a full channel buffer.
requireEventually(t, func() bool {
return len(fakeHandler) == 1
}, "expected an event in the fakeHandler channel")

// If FileHandler works well, then it will stop waiting for
// the blocked fakeHandler and stop cleanly.
// This is why this time we don't drain fakeHandler.
requireEventually(t, func() bool {
target.Stop()
ps.Stop()
return true
}, "expected FileTarget not to hang")

require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(`
# HELP promtail_files_active_total Number of active files.
# TYPE promtail_files_active_total gauge
promtail_files_active_total 0
`), "promtail_files_active_total"))
}

func TestFileTargetPathExclusion(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
Expand Down
1 change: 1 addition & 0 deletions cmd/logcli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ func newQueryClient(app *kingpin.Application) client.Client {
app.Flag("key", "Path to the client certificate key. Can also be set using LOKI_CLIENT_KEY_PATH env var.").Default("").Envar("LOKI_CLIENT_KEY_PATH").StringVar(&client.TLSConfig.KeyFile)
app.Flag("org-id", "adds X-Scope-OrgID to API requests for representing tenant ID. Useful for requesting tenant data when bypassing an auth gateway. Can also be set using LOKI_ORG_ID env var.").Default("").Envar("LOKI_ORG_ID").StringVar(&client.OrgID)
app.Flag("query-tags", "adds X-Query-Tags http header to API requests. This header value will be part of `metrics.go` statistics. Useful for tracking the query. Can also be set using LOKI_QUERY_TAGS env var.").Default("").Envar("LOKI_QUERY_TAGS").StringVar(&client.QueryTags)
app.Flag("nocache", "adds Cache-Control: no-cache http header to API requests. Can also be set using LOKI_NO_CACHE env var.").Default("false").Envar("LOKI_NO_CACHE").BoolVar(&client.NoCache)
app.Flag("bearer-token", "adds the Authorization header to API requests for authentication purposes. Can also be set using LOKI_BEARER_TOKEN env var.").Default("").Envar("LOKI_BEARER_TOKEN").StringVar(&client.BearerToken)
app.Flag("bearer-token-file", "adds the Authorization header to API requests for authentication purposes. Can also be set using LOKI_BEARER_TOKEN_FILE env var.").Default("").Envar("LOKI_BEARER_TOKEN_FILE").StringVar(&client.BearerTokenFile)
app.Flag("retries", "How many times to retry each query when getting an error response from Loki. Can also be set using LOKI_CLIENT_RETRIES env var.").Default("0").Envar("LOKI_CLIENT_RETRIES").IntVar(&client.Retries)
Expand Down
1 change: 1 addition & 0 deletions cmd/loki/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func main() {
}

level.Info(util_log.Logger).Log("msg", "Starting Loki", "version", version.Info())
level.Info(util_log.Logger).Log("msg", "Loading configuration file", "filename", config.ConfigFile)

err = t.Run(loki.RunOpts{StartTime: startTime})
util_log.CheckFatal("running loki", err, util_log.Logger)
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/get-started/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ description: Provides an overview of the steps for implementing Grafana Loki to

# Get started with Grafana Loki

{{< youtube id="1uk8LtQqsZQ" >}}

Loki is a horizontally-scalable, highly-available, multi-tenant log aggregation system inspired by Prometheus. It is designed to be very cost effective and easy to operate. It does not index the contents of the logs, but rather a set of labels for each log stream.

Because all Loki implementations are unique, the installation process is
Expand Down
2 changes: 1 addition & 1 deletion docs/sources/get-started/components.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ This page describes the responsibilities of each of these components.
## Distributor

The **distributor** service is responsible for handling incoming push requests from
clients. It's the first stop in the write path for log data. Once the
clients. It's the first step in the write path for log data. Once the
distributor receives a set of streams in an HTTP request, each stream is validated for correctness
and to ensure that it is within the configured tenant (or global) limits. Each valid stream
is then sent to `n` [ingesters](#ingester) in parallel, where `n` is the [replication factor](#replication-factor) for data.
Expand Down
11 changes: 11 additions & 0 deletions docs/sources/reference/loki-http-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Authorization needs to be done separately, for example, using an open-source loa
These endpoints are exposed by the `distributor`, `write`, and `all` components:

- [`POST /loki/api/v1/push`](#ingest-logs)
- [`POST /otlp/v1/logs`](#ingest-logs-using-otlp)

A [list of clients]({{< relref "../send-data" >}}) can be found in the clients documentation.

Expand Down Expand Up @@ -260,6 +261,16 @@ curl -H "Content-Type: application/json" \
--data-raw '{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}'
```

## Ingest logs using OTLP

```bash
POST /otlp/v1/logs
```

`/otlp/v1/logs` lets the OpenTelemetry Collector send logs to Loki using `otlphttp` procotol.

For information on how to configure Loki, refer to the [OTel Collector topic](https://grafana.com/docs/loki/<LOKI_VERSION>/send-data/otel/).

## Query logs at a single point in time

```bash
Expand Down
2 changes: 1 addition & 1 deletion docs/sources/release-notes/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ weight: 100
Release notes for Loki are in the CHANGELOG for the release and
listed here by version number.

- [V3.0 release notes](https://grafana.com/docs/loki/<LOKI_VERSION>/release-notes/v3.0/)
- [V3.0 release notes](https://grafana.com/docs/loki/<LOKI_VERSION>/release-notes/v3-0/)
- [V2.9 release notes](https://grafana.com/docs/loki/<LOKI_VERSION>/release-notes/v2-9/)
- [V2.8 release notes](https://grafana.com/docs/loki/<LOKI_VERSION>/release-notes/v2-8/)
- [V2.7 release notes](https://grafana.com/docs/loki/<LOKI_VERSION>/release-notes/v2-7/)
Expand Down
1 change: 1 addition & 0 deletions docs/sources/send-data/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ These third-party clients also enable sending logs to Loki:
- [push-to-loki.py](https://github.com/sleleko/devops-kb/blob/master/python/push-to-loki.py) (Python 3)
- [python-logging-loki](https://pypi.org/project/python-logging-loki/) (Python 3)
- [nextlog](https://pypi.org/project/nextlog/) (Python 3)
- [Rails Loki Exporter](https://github.com/planninghow/rails-loki-exporter) (Rails)
- [Serilog-Sinks-Loki](https://github.com/JosephWoodward/Serilog-Sinks-Loki) (C#)
- [Vector Loki Sink](https://vector.dev/docs/reference/configuration/sinks/loki/)
- [winston-loki](https://github.com/JaniAnttonen/winston-loki) (JS)
4 changes: 2 additions & 2 deletions docs/sources/send-data/otel/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ You need to make the following changes to the [OpenTelemetry Collector config](h
```yaml
exporters:
otlphttp:
endpoint: http://<loki-addr>:3100/otlp
endpoint: http://<loki-addr>:3100/otlp/v1/logs
```
And enable it in `service.pipelines`:
Expand All @@ -57,7 +57,7 @@ exporters:
otlphttp:
auth:
authenticator: basicauth/otlp
endpoint: http://<loki-addr>:3100/otlp
endpoint: http://<loki-addr>:3100/otlp/v1/logs
service:
extensions: [basicauth/otlp]
Expand Down
4 changes: 2 additions & 2 deletions docs/sources/setup/install/helm/install-monolithic/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ If you set the `singleBinary.replicas` value to 2 or more, this chart configures
type: 'filesystem'
schemaConfig:
configs:
- from: 2024-01-01
- from: "2024-01-01"
store: tsdb
index:
prefix: loki_index_
Expand All @@ -72,7 +72,7 @@ If you set the `singleBinary.replicas` value to 2 or more, this chart configures
replication_factor: 3
schemaConfig:
configs:
- from: 2024-01-01
- from: "2024-01-01"
store: tsdb
index:
prefix: loki_index_
Expand Down
Loading

0 comments on commit 74dd75a

Please sign in to comment.