Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to combine update operations #25976

Merged
merged 9 commits into from
Jul 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Update `fortinet` ingest pipelines. {issue}22136[22136] {issue}25254[25254] {pull}24816[24816]
- Use default add_locale for fortinet.firewall {issue}20300[20300] {pull}26524[26524]
- Add new template functions and `value_type` parameter to `httpjson` transforms. {pull}26847[26847]
- Add support to merge registry updates in the filestream input across multiple ACKed batches in case of backpressure in the registry or disk. {pull}25976[25976]

*Heartbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestGCStore(t *testing.T) {

// create pending update operation
res := store.Get("test::key")
op, err := createUpdateOp(store, res, "test-state-update")
op, err := createUpdateOp(res, "test-state-update")
require.NoError(t, err)
res.Release()

Expand Down
5 changes: 2 additions & 3 deletions filebeat/input/filestream/internal/input-logfile/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ package input_logfile
// Cursor allows the input to check if cursor status has been stored
// in the past and unpack the status into a custom structure.
type Cursor struct {
store *store
resource *resource
}

func makeCursor(store *store, res *resource) Cursor {
return Cursor{store: store, resource: res}
func makeCursor(res *resource) Cursor {
return Cursor{resource: res}
}

// IsNew returns true if no cursor information has been stored
Expand Down
20 changes: 10 additions & 10 deletions filebeat/input/filestream/internal/input-logfile/cursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestCursor_IsNew(t *testing.T) {
store := testOpenStore(t, "test", createSampleStore(t, nil))
defer store.Release()

cursor := makeCursor(store, store.Get("test::key"))
cursor := makeCursor(store.Get("test::key"))
require.True(t, cursor.IsNew())
})

Expand All @@ -38,7 +38,7 @@ func TestCursor_IsNew(t *testing.T) {
}))
defer store.Release()

cursor := makeCursor(store, store.Get("test::key"))
cursor := makeCursor(store.Get("test::key"))
require.True(t, cursor.IsNew())
})

Expand All @@ -48,7 +48,7 @@ func TestCursor_IsNew(t *testing.T) {
}))
defer store.Release()

cursor := makeCursor(store, store.Get("test::key"))
cursor := makeCursor(store.Get("test::key"))
require.False(t, cursor.IsNew())
})

Expand All @@ -59,11 +59,11 @@ func TestCursor_IsNew(t *testing.T) {
defer store.Release()

res := store.Get("test::key")
op, err := createUpdateOp(store, res, "test-state-update")
op, err := createUpdateOp(res, "test-state-update")
require.NoError(t, err)
defer op.done(1)

cursor := makeCursor(store, res)
cursor := makeCursor(res)
require.False(t, cursor.IsNew())
})
}
Expand All @@ -74,7 +74,7 @@ func TestCursor_Unpack(t *testing.T) {
defer store.Release()

var st string
cursor := makeCursor(store, store.Get("test::key"))
cursor := makeCursor(store.Get("test::key"))

require.NoError(t, cursor.Unpack(&st))
require.Equal(t, "", st)
Expand All @@ -87,7 +87,7 @@ func TestCursor_Unpack(t *testing.T) {
defer store.Release()

var st struct{ A uint }
cursor := makeCursor(store, store.Get("test::key"))
cursor := makeCursor(store.Get("test::key"))
require.Error(t, cursor.Unpack(&st))
})

