Skip to content

Commit

Permalink
storage/disk: Add new disk-based storage implementation
Browse files Browse the repository at this point in the history
This commit adds library support for a disk-based storage
implementation that persists policies and data using an embedded
key-value store (badger).

Signed-off-by: Torin Sandall <torinsandall@gmail.com>
  • Loading branch information
tsandall committed Aug 30, 2021
1 parent b704668 commit 1eb0220
Show file tree
Hide file tree
Showing 8 changed files with 2,251 additions and 0 deletions.
423 changes: 423 additions & 0 deletions storage/disk/disk.go

Large diffs are not rendered by default.

1,041 changes: 1,041 additions & 0 deletions storage/disk/disk_test.go

Large diffs are not rendered by default.

24 changes: 24 additions & 0 deletions storage/disk/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2021 The OPA Authors. All rights reserved.
// Use of this source code is governed by an Apache2
// license that can be found in the LICENSE file.

package disk

import (
"github.com/open-policy-agent/opa/storage"
)

var errNotFound = &storage.Error{Code: storage.NotFoundErr}

func wrapError(err error) error {
if err == nil {
return nil
}
if _, ok := err.(*storage.Error); ok {
return err
}
// NOTE(tsandall): we intentionally do not convert badger.ErrKeyNotFound to
// NotFoundErr code here because the former may not always need to be
// represented as a NotFoundErr (i.e., it may depend on the call-site.)
return &storage.Error{Code: storage.InternalErr, Message: err.Error()}
}
83 changes: 83 additions & 0 deletions storage/disk/example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2021 The OPA Authors. All rights reserved.
// Use of this source code is governed by an Apache2
// license that can be found in the LICENSE file.

package disk_test

import (
"context"
"fmt"
"io/ioutil"
"os"

"github.com/open-policy-agent/opa/storage"
"github.com/open-policy-agent/opa/storage/disk"
"github.com/open-policy-agent/opa/util"
)

func check(err error) {
if err != nil {
panic(err)
}
}

func Example_store() {

ctx := context.Background()

// Create a temporary directory for store.
dir, err := ioutil.TempDir("", "opa_disk_example")
check(err)

// Cleanup temporary directory after finishing.
defer os.RemoveAll(dir)

// Create a new disk-based store.
store, err := disk.New(ctx, disk.Options{
Dir: dir,
Partitions: []storage.Path{
storage.MustParsePath("/authz/tenants"),
},
})
check(err)

// Insert data into the store. The `storage.WriteOne` function automatically
// opens a write transaction, applies the operation, and commits the
// transaction in one-shot.
err = storage.WriteOne(ctx, store, storage.AddOp, storage.MustParsePath("/"), util.MustUnmarshalJSON([]byte(`{
"authz": {
"tenants": {
"acmecorp.openpolicyagent.org": {
"tier": "gold"
},
"globex.openpolicyagent.org" :{
"tier": "silver"
}
}
}
}`)))
check(err)

// Close the store so that it can be reopened.
err = store.Close(ctx)
check(err)

// Re-create the disk-based store using the same options.
store2, err := disk.New(ctx, disk.Options{
Dir: dir,
Partitions: []storage.Path{
storage.MustParsePath("/authz/tenants"),
},
})
check(err)

// Read value persisted above and inspect the result.
value, err := storage.ReadOne(ctx, store2, storage.MustParsePath("/authz/tenants/acmecorp.openpolicyagent.org"))
check(err)

fmt.Println(value)

// Output:
//
// map[tier:gold]
}
55 changes: 55 additions & 0 deletions storage/disk/partition.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2021 The OPA Authors. All rights reserved.
// Use of this source code is governed by an Apache2
// license that can be found in the LICENSE file.

package disk

import (
"github.com/open-policy-agent/opa/storage"
)

type partitionTrie struct {
partitions map[string]*partitionTrie
}

func buildPartitionTrie(paths []storage.Path) *partitionTrie {
root := newPartitionTrie()
for i := range paths {
root.insert(paths[i])
}
return root
}

func newPartitionTrie() *partitionTrie {
return &partitionTrie{
partitions: make(map[string]*partitionTrie),
}
}

func (p *partitionTrie) Find(path storage.Path) (int, *partitionTrie) {
node := p
for i, x := range path {
next, ok := node.partitions[x]
if !ok {
return i + 1, nil
}
node = next
}
return len(path), node
}

func (p *partitionTrie) insert(path storage.Path) {

if len(path) == 0 {
return
}

head := path[0]
child, ok := p.partitions[head]
if !ok {
child = newPartitionTrie()
p.partitions[head] = child
}

child.insert(path[1:])
}
97 changes: 97 additions & 0 deletions storage/disk/partition_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2021 The OPA Authors. All rights reserved.
// Use of this source code is governed by an Apache2
// license that can be found in the LICENSE file.

package disk

import (
"strings"
"testing"

"github.com/open-policy-agent/opa/storage"
)

