Skip to content

Commit

Permalink
CBG-3913 replace parseStreamWithXattr with sg-bucket function (#6812)
Browse files Browse the repository at this point in the history
* CBG-3913 replace parseStreamWithXattr with sg-bucket function

* Update with changed API

* Restore previous behavior
  • Loading branch information
torcolvin authored May 14, 2024
1 parent f771d8c commit 89637f4
Show file tree
Hide file tree
Showing 10 changed files with 119 additions and 223 deletions.
4 changes: 0 additions & 4 deletions base/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ var (
ErrDocumentMigrated = &sgError{"Document migrated"}
ErrFatalBucketConnection = &sgError{"Fatal error connecting to bucket"}
ErrAuthError = &sgError{"Authentication failure"}
ErrEmptyMetadata = &sgError{"Empty Sync Gateway metadata"}
ErrCasFailureShouldRetry = sgbucket.ErrCasFailureShouldRetry
ErrIndexerError = &sgError{"Indexer error"}
ErrAlreadyExists = &sgError{"Already exists"}
Expand All @@ -53,9 +52,6 @@ var (
// ErrXattrPartialFound is returned if only a subset of requested xattrs are found
ErrXattrPartialFound = &sgError{"Some Requested Xattrs Not Found"}

// ErrXattrInvalidLen is returned if the xattr is corrupt.
ErrXattrInvalidLen = &sgError{"Xattr stream length"}

// ErrPartialViewErrors is returned if the view call contains any partial errors.
// This is more of a warning, and inspecting ViewResult.Errors is required for detail.
ErrPartialViewErrors = &sgError{"Partial errors in view"}
Expand Down
27 changes: 18 additions & 9 deletions db/attachment_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,15 +211,14 @@ func getAttachmentSyncData(dataType uint8, data []byte) (*AttachmentCompactionDa
var documentBody []byte

if dataType&base.MemcachedDataTypeXattr != 0 {
body, xattr, _, err := parseXattrStreamData(base.SyncXattrName, "", data)
body, xattrs, err := sgbucket.DecodeValueWithXattrs([]string{base.SyncXattrName}, data)
if err != nil {
if errors.Is(err, base.ErrXattrNotFound) || errors.Is(err, base.ErrXattrInvalidLen) {
if errors.Is(err, sgbucket.ErrXattrInvalidLen) {
return nil, nil
}
return nil, err
return nil, fmt.Errorf("Could not parse DCP attachment sync data: %w", err)
}

err = base.JSONUnmarshal(xattr, &attachmentData)
err = base.JSONUnmarshal(xattrs[base.SyncXattrName], &attachmentData)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -312,8 +311,9 @@ func attachmentCompactSweepPhase(ctx context.Context, dataStore base.DataStore,

// If the data contains an xattr then the attachment likely has a compaction ID, need to check this value
if event.DataType&base.MemcachedDataTypeXattr != 0 {
_, xattr, _, err := parseXattrStreamData(base.AttachmentCompactionXattrName, "", event.Value)
if err != nil && !errors.Is(err, base.ErrXattrNotFound) {

xattr, err := getAttachmentCompactionXattr(event.Value)
if err != nil {
base.WarnfCtx(ctx, "[%s] Unexpected error occurred attempting to parse attachment xattr: %v", compactionLoggingID, err)
return true
}
Expand Down Expand Up @@ -424,8 +424,8 @@ func attachmentCompactCleanupPhase(ctx context.Context, dataStore base.DataStore
return true
}

_, xattr, _, err := parseXattrStreamData(base.AttachmentCompactionXattrName, "", event.Value)
if err != nil && !errors.Is(err, base.ErrXattrNotFound) {
xattr, err := getAttachmentCompactionXattr(event.Value)
if err != nil {
base.WarnfCtx(ctx, "[%s] Unexpected error occurred attempting to parse attachment xattr: %v", compactionLoggingID, err)
return true
}
Expand Down Expand Up @@ -575,3 +575,12 @@ func GenerateCompactionDCPStreamName(compactionID, compactionAction string) stri
compactionAction,
)
}

// getAttachmentCompactionXattr returns the value of the attachment compaction xattr from a DCP stream. The value will be nil if the xattr is not found.
func getAttachmentCompactionXattr(data []byte) ([]byte, error) {
_, xattrs, err := sgbucket.DecodeValueWithXattrs([]string{base.AttachmentCompactionXattrName}, data)
if err != nil {
return nil, err
}
return xattrs[base.AttachmentCompactionXattrName], nil
}
2 changes: 1 addition & 1 deletion db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) {
if event.DataType != base.MemcachedDataTypeRaw {
base.DebugfCtx(ctx, base.KeyCache, "Unable to unmarshal sync metadata for feed document %q. Will not be included in channel cache. Error: %v", base.UD(docID), err)
}
if err == base.ErrEmptyMetadata {
if errors.Is(err, sgbucket.ErrEmptyMetadata) {
base.WarnfCtx(ctx, "Unexpected empty metadata when processing feed event. docid: %s opcode: %v datatype:%v", base.UD(event.Key), event.Opcode, event.DataType)
}
return
Expand Down
49 changes: 1 addition & 48 deletions db/change_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package db

import (
"context"
"encoding/binary"
"errors"
"fmt"
"log"
Expand Down Expand Up @@ -1952,42 +1951,6 @@ func (f *testDocChangedFeed) reset() {
}
}

// makeFeedBytes creates a DCP mutation message w/ xattr (reverse of parseXattrStreamData)
func makeFeedBytes(xattrKey, xattrValue, docValue string) []byte {
xattrKeyBytes := []byte(xattrKey)
xattrValueBytes := []byte(xattrValue)
docValueBytes := []byte(docValue)
separator := []byte("\x00")

xattrBytes := xattrKeyBytes
xattrBytes = append(xattrBytes, separator...)
xattrBytes = append(xattrBytes, xattrValueBytes...)
xattrBytes = append(xattrBytes, separator...)
xattrLength := len(xattrBytes) + 4 // +4, to include storage for the length bytes

totalXattrLengthBytes := make([]byte, 4)
binary.BigEndian.PutUint32(totalXattrLengthBytes, uint32(xattrLength))
syncXattrLengthBytes := make([]byte, 4)
binary.BigEndian.PutUint32(syncXattrLengthBytes, uint32(xattrLength))

feedBytes := totalXattrLengthBytes
feedBytes = append(feedBytes, syncXattrLengthBytes...)
feedBytes = append(feedBytes, xattrBytes...)
feedBytes = append(feedBytes, docValueBytes...)
return feedBytes
}

func TestMakeFeedBytes(t *testing.T) {

rawBytes := makeFeedBytes(base.SyncPropertyName, `{"rev":"foo"}`, `{"k":"val"}`)

body, xattr, _, err := parseXattrStreamData(base.SyncXattrName, "", rawBytes)
assert.NoError(t, err)
require.Len(t, body, 11)
require.Len(t, xattr, 13)

}

func (f *testDocChangedFeed) Next() sgbucket.FeedEvent {

// Select the next sequence from a source at random. Simulates unordered global sequences arriving over DCP
Expand All @@ -2005,8 +1968,7 @@ func (f *testDocChangedFeed) Next() sgbucket.FeedEvent {
channelName,
)
docBody := fmt.Sprintf(`{"channels":["%s"]}`, channelName)
// docBody := fmt.Sprintf(feedDoc1kFormat, channelName)
value := makeFeedBytes(base.SyncXattrName, xattrValue, docBody)
value := sgbucket.EncodeValueWithXattrs([]byte(docBody), sgbucket.Xattr{Name: base.SyncXattrName, Value: []byte(xattrValue)})

return sgbucket.FeedEvent{
Opcode: sgbucket.FeedOpMutation,
Expand Down Expand Up @@ -2121,15 +2083,6 @@ func BenchmarkDocChanged(b *testing.B) {
}
}

func TestInvalidXattrStream(t *testing.T) {

body, xattr, userXattr, err := parseXattrStreamData(base.SyncXattrName, "", []byte("abcde"))
require.Error(t, err)
require.Nil(t, body)
require.Nil(t, xattr)
require.Nil(t, userXattr)
}

// TestProcessSkippedEntry:
// - Creates change cache with minimal pending seq wait time to push sequences to skipped quick
// - Push a sequence higher than expected to cache
Expand Down
116 changes: 23 additions & 93 deletions db/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"context"
"crypto/sha256"
"encoding/base64"
"encoding/binary"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -433,30 +432,36 @@ func UnmarshalDocumentSyncData(data []byte, needHistory bool) (*SyncData, error)
// Returns the raw body, in case it's needed for import.

// TODO: Using a pool of unmarshal workers may help prevent memory spikes under load
func UnmarshalDocumentSyncDataFromFeed(data []byte, dataType uint8, userXattrKey string, needHistory bool) (result *SyncData, rawBody []byte, rawXattr []byte, rawUserXattr []byte, err error) {
func UnmarshalDocumentSyncDataFromFeed(data []byte, dataType uint8, userXattrKey string, needHistory bool) (result *SyncData, rawBody []byte, rawSyncXattr []byte, rawUserXattr []byte, err error) {

var body []byte

// If attr datatype flag is set, data includes both xattrs and document body. Check for presence of sync xattr.
// If xattr datatype flag is set, data includes both xattrs and document body. Check for presence of sync xattr.
// Note that there could be a non-sync xattr present
if dataType&base.MemcachedDataTypeXattr != 0 {
var syncXattr []byte
body, syncXattr, rawUserXattr, err = parseXattrStreamData(base.SyncXattrName, userXattrKey, data)
var xattrs map[string][]byte
xattrKeys := []string{base.SyncXattrName}
if userXattrKey != "" {
xattrKeys = append(xattrKeys, userXattrKey)
}
body, xattrs, err = sgbucket.DecodeValueWithXattrs(xattrKeys, data)
if err != nil {
return nil, nil, nil, nil, err
}
rawSyncXattr = xattrs[base.SyncXattrName]
rawUserXattr = xattrs[userXattrKey]

// If the sync xattr is present, use that to build SyncData
if syncXattr != nil && len(syncXattr) > 0 {
if len(rawSyncXattr) > 0 {
result = &SyncData{}
if needHistory {
result.History = make(RevTree)
}
err = base.JSONUnmarshal(syncXattr, result)
err = base.JSONUnmarshal(rawSyncXattr, result)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("Found _sync xattr (%q), but could not unmarshal: %w", string(syncXattr), err)
return nil, nil, nil, nil, fmt.Errorf("Found _sync xattr (%q), but could not unmarshal: %w", syncXattr, err)
}
return result, body, syncXattr, rawUserXattr, nil
return result, body, rawSyncXattr, rawUserXattr, nil
}
} else {
// Xattr flag not set - data is just the document body
Expand All @@ -469,93 +474,18 @@ func UnmarshalDocumentSyncDataFromFeed(data []byte, dataType uint8, userXattrKey
}

func UnmarshalDocumentFromFeed(ctx context.Context, docid string, cas uint64, data []byte, dataType uint8, userXattrKey string) (doc *Document, err error) {
var body []byte

if dataType&base.MemcachedDataTypeXattr != 0 {
var syncXattr []byte
var userXattr []byte
body, syncXattr, userXattr, err = parseXattrStreamData(base.SyncXattrName, userXattrKey, data)
if err != nil {
return nil, err
}
return unmarshalDocumentWithXattr(ctx, docid, body, syncXattr, userXattr, cas, DocUnmarshalAll)
if dataType&base.MemcachedDataTypeXattr == 0 {
return unmarshalDocument(docid, data)
}

return unmarshalDocument(docid, data)
}

// parseXattrStreamData returns the raw bytes of the body and the requested xattr (when present) from the raw DCP data bytes.
// Details on format (taken from https://docs.google.com/document/d/18UVa5j8KyufnLLy29VObbWRtoBn9vs8pcxttuMt6rz8/edit#heading=h.caqiui1pmmmb.):
/*
When the XATTR bit is set the first uint32_t in the body contains the size of the entire XATTR section.
Byte/ 0 | 1 | 2 | 3 |
/ | | | |
|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
+---------------+---------------+---------------+---------------+
0| Total xattr length in network byte order |
+---------------+---------------+---------------+---------------+
Following the length you'll find an iovector-style encoding of all of the XATTR key-value pairs with the following encoding:
uint32_t length of next xattr pair (network order)
xattr key in modified UTF-8
0x00
xattr value in modified UTF-8
0x00
The 0x00 byte after the key saves us from storing a key length, and the trailing 0x00 is just for convenience to allow us to use string functions to search in them.
*/

func parseXattrStreamData(xattrName string, userXattrName string, data []byte) (body []byte, xattr []byte, userXattr []byte, err error) {

if len(data) < 4 {
return nil, nil, nil, base.ErrEmptyMetadata
}

xattrsLen := binary.BigEndian.Uint32(data[0:4])
if int(xattrsLen+4) > len(data) {
return nil, nil, nil, fmt.Errorf("%w (%d) from bytes %+v", base.ErrXattrInvalidLen, xattrsLen, data[0:4])
}
body = data[xattrsLen+4:]
if xattrsLen == 0 {
return body, nil, nil, nil
xattrKeys := []string{base.SyncXattrName}
if userXattrKey != "" {
xattrKeys = append(xattrKeys, userXattrKey)
}

// In the xattr key/value pairs, key and value are both terminated by 0x00 (byte(0)). Use this as a separator to split the byte slice
separator := []byte("\x00")

// Iterate over xattr key/value pairs
pos := uint32(4)
for pos < xattrsLen {
pairLen := binary.BigEndian.Uint32(data[pos : pos+4])
if pairLen == 0 || int(pos+pairLen) > len(data) {
return nil, nil, nil, fmt.Errorf("Unexpected xattr pair length (%d) - unable to parse xattrs", pairLen)
}
pos += 4
pairBytes := data[pos : pos+pairLen]
components := bytes.Split(pairBytes, separator)
// xattr pair has the format [key]0x00[value]0x00, and so should split into three components
if len(components) != 3 {
return nil, nil, nil, fmt.Errorf("Unexpected number of components found in xattr pair: %s", pairBytes)
}
xattrKey := string(components[0])
if xattrName == xattrKey {
xattr = components[1]
} else if userXattrName != "" && userXattrName == xattrKey {
userXattr = components[1]
}

// Exit if we have xattrs we want (either both or one if the latter is disabled)
if len(xattr) > 0 && (len(userXattr) > 0 || userXattrName == "") {
return body, xattr, userXattr, nil
}

pos += pairLen
body, xattrs, err := sgbucket.DecodeValueWithXattrs(xattrKeys, data)
if err != nil {
return nil, err
}

return body, xattr, userXattr, nil
return unmarshalDocumentWithXattr(ctx, docid, body, xattrs[base.SyncXattrName], xattrs[userXattrKey], cas, DocUnmarshalAll)
}

func (doc *SyncData) HasValidSyncData() bool {
Expand Down
Loading

0 comments on commit 89637f4

Please sign in to comment.