Skip to content

Commit

Permalink
fix(key): only count a key as an ancestor or prefix if there is a sep…
Browse files Browse the repository at this point in the history
…arator

Also make sure to clean and normalize keys before using them as prefixes.

BREAKING CHANGES:

* `myds.Query(Query{Prefix:"/foo"})` will no longer match "/foobar" (or even
  "/foo"). This is usually what the user expects, we had a tendency to normalize
  "/foo/" to "/foo" (when we clean keys), and many datastores can't efficiently
  search for prefixes that aren't on path-boundaries anyways.
* Given a mount datastore with mounts `["/foo", "/"]`, `myds.Put("/foo", "bar")`
  will put the value to the datastore mounted at "/", not "/foo", as the key "/"
  and "" usually doesn't make sense.

While technically breaking, these changes are much more likely to fix bugs than
they are to break things.
  • Loading branch information
Stebalien committed Feb 5, 2020
1 parent e9f6023 commit 5598edf
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 70 deletions.
19 changes: 13 additions & 6 deletions key.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,20 +212,27 @@ func (k Key) ChildString(s string) Key {
// NewKey("/Comedy").IsAncestorOf("/Comedy/MontyPython")
// true
func (k Key) IsAncestorOf(other Key) bool {
if other.string == k.string {
// equivalent to HasPrefix(other, k.string + "/")

if len(other.string) <= len(k.string) {
// We're not long enough to be a child.
return false
}
return strings.HasPrefix(other.string, k.string)

if k.string == "/" {
// We're the root and the other key is longer.
return true
}

// "other" starts with /k.string/
return other.string[len(k.string)] == '/' && other.string[:len(k.string)] == k.string
}

// IsDescendantOf returns whether this key contains another as a prefix.
// NewKey("/Comedy/MontyPython").IsDescendantOf("/Comedy")
// true
func (k Key) IsDescendantOf(other Key) bool {
if other.string == k.string {
return false
}
return strings.HasPrefix(k.string, other.string)
return other.IsAncestorOf(k)
}

// IsTopLevel returns whether this key has only one namespace.
Expand Down
20 changes: 12 additions & 8 deletions key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,23 @@ func CheckTrue(c *C, cond bool) {
func (ks *KeySuite) TestKeyAncestry(c *C) {
k1 := NewKey("/A/B/C")
k2 := NewKey("/A/B/C/D")
k3 := NewKey("/AB")
k4 := NewKey("/A")

c.Check(k1.String(), Equals, "/A/B/C")
c.Check(k2.String(), Equals, "/A/B/C/D")
CheckTrue(c, k1.IsAncestorOf(k2))
CheckTrue(c, k2.IsDescendantOf(k1))
CheckTrue(c, NewKey("/A").IsAncestorOf(k2))
CheckTrue(c, NewKey("/A").IsAncestorOf(k1))
CheckTrue(c, !NewKey("/A").IsDescendantOf(k2))
CheckTrue(c, !NewKey("/A").IsDescendantOf(k1))
CheckTrue(c, k2.IsDescendantOf(NewKey("/A")))
CheckTrue(c, k1.IsDescendantOf(NewKey("/A")))
CheckTrue(c, !k2.IsAncestorOf(NewKey("/A")))
CheckTrue(c, !k1.IsAncestorOf(NewKey("/A")))
CheckTrue(c, k4.IsAncestorOf(k2))
CheckTrue(c, k4.IsAncestorOf(k1))
CheckTrue(c, !k4.IsDescendantOf(k2))
CheckTrue(c, !k4.IsDescendantOf(k1))
CheckTrue(c, !k3.IsDescendantOf(k4))
CheckTrue(c, !k4.IsAncestorOf(k3))
CheckTrue(c, k2.IsDescendantOf(k4))
CheckTrue(c, k1.IsDescendantOf(k4))
CheckTrue(c, !k2.IsAncestorOf(k4))
CheckTrue(c, !k1.IsAncestorOf(k4))
CheckTrue(c, !k2.IsAncestorOf(k2))
CheckTrue(c, !k1.IsAncestorOf(k1))
c.Check(k1.Child(NewKey("D")).String(), Equals, k2.String())
Expand Down
22 changes: 9 additions & 13 deletions mount/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var _ ds.Datastore = (*Datastore)(nil)

func (d *Datastore) lookup(key ds.Key) (ds.Datastore, ds.Key, ds.Key) {
for _, m := range d.mounts {
if m.Prefix.Equal(key) || m.Prefix.IsAncestorOf(key) {
if m.Prefix.IsAncestorOf(key) {
s := strings.TrimPrefix(key.String(), m.Prefix.String())
k := ds.NewKey(s)
return m.Datastore, m.Prefix, k
Expand Down Expand Up @@ -159,21 +159,21 @@ func (h *querySet) next() (query.Result, bool) {
// /aoe/ not matching
func (d *Datastore) lookupAll(key ds.Key) (dst []ds.Datastore, mountpoint, rest []ds.Key) {
for _, m := range d.mounts {
p := m.Prefix.String()
if len(p) > 1 {
p = p + "/"
}

if strings.HasPrefix(p, key.String()) {
if m.Prefix.IsDescendantOf(key) {
dst = append(dst, m.Datastore)
mountpoint = append(mountpoint, m.Prefix)
rest = append(rest, ds.NewKey("/"))
} else if strings.HasPrefix(key.String(), p) {
} else if m.Prefix.Equal(key) || m.Prefix.IsAncestorOf(key) {
r := strings.TrimPrefix(key.String(), m.Prefix.String())

dst = append(dst, m.Datastore)
mountpoint = append(mountpoint, m.Prefix)
rest = append(rest, ds.NewKey(r))

// We've found an ancestor (or equal) key. We might have
// more general datastores, but they won't contain keys
// with this prefix so there's no point in searching them.
break
}
}
return dst, mountpoint, rest
Expand All @@ -191,15 +191,11 @@ func (d *Datastore) Put(key ds.Key, value []byte) error {
func (d *Datastore) Sync(prefix ds.Key) error {
// Sync all mount points below the prefix
// Sync the mount point right at (or above) the prefix
dstores, mountPts, rest := d.lookupAll(prefix)
dstores, _, rest := d.lookupAll(prefix)
for i, suffix := range rest {
if err := dstores[i].Sync(suffix); err != nil {
return err
}

if mountPts[i].Equal(prefix) || suffix.String() != "/" {
return nil
}
}

return nil
Expand Down
78 changes: 43 additions & 35 deletions mount/mount_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,49 +268,57 @@ func TestQueryAcrossMounts(t *testing.T) {
t.Fatal(err)
}

res, err := m.Query(query.Query{Prefix: "/ba"})
if err != nil {
t.Fatalf("Query fail: %v\n", err)
}
entries, err := res.Rest()
if err != nil {
err = res.Close()
expect := func(prefix string, values map[string]string) {
t.Helper()
res, err := m.Query(query.Query{Prefix: prefix})
if err != nil {
t.Errorf("result.Close failed %d", err)
t.Fatalf("Query fail: %v\n", err)
}
t.Fatalf("Query Results.Rest fail: %v\n", err)
}
seen := 0

expect := map[string]string{
"/foo/lorem": "y u here",
"/bar/ipsum": "234",
"/bar/dolor": "345",
"/baz/sit": "456",
"/banana": "567",
}
for _, e := range entries {
v := expect[e.Key]
if v == "" {
t.Errorf("unexpected key %s", e.Key)
entries, err := res.Rest()
if err != nil {
err = res.Close()
if err != nil {
t.Errorf("result.Close failed %d", err)
}
t.Fatalf("Query Results.Rest fail: %v\n", err)
}

if v != string(e.Value) {
t.Errorf("key value didn't match expected %s: '%s' - '%s'", e.Key, v, e.Value)
if len(entries) != len(values) {
t.Errorf("expected %d results, got %d", len(values), len(entries))
}
for _, e := range entries {
v, ok := values[e.Key]
if !ok {
t.Errorf("unexpected key %s", e.Key)
continue
}

expect[e.Key] = "seen"
seen++
}
if v != string(e.Value) {
t.Errorf("key value didn't match expected %s: '%s' - '%s'", e.Key, v, e.Value)
}

if seen != 4 {
t.Errorf("expected to see 3 values, saw %d", seen)
values[e.Key] = "seen"
}
}

err = res.Close()
if err != nil {
t.Errorf("result.Close failed %d", err)
}
expect("/ba", nil)
expect("/bar", map[string]string{
"/bar/ipsum": "234",
"/bar/dolor": "345",
})
expect("/baz/", map[string]string{
"/baz/sit": "456",
})
expect("/foo", map[string]string{
"/foo/lorem": "123",
})
expect("/", map[string]string{
"/foo/lorem": "123",
"/bar/ipsum": "234",
"/bar/dolor": "345",
"/baz/sit": "456",
"/banana": "567",
})
expect("/banana", nil)
}

func TestQueryAcrossMountsWithSort(t *testing.T) {
Expand Down
1 change: 0 additions & 1 deletion namespace/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ func (ks *DSSuite) TestQuery(c *C) {
c.Check(err, Equals, nil)

expect = []dsq.Entry{
{Key: "/bar", Size: len([]byte("/foo/bar")), Value: []byte("/foo/bar")},
{Key: "/bar/baz", Size: len([]byte("/foo/bar/baz")), Value: []byte("/foo/bar/baz")},
}

Expand Down
20 changes: 19 additions & 1 deletion query/query_impl.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package query

import (
"path"

goprocess "github.com/jbenet/goprocess"
)

Expand Down Expand Up @@ -116,7 +118,23 @@ func NaiveOrder(qr Results, orders ...Order) Results {

func NaiveQueryApply(q Query, qr Results) Results {
if q.Prefix != "" {
qr = NaiveFilter(qr, FilterKeyPrefix{q.Prefix})
// Clean the prefix as a key and append / so a prefix of /bar
// only finds /bar/baz, not /barbaz.
prefix := q.Prefix
if len(prefix) == 0 {
prefix = "/"
} else {
if prefix[0] != '/' {
prefix = "/" + prefix
}
prefix = path.Clean(prefix)
}
// If the prefix isn't "/", end it in a "/" so we only find keys
// _under_ the prefix.
if prefix != "/" {
prefix += "/"
}
qr = NaiveFilter(qr, FilterKeyPrefix{prefix})
}
for _, f := range q.Filters {
qr = NaiveFilter(qr, f)
Expand Down
10 changes: 4 additions & 6 deletions query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ var sampleKeys = []string{
}

func testResults(t *testing.T, res Results, expect []string) {
t.Helper()

actualE, err := res.Rest()
if err != nil {
t.Fatal(err)
Expand All @@ -37,6 +39,7 @@ func testResults(t *testing.T, res Results, expect []string) {

func TestNaiveQueryApply(t *testing.T) {
testNaiveQueryApply := func(t *testing.T, query Query, keys []string, expect []string) {
t.Helper()
e := make([]Entry, len(keys))
for i, k := range keys {
e[i] = Entry{Key: k}
Expand Down Expand Up @@ -71,9 +74,6 @@ func TestNaiveQueryApply(t *testing.T) {
testNaiveQueryApply(t, q, sampleKeys, []string{
"/ab/c",
"/ab/cd",
"/abce",
"/abcf",
"/ab",
})

q = Query{Orders: []Order{OrderByKeyDescending{}}}
Expand All @@ -88,14 +88,12 @@ func TestNaiveQueryApply(t *testing.T) {

q = Query{
Limit: 3,
Offset: 2,
Offset: 1,
Prefix: "/ab",
Orders: []Order{OrderByKey{}},
}
testNaiveQueryApply(t, q, sampleKeys, []string{
"/ab/cd",
"/abce",
"/abcf",
})
}

Expand Down

0 comments on commit 5598edf

Please sign in to comment.