func TestPartitionTrie(t *testing.T) {

// Build simple trie
root := buildPartitionTrie([]storage.Path{
storage.MustParsePath("/foo/bar"),
storage.MustParsePath("/foo/baz/qux"),
storage.MustParsePath("/corge"),
})

// Assert on counts...
if len(root.partitions) != 2 {
t.Fatal("expected root to contain two partitions")
}

if len(root.partitions["foo"].partitions) != 2 {
t.Fatal("expected foo to contain two partitions")
}

if len(root.partitions["foo"].partitions["baz"].partitions) != 1 {
t.Fatal("expected baz to contain one child")
}

tests := []struct {
path string
wantIdx int
wantPtr *partitionTrie
}{
{
path: "/",
wantIdx: 0,
wantPtr: root,
}, {
path: "/foo",
wantIdx: 1,
wantPtr: root.partitions["foo"],
}, {
path: "/foo/bar",
wantIdx: 2,
wantPtr: root.partitions["foo"].partitions["bar"],
}, {
path: "/foo/bar/baz",
wantIdx: 3,
wantPtr: nil,
}, {
path: "/foo/bar/baz/qux",
wantIdx: 3,
wantPtr: nil,
}, {
path: "/foo/baz",
wantIdx: 2,
wantPtr: root.partitions["foo"].partitions["baz"],
}, {
path: "/foo/baz/deadbeef",
wantIdx: 3,
wantPtr: nil,
}, {
path: "/foo/baz/qux",
wantIdx: 3,
wantPtr: root.partitions["foo"].partitions["baz"].partitions["qux"],
}, {
path: "/foo/baz/qux/deadbeef",
wantIdx: 4,
wantPtr: nil,
}, {
path: "/foo/corge",
wantIdx: 2,
wantPtr: nil,
}, {
path: "/deadbeef",
wantIdx: 1,
wantPtr: nil,
},
}

for _, tc := range tests {
t.Run(strings.TrimPrefix(tc.path, "/"), func(t *testing.T) {
gotIdx, gotPtr := root.Find(storage.MustParsePath(tc.path))
if gotIdx != tc.wantIdx || gotPtr != tc.wantPtr {
t.Fatalf("expected (%d, %v) but got (%d, %v)", tc.wantIdx, tc.wantPtr, gotIdx, gotPtr)
}
})
}

}
104 changes: 104 additions & 0 deletions storage/disk/paths.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2021 The OPA Authors. All rights reserved.
// Use of this source code is governed by an Apache2
// license that can be found in the LICENSE file.

package disk

import (
"fmt"
"sort"

"github.com/open-policy-agent/opa/storage"
)

type pathMapper struct {
dataPrefix string
dataPrefixNoTrailingSlash string
policiesPrefix string
}

func newPathMapper(schemaVersion, partitionVersion int64) *pathMapper {
var pm pathMapper
pm.dataPrefix = fmt.Sprintf("/%v/%v/data/", schemaVersion, partitionVersion)
pm.dataPrefixNoTrailingSlash = pm.dataPrefix[:len(pm.dataPrefix)-1]
pm.policiesPrefix = fmt.Sprintf("/%v/%v/policies/", schemaVersion, partitionVersion)
return &pm
}

func (pm *pathMapper) PolicyKey2ID(key []byte) string {
return string(key[len(pm.policiesPrefix):])
}

func (pm *pathMapper) PolicyIDPrefix() []byte {
return []byte(pm.policiesPrefix)
}

func (pm *pathMapper) PolicyID2Key(id string) []byte {
return []byte(pm.policiesPrefix + id)
}

func (pm *pathMapper) DataKey2Path(key []byte) (storage.Path, error) {
p, ok := storage.ParsePath(string(key))
if !ok {
return nil, &storage.Error{Code: storage.InternalErr, Message: fmt.Sprintf("corrupt key: %s", key)}
}
// skip /<schema_version>/<partition_version>/<data prefix>
return p[3:], nil
}

func (pm *pathMapper) DataPrefix2Key(path storage.Path) ([]byte, error) {
if len(path) == 0 {
return []byte(pm.dataPrefix), nil
}
return []byte(pm.dataPrefixNoTrailingSlash + path.String() + "/"), nil
}

func (pm *pathMapper) DataPath2Key(path storage.Path) ([]byte, error) {
if len(path) == 0 {
return nil, &storage.Error{Code: storage.InternalErr, Message: "empty path"}
}
return []byte(pm.dataPrefixNoTrailingSlash + path.String()), nil
}

type pathSet []storage.Path

func (ps pathSet) IsDisjoint() bool {
for i := range ps {
for j := range ps {
if i != j {
if ps[i].HasPrefix(ps[j]) || ps[j].HasPrefix(ps[i]) {
return false
}
}
}
}
return true
}

func (ps pathSet) Diff(other pathSet) pathSet {
diff := pathSet{}
for _, x := range ps {
if !other.Contains(x) {
diff = append(diff, x)
}
}
return diff
}

func (ps pathSet) Contains(x storage.Path) bool {
for _, other := range ps {
if x.Equal(other) {
return true
}
}
return false
}

func (ps pathSet) Sorted() []storage.Path {
cpy := make(pathSet, len(ps))
copy(cpy, ps)
sort.Slice(cpy, func(i, j int) bool {
return cpy[i].Compare(cpy[j]) < 0
})
return cpy
}
Loading

0 comments on commit 1eb0220

Please sign in to comment.