Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

User Defined Timestamp APIs #77

Merged
merged 10 commits into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 179 additions & 0 deletions cf_ts_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package grocksdb

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestColumnFamilyPutGetDeleteWithTS(t *testing.T) {
dir := t.TempDir()

givenNames := []string{"default", "guide"}
opts := NewDefaultOptions()
opts.SetCreateIfMissingColumnFamilies(true)
opts.SetCreateIfMissing(true)
opts.SetCompression(SnappyCompression)
opts.SetComparator(newDefaultComparatorWithTS())
db, cfh, err := OpenDbColumnFamilies(opts, dir, givenNames, []*Options{opts, opts})
require.Nil(t, err)
defer db.Close()
require.EqualValues(t, len(cfh), 2)
defer cfh[0].Destroy()
defer cfh[1].Destroy()

wo := NewDefaultWriteOptions()
defer wo.Destroy()
ro := NewDefaultReadOptions()
defer ro.Destroy()

givenKey0 := []byte("hello0")
givenKey1 := []byte("hello1")
givenVal0 := []byte("world0")
givenVal1 := []byte("world1")
givenTs0 := marshalTimestamp(1)
givenTs1 := marshalTimestamp(2)
givenTs2 := marshalTimestamp(3)

{
ro.SetTimestamp(givenTs2)

require.Nil(t, db.PutCFWithTS(wo, cfh[0], givenKey0, givenTs0, givenVal0))
actualVal0, actualTs0, err := db.GetCFWithTS(ro, cfh[0], givenKey0)
defer actualVal0.Free()
defer actualTs0.Free()
require.Nil(t, err)
require.EqualValues(t, actualVal0.Data(), givenVal0)
require.EqualValues(t, actualTs0.Data(), givenTs0)

require.Nil(t, db.PutCFWithTS(wo, cfh[1], givenKey1, givenTs1, givenVal1))
actualVal1, actualTs1, err := db.GetCFWithTS(ro, cfh[1], givenKey1)
defer actualVal1.Free()
defer actualTs1.Free()
require.Nil(t, err)
require.EqualValues(t, actualVal1.Data(), givenVal1)
require.EqualValues(t, actualTs1.Data(), givenTs1)

actualVal, actualTs, err := db.GetCFWithTS(ro, cfh[0], givenKey1)
defer actualVal.Free()
defer actualTs.Free()
require.Nil(t, err)
require.EqualValues(t, actualVal.Size(), 0)
require.EqualValues(t, actualTs.Size(), 0)

actualVal, actualTs, err = db.GetCFWithTS(ro, cfh[1], givenKey0)
defer actualVal.Free()
defer actualTs.Free()
require.Nil(t, err)
require.EqualValues(t, actualVal.Size(), 0)
require.EqualValues(t, actualTs.Size(), 0)

require.Nil(t, db.DeleteCFWithTS(wo, cfh[0], givenKey0, givenTs2))
actualVal, actualTs, err = db.GetCFWithTS(ro, cfh[0], givenKey0)
defer actualVal.Free()
defer actualTs.Free()
require.Nil(t, err)
require.EqualValues(t, actualVal.Size(), 0)
require.EqualValues(t, actualTs.Size(), 0)
}

{
require.Nil(t, db.PutCFWithTS(wo, cfh[0], givenKey0, givenTs2, givenVal0))
actualVal0, actualTs0, err := db.GetCFWithTS(ro, cfh[0], givenKey0)
defer actualVal0.Free()
defer actualTs0.Free()
require.Nil(t, err)
require.EqualValues(t, actualVal0.Data(), givenVal0)
require.EqualValues(t, actualTs0.Data(), givenTs2)

actualVal1, actualTs1, err := db.GetCFWithTS(ro, cfh[1], givenKey1)
defer actualVal1.Free()
defer actualTs1.Free()
require.Nil(t, err)
require.EqualValues(t, actualVal1.Data(), givenVal1)
require.EqualValues(t, actualTs1.Data(), givenTs1)
}
}