Expand All @@ -98,7 +98,7 @@ func TestCursor_Unpack(t *testing.T) {
defer store.Release()

var st string
cursor := makeCursor(store, store.Get("test::key"))
cursor := makeCursor(store.Get("test::key"))

require.NoError(t, cursor.Unpack(&st))
require.Equal(t, "test", st)
Expand All @@ -111,12 +111,12 @@ func TestCursor_Unpack(t *testing.T) {
defer store.Release()

res := store.Get("test::key")
op, err := createUpdateOp(store, res, "test-state-update")
op, err := createUpdateOp(res, "test-state-update")
require.NoError(t, err)
defer op.done(1)

var st string
cursor := makeCursor(store, store.Get("test::key"))
cursor := makeCursor(store.Get("test::key"))

require.NoError(t, cursor.Unpack(&st))
require.Equal(t, "test-state-update", st)
Expand Down
5 changes: 3 additions & 2 deletions filebeat/input/filestream/internal/input-logfile/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ type defaultHarvesterGroup struct {
harvester Harvester
cleanTimeout time.Duration
store *store
ackCH *updateChan
identifier *sourceIdentifier
tg unison.TaskGroup
}
Expand Down Expand Up @@ -191,7 +192,7 @@ func startHarvester(ctx input.Context, hg *defaultHarvesterGroup, s Source, rest

client, err := hg.pipeline.ConnectWith(beat.ClientConfig{
CloseRef: ctx.Cancelation,
ACKHandler: newInputACKHandler(ctx.Logger),
ACKHandler: newInputACKHandler(hg.ackCH, ctx.Logger),
})
if err != nil {
hg.readers.remove(srcID)
Expand All @@ -200,7 +201,7 @@ func startHarvester(ctx input.Context, hg *defaultHarvesterGroup, s Source, rest
defer client.Close()

hg.store.UpdateTTL(resource, hg.cleanTimeout)
cursor := makeCursor(hg.store, resource)
cursor := makeCursor(resource)
publisher := &cursorPublisher{canceler: ctx.Cancelation, client: client, cursor: &cursor}

err = hg.harvester.Run(ctx, s, cursor, publisher)
Expand Down
8 changes: 6 additions & 2 deletions filebeat/input/filestream/internal/input-logfile/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
type managedInput struct {
userID string
manager *InputManager
ackCH *updateChan
sourceIdentifier *sourceIdentifier
prospector Prospector
harvester Harvester
Expand Down Expand Up @@ -67,6 +68,7 @@ func (inp *managedInput) Run(
cleanTimeout: inp.cleanTimeout,
harvester: inp.harvester,
store: groupStore,
ackCH: inp.ackCH,
identifier: inp.sourceIdentifier,
tg: unison.TaskGroup{},
}
Expand All @@ -80,7 +82,7 @@ func (inp *managedInput) Run(
return nil
}

func newInputACKHandler(log *logp.Logger) beat.ACKer {
func newInputACKHandler(ch *updateChan, log *logp.Logger) beat.ACKer {
return acker.EventPrivateReporter(func(acked int, private []interface{}) {
var n uint
var last int
Expand All @@ -101,6 +103,8 @@ func newInputACKHandler(log *logp.Logger) beat.ACKer {
if n == 0 {
return
}
private[last].(*updateOp).Execute(n)

op := private[last].(*updateOp)
ch.Send(scheduledUpdate{op: op, n: n})
})
}
13 changes: 10 additions & 3 deletions filebeat/input/filestream/internal/input-logfile/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@ type InputManager struct {
// that will be used to collect events from each source.
Configure func(cfg *common.Config) (Prospector, Harvester, error)

initOnce sync.Once
initErr error
store *store
initOnce sync.Once
initErr error
store *store
ackUpdater *updateWriter
ackCH *updateChan
}

// Source describe a source the input can collect data from.
Expand Down Expand Up @@ -104,6 +106,8 @@ func (cim *InputManager) init() error {
}

cim.store = store
cim.ackCH = newUpdateChan()
cim.ackUpdater = newUpdateWriter(store, cim.ackCH)
})

return cim.initErr
Expand Down Expand Up @@ -144,6 +148,7 @@ func (cim *InputManager) Init(group unison.Group, mode v2.Mode) error {
}

func (cim *InputManager) shutdown() {
cim.ackUpdater.Close()
cim.store.Release()
}

Expand Down Expand Up @@ -178,6 +183,7 @@ func (cim *InputManager) Create(config *common.Config) (input.Input, error) {

pStore := cim.getRetainedStore()
defer pStore.Release()

prospectorStore := newSourceStore(pStore, sourceIdentifier)
err = prospector.Init(prospectorStore)
if err != nil {
Expand All @@ -186,6 +192,7 @@ func (cim *InputManager) Create(config *common.Config) (input.Input, error) {

return &managedInput{
manager: cim,
ackCH: cim.ackCH,
userID: settings.ID,
prospector: prospector,
harvester: harvester,
Expand Down
48 changes: 21 additions & 27 deletions filebeat/input/filestream/internal/input-logfile/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ type cursorPublisher struct {
// instances can add update operations to be executed after already pending
// update operations from older inputs instances that have been shutdown.
type updateOp struct {
store *store
resource *resource

// state updates to persist
Expand All @@ -59,6 +58,18 @@ type updateOp struct {
delta interface{}
}

func newUpdateOp(resource *resource, ts time.Time, delta interface{}) *updateOp {
return &updateOp{
resource: resource,
timestamp: ts,
delta: delta,
}
}

func (op *updateOp) Key() string {
return op.resource.key
}

// Publish publishes an event. Publish returns false if the inputs cancellation context has been marked as done.
// If cursorUpdate is not nil, Publish updates the in memory state and create and updateOp for the pending update.
// It overwrite event.Private with the update operation, before finally sending the event.
Expand All @@ -69,7 +80,7 @@ func (c *cursorPublisher) Publish(event beat.Event, cursorUpdate interface{}) er
return c.forward(event)
}

op, err := createUpdateOp(c.cursor.store, c.cursor.resource, cursorUpdate)
op, err := createUpdateOp(c.cursor.resource, cursorUpdate)
if err != nil {
return err
}
Expand All @@ -86,43 +97,27 @@ func (c *cursorPublisher) forward(event beat.Event) error {
return c.canceler.Err()
}

func createUpdateOp(store *store, resource *resource, updates interface{}) (*updateOp, error) {
func createUpdateOp(resource *resource, updates interface{}) (*updateOp, error) {
ts := time.Now()

resource.stateMutex.Lock()
defer resource.stateMutex.Unlock()

cursor := resource.pendingCursor
if resource.activeCursorOperations == 0 {
var tmp interface{}
typeconv.Convert(&tmp, cursor)
resource.pendingCursor = tmp
cursor = tmp
}
if err := typeconv.Convert(&cursor, updates); err != nil {
return nil, err
}
resource.pendingCursor = cursor
resource.pendingUpdate = updates
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not apply the update directly anymore. Instead we store the most recent delta with the resource. The delta is applied lazily when the current pendingCursor state is required for real. This removes an 'expensive' operation from the hot path.


resource.Retain()
resource.activeCursorOperations++
return &updateOp{
resource: resource,
store: store,
timestamp: ts,
delta: updates,
}, nil
return newUpdateOp(resource, ts, updates), nil
}

// done releases resources held by the last N updateOps.
func (op *updateOp) done(n uint) {
op.resource.UpdatesReleaseN(n)
op.resource = nil
*op = updateOp{}
}

// Execute updates the persistent store with the scheduled changes and releases the resource.
func (op *updateOp) Execute(n uint) {
func (op *updateOp) Execute(store *store, n uint) {
resource := op.resource

resource.stateMutex.Lock()
Expand All @@ -135,8 +130,8 @@ func (op *updateOp) Execute(n uint) {
defer op.done(n)
resource.activeCursorOperations -= n
if resource.activeCursorOperations == 0 {
resource.cursor = resource.pendingCursor
resource.pendingCursor = nil
resource.cursor = resource.pendingCursor()
resource.pendingCursorValue = nil
} else {
typeconv.Convert(&resource.cursor, op.delta)
}
Expand All @@ -145,13 +140,12 @@ func (op *updateOp) Execute(n uint) {
resource.internalState.Updated = op.timestamp
}

err := op.store.persistentStore.Set(resource.key, resource.inSyncStateSnapshot())
err := store.persistentStore.Set(resource.key, resource.inSyncStateSnapshot())
if err != nil {
if !statestore.IsClosed(err) {
op.store.log.Errorf("Failed to update state in the registry for '%v'", resource.key)
store.log.Errorf("Failed to update state in the registry for '%v'", resource.key)
}
} else {
resource.internalInSync = true
resource.stored = true
}
}
Loading