Skip to content

Commit

Permalink
sstable: add meta block for range keys; support writing range keys
Browse files Browse the repository at this point in the history
Range keys (see cockroachdb#1341) will be stored in their own, single block of an
sstable. Add a new, optional meta block, indexed as "pebble.range_key"
in the metablock index, to the sstable structure. This block is only
present when at least one range key has been written to the sstable.

Add the ability to add range keys to an sstable via
`(*sstable.Writer).Write`.

Update existing data-driven tests to support printing of the range key
summary. Add additional test coverage demonstrating writing of range
keys with an `sstable.Writer`.

Add minimal functionality to `sstable.Reader` to support writing the
data-driven test cases for the writer. Additional read-oriented
functionality will be added in a subsequent patch.

Related to cockroachdb#1339.
  • Loading branch information
nicktrav committed Dec 8, 2021
1 parent b10ca3a commit f2d0bb1
Show file tree
Hide file tree
Showing 14 changed files with 395 additions and 29 deletions.
24 changes: 24 additions & 0 deletions internal/rangekey/rangekey.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,30 @@ func Parse(s string) (key base.InternalKey, value []byte) {
}
}

// RecombinedValueLen returns the length of the byte slice that results from
// re-encoding the end key and the user-value as a physical range key value.
func RecombinedValueLen(kind base.InternalKeyKind, endKey, userValue []byte) int {
n := len(endKey)
if kind == base.InternalKeyKindRangeKeyDelete {
// RANGEKEYDELs are not varint encoded.
return n
}
return lenVarint(len(endKey)) + len(endKey) + len(userValue)
}

// RecombineValue re-encodes the end key and user-value as a physical range key
// value into the destination byte slice.
func RecombineValue(kind base.InternalKeyKind, dst, endKey, userValue []byte) int {
if kind == base.InternalKeyKindRangeKeyDelete {
// RANGEKEYDELs are not varint encoded.
return copy(dst, endKey)
}
n := binary.PutUvarint(dst, uint64(len(endKey)))
n += copy(dst[n:], endKey)
n += copy(dst[n:], userValue)
return n
}

func lenVarint(v int) (n int) {
x := uint32(v)
n++
Expand Down
25 changes: 25 additions & 0 deletions internal/rangekey/rangekey_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rangekey

import (
"fmt"
"strconv"
"testing"

"github.com/cockroachdb/pebble/internal/base"
Expand Down Expand Up @@ -146,3 +147,27 @@ func TestParseFormatRoundtrip(t *testing.T) {
}
}
}

func TestRecombinedValueLen_RoundTrip(t *testing.T) {
testCases := []string{
"a.RANGEKEYSET.1: [(@t22=foo),(@t1=bar)]",
"a.RANGEKEYSET.1: [(@t1=bar)]",
"a.RANGEKEYUNSET.1: [@t9,@t8,@t7,@t6,@t5]",
"a.RANGEKEYDEL.5: foo",
}
for i, in := range testCases {
t.Run(strconv.Itoa(i), func(t *testing.T) {
k, v := Parse(in)

// Split the value into an end key and a user-value.
endKey, restValue, ok := DecodeEndKey(k.Kind(), v)
require.True(t, ok)

// Re-encode the end key and user-value.
dst := make([]byte, RecombinedValueLen(k.Kind(), endKey, restValue))
RecombineValue(k.Kind(), dst, endKey, restValue)

require.Equal(t, v, dst)
})
}
}
67 changes: 59 additions & 8 deletions sstable/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/datadriven"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/rangekey"
"github.com/cockroachdb/pebble/vfs"
)

Expand Down Expand Up @@ -68,12 +69,20 @@ func runBuildCmd(
}

