-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement memlog on-disk handling #19408
Conversation
Pinging @elastic/integrations-services (Team:Services) |
💚 Build SucceededExpand to view the summary
Build stats
Test stats 🧪
Steps errorsExpand to view the steps failures
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doing this in chunks because of the size -- I finished about half so far, should get to the remainder on Monday
}) | ||
|
||
enc := newJSONEncoder(counting) | ||
if err := enc.Encode(logAction{Op: op.name(), ID: s.txid + 1}); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I notice that most (all?) of the actual references to diskstore.txid
use txid + 1
. Would it make sense to name it nextTxID
instead, and initialize it to 1, so it's clear which value should be assigned to new transactions? If the current approach is preferred due to other context, please add a comment to the definition of txid
clarifying that it refers to the most recently completed transaction id rather than the next available one (I would have left this note there but github doesn't let me comment on code the PR doesn't modify :-) )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
diskstore.txid
is indeed used with txid+1
always. Will rename to nextTxID
as proposed.
return err | ||
} | ||
|
||
fileTxID := s.txid + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
transaction ids seem natural for operations but their role for files is less obvious. Could there be a comment here to describe the intention? E.g. the way it looks to me is something like:
// The checkpoint is assigned the next available transaction id. The first operation
// after a successful checkpoint will be (fileTxID + 1).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment:
// The checkpoint is assigned the next available transaction id. This
// guarantees that all existing log entries are 'older' then the checkpoint
// file and subsequenent operations. The first operation after a successful
// checkpoint will be (fileTxID + 1).
checkpointPath := filepath.Join(s.home, fileName) | ||
|
||
if err := os.Rename(tmpPath, checkpointPath); err != nil { | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there's an error here, should we delete the temporary file?
Related, it looks like checkpointTmpFile
always uses the file name "checkpoint.new"
. This means that if a checkpoint is incomplete (whether because of an explicit error or just incomplete cleanup), it might obstruct the creation of new ones (this will usually not be the case since you use O_TRUNC
etc but it could happen e.g. if the beat is restarted by a different user than previously, or if file permissions are otherwise changed). It would be a bit more robust to include the txid in the temporary file as well, so there will ~never be a conflict.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Restarting as another user that doesn't have access should trigger an error early when opening the store.
Changing permissions life can indeed be a risk, even for this file. I opted to not include the txid in the temporary file, as we continuously will need to try to cleanup if there was some error in the meantime. Messing with permissions on existing files or the directory can lead to other problems as well, so I didn't really account for that. In order to prevent a potential attack on the temporary file I wouldn't use the txid
for the temporary file, but really create a full random name.
I will add logic to delete an existing checkpoint.new
on init time (this is in openStore
which is not shown here), so we can fail with permission errors on Beats start. Plus I will add a os.Remove that tries to delete the checkpoint.new
file if it still exists after the renaming. Any concerns with that?
s.logFileSize = 0 | ||
} | ||
|
||
func updateActiveMarker(log *logp.Logger, home, active string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please document how home
and active
are used by this function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed home
to homePath
and active
to checkpointFilePath
+ add godoc.
// updateActiveMarker overwrites the active.dat file in the home directory with
// the path of the most recent checkpoint file.
// The active file will be written to `<homePath>`/active.dat.
func updateActiveMarker(log *logp.Logger, homePath, checkpointFilePath string) error {
|
||
func updateActiveMarker(log *logp.Logger, home, active string) error { | ||
activeLink := filepath.Join(home, "active.dat") | ||
tmpLink := filepath.Join(home, "active.dat") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
activeLink
and tmpLink
are both assigned the same value, which is probably not intended? It also doesn't match the filenames listed in the comments below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an error. They should be different.
return nil | ||
} | ||
|
||
// removeOldDataFiles sorts the data files by their update sequence number and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment says this sorts the data files, but I don't see any sorting happen. It looks more like it expects an already-sorted data files list. In fact it looks like diskstore.dataFiles
is always required to be sorted by txid, but I'm not sure of that -- can the definition of dataFiles
document whether / when it is required to be sorted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sort happens in listDataFiles
. We keep the sequence ordered by always trying to delete all but the last entry, and appending new entries to the end. Will add a comment to the diskstore
type dataFiles
field that it requires the list to be sorted by ID via isTxIDLessEqual
return | ||
} | ||
|
||
removable, keep := s.dataFiles[:L-1], s.dataFiles[L-1:] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like keep
will always be length 1 in this function, can we just make it a dataFileInfo
and assign it from s.dataFiles[L-1]
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is correct that the length is 1. We advance through the slice by finally assigning (assuming we haven't had an error) s.dataFiles = keep
. If len(dataFiles) == 1
, then this operation becomes a noop.
if len(dataFiles) > 1
, then the active data file is the last one, while the other ones are all considered old. I think I will rather split the slice into two variables: oldDataFiles []dataFileInfo
and activeDataFile dataFileInfo
. Then it is more clear which is which and we can always attempt to remove all entries in oldDataFiles
. This also removes the need to keep the list sorted.
416ed72
to
a8e2007
Compare
jenkins run the tests please |
a8e2007
to
82e3dd0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Found a few comments that need updating but otherwise I am content :-) I don't see the commented top-level design summary anymore, I hope something like it will still be checked in at some point? Otherwise approved.
log = log.With("temporary", tmpLink, "data_file", checkpointFilePath, "link_file", activeLink) | ||
|
||
if checkpointFilePath == "" { | ||
if err := os.Remove(activeLink); err != nil { // try, remove active symlink if present. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the comment above WriteCheckpoint
says active link isn't a symlink, which seems accurate, but the comments / error message in here still say "symlink" -- update?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, it's missing an update.
// loadLogFile returns the last commited txid in logTxid and the total number | ||
// of operations in logCount. | ||
// An incomplete transaction is recorded at the end of the log file, if | ||
// complete is false. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what "complete is false" is referring to here, is this an outdated comment or is it referring to a value this PR doesn't reference explicitly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be removed. It's an artifact of an older change
This change introduces the implementation of the on-disk state handling. Tests are currently not included due to placeholders that still will panic.
- rename txid => nextTxID - introduce consts for file names - try to delete old temporary files and return error if we encounter problems - fix active.dat not correctly creating a temporary file - split dataFiles array into oldDataFiles and extra activeDataFile fields (sorting is not required to be maintained, but on init)
d30e00c
to
4535a84
Compare
…ne-beats * upstream/master: (105 commits) ci: enable packaging job (elastic#19536) ci: disable upstream trigger on PRs for the packaging job (elastic#19490) Implement memlog on-disk handling (elastic#19408) fix go.mod for PR elastic#19423 (elastic#19521) [MetricBeat] add param `aws_partition` to support aws-cn, aws-us-gov regions (elastic#19423) Input v2 stateless manager (elastic#19406) Input v2 compatibility layer (elastic#19401) [Elastic Agent] Fix artifact downloading to allow endpoint-security to be downloaded (elastic#19503) fix: ignore target changes on scans (elastic#19510) Add more helpers to pipeline/testing package (elastic#19405) Report dependencies in CSV format (elastic#19506) [Filebeat] Fix reference leak in TCP and Unix socket inputs (elastic#19459) Cursor input skeleton (elastic#19378) Add changelog. (elastic#19495) [DOC] Typo in Kerberos (elastic#19265) Remove accidentally commited unused NOTICE template (elastic#19485) [Elastic Agent] Support the install, control, and uninstall of Endpoint (elastic#19248) [Filebeat][httpjson] Add split_events_by config setting (elastic#19246) ci: disabling packaging job until we fix it (elastic#19481) Fix golang.org/x/tools to release1.13 (elastic#19478) ...
This change introduces the implementation of the on-disk state handling. Tests are currently not included due to placeholders that still will panic. The final state of the current implementation can be found here: https://github.com/urso/beats/tree/fb-input-v2-combined/libbeat/statestore/backend/memlog The addition of the statestore package is split up into multiple changeset to ease review. The final version of the package can be found [here](https://github.com/urso/beats/tree/fb-input-v2-combined/libbeat/statestore). Once finalized, the libbeat/statestore package contains: - The statestore frontend and interface for use within Beats - Interfaces for the store backend - A common set of tests store backends need to support - a storetest package for testing new features that require a store. The testing helpers use map[string]interface{} that can be initialized or queried after the test run for validation purposes. - The default memlog backend + tests (cherry picked from commit 30f0799)
This change introduces the implementation of the on-disk state handling. Tests are currently not included due to placeholders that still will panic. The final state of the current implementation can be found here: https://github.com/urso/beats/tree/fb-input-v2-combined/libbeat/statestore/backend/memlog The addition of the statestore package is split up into multiple changeset to ease review. The final version of the package can be found [here](https://github.com/urso/beats/tree/fb-input-v2-combined/libbeat/statestore). Once finalized, the libbeat/statestore package contains: - The statestore frontend and interface for use within Beats - Interfaces for the store backend - A common set of tests store backends need to support - a storetest package for testing new features that require a store. The testing helpers use map[string]interface{} that can be initialized or queried after the test run for validation purposes. - The default memlog backend + tests
hai. The current version is 7.12.1. Often disk IO write abnormalities,Is there a switch to turn off this behavior? filebeat version
filebeat version 7.12.1 (amd64), libbeat 7.12.1 [651a2ad1225f3d4420a22eba847de385b71f711d built 2021-04-20 20:58:32 +0000 UTC]
[root@hostname filebeat]# tree
.
|-- filebeat.lock
|-- meta.json
`-- registry
`-- filebeat
|-- 23973819.json
|-- active.dat
|-- checkpoint.new
|-- log.json
`-- meta.json
2 directories, 7 files
[root@hostname filebeat]# cd registry/filebeat/
[root@hostname filebeat]# ll
total 39904
-rw-------. 1 root root 15419626 Jul 5 20:42 24027145.json
-rw-------. 1 root root 49 Jul 5 20:42 active.dat
-rw-------. 1 root root 14880768 Jul 5 20:42 checkpoint.new
-rw-------. 1 root root 10539326 Jul 5 20:42 log.json
-rw-------. 1 root root 15 Jun 2 20:33 meta.json
[root@hostname filebeat]# du -sh *
15M 24053808.json
4.0K active.dat
2.4M checkpoint.new
11M log.json
4.0K meta.json |
What does this PR do?
This change introduces the implementation of the on-disk state handling.
Tests are currently not included due to placeholders that still will
panic. The final state of the current implementation can be found here: https://github.com/urso/beats/tree/fb-input-v2-combined/libbeat/statestore/backend/memlog
The addition of the statestore package is split up into multiple
changeset to ease review. The final version of the package can be found
here.
Once finalized, the libbeat/statestore package contains:
testing helpers use map[string]interface{} that can be initialized or
queried after the test run for validation purposes.
Why is it important?
The statestore introduces a simple key-value store to Beats. The statestore will be used to replace the registry in filebeat in the future.
Checklist
- [ ] I have made corresponding changes to the documentation- [ ] I have made corresponding change to the default configuration files- [ ] I have added tests that prove my fix is effective or that my feature works- [ ] I have added an entry inCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.Related issues