Skip to content

Commit

Permalink
CBG-3835 catch error in case there is a valid json but not a JSON obj…
Browse files Browse the repository at this point in the history
…ect (#6756)

* Remove logging for invalid JSON in sync attribute

log in the case where there is _sync xattr but the body has become non
json

* Update log messages

* Update db/import_listener.go

Co-authored-by: Adam Fraser <adam.fraser@couchbase.com>

---------

Co-authored-by: Adam Fraser <adam.fraser@couchbase.com>
  • Loading branch information
torcolvin and adamcfraser authored Apr 17, 2024
1 parent 1d15ac7 commit 64bad65
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 12 deletions.
4 changes: 2 additions & 2 deletions db/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func UnmarshalDocumentSyncData(data []byte, needHistory bool) (*SyncData, error)
root.SyncData = &SyncData{History: make(RevTree)}
}
if err := base.JSONUnmarshal(data, &root); err != nil {
return nil, err
return nil, fmt.Errorf("Could not unmarshal _sync out of document body: %w", err)
}
if root.SyncData != nil && root.SyncData.Deleted_OLD {
root.SyncData.Deleted_OLD = false
Expand Down Expand Up @@ -454,7 +454,7 @@ func UnmarshalDocumentSyncDataFromFeed(data []byte, dataType uint8, userXattrKey
}
err = base.JSONUnmarshal(syncXattr, result)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, fmt.Errorf("Found _sync xattr (%q), but could not unmarshal: %w", string(syncXattr), err)
}
return result, body, syncXattr, rawUserXattr, nil
}
Expand Down
1 change: 1 addition & 0 deletions db/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (db *DatabaseCollectionWithUser) ImportDocRaw(ctx context.Context, docid st
} else {
err := body.Unmarshal(value)
if err != nil {
db.dbStats().SharedBucketImport().ImportErrorCount.Add(1)
base.InfofCtx(ctx, base.KeyImport, "Unmarshal error during importDoc %v", err)
return nil, base.HTTPErrorf(http.StatusNotFound, "Error unmarshalling %s: %s", base.UD(docid).Redact(), err)
}
Expand Down
8 changes: 4 additions & 4 deletions db/import_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (il *importListener) ProcessFeedEvent(event sgbucket.FeedEvent) (shouldPers

// If this is a binary document we can ignore, but update checkpoint to avoid reprocessing upon restart
if event.DataType == base.MemcachedDataTypeRaw {
base.InfofCtx(ctx, base.KeyImport, "Ignoring binary mutation event for %s.", base.UD(docID))
base.DebugfCtx(ctx, base.KeyImport, "Ignoring binary mutation event for %s.", base.UD(docID))
return true
}

Expand All @@ -171,10 +171,10 @@ func (il *importListener) ImportFeedEvent(ctx context.Context, collection *Datab
if err != nil {
if err == base.ErrEmptyMetadata {
base.WarnfCtx(ctx, "Unexpected empty metadata when processing feed event. docid: %s opcode: %v datatype:%v", base.UD(event.Key), event.Opcode, event.DataType)
} else {
base.WarnfCtx(ctx, "Found sync metadata, but unable to unmarshal for feed document %q. Will not be imported. Error: %v", base.UD(event.Key), err)
il.importStats.ImportErrorCount.Add(1)
return
}
il.importStats.ImportErrorCount.Add(1)
base.DebugfCtx(ctx, base.KeyImport, "%s will not be imported: %v", base.UD(event.Key), err)
return
}

Expand Down
129 changes: 123 additions & 6 deletions db/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,8 +562,9 @@ func TestImportNonZeroStart(t *testing.T) {
require.Equal(t, revID1, doc.SyncData.CurrentRev)
}

// TestImportInvalidMetadata tests triggering an import error if the metadata is unmarshalable
func TestImportInvalidMetadata(t *testing.T) {
// TestImportFeedInvalidInlineSyncMetadata tests avoiding an import error if the metadata is unmarshable
func TestImportFeedInvalidInlineSyncMetadata(t *testing.T) {
base.SetUpTestLogging(t, base.LevelDebug, base.KeyMigrate, base.KeyImport)
base.SkipImportTestsIfNotEnabled(t)
bucket := base.GetTestBucket(t)
defer bucket.Close(base.TestCtx(t))
Expand All @@ -575,13 +576,129 @@ func TestImportInvalidMetadata(t *testing.T) {
require.Equal(t, int64(0), db.DbStats.SharedBucketImport().ImportCount.Value())
require.Equal(t, int64(0), db.DbStats.SharedBucketImport().ImportErrorCount.Value())

// write a document with inline sync metadata that is unmarshalable, triggering an import error
// can't write a document with invalid sync metadata as an xattr, so rely on legacy behavior
_, err := bucket.GetSingleDataStore().Add("doc1", 0, `{"foo" : "bar", "_sync" : 1 }`)
// docs named so they will both be on vBucket 1 in both 64 and 1024 vbuckets
const (
doc1 = "bookstand"
doc2 = "chipchop"
)
// write a document with inline sync metadata that not unmarshalable into SyncData. This document will be ignored and logged at debug level.
// [DBG] .. col:sg_test_0 <ud>bookstand</ud> not able to be imported. Error: Could not unmarshal _sync out of document body: json: cannot unmarshal number into Go struct field documentRoot._sync of type db.SyncData
_, err := bucket.GetSingleDataStore().Add(doc1, 0, []byte(`{"foo" : "bar", "_sync" : 1 }`))
require.NoError(t, err)

// this will be imported
err = bucket.GetSingleDataStore().Set(doc2, 0, nil, []byte(`{"foo" : "bar"}`))
require.NoError(t, err)

base.RequireWaitForStat(t, func() int64 {
return db.DbStats.SharedBucketImport().ImportErrorCount.Value()
return db.DbStats.SharedBucketImport().ImportCount.Value()
}, 1)
require.Equal(t, int64(0), db.DbStats.SharedBucketImport().ImportErrorCount.Value())
}

func TestImportFeedInvalidSyncMetadata(t *testing.T) {
base.SetUpTestLogging(t, base.LevelDebug, base.KeyMigrate, base.KeyImport)
base.SkipImportTestsIfNotEnabled(t)
bucket := base.GetTestBucket(t)
defer bucket.Close(base.TestCtx(t))

db, ctx := setupTestDBWithOptionsAndImport(t, bucket, DatabaseContextOptions{})
defer db.Close(ctx)

// make sure no documents are imported
require.Equal(t, int64(0), db.DbStats.SharedBucketImport().ImportCount.Value())
require.Equal(t, int64(0), db.DbStats.SharedBucketImport().ImportErrorCount.Value())

// docs named so they will both be on vBucket 1 in both 64 and 1024 vbuckets
const (
doc1 = "bookstand"
doc2 = "chipchop"
)

// this document will be ignored for input with debug logging as follows:
// [DBG] .. col:sg_test_0 <ud>bookstand</ud> not able to be imported. Error: Found _sync xattr ("1"), but could not unmarshal: json: cannot unmarshal number into Go value of type db.SyncData
_, err := bucket.GetSingleDataStore().WriteWithXattrs(ctx, doc1, 0, 0, []byte(`{"foo" : "bar"}`), map[string][]byte{base.SyncXattrName: []byte(`1`)}, nil)
require.NoError(t, err)

// fix xattrs, and the document is able to be imported
_, err = bucket.GetSingleDataStore().WriteWithXattrs(ctx, doc2, 0, 0, []byte(`{"foo" : "bar"}`), map[string][]byte{base.SyncXattrName: []byte(`{}`)}, nil)
require.NoError(t, err)

base.RequireWaitForStat(t, func() int64 {
return db.DbStats.SharedBucketImport().ImportCount.Value()
}, 1)
require.Equal(t, int64(0), db.DbStats.SharedBucketImport().ImportErrorCount.Value())
}

func TestImportFeedNonJSONNewDoc(t *testing.T) {
base.SetUpTestLogging(t, base.LevelDebug, base.KeyMigrate, base.KeyImport)
base.SkipImportTestsIfNotEnabled(t)
bucket := base.GetTestBucket(t)
defer bucket.Close(base.TestCtx(t))

db, ctx := setupTestDBWithOptionsAndImport(t, bucket, DatabaseContextOptions{})
defer db.Close(ctx)

// make sure no documents are imported
require.Equal(t, int64(0), db.DbStats.SharedBucketImport().ImportCount.Value())
require.Equal(t, int64(0), db.DbStats.SharedBucketImport().ImportErrorCount.Value())

// docs named so they will both be on vBucket 1 in both 64 and 1024 vbuckets
const (
doc1 = "bookstand"
doc2 = "chipchop"
)

// logs because a JSON number is not a JSON object
// [DBG] .. col:sg_test_0 <ud>bookstand</ud> not able to be imported. Error: Could not unmarshal _sync out of document body: json: cannot unmarshal number into Go value of type db.documentRoot
_, err := bucket.GetSingleDataStore().Add(doc1, 0, []byte(`1`))
require.NoError(t, err)

_, err = bucket.GetSingleDataStore().Add(doc2, 0, []byte(`{"foo" : "bar"}`))
require.NoError(t, err)

base.RequireWaitForStat(t, func() int64 {
return db.DbStats.SharedBucketImport().ImportCount.Value()
}, 1)
require.Equal(t, int64(0), db.DbStats.SharedBucketImport().ImportErrorCount.Value())
}

func TestImportFeedNonJSONExistingDoc(t *testing.T) {
base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyMigrate, base.KeyImport)
base.SkipImportTestsIfNotEnabled(t)
bucket := base.GetTestBucket(t)
defer bucket.Close(base.TestCtx(t))

db, ctx := setupTestDBWithOptionsAndImport(t, bucket, DatabaseContextOptions{})
defer db.Close(ctx)

// make sure no documents are imported
require.Equal(t, int64(0), db.DbStats.SharedBucketImport().ImportCount.Value())
require.Equal(t, int64(0), db.DbStats.SharedBucketImport().ImportErrorCount.Value())

// docs named so they will both be on vBucket 1 in both 64 and 1024 vbuckets
const (
doc1 = "bookstand"
doc2 = "chipchop"
)

_, err := bucket.GetSingleDataStore().Add(doc1, 0, []byte(`{"foo": "bar"}`))
require.NoError(t, err)

base.RequireWaitForStat(t, func() int64 {
return db.DbStats.SharedBucketImport().ImportCount.Value()
}, 1)

// logs and increments ImportErrorCount
// [INF] .. col:sg_test_0 Unmarshal error during importDoc json: cannot unmarshal number into Go value of type db.Body
err = bucket.GetSingleDataStore().Set(doc1, 0, nil, []byte(`1`))
require.NoError(t, err)

_, err = bucket.GetSingleDataStore().Add(doc2, 0, []byte(`{"foo" : "bar"}`))
require.NoError(t, err)

base.RequireWaitForStat(t, func() int64 {
return db.DbStats.SharedBucketImport().ImportCount.Value()
}, 2)
require.Equal(t, int64(1), db.DbStats.SharedBucketImport().ImportErrorCount.Value())
}

0 comments on commit 64bad65

Please sign in to comment.