w := NewWriter(f0, *writerOpts)
var tombstones []keyspan.Span
f := keyspan.Fragmenter{
var rangeDels []keyspan.Span
rangeDelFrag := keyspan.Fragmenter{
Cmp: DefaultComparer.Compare,
Format: DefaultComparer.FormatKey,
Emit: func(fragmented []keyspan.Span) {
tombstones = append(tombstones, fragmented...)
rangeDels = append(rangeDels, fragmented...)
},
}
var rangeKeys []keyspan.Span
rangeKeyFrag := keyspan.Fragmenter{
Cmp: DefaultComparer.Compare,
Format: DefaultComparer.FormatKey,
Emit: func(fragmented []keyspan.Span) {
rangeKeys = append(rangeKeys, fragmented...)
},
}
for _, data := range strings.Split(td.Input, "\n") {
Expand All @@ -89,7 +98,28 @@ func runBuildCmd(
err = errors.Errorf("%v", r)
}
}()
f.Add(keyspan.Span{Start: key, End: value})
rangeDelFrag.Add(keyspan.Span{Start: key, End: value})
}()
if err != nil {
return nil, nil, err
}
case base.InternalKeyKindRangeKeyDelete,
base.InternalKeyKindRangeKeyUnset,
base.InternalKeyKindRangeKeySet:
var err error
func() {
defer func() {
if r := recover(); r != nil {
err = errors.Errorf("%v", r)
}
}()
key, value := rangekey.Parse(data)
endKey, value, ok := rangekey.DecodeEndKey(key.Kind(), value)
if !ok {
err = errors.New("could not decode end key")
return
}
rangeKeyFrag.Add(keyspan.Span{Start: key, End: endKey, Value: value})
}()
if err != nil {
return nil, nil, err
Expand All @@ -98,15 +128,24 @@ func runBuildCmd(
if err := w.Add(key, value); err != nil {
return nil, nil, err
}

}
}
f.Finish()
for _, v := range tombstones {
rangeDelFrag.Finish()
for _, v := range rangeDels {
if err := w.Add(v.Start, v.End); err != nil {
return nil, nil, err
}
}
rangeKeyFrag.Finish()
for _, v := range rangeKeys {
// Reconstitute the value from the end key and the user value.
n := rangekey.RecombinedValueLen(v.Start.Kind(), v.End, v.Value)
b := make([]byte, n)
_ = rangekey.RecombineValue(v.Start.Kind(), b, v.End, v.Value)
if err := w.Add(v.Start, b); err != nil {
return nil, nil, err
}
}
if err := w.Close(); err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -151,7 +190,19 @@ func runBuildRawCmd(td *datadriven.TestData) (*WriterMetadata, *Reader, error) {
for _, data := range strings.Split(td.Input, "\n") {
j := strings.Index(data, ":")
key := base.ParseInternalKey(data[:j])
value := []byte(data[j+1:])

var value []byte
switch key.Kind() {
case base.InternalKeyKindRangeKeyDelete,
base.InternalKeyKindRangeKeyUnset,
base.InternalKeyKindRangeKeySet:
// Values for range keys must be converted into their "packed" form before
// being added to the Writer.
_, value = rangekey.Parse(data)
default:
value = []byte(data[j+1:])
}

if err := w.Add(key, value); err != nil {
return nil, nil, err
}
Expand Down
5 changes: 5 additions & 0 deletions sstable/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ type Properties struct {
NumMergeOperands uint64 `prop:"rocksdb.merge.operands"`
// The number of range deletions in this table.
NumRangeDeletions uint64 `prop:"rocksdb.num.range-deletions"`
// The number of range keys in this table.
NumRangeKeys uint64 `prop:"pebble.num.range-keys"`
// Timestamp of the earliest key. 0 if unknown.
OldestKeyTime uint64 `prop:"rocksdb.oldest.key.time"`
// The name of the prefix extractor used in this table. Empty if no prefix
Expand Down Expand Up @@ -318,6 +320,9 @@ func (p *Properties) save(w *rawBlockWriter) {
p.saveUvarint(m, unsafe.Offsetof(p.NumDeletions), p.NumDeletions)
p.saveUvarint(m, unsafe.Offsetof(p.NumMergeOperands), p.NumMergeOperands)
p.saveUvarint(m, unsafe.Offsetof(p.NumRangeDeletions), p.NumRangeDeletions)
if p.NumRangeKeys > 0 {
p.saveUvarint(m, unsafe.Offsetof(p.NumRangeKeys), p.NumRangeKeys)
}
p.saveUvarint(m, unsafe.Offsetof(p.OldestKeyTime), p.OldestKeyTime)
if p.PrefixExtractorName != "" {
p.saveString(m, unsafe.Offsetof(p.PrefixExtractorName), p.PrefixExtractorName)
Expand Down
9 changes: 5 additions & 4 deletions sstable/properties_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,14 @@ func TestPropertiesSave(t *testing.T) {
NumEntries: 16,
NumMergeOperands: 17,
NumRangeDeletions: 18,
OldestKeyTime: 19,
NumRangeKeys: 19,
OldestKeyTime: 20,
PrefixExtractorName: "prefix extractor name",
PrefixFiltering: true,
PropertyCollectorNames: "prefix collector names",
RawKeySize: 20,
RawValueSize: 21,
TopLevelIndexSize: 22,
RawKeySize: 21,
RawValueSize: 22,
TopLevelIndexSize: 23,
WholeKeyFiltering: true,
UserProperties: map[string]string{
"user-prop-a": "1",
Expand Down
27 changes: 27 additions & 0 deletions sstable/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2018,6 +2018,7 @@ type Reader struct {
indexBH BlockHandle
filterBH BlockHandle
rangeDelBH BlockHandle
rangeKeyBH BlockHandle
rangeDelTransform blockTransform
propertiesBH BlockHandle
metaIndexBH BlockHandle
Expand Down Expand Up @@ -2133,6 +2134,24 @@ func (r *Reader) NewRawRangeDelIter() (base.InternalIterator, error) {
return i, nil
}

// NewRawRangeKeyIter returns an internal iterator for the contents of the
// range-key block for the table. Returns nil if the table does not contain any
// range keys.
func (r *Reader) NewRawRangeKeyIter() (base.InternalIterator, error) {
if r.rangeKeyBH.Length == 0 {
return nil, nil
}
h, err := r.readRangeKey()
if err != nil {
return nil, err
}
i := &blockIter{}
if err := i.initHandle(r.Compare, h, r.Properties.GlobalSeqNum); err != nil {
return nil, err
}
return i, nil
}

func (r *Reader) readIndex() (cache.Handle, error) {
return r.readBlock(r.indexBH, nil /* transform */, nil /* readaheadState */)
}
Expand All @@ -2145,6 +2164,10 @@ func (r *Reader) readRangeDel() (cache.Handle, error) {
return r.readBlock(r.rangeDelBH, r.rangeDelTransform, nil /* readaheadState */)
}

func (r *Reader) readRangeKey() (cache.Handle, error) {
return r.readBlock(r.rangeKeyBH, nil /* transform */, nil /* readaheadState */)
}

// readBlock reads and decompresses a block from disk into memory.
func (r *Reader) readBlock(
bh BlockHandle, transform blockTransform, raState *readaheadState,
Expand Down Expand Up @@ -2347,6 +2370,10 @@ func (r *Reader) readMetaindex(metaindexBH BlockHandle) error {
}
}

if bh, ok := meta[metaRangeKeyName]; ok {
r.rangeKeyBH = bh
}

for name, fp := range r.opts.Filters {
types := []struct {
ftype FilterType
Expand Down
1 change: 1 addition & 0 deletions sstable/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ const (
levelDBFormatVersion = 0
rocksDBFormatVersion2 = 2

metaRangeKeyName = "pebble.range_key"
metaPropertiesName = "rocksdb.properties"
metaRangeDelName = "rocksdb.range_del"
metaRangeDelV2Name = "rocksdb.range_del2"
Expand Down
7 changes: 7 additions & 0 deletions sstable/testdata/rewriter
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ c_xyz.SET.1:c
----
point: [a_xyz#1,1,c_xyz#1,1]
rangedel: [#0,0,#0,0]
rangekey: [#0,0,#0,0]
seqnums: [1,1]

# rewrite from=xyz to=123 block-size=1 index-block-size=1 filter
Expand All @@ -22,6 +23,7 @@ ca_xyz.SET.1:c
----
point: [aa_xyz#1,1,ca_xyz#1,1]
rangedel: [#0,0,#0,0]
rangekey: [#0,0,#0,0]
seqnums: [1,1]

rewrite from=yz to=23 block-size=1 index-block-size=1 filter comparer-split-4b-suffix
Expand All @@ -39,6 +41,7 @@ c_xyz.SET.1:c
----
point: [a_xyz#1,1,c_xyz#1,1]
rangedel: [#0,0,#0,0]
rangekey: [#0,0,#0,0]
seqnums: [1,1]

layout
Expand Down Expand Up @@ -75,6 +78,7 @@ rewrite from=_xyz to=_123 block-size=1 index-block-size=1 filter comparer-split-
----
point: [a_123#1,1,c_123#1,1]
rangedel: [#0,0,#0,0]
rangekey: [#0,0,#0,0]
seqnums: [1,1]

layout
Expand Down Expand Up @@ -111,6 +115,7 @@ rewrite from=_123 to=_456 block-size=1 index-block-size=1 filter comparer-split-
----
point: [a_456#1,1,c_456#1,1]
rangedel: [#0,0,#0,0]
rangekey: [#0,0,#0,0]
seqnums: [1,1]

layout
Expand Down Expand Up @@ -147,6 +152,7 @@ rewrite from=_456 to=_xyz block-size=1 index-block-size=1 filter comparer-split-
----
point: [a_xyz#1,1,c_xyz#1,1]
rangedel: [#0,0,#0,0]
rangekey: [#0,0,#0,0]
seqnums: [1,1]

layout
Expand Down Expand Up @@ -184,6 +190,7 @@ rewrite from=_xyz to=_123 block-size=1 index-block-size=1 filter comparer-split-
----
point: [a_123#1,1,c_123#1,1]
rangedel: [#0,0,#0,0]
rangekey: [#0,0,#0,0]
seqnums: [1,1]

layout
Expand Down
Loading

0 comments on commit f2d0bb1

Please sign in to comment.