Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-3849 Remove inline cas handling for xattr config persistence #6850

Merged
merged 3 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 13 additions & 24 deletions base/config_persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,12 @@ type XattrBootstrapPersistence struct {

const cfgXattrKey = "_sync"
const cfgXattrConfigPath = cfgXattrKey + ".config"
const cfgXattrCasPath = cfgXattrKey + ".cas"
const cfgXattrBody = `{"cfgVersion": 1}`

func (xbp *XattrBootstrapPersistence) insertConfig(c *gocb.Collection, key string, value interface{}) (cas uint64, err error) {

mutateOps := []gocb.MutateInSpec{
gocb.UpsertSpec(cfgXattrConfigPath, value, UpsertSpecXattr),
gocb.UpsertSpec(cfgXattrCasPath, gocb.MutationMacroCAS, UpsertSpecXattr),
gocb.ReplaceSpec("", json.RawMessage(cfgXattrBody), nil),
}
options := &gocb.MutateInOptions{
Expand Down Expand Up @@ -91,7 +89,7 @@ func (xbp *XattrBootstrapPersistence) touchConfigRollback(c *gocb.Collection, ke
return result.Cas(), nil
}

// loadRawConfig returns the config and document cas (not cfgCas). Does not restore deleted documents,
// loadRawConfig returns the config and document cas. Does not restore deleted documents,
// to avoid cas collisions with concurrent updates
func (xbp *XattrBootstrapPersistence) loadRawConfig(ctx context.Context, c *gocb.Collection, key string) ([]byte, gocb.Cas, error) {

Expand Down Expand Up @@ -150,9 +148,9 @@ func (xbp *XattrBootstrapPersistence) removeRawConfig(c *gocb.Collection, key st
}

func (xbp *XattrBootstrapPersistence) replaceRawConfig(c *gocb.Collection, key string, value []byte, cas gocb.Cas) (gocb.Cas, error) {

mutateOps := []gocb.MutateInSpec{
gocb.UpsertSpec(cfgXattrConfigPath, bytesToRawMessage(value), UpsertSpecXattr),
gocb.UpsertSpec(cfgXattrCasPath, gocb.MutationMacroCAS, UpsertSpecXattr),
}
options := &gocb.MutateInOptions{
StoreSemantic: gocb.StoreSemanticsReplace,
Expand All @@ -171,7 +169,6 @@ func (xbp *XattrBootstrapPersistence) loadConfig(ctx context.Context, c *gocb.Co

ops := []gocb.LookupInSpec{
gocb.GetSpec(cfgXattrConfigPath, GetSpecXattr),
gocb.GetSpec(cfgXattrCasPath, GetSpecXattr),
gocb.GetSpec("", &gocb.GetSpecOptions{}),
}
lookupOpts := &gocb.LookupInOptions{
Expand All @@ -187,26 +184,19 @@ func (xbp *XattrBootstrapPersistence) loadConfig(ctx context.Context, c *gocb.Co
DebugfCtx(ctx, KeyCRUD, "No xattr config found for key=%s, path=%s: %v", key, cfgXattrConfigPath, xattrContErr)
return 0, ErrNotFound
}

// cas
var strCas string
xattrCasErr := res.ContentAt(1, &strCas)
if xattrCasErr != nil {
DebugfCtx(ctx, KeyCRUD, "No xattr cas found for key=%s, path=%s: %v", key, cfgXattrCasPath, xattrContErr)
return 0, ErrNotFound
}
cfgCas := HexCasToUint64(strCas)
casOut := res.Cas()

// deleted document check - if deleted, restore
var body map[string]interface{}
bodyErr := res.ContentAt(2, &body)
bodyErr := res.ContentAt(1, &body)
if bodyErr != nil {
restoreErr := xbp.restoreDocumentBody(c, key, valuePtr, strCas)
var restoreErr error
casOut, restoreErr = xbp.restoreDocumentBody(c, key, valuePtr, res.Cas())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's an error here, I think casOut will be updated to 0, which might not be correct? The tombstone document will have a cas.

OTOH, the cas isn't super useful

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can also see how if there's no document, then it should have a zero cas. I'm not sure what to do here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're correct - if the restore fails we should return the cas of the current version of the document (the tombstone), since we're returning the tombstone's xattr.

if restoreErr != nil {
WarnfCtx(ctx, "Error attempting to restore unexpected deletion of config: %v", restoreErr)
}
}
return cfgCas, nil
return uint64(casOut), nil
} else if errors.Is(lookupErr, gocbcore.ErrDocumentNotFound) {
DebugfCtx(ctx, KeyCRUD, "No config document found for key=%s", key)
return 0, ErrNotFound
Expand All @@ -215,24 +205,23 @@ func (xbp *XattrBootstrapPersistence) loadConfig(ctx context.Context, c *gocb.Co
}
}

// Restore a deleted document's body. Rewrites metadata, but preserves previous cfgCas
func (xbp *XattrBootstrapPersistence) restoreDocumentBody(c *gocb.Collection, key string, value interface{}, cfgCas string) error {
// Restore a deleted document's body. Rewrites metadata
func (xbp *XattrBootstrapPersistence) restoreDocumentBody(c *gocb.Collection, key string, value interface{}, cas gocb.Cas) (casOut gocb.Cas, err error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We aren't using a cas check here because there aren't cas checks on insert. I think you can remove cas from an argument here.

We are sure that there is no document existing because on gocb.StoreSemanticsInsert

mutateOps := []gocb.MutateInSpec{
gocb.UpsertSpec(cfgXattrConfigPath, value, UpsertSpecXattr),
gocb.UpsertSpec(cfgXattrCasPath, cfgCas, UpsertSpecXattr),
gocb.ReplaceSpec("", json.RawMessage(cfgXattrBody), nil),
}
options := &gocb.MutateInOptions{
StoreSemantic: gocb.StoreSemanticsInsert,
}
_, mutateErr := c.MutateIn(key, mutateOps, options)
result, mutateErr := c.MutateIn(key, mutateOps, options)
if isKVError(mutateErr, memd.StatusKeyExists) {
return ErrAlreadyExists
return 0, ErrAlreadyExists
}
if mutateErr != nil {
return mutateErr
return 0, mutateErr
}
return nil
return result.Cas(), nil
}

// Document Body persistence stores config in the document body.
Expand Down
70 changes: 9 additions & 61 deletions base/config_persistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestXattrConfigPersistence(t *testing.T) {
configBody["sampleConfig"] = "value"
configKey := "testConfigKey"

insertCas, insertErr := cp.insertConfig(c, configKey, configBody)
_, insertErr := cp.insertConfig(c, configKey, configBody)
require.NoError(t, insertErr)

// modify the document body directly in the bucket
Expand All @@ -152,21 +152,19 @@ func TestXattrConfigPersistence(t *testing.T) {
_, reinsertErr := cp.insertConfig(c, configKey, configBody)
require.ErrorIs(t, reinsertErr, ErrAlreadyExists)

// Retrieve the config, cas should still match insertCas
// Retrieve the config
var loadedConfig map[string]interface{}
loadCas, loadErr := cp.loadConfig(ctx, c, configKey, &loadedConfig)
_, loadErr := cp.loadConfig(ctx, c, configKey, &loadedConfig)
require.NoError(t, loadErr)
assert.Equal(t, insertCas, loadCas)
assert.Equal(t, configBody["sampleConfig"], loadedConfig["sampleConfig"])

// set the document to an empty body, shouldn't be treated as delete
err = dataStore.Set(configKey, 0, nil, nil)
require.NoError(t, err)

// Retrieve the config, cas should still match insertCas
loadCas, loadErr = cp.loadConfig(ctx, c, configKey, &loadedConfig)
// Retrieve the config
_, loadErr = cp.loadConfig(ctx, c, configKey, &loadedConfig)
require.NoError(t, loadErr)
assert.Equal(t, insertCas, loadCas)
assert.Equal(t, configBody["sampleConfig"], loadedConfig["sampleConfig"])

// Fetch the document directly from the bucket to verify resurrect handling didn't occur
Expand All @@ -179,69 +177,19 @@ func TestXattrConfigPersistence(t *testing.T) {
deleteErr := dataStore.Delete(configKey)
require.NoError(t, deleteErr)

// Retrieve the config, cas should still match insertCas
loadCas, loadErr = cp.loadConfig(ctx, c, configKey, &loadedConfig)
// Retrieve the config
_, loadErr = cp.loadConfig(ctx, c, configKey, &loadedConfig)
require.NoError(t, loadErr)
assert.Equal(t, insertCas, loadCas)
assert.Equal(t, configBody["sampleConfig"], loadedConfig["sampleConfig"])

// Fetch the document directly from the bucket to verify resurrect handling DID occur
_, err = dataStore.Get(configKey, &docBody)
assert.NoError(t, err)
require.NotNil(t, docBody)

// Retrieve the config, cas should still match insertCas
loadCas, loadErr = cp.loadConfig(ctx, c, configKey, &loadedConfig)
// Retrieve the config
_, loadErr = cp.loadConfig(ctx, c, configKey, &loadedConfig)
require.NoError(t, loadErr)
assert.Equal(t, insertCas, loadCas)
assert.Equal(t, configBody["sampleConfig"], loadedConfig["sampleConfig"])
/*
rawConfig, rawCas, rawErr := cp.loadRawConfig(c, configKey)
require.NoError(t, rawErr)
assert.Equal(t, insertCas, uint64(rawCas))
assert.Equal(t, rawConfigBody, rawConfig)

configBody["updated"] = true
updatedRawBody, marshalErr := JSONMarshal(configBody)
require.NoError(t, marshalErr)

// update with incorrect cas
_, _, updateErr := cp.replaceRawConfig(c, configKey, updatedRawBody, 1234)
require.Error(t, updateErr)

// update with correct cas
updateCas, _, updateErr := cp.replaceRawConfig(c, configKey, updatedRawBody, gocb.Cas(insertCas))
require.NoError(t, updateErr)

// retrieve config, validate updated value
var updatedConfig map[string]interface{}
loadCas, loadErr = cp.loadConfig(c, configKey, &updatedConfig)
require.NoError(t, loadErr)
assert.Equal(t, updateCas, gocb.Cas(loadCas))
assert.Equal(t, configBody["updated"], updatedConfig["updated"])

// retrieve raw config, validate updated value
rawConfig, rawCas, rawErr = cp.loadRawConfig(c, configKey)
require.NoError(t, rawErr)
assert.Equal(t, updateCas, rawCas)
assert.Equal(t, updatedRawBody, rawConfig)

// delete with incorrect cas
_, removeErr := cp.removeRawConfig(c, configKey, gocb.Cas(insertCas))
require.Error(t, removeErr)

// delete with correct cas
_, removeErr = cp.removeRawConfig(c, configKey, updateCas)
require.NoError(t, removeErr)

// attempt to retrieve config, validate not found
var deletedConfig map[string]interface{}
loadCas, loadErr = cp.loadConfig(c, configKey, &deletedConfig)
assert.Equal(t, ErrNotFound, loadErr)

// attempt to retrieve raw config, validate updated value
rawConfig, rawCas, rawErr = cp.loadRawConfig(c, configKey)
assert.Equal(t, ErrNotFound, loadErr)
*/

}
Loading
Loading