From 7d57d7137738cd6079c04170666eb9ded72f371e Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Mon, 2 Aug 2021 14:15:19 +0200 Subject: [PATCH] Update go-concert to 0.2.0 (#27162) (cherry picked from commit 3664b244efb7098c34de87d1de77fd117dacdbf7) --- CHANGELOG-developer.next.asciidoc | 1 + NOTICE.txt | 8 ++++---- filebeat/input/filestream/filestream.go | 5 +++-- .../input/filestream/internal/input-logfile/harvester.go | 8 ++++---- filebeat/input/filestream/internal/input-logfile/input.go | 4 +++- .../input/filestream/internal/input-logfile/manager.go | 3 ++- .../filestream/internal/input-logfile/update_writer.go | 5 ++--- filebeat/input/v2/input-cursor/manager.go | 3 ++- filebeat/inputsource/common/dgram/server.go | 2 +- go.mod | 4 ++-- go.sum | 8 ++++---- 11 files changed, 28 insertions(+), 23 deletions(-) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 4a87123089ce..caac8694132f 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -109,4 +109,5 @@ The list below covers the major changes between 7.0.0-rc2 and master only. - Update Go version to 1.15.12. {pull}25629[25629] - Update Go version to 1.16.4. {issue}25346[25346] {pull}25671[25671] - Add sorting to array fields for generated data files (*-generated.json) {pull}25320[25320] +- Update to go-concert 0.2.0 {pull}27162[27162] - Update Go version to 1.16.5. {issue}26182[26182] {pull}26186[26186] diff --git a/NOTICE.txt b/NOTICE.txt index 2f2b7c027aed..92b117280597 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -6578,11 +6578,11 @@ SOFTWARE -------------------------------------------------------------------------------- Dependency : github.com/elastic/go-concert -Version: v0.1.0 +Version: v0.2.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/go-concert@v0.1.0/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/go-concert@v0.2.0/LICENSE: Apache License Version 2.0, January 2004 @@ -15148,11 +15148,11 @@ SOFTWARE. -------------------------------------------------------------------------------- Dependency : github.com/urso/sderr -Version: v0.0.0-20200210124243-c2a16f3d43ec +Version: v0.0.0-20210525210834-52b04e8f5c71 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/urso/sderr@v0.0.0-20200210124243-c2a16f3d43ec/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/urso/sderr@v0.0.0-20210525210834-52b04e8f5c71/LICENSE: Apache License Version 2.0, January 2004 diff --git a/filebeat/input/filestream/filestream.go b/filebeat/input/filestream/filestream.go index 5e72dc927b95..884fbd213a99 100644 --- a/filebeat/input/filestream/filestream.go +++ b/filebeat/input/filestream/filestream.go @@ -18,6 +18,7 @@ package filestream import ( + "context" "errors" "io" "os" @@ -136,14 +137,14 @@ func (f *logFile) Read(buf []byte) (int, error) { func (f *logFile) startFileMonitoringIfNeeded() { if f.closeInactive > 0 || f.closeRemoved || f.closeRenamed { - f.tg.Go(func(ctx unison.Canceler) error { + f.tg.Go(func(ctx context.Context) error { f.periodicStateCheck(ctx) return nil }) } if f.closeAfterInterval > 0 { - f.tg.Go(func(ctx unison.Canceler) error { + f.tg.Go(func(ctx context.Context) error { f.closeIfTimeout(ctx) return nil }) diff --git a/filebeat/input/filestream/internal/input-logfile/harvester.go b/filebeat/input/filestream/internal/input-logfile/harvester.go index a073a76da8af..7c0551487099 100644 --- a/filebeat/input/filestream/internal/input-logfile/harvester.go +++ b/filebeat/input/filestream/internal/input-logfile/harvester.go @@ -158,10 +158,10 @@ func (hg *defaultHarvesterGroup) Restart(ctx input.Context, s Source) { hg.tg.Go(startHarvester(ctx, hg, s, true)) } -func startHarvester(ctx input.Context, hg *defaultHarvesterGroup, s Source, restart bool) func(canceler unison.Canceler) error { +func startHarvester(ctx input.Context, hg *defaultHarvesterGroup, s Source, restart bool) func(context.Context) error { srcID := hg.identifier.ID(s) - return func(canceler unison.Canceler) error { + return func(canceler context.Context) error { defer func() { if v := recover(); v != nil { err := fmt.Errorf("harvester panic with: %+v\n%s", v, debug.Stack()) @@ -226,7 +226,7 @@ func (hg *defaultHarvesterGroup) Continue(ctx input.Context, previous, next Sour prevID := hg.identifier.ID(previous) nextID := hg.identifier.ID(next) - hg.tg.Go(func(canceler unison.Canceler) error { + hg.tg.Go(func(canceler context.Context) error { previousResource, err := lock(ctx, hg.store, prevID) if err != nil { return fmt.Errorf("error while locking previous resource: %v", err) @@ -252,7 +252,7 @@ func (hg *defaultHarvesterGroup) Continue(ctx input.Context, previous, next Sour // Stop stops the running Harvester for a given Source. func (hg *defaultHarvesterGroup) Stop(s Source) { - hg.tg.Go(func(_ unison.Canceler) error { + hg.tg.Go(func(_ context.Context) error { hg.readers.remove(hg.identifier.ID(s)) return nil }) diff --git a/filebeat/input/filestream/internal/input-logfile/input.go b/filebeat/input/filestream/internal/input-logfile/input.go index 3ba754ccd739..7b1e323c9154 100644 --- a/filebeat/input/filestream/internal/input-logfile/input.go +++ b/filebeat/input/filestream/internal/input-logfile/input.go @@ -70,7 +70,9 @@ func (inp *managedInput) Run( store: groupStore, ackCH: inp.ackCH, identifier: inp.sourceIdentifier, - tg: unison.TaskGroup{}, + tg: unison.TaskGroup{ + OnQuit: unison.ContinueOnErrors, // harvester should keep running if a single harvester errored + }, } prospectorStore := inp.manager.getRetainedStore() diff --git a/filebeat/input/filestream/internal/input-logfile/manager.go b/filebeat/input/filestream/internal/input-logfile/manager.go index 74261e1a685f..2f5317270858 100644 --- a/filebeat/input/filestream/internal/input-logfile/manager.go +++ b/filebeat/input/filestream/internal/input-logfile/manager.go @@ -18,6 +18,7 @@ package input_logfile import ( + "context" "errors" "fmt" "strings" @@ -128,7 +129,7 @@ func (cim *InputManager) Init(group unison.Group, mode v2.Mode) error { store := cim.getRetainedStore() cleaner := &cleaner{log: log} - err := group.Go(func(canceler unison.Canceler) error { + err := group.Go(func(canceler context.Context) error { defer cim.shutdown() defer store.Release() interval := cim.StateStore.CleanupInterval() diff --git a/filebeat/input/filestream/internal/input-logfile/update_writer.go b/filebeat/input/filestream/internal/input-logfile/update_writer.go index 159684e5d2e6..2c6d9c5f4fea 100644 --- a/filebeat/input/filestream/internal/input-logfile/update_writer.go +++ b/filebeat/input/filestream/internal/input-logfile/update_writer.go @@ -21,7 +21,6 @@ import ( "context" "sync" - "github.com/elastic/go-concert/ctxtool" "github.com/elastic/go-concert/unison" ) @@ -62,8 +61,8 @@ func newUpdateWriter(store *store, ch *updateChan) *updateWriter { store: store, ch: ch, } - w.tg.Go(func(ctx unison.Canceler) error { - w.run(ctxtool.FromCanceller(ctx)) + w.tg.Go(func(ctx context.Context) error { + w.run(ctx) return nil }) diff --git a/filebeat/input/v2/input-cursor/manager.go b/filebeat/input/v2/input-cursor/manager.go index 766d6f17fa00..8f70e17c2b23 100644 --- a/filebeat/input/v2/input-cursor/manager.go +++ b/filebeat/input/v2/input-cursor/manager.go @@ -18,6 +18,7 @@ package cursor import ( + "context" "errors" "sync" "time" @@ -119,7 +120,7 @@ func (cim *InputManager) Init(group unison.Group, mode v2.Mode) error { store := cim.store cleaner := &cleaner{log: log} store.Retain() - err := group.Go(func(canceler unison.Canceler) error { + err := group.Go(func(canceler context.Context) error { defer cim.shutdown() defer store.Release() interval := cim.StateStore.CleanupInterval() diff --git a/filebeat/inputsource/common/dgram/server.go b/filebeat/inputsource/common/dgram/server.go index 41a028f7d33a..2f2e98864bb2 100644 --- a/filebeat/inputsource/common/dgram/server.go +++ b/filebeat/inputsource/common/dgram/server.go @@ -103,7 +103,7 @@ func (l *Listener) Start() error { return err } - l.tg.Go(func(ctx unison.Canceler) error { + l.tg.Go(func(ctx context.Context) error { connCtx, connCancel := ctxtool.WithFunc(ctxtool.FromCanceller(ctx), func() { conn.Close() }) diff --git a/go.mod b/go.mod index b84896317c87..683643f5f77e 100644 --- a/go.mod +++ b/go.mod @@ -63,7 +63,7 @@ require ( github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2 github.com/elastic/ecs v1.10.0 github.com/elastic/elastic-agent-client/v7 v7.0.0-20210727140539-f0905d9377f6 - github.com/elastic/go-concert v0.1.0 + github.com/elastic/go-concert v0.2.0 github.com/elastic/go-libaudit/v2 v2.2.0 github.com/elastic/go-licenser v0.3.1 github.com/elastic/go-lookslike v0.3.0 @@ -152,7 +152,7 @@ require ( github.com/tsg/go-daemon v0.0.0-20200207173439-e704b93fd89b github.com/tsg/gopacket v0.0.0-20200626092518-2ab8e397a786 github.com/ugorji/go/codec v1.1.8 - github.com/urso/sderr v0.0.0-20200210124243-c2a16f3d43ec + github.com/urso/sderr v0.0.0-20210525210834-52b04e8f5c71 github.com/vmware/govmomi v0.0.0-20170802214208-2cad15190b41 github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c github.com/yuin/gopher-lua v0.0.0-20170403160031-b402f3114ec7 // indirect diff --git a/go.sum b/go.sum index 9f720f7066ca..dede3be284c6 100644 --- a/go.sum +++ b/go.sum @@ -251,8 +251,8 @@ github.com/elastic/elastic-agent-client/v7 v7.0.0-20210727140539-f0905d9377f6 h1 github.com/elastic/elastic-agent-client/v7 v7.0.0-20210727140539-f0905d9377f6/go.mod h1:uh/Gj9a0XEbYoM4NYz4LvaBVARz3QXLmlNjsrKY9fTc= github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270 h1:cWPqxlPtir4RoQVCpGSRXmLqjEHpJKbR60rxh1nQZY4= github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270/go.mod h1:Msl1pdboCbArMF/nSCDUXgQuWTeoMmE/z8607X+k7ng= -github.com/elastic/go-concert v0.1.0 h1:gz/yvA3bseuHzoF/lNMltkL30XdPqMo+bg5o2mBx2EE= -github.com/elastic/go-concert v0.1.0/go.mod h1:9MtFarjXroUgmm0m6HY3NSe1XiKhdktiNRRj9hWvIaM= +github.com/elastic/go-concert v0.2.0 h1:GAQrhRVXprnNjtvTP9pWJ1d4ToEA4cU5ci7TwTa20xg= +github.com/elastic/go-concert v0.2.0/go.mod h1:HWjpO3IAEJUxOeaJOWXWEp7imKd27foxz9V5vegC/38= github.com/elastic/go-libaudit/v2 v2.2.0 h1:TY3FDpG4Zr9Qnv6KYW6olYr/U+nfu0rD2QAbv75VxMQ= github.com/elastic/go-libaudit/v2 v2.2.0/go.mod h1:MM/l/4xV7ilcl+cIblL8Zn448J7RZaDwgNLE4gNKYPg= github.com/elastic/go-licenser v0.3.1 h1:RmRukU/JUmts+rpexAw0Fvt2ly7VVu6mw8z4HrEzObU= @@ -717,8 +717,8 @@ github.com/urso/magetools v0.0.0-20190919040553-290c89e0c230 h1:Ft1EJ6JL0F/RV6o2 github.com/urso/magetools v0.0.0-20190919040553-290c89e0c230/go.mod h1:DFxTNgS/ExCGmmjVjSOgS2WjtfjKXgCyDzAFgbtovSA= github.com/urso/qcgen v0.0.0-20180131103024-0b059e7db4f4 h1:hhA8EBThzz9PztawVTycKvfETVuBqxAQ5keFlAVtbAw= github.com/urso/qcgen v0.0.0-20180131103024-0b059e7db4f4/go.mod h1:RspW+E2Yb7Fs7HclB2tiDaiu6Rp41BiIG4Wo1YaoXGc= -github.com/urso/sderr v0.0.0-20200210124243-c2a16f3d43ec h1:HkZIDJrMKZHPsYhmH2XjTTSk1pbMCFfpxSnyzZUFm+k= -github.com/urso/sderr v0.0.0-20200210124243-c2a16f3d43ec/go.mod h1:Wp40HwmjM59FkDIVFfcCb9LzBbnc0XAMp8++hJuWvSU= +github.com/urso/sderr v0.0.0-20210525210834-52b04e8f5c71 h1:CehQeKbysHV8J2V7AD0w8NL2x1h04kmmo/Ft5su4lU0= +github.com/urso/sderr v0.0.0-20210525210834-52b04e8f5c71/go.mod h1:Wp40HwmjM59FkDIVFfcCb9LzBbnc0XAMp8++hJuWvSU= github.com/vbatts/tar-split v0.11.1/go.mod h1:LEuURwDEiWjRjwu46yU3KVGuUdVv/dcnpcEPSzR8z6g= github.com/vmware/govmomi v0.0.0-20170802214208-2cad15190b41 h1:NeNpIvfvaFOh0BH7nMEljE5Rk/VJlxhm58M41SeOD20= github.com/vmware/govmomi v0.0.0-20170802214208-2cad15190b41/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59bHWk6aFU=