func TestColumnFamilyMultiGetWithTS(t *testing.T) {
db, cfh, cleanup := newTestDBMultiCF(t, []string{"default", "custom"}, func(opts *Options) {
opts.SetComparator(newDefaultComparatorWithTS())
})
defer cleanup()

var (
givenKey1 = []byte("hello1")
givenKey2 = []byte("hello2")
givenKey3 = []byte("hello3")
givenVal1 = []byte("world1")
givenVal2 = []byte("world2")
givenVal3 = []byte("world3")
givenTs1 = marshalTimestamp(1)
givenTs2 = marshalTimestamp(2)
givenTs3 = marshalTimestamp(3)
)

wo := NewDefaultWriteOptions()
defer wo.Destroy()

ro := NewDefaultReadOptions()
ro.SetTimestamp(givenTs3)
defer ro.Destroy()

// create
require.Nil(t, db.PutCFWithTS(wo, cfh[0], givenKey1, givenTs1, givenVal1))
require.Nil(t, db.PutCFWithTS(wo, cfh[1], givenKey2, givenTs2, givenVal2))
require.Nil(t, db.PutCFWithTS(wo, cfh[1], givenKey3, givenTs3, givenVal3))

// column family 0 only has givenKey1
values, times, err := db.MultiGetCFWithTS(ro, cfh[0], []byte("noexist"), givenKey1, givenKey2, givenKey3)
defer values.Destroy()
defer times.Destroy()
require.Nil(t, err)
require.EqualValues(t, len(values), 4)

require.EqualValues(t, values[0].Data(), []byte(nil))
require.EqualValues(t, values[1].Data(), givenVal1)
require.EqualValues(t, values[2].Data(), []byte(nil))
require.EqualValues(t, values[3].Data(), []byte(nil))

require.EqualValues(t, times[0].Data(), []byte(nil))
require.EqualValues(t, times[1].Data(), givenTs1)
require.EqualValues(t, times[2].Data(), []byte(nil))
require.EqualValues(t, times[3].Data(), []byte(nil))

// column family 1 only has givenKey2 and givenKey3
values, times, err = db.MultiGetCFWithTS(ro, cfh[1], []byte("noexist"), givenKey1, givenKey2, givenKey3)
defer values.Destroy()
defer times.Destroy()
require.Nil(t, err)
require.EqualValues(t, len(values), 4)

require.EqualValues(t, values[0].Data(), []byte(nil))
require.EqualValues(t, values[1].Data(), []byte(nil))
require.EqualValues(t, values[2].Data(), givenVal2)
require.EqualValues(t, values[3].Data(), givenVal3)

require.EqualValues(t, times[0].Data(), []byte(nil))
require.EqualValues(t, times[1].Data(), []byte(nil))
require.EqualValues(t, times[2].Data(), givenTs2)
require.EqualValues(t, times[3].Data(), givenTs3)

// getting them all from the right CF should return them all
values, times, err = db.MultiGetMultiCFWithTS(ro,
ColumnFamilyHandles{cfh[0], cfh[1], cfh[1]},
[][]byte{givenKey1, givenKey2, givenKey3},
)
defer values.Destroy()
defer times.Destroy()
require.Nil(t, err)
require.EqualValues(t, len(values), 3)

require.EqualValues(t, values[0].Data(), givenVal1)
require.EqualValues(t, values[1].Data(), givenVal2)
require.EqualValues(t, values[2].Data(), givenVal3)

require.EqualValues(t, times[0].Data(), givenTs1)
require.EqualValues(t, times[1].Data(), givenTs2)
require.EqualValues(t, times[2].Data(), givenTs3)
}
66 changes: 60 additions & 6 deletions comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,63 @@ import (
// < 0 iff "a" < "b",
// == 0 iff "a" == "b",
// > 0 iff "a" > "b"
// Note that Compare(a, b) also compares timestamp if timestamp size is
// non-zero. For the same user key with different timestamps, larger (newer)
// timestamp comes first.
type Comparing = func(a, b []byte) int

// ComparingWithoutTimestamp functor.
//
// Three-way comparison. Returns value:
// < 0 if "a" < "b",
// == 0 if "a" == "b",
// > 0 if "a" > "b"
type ComparingWithoutTimestamp = func(a []byte, aHasTs bool, b []byte, bHasTs bool) int

// NewComparator creates a Comparator object which contains native c-comparator pointer.
func NewComparator(name string, compare Comparing) *Comparator {
cmp := &Comparator{name: name, compare: compare}
cmp := &Comparator{
name: name,
compare: compare,
}
idx := registerComperator(cmp)
cmp.c = C.gorocksdb_comparator_create(C.uintptr_t(idx))
return cmp
}

// NewComparatorWithTimestamp creates a Timestamp Aware Comparator object which contains native c-comparator pointer.
func NewComparatorWithTimestamp(name string, tsSize uint64, compare, compareTs Comparing, compareWithoutTs ComparingWithoutTimestamp) *Comparator {
cmp := &Comparator{
name: name,
tsSize: tsSize,
compare: compare,
compareTs: compareTs,
compareWithoutTs: compareWithoutTs,
}
idx := registerComperator(cmp)
cmp.c = C.gorocksdb_comparator_with_ts_create(C.uintptr_t(idx), C.size_t(tsSize))
return cmp
}

// NativeComparator wraps c-comparator pointer.
type Comparator struct {
c *C.rocksdb_comparator_t
compare Comparing
name string
c *C.rocksdb_comparator_t

name string
tsSize uint64

compare Comparing
compareTs Comparing
compareWithoutTs ComparingWithoutTimestamp
}

func (c *Comparator) Compare(a, b []byte) int { return c.compare(a, b) }
func (c *Comparator) Name() string { return c.name }
func (c *Comparator) Compare(a, b []byte) int { return c.compare(a, b) }
func (c *Comparator) CompareTimestamp(a, b []byte) int { return c.compareTs(a, b) }
func (c *Comparator) CompareWithoutTimestamp(a []byte, aHasTs bool, b []byte, bHasTs bool) int {
return c.compareWithoutTs(a, aHasTs, b, bHasTs)
}
func (c *Comparator) Name() string { return c.name }
func (c *Comparator) TimestampSize() uint64 { return c.tsSize }
func (c *Comparator) Destroy() {
C.rocksdb_comparator_destroy(c.c)
c.c = nil
Expand All @@ -57,6 +95,22 @@ func gorocksdb_comparator_compare(idx int, cKeyA *C.char, cKeyALen C.size_t, cKe
return C.int(comperators.Get(idx).(comperatorWrapper).comparator.Compare(keyA, keyB))
}

//export gorocksdb_comparator_compare_ts
func gorocksdb_comparator_compare_ts(idx int, cTsA *C.char, cTsALen C.size_t, cTsB *C.char, cTsBLen C.size_t) C.int {
tsA := charToByte(cTsA, cTsALen)
tsB := charToByte(cTsB, cTsBLen)
return C.int(comperators.Get(idx).(comperatorWrapper).comparator.CompareTimestamp(tsA, tsB))
}

//export gorocksdb_comparator_compare_without_ts
func gorocksdb_comparator_compare_without_ts(idx int, cKeyA *C.char, cKeyALen C.size_t, cAHasTs C.uchar, cKeyB *C.char, cKeyBLen C.size_t, cBHasTs C.uchar) C.int {
keyA := charToByte(cKeyA, cKeyALen)
keyB := charToByte(cKeyB, cKeyBLen)
keyAHasTs := charToBool(cAHasTs)
keyBHasTs := charToBool(cBHasTs)
return C.int(comperators.Get(idx).(comperatorWrapper).comparator.CompareWithoutTimestamp(keyA, keyAHasTs, keyB, keyBHasTs))
}

//export gorocksdb_comparator_name
func gorocksdb_comparator_name(idx int) *C.char {
return comperators.Get(idx).(comperatorWrapper).name
Expand Down
61 changes: 61 additions & 0 deletions comparator_ts_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package grocksdb

import (
"bytes"
"runtime"
"testing"

"github.com/stretchr/testify/require"
)

func TestComparatorWithTS(t *testing.T) {
db, opts := newTestDBAndOpts(t, func(opts *Options) {
comp := newComparatorWithTimeStamp(
"rev",
func(a, b []byte) int {
return bytes.Compare(a, b) * -1
},
)
opts.SetComparator(comp)
})
defer func() {
db.Close()
opts.Destroy()
}()

runtime.GC()

// insert keys
givenKeys := [][]byte{[]byte("key1"), []byte("key2"), []byte("key3")}
givenTimes := [][]byte{marshalTimestamp(1), marshalTimestamp(2), marshalTimestamp(3)}

wo := NewDefaultWriteOptions()
for i, k := range givenKeys {
require.Nil(t, db.PutWithTS(wo, k, givenTimes[i], []byte("val")))
runtime.GC()
}

// create a iterator to collect the keys
ro := NewDefaultReadOptions()
ro.SetTimestamp(marshalTimestamp(4))
iter := db.NewIterator(ro)
defer iter.Close()

// we seek to the last key and iterate in reverse order
// to match given keys
var actualKeys, actualTimes [][]byte
for iter.SeekToLast(); iter.Valid(); iter.Prev() {
key := make([]byte, 4)
ts := make([]byte, timestampSize)
copy(key, iter.Key().Data())
copy(ts, iter.Timestamp().Data())
actualKeys = append(actualKeys, key)
actualTimes = append(actualTimes, ts)
runtime.GC()
}
require.Nil(t, iter.Err())

// ensure that the order is correct
require.EqualValues(t, actualKeys, givenKeys)
require.EqualValues(t, actualTimes, givenTimes)
}
Loading