diff --git a/.golangci.yml b/.golangci.yml index 8efae9f8..0810c6e5 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,6 +1,6 @@ # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2020-08-28T20:43:11Z by kres 292ed36-dirty. +# Generated on 2020-12-30T20:27:21Z by kres latest. # options for analysis running @@ -125,6 +125,9 @@ linters: - gomnd - goerr113 - nestif + - wrapcheck + - paralleltest + - exhaustivestruct disable-all: false fast: false diff --git a/Dockerfile b/Dockerfile index 58525ea8..84756f28 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,8 +1,8 @@ -# syntax = docker/dockerfile-upstream:1.1.7-experimental +# syntax = docker/dockerfile-upstream:1.2.0-labs # THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT. # -# Generated on 2020-08-28T20:50:10Z by kres 292ed36-dirty. +# Generated on 2020-12-30T20:27:21Z by kres latest. ARG TOOLCHAIN @@ -24,7 +24,7 @@ FROM toolchain AS tools ENV GO111MODULE on ENV CGO_ENABLED 0 ENV GOPATH /go -RUN curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | bash -s -- -b /bin v1.30.0 +RUN curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | bash -s -- -b /bin v1.33.0 ARG GOFUMPT_VERSION RUN cd $(mktemp -d) \ && go mod init tmp \ diff --git a/cmd/directory-fun/main.go b/cmd/directory-fun/main.go index e69f3a78..b29ebc07 100644 --- a/cmd/directory-fun/main.go +++ b/cmd/directory-fun/main.go @@ -13,13 +13,13 @@ import ( "github.com/talos-systems/os-runtime/pkg/resource" "github.com/talos-systems/os-runtime/pkg/state" - "github.com/talos-systems/os-runtime/pkg/state/impl/local" + "github.com/talos-systems/os-runtime/pkg/state/impl/inmem" ) // DirectoryTask implements simple process attached to the state. // // DirectoryTask attempts to create path when the parent path got created. -// DirectoryTask watches for parent to be torn down, starts tear down, waits for children +// DirectoryTask watches for parent to be torn down, starts tear down process, waits for children // to be destroyed, and removes the path. // // DirectoryTask is a model of task in some OS sequencer. @@ -34,7 +34,9 @@ func DirectoryTask(world state.State, path string) { err error ) - if parent, err = world.WatchFor(ctx, resource.NewMetadata(defaultNs, PathResourceType, base, resource.VersionUndefined), state.WithEventTypes(state.Created, state.Updated)); err != nil { + if parent, err = world.WatchFor(ctx, + resource.NewMetadata(defaultNs, PathResourceType, base, resource.VersionUndefined), + state.WithEventTypes(state.Created, state.Updated)); err != nil { log.Fatal(err) } @@ -52,38 +54,39 @@ func DirectoryTask(world state.State, path string) { log.Printf("%q: created %q", path, path) - if parent, err = world.UpdateWithConflicts(ctx, parent, func(r resource.Resource) error { - r.(*PathResource).AddDependent(self) + if err = world.AddFinalizer(ctx, parent.Metadata(), self.String()); err != nil { + log.Fatal(err) + } - return nil - }); err != nil { + parent, err = world.Get(ctx, parent.Metadata()) + if err != nil { log.Fatal(err) } - log.Printf("%q: %q.dependents = %q", path, parent.Metadata().ID(), parent.(*PathResource).spec.dependents) + log.Printf("%q: parent %q finalizers are %q", path, base, parent.Metadata().Finalizers()) // doing something useful here <> log.Printf("%q: watching for teardown %q", path, base) - if parent, err = world.WatchFor(ctx, resource.NewMetadata(defaultNs, PathResourceType, base, resource.VersionUndefined), state.WithEventTypes(state.Destroyed, state.Torndown)); err != nil { + if parent, err = world.WatchFor(ctx, + parent.Metadata(), + state.WithEventTypes(state.Created, state.Updated), + state.WithPhases(resource.PhaseTearingDown)); err != nil { log.Fatal(err) } log.Printf("%q: teardown self", path) - if err = world.Teardown(ctx, self.Metadata()); err != nil { + if _, err = world.Teardown(ctx, self.Metadata()); err != nil { log.Fatal(err) } log.Printf("%q: watching for dependents to vanish %q", path, path) if _, err = world.WatchFor(ctx, - resource.NewMetadata(defaultNs, PathResourceType, path, resource.VersionUndefined), - state.WithEventTypes(state.Created, state.Updated, state.Torndown), - state.WithCondition(func(r resource.Resource) (bool, error) { - return len(r.(*PathResource).spec.dependents) == 0, nil - })); err != nil { + self.Metadata(), + state.WithFinalizerEmpty()); err != nil { log.Fatal(err) } @@ -93,14 +96,17 @@ func DirectoryTask(world state.State, path string) { log.Fatal(err) } - if _, err = world.UpdateWithConflicts(ctx, parent, func(r resource.Resource) error { - r.(*PathResource).DropDependent(self) + if err = world.RemoveFinalizer(ctx, parent.Metadata(), self.String()); err != nil { + log.Fatal(err) + } - return nil - }); err != nil { + parent, err = world.Get(ctx, parent.Metadata()) + if err != nil { log.Fatal(err) } + log.Printf("%q: parent %q finalizers are %q", path, base, parent.Metadata().Finalizers()) + if err = world.Destroy(ctx, self.Metadata()); err != nil { log.Fatal(err) } @@ -110,7 +116,7 @@ const defaultNs = "default" func main() { ctx := context.Background() - world := state.WrapCore(local.NewState(defaultNs)) + world := state.WrapCore(inmem.NewState(defaultNs)) root := NewPathResource(defaultNs, ".") if err := world.Create(ctx, root); err != nil { @@ -137,9 +143,22 @@ func main() { time.Sleep(2 * time.Second) - if err := world.Teardown(ctx, root.Metadata()); err != nil { + if _, err := world.Teardown(ctx, root.Metadata()); err != nil { log.Fatal(err) } - time.Sleep(10 * time.Second) + if _, err := world.WatchFor(ctx, + root.Metadata(), + state.WithFinalizerEmpty()); err != nil { + log.Fatal(err) + } + + rootRes, err := world.Get(ctx, root.Metadata()) + if err != nil { + log.Fatal(err) + } + + if err := world.Destroy(ctx, rootRes.Metadata()); err != nil { + log.Fatal(err) + } } diff --git a/cmd/directory-fun/path.go b/cmd/directory-fun/path.go index 5298f9c5..453a7e58 100644 --- a/cmd/directory-fun/path.go +++ b/cmd/directory-fun/path.go @@ -13,17 +13,12 @@ import ( const PathResourceType = resource.Type("os/path") -type pathResourceSpec struct { - dependents []string -} - // PathResource represents a path in the filesystem. // // Resource ID is the path, and dependents are all the immediate // children on the path. type PathResource struct { - md resource.Metadata - spec pathResourceSpec + md resource.Metadata } func NewPathResource(ns resource.Namespace, path string) *PathResource { @@ -35,40 +30,20 @@ func NewPathResource(ns resource.Namespace, path string) *PathResource { return r } -func (path *PathResource) Metadata() resource.Metadata { - return path.md +func (path *PathResource) Metadata() *resource.Metadata { + return &path.md } func (path *PathResource) Spec() interface{} { - return path.spec + return nil } func (path *PathResource) String() string { return fmt.Sprintf("PathResource(%q)", path.md.ID()) } -func (path *PathResource) Copy() resource.Resource { +func (path *PathResource) DeepCopy() resource.Resource { return &PathResource{ md: path.md, - spec: pathResourceSpec{ - dependents: append([]string(nil), path.spec.dependents...), - }, } } - -func (path *PathResource) AddDependent(dependent *PathResource) { - path.spec.dependents = append(path.spec.dependents, dependent.md.ID()) - path.md.BumpVersion() -} - -func (path *PathResource) DropDependent(dependent *PathResource) { - for i, p := range path.spec.dependents { - if p == dependent.md.ID() { - path.spec.dependents = append(path.spec.dependents[:i], path.spec.dependents[i+1:]...) - - break - } - } - - path.md.BumpVersion() -} diff --git a/go.mod b/go.mod index 201dc628..379d14a3 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,13 @@ module github.com/talos-systems/os-runtime go 1.14 -require github.com/stretchr/testify v1.6.1 +require ( + github.com/AlekSi/pointer v1.1.0 + github.com/hashicorp/go-memdb v1.3.0 + github.com/hashicorp/go-multierror v1.1.0 + github.com/stretchr/testify v1.6.1 + github.com/talos-systems/go-retry v0.2.0 + go.uber.org/goleak v1.1.10 + golang.org/x/sync v0.0.0-20190423024810-112230192c58 + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c +) diff --git a/go.sum b/go.sum index afe7890c..4ee43424 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,52 @@ +github.com/AlekSi/pointer v1.1.0 h1:SSDMPcXD9jSl8FPy9cRzoRaMJtm9g9ggGTxecRUbQoI= +github.com/AlekSi/pointer v1.1.0/go.mod h1:y7BvfRI3wXPWKXEBhU71nbnIEEZX0QTSB2Bj48UJIZE= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-immutable-radix v1.3.0 h1:8exGP7ego3OmkfksihtSouGMZ+hQrhxx+FVELeXpVPE= +github.com/hashicorp/go-immutable-radix v1.3.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= +github.com/hashicorp/go-memdb v1.3.0 h1:xdXq34gBOMEloa9rlGStLxmfX/dyIK8htOv36dQUwHU= +github.com/hashicorp/go-memdb v1.3.0/go.mod h1:Mluclgwib3R93Hk5fxEfiRhB+6Dar64wWh71LpNSe3g= +github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI= +github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA= +github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM= +github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/talos-systems/go-retry v0.2.0 h1:YpQHmtTZ2k0i/bBYRIasdVmF0XaiISVJUOrmZ6FzgLU= +github.com/talos-systems/go-retry v0.2.0/go.mod h1:HiXQqyVStZ35uSY/MTLWVvQVmC3lIW2MS5VdDaMtoKM= +go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= +go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11 h1:Yq9t9jnGoR+dBuitxdo9l6Q7xh/zOyNnYUtDKaQ3x0E= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go new file mode 100644 index 00000000..9f95347b --- /dev/null +++ b/pkg/controller/controller.go @@ -0,0 +1,21 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// Package controller defines common interfaces to be implemented by the controllers and controller runtime. +package controller + +import ( + "context" + "log" + + "github.com/talos-systems/os-runtime/pkg/resource" +) + +// Controller interface should be implemented by Controllers. +type Controller interface { + Name() string + ManagedResources() (resource.Namespace, resource.Type) + + Run(context.Context, Runtime, *log.Logger) error +} diff --git a/pkg/controller/runtime.go b/pkg/controller/runtime.go new file mode 100644 index 00000000..67265618 --- /dev/null +++ b/pkg/controller/runtime.go @@ -0,0 +1,69 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package controller + +import ( + "context" + + "github.com/talos-systems/os-runtime/pkg/resource" + "github.com/talos-systems/os-runtime/pkg/state" +) + +// ReconcileEvent is a signal for controller to reconcile its resources. +type ReconcileEvent struct{} + +// Runtime interface as presented to the controller. +type Runtime interface { + EventCh() <-chan ReconcileEvent + QueueReconcile() + + UpdateDependencies([]Dependency) error + + Reader + Writer +} + +// DependencyKind for dependencies. +type DependencyKind = int + +// Dependency kinds. +const ( + DependencyWeak int = iota + DependencyHard +) + +// Dependency of controller on some resource(s). +// +// Each controller might have multiple dependencies, it might depend on +// all the objects of some type under namespace, or on specific object by ID. +// +// Dependency might be either Weak or Hard. Any kind of dependency triggers +// cascading reconcile on changes, hard dependencies in addition block deletion of +// parent object until all the dependencies are torn down. +type Dependency struct { + Namespace resource.Namespace + Type resource.Type + ID *resource.ID + Kind DependencyKind +} + +// Reader provides read-only access to the state. +type Reader interface { + Get(context.Context, resource.Pointer) (resource.Resource, error) + List(context.Context, resource.Kind) (resource.List, error) + WatchFor(context.Context, resource.Pointer, ...state.WatchForConditionFunc) (resource.Resource, error) +} + +// Writer provides write access to the state. +// +// Only managed objects can be written to by the controller. +type Writer interface { + Update(context.Context, resource.Resource, func(resource.Resource) error) error + Teardown(context.Context, resource.Pointer) (bool, error) + Destroy(context.Context, resource.Pointer) error + + AddFinalizer(context.Context, resource.Pointer, ...resource.Finalizer) error + RemoveFinalizer(context.Context, resource.Pointer, ...resource.Finalizer) error +} diff --git a/pkg/controller/runtime/adapter.go b/pkg/controller/runtime/adapter.go new file mode 100644 index 00000000..8aa45173 --- /dev/null +++ b/pkg/controller/runtime/adapter.go @@ -0,0 +1,286 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package runtime + +import ( + "context" + "fmt" + "log" + "sort" + + "github.com/AlekSi/pointer" + + "github.com/talos-systems/os-runtime/pkg/controller" + "github.com/talos-systems/os-runtime/pkg/controller/runtime/dependency" + "github.com/talos-systems/os-runtime/pkg/resource" + "github.com/talos-systems/os-runtime/pkg/state" +) + +// adapter is presented to the Controller as Runtime interface implementation. +type adapter struct { + runtime *Runtime + + ctrl controller.Controller + ch chan controller.ReconcileEvent + + name string + + managedNamespace resource.Namespace + managedType resource.Type + + dependencies []controller.Dependency +} + +// EventCh implements controller.Runtime interface. +func (adapter *adapter) EventCh() <-chan controller.ReconcileEvent { + return adapter.ch +} + +// QueueReconcile implements controller.Runtime interface. +func (adapter *adapter) QueueReconcile() { + adapter.triggerReconcile() +} + +// UpdateDependencies implements controller.Runtime interface. +func (adapter *adapter) UpdateDependencies(deps []controller.Dependency) error { + sort.Slice(deps, func(i, j int) bool { + return dependency.Less(&deps[i], &deps[j]) + }) + + dbDeps, err := adapter.runtime.depDB.GetControllerDependencies(adapter.name) + if err != nil { + return fmt.Errorf("error fetching controller dependencies: %w", err) + } + + i, j := 0, 0 + + for { + if i >= len(deps) && j >= len(dbDeps) { + break + } + + add := false + delete := false + + switch { + case i >= len(deps): + delete = true + case j >= len(dbDeps): + add = true + default: + dI := deps[i] + dJ := dbDeps[j] + + switch { + case dependency.Equal(&dI, &dJ): + i++ + j++ + case dependency.EqualKeys(&dI, &dJ): + add, delete = true, true + case dependency.Less(&dI, &dJ): + add = true + default: + delete = true + } + } + + if add { + if err := adapter.runtime.depDB.AddControllerDependency(adapter.name, deps[i]); err != nil { + return fmt.Errorf("error adding controller dependency: %w", err) + } + + if err := adapter.runtime.watch(deps[i].Namespace, deps[i].Type); err != nil { + return fmt.Errorf("error watching resources: %w", err) + } + + i++ + } + + if delete { + if err := adapter.runtime.depDB.DeleteControllerDependency(adapter.name, dbDeps[j]); err != nil { + return fmt.Errorf("error deleting controller dependency: %w", err) + } + + j++ + } + } + + adapter.dependencies = append([]controller.Dependency(nil), deps...) + + return nil +} + +func (adapter *adapter) checkReadAccess(resourceNamespace resource.Namespace, resourceType resource.Type, resourceID *resource.ID) error { + if adapter.managedNamespace == resourceNamespace && adapter.managedType == resourceType { + return nil + } + + // go over cached dependencies here + for _, dep := range adapter.dependencies { + if dep.Namespace == resourceNamespace && dep.Type == resourceType { + // any ID is allowed + if dep.ID == nil { + return nil + } + + // list request, but only ID-specific dependency found + if resourceID == nil { + continue + } + + if *dep.ID == *resourceID { + return nil + } + } + } + + return fmt.Errorf("attempt to query resource %q/%q, not watched or managed by controller %q", resourceNamespace, resourceType, adapter.name) +} + +func (adapter *adapter) checkFinalizerAccess(resourceNamespace resource.Namespace, resourceType resource.Type, resourceID resource.ID) error { + // go over cached dependencies here + for _, dep := range adapter.dependencies { + if dep.Namespace == resourceNamespace && dep.Type == resourceType && dep.Kind == controller.DependencyHard { + // any ID is allowed + if dep.ID == nil { + return nil + } + + if *dep.ID == resourceID { + return nil + } + } + } + + return fmt.Errorf("attempt to change finalizers for resource %q/%q, not watched with hard dependency by controller %q", resourceNamespace, resourceType, adapter.name) +} + +// Get implements controller.Runtime interface. +func (adapter *adapter) Get(ctx context.Context, resourcePointer resource.Pointer) (resource.Resource, error) { + if err := adapter.checkReadAccess(resourcePointer.Namespace(), resourcePointer.Type(), pointer.ToString(resourcePointer.ID())); err != nil { + return nil, err + } + + return adapter.runtime.state.Get(ctx, resourcePointer) +} + +// List implements controller.Runtime interface. +func (adapter *adapter) List(ctx context.Context, resourceKind resource.Kind) (resource.List, error) { + if err := adapter.checkReadAccess(resourceKind.Namespace(), resourceKind.Type(), nil); err != nil { + return resource.List{}, err + } + + return adapter.runtime.state.List(ctx, resourceKind) +} + +// WatchFor implements controller.Runtime interface. +func (adapter *adapter) WatchFor(ctx context.Context, resourcePointer resource.Pointer, opts ...state.WatchForConditionFunc) (resource.Resource, error) { + if err := adapter.checkReadAccess(resourcePointer.Namespace(), resourcePointer.Type(), nil); err != nil { + return nil, err + } + + return adapter.runtime.state.WatchFor(ctx, resourcePointer, opts...) +} + +// Update implements controller.Runtime interface. +func (adapter *adapter) Update(ctx context.Context, emptyResource resource.Resource, updateFunc func(resource.Resource) error) error { + if emptyResource.Metadata().Namespace() != adapter.managedNamespace || emptyResource.Metadata().Type() != adapter.managedType { + return fmt.Errorf("resource %q/%q is not managed by controller %q, update attempted on %q", + emptyResource.Metadata().Namespace(), emptyResource.Metadata().Type(), adapter.name, emptyResource.Metadata().ID()) + } + + _, err := adapter.runtime.state.Get(ctx, emptyResource.Metadata()) + if err != nil { + if state.IsNotFoundError(err) { + err = updateFunc(emptyResource) + if err != nil { + return err + } + + return adapter.runtime.state.Create(ctx, emptyResource) + } + + return fmt.Errorf("error querying current object state: %w", err) + } + + _, err = adapter.runtime.state.UpdateWithConflicts(ctx, emptyResource.Metadata(), updateFunc) + + return err +} + +// AddFinalizer implements controller.Runtime interface. +func (adapter *adapter) AddFinalizer(ctx context.Context, resourcePointer resource.Pointer, fins ...resource.Finalizer) error { + if err := adapter.checkFinalizerAccess(resourcePointer.Namespace(), resourcePointer.Type(), resourcePointer.ID()); err != nil { + return err + } + + return adapter.runtime.state.AddFinalizer(ctx, resourcePointer, fins...) +} + +// RemoveFinalizer impleemnts controller.Runtime interface. +func (adapter *adapter) RemoveFinalizer(ctx context.Context, resourcePointer resource.Pointer, fins ...resource.Finalizer) error { + if err := adapter.checkFinalizerAccess(resourcePointer.Namespace(), resourcePointer.Type(), resourcePointer.ID()); err != nil { + return err + } + + err := adapter.runtime.state.RemoveFinalizer(ctx, resourcePointer, fins...) + if state.IsNotFoundError(err) { + err = nil + } + + return err +} + +// Teardown implements controller.Runtime interface. +func (adapter *adapter) Teardown(ctx context.Context, resourcePointer resource.Pointer) (bool, error) { + if resourcePointer.Namespace() != adapter.managedNamespace || resourcePointer.Type() != adapter.managedType { + return false, fmt.Errorf("resource %q/%q is not managed by controller %q, teardown attempted on %q", resourcePointer.Namespace(), resourcePointer.Type(), adapter.name, resourcePointer.ID()) + } + + return adapter.runtime.state.Teardown(ctx, resourcePointer) +} + +// Destroy implements controller.Runtime interface. +func (adapter *adapter) Destroy(ctx context.Context, resourcePointer resource.Pointer) error { + if resourcePointer.Namespace() != adapter.managedNamespace || resourcePointer.Type() != adapter.managedType { + return fmt.Errorf("resource %q/%q is not managed by controller %q, destroy attempted on %q", resourcePointer.Namespace(), resourcePointer.Type(), adapter.name, resourcePointer.ID()) + } + + return adapter.runtime.state.Destroy(ctx, resourcePointer) +} + +func (adapter *adapter) initialize() error { + adapter.name = adapter.ctrl.Name() + adapter.managedNamespace, adapter.managedType = adapter.ctrl.ManagedResources() + + if err := adapter.runtime.depDB.AddControllerManaged(adapter.name, adapter.managedNamespace, adapter.managedType); err != nil { + return fmt.Errorf("error registering in dependency database: %w", err) + } + + return nil +} + +func (adapter *adapter) triggerReconcile() { + // schedule reconcile if channel is empty + // otherwise channel is not empty, and reconcile is anyway scheduled + select { + case adapter.ch <- controller.ReconcileEvent{}: + default: + } +} + +func (adapter *adapter) run(ctx context.Context) (err error) { + defer func() { + if p := recover(); p != nil { + err = fmt.Errorf("controller %q panicked: %s", adapter.name, p) + } + }() + + logger := log.New(adapter.runtime.logger.Writer(), fmt.Sprintf("%s %s: ", adapter.runtime.logger.Prefix(), adapter.name), adapter.runtime.logger.Flags()) + + err = adapter.ctrl.Run(ctx, adapter, logger) + + return +} diff --git a/pkg/controller/runtime/dependency/database.go b/pkg/controller/runtime/dependency/database.go new file mode 100644 index 00000000..97a145f7 --- /dev/null +++ b/pkg/controller/runtime/dependency/database.go @@ -0,0 +1,292 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package dependency + +import ( + "fmt" + + "github.com/AlekSi/pointer" + "github.com/hashicorp/go-memdb" + + "github.com/talos-systems/os-runtime/pkg/controller" + "github.com/talos-systems/os-runtime/pkg/resource" +) + +// Database tracks dependencies between resources and controllers (and vice versa). +type Database struct { + db *memdb.MemDB +} + +const ( + tableManagedResources = "managed_resources" + tableControllerDependency = "controller_dependency" +) + +// NewDatabase creates new Database. +func NewDatabase() (*Database, error) { + db := &Database{} + + var err error + + db.db, err = memdb.NewMemDB(&memdb.DBSchema{ + Tables: map[string]*memdb.TableSchema{ + tableManagedResources: { + Name: tableManagedResources, + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + Unique: true, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{ + Field: "Namespace", + }, + &memdb.StringFieldIndex{ + Field: "Type", + }, + }, + }, + }, + "controller": { + Name: "controller", + Unique: true, + Indexer: &memdb.StringFieldIndex{ + Field: "ControllerName", + }, + }, + }, + }, + tableControllerDependency: { + Name: tableControllerDependency, + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + Unique: true, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{ + Field: "ControllerName", + }, + &memdb.StringFieldIndex{ + Field: "Namespace", + }, + &memdb.StringFieldIndex{ + Field: "Type", + }, + &memdb.StringFieldIndex{ + Field: "ID", + }, + }, + }, + }, + "controller": { + Name: "controller", + Indexer: &memdb.StringFieldIndex{ + Field: "ControllerName", + }, + }, + "resource": { + Name: "resource", + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{ + Field: "Namespace", + }, + &memdb.StringFieldIndex{ + Field: "Type", + }, + }, + }, + }, + }, + }, + }, + }) + if err != nil { + return nil, fmt.Errorf("error creating memory db: %w", err) + } + + return db, nil +} + +// AddControllerManaged tracks which resource is managed by which controller. +func (db *Database) AddControllerManaged(controllerName string, resourceNamespace resource.Namespace, resourceType resource.Type) error { + txn := db.db.Txn(true) + defer txn.Abort() + + obj, err := txn.First(tableManagedResources, "id", resourceNamespace, resourceType) + if err != nil { + return fmt.Errorf("error quering controller managed: %w", err) + } + + if obj != nil { + dep := obj.(*ManagedResource) //nolint: errcheck + + return fmt.Errorf("duplicate controller managed link: (%q, %q) -> %q", dep.Namespace, dep.Type, dep.ControllerName) + } + + obj, err = txn.First(tableManagedResources, "controller", controllerName) + if err != nil { + return fmt.Errorf("error quering controller managed: %w", err) + } + + if obj != nil { + dep := obj.(*ManagedResource) //nolint: errcheck + + return fmt.Errorf("duplicate controller managed link: (%q, %q) -> %q", dep.Namespace, dep.Type, dep.ControllerName) + } + + if err = txn.Insert(tableManagedResources, &ManagedResource{ + Namespace: resourceNamespace, + Type: resourceType, + ControllerName: controllerName, + }); err != nil { + return fmt.Errorf("error adding controller managed resource: %w", err) + } + + txn.Commit() + + return nil +} + +// GetControllerResource returns resource managed by controller. +func (db *Database) GetControllerResource(controllerName string) (resource.Namespace, resource.Type, error) { + txn := db.db.Txn(false) + defer txn.Abort() + + obj, err := txn.First(tableManagedResources, "controller", controllerName) + if err != nil { + return "", "", fmt.Errorf("error quering controller managed: %w", err) + } + + if obj == nil { + return "", "", fmt.Errorf("controller %q is not registered", controllerName) + } + + dep := obj.(*ManagedResource) //nolint: errcheck + + return dep.Namespace, dep.Type, nil +} + +// GetResourceController returns controller which manages a resource. +// +// If no controller manages a resource, empty string is returned. +func (db *Database) GetResourceController(resourceNamespace resource.Namespace, resourceType resource.Type) (string, error) { + txn := db.db.Txn(false) + defer txn.Abort() + + obj, err := txn.First(tableManagedResources, "id", resourceNamespace, resourceType) + if err != nil { + return "", fmt.Errorf("error quering controller managed: %w", err) + } + + if obj == nil { + return "", nil + } + + dep := obj.(*ManagedResource) //nolint: errcheck + + return dep.ControllerName, nil +} + +// AddControllerDependency adds a dependency of controller on a resource. +func (db *Database) AddControllerDependency(controllerName string, dep controller.Dependency) error { + txn := db.db.Txn(true) + defer txn.Abort() + + model := ControllerDependency{ + ControllerName: controllerName, + Namespace: dep.Namespace, + Type: dep.Type, + Kind: dep.Kind, + } + + if dep.ID != nil { + model.ID = *dep.ID + } else { + model.ID = StarID + } + + if err := txn.Insert(tableControllerDependency, &model); err != nil { + return fmt.Errorf("error adding controller managed resource: %w", err) + } + + txn.Commit() + + return nil +} + +// DeleteControllerDependency adds a dependency of controller on a resource. +func (db *Database) DeleteControllerDependency(controllerName string, dep controller.Dependency) error { + txn := db.db.Txn(true) + defer txn.Abort() + + resourceID := StarID + if dep.ID != nil { + resourceID = *dep.ID + } + + if _, err := txn.DeleteAll(tableControllerDependency, "id", controllerName, dep.Namespace, dep.Type, resourceID); err != nil { + return fmt.Errorf("error deleting controller managed resource: %w", err) + } + + txn.Commit() + + return nil +} + +// GetControllerDependencies returns a list of controller dependencies. +func (db *Database) GetControllerDependencies(controllerName string) ([]controller.Dependency, error) { + txn := db.db.Txn(false) + defer txn.Abort() + + iter, err := txn.Get(tableControllerDependency, "controller", controllerName) + if err != nil { + return nil, fmt.Errorf("error fetching controller dependencies: %w", err) + } + + var result []controller.Dependency + + for obj := iter.Next(); obj != nil; obj = iter.Next() { + model := obj.(*ControllerDependency) //nolint: errcheck + + dep := controller.Dependency{ + Namespace: model.Namespace, + Type: model.Type, + Kind: model.Kind, + } + + if model.ID != StarID { + dep.ID = pointer.ToString(model.ID) + } + + result = append(result, dep) + } + + return result, nil +} + +// GetDependentControllers returns a list of controllers which depend on resource change. +func (db *Database) GetDependentControllers(dep controller.Dependency) ([]string, error) { + txn := db.db.Txn(false) + defer txn.Abort() + + iter, err := txn.Get(tableControllerDependency, "resource", dep.Namespace, dep.Type) + if err != nil { + return nil, fmt.Errorf("error fetching dependent resources: %w", err) + } + + var result []string + + for obj := iter.Next(); obj != nil; obj = iter.Next() { + model := obj.(*ControllerDependency) //nolint: errcheck + + if dep.ID == nil || model.ID == StarID || model.ID == *dep.ID { + result = append(result, model.ControllerName) + } + } + + return result, nil +} diff --git a/pkg/controller/runtime/dependency/database_test.go b/pkg/controller/runtime/dependency/database_test.go new file mode 100644 index 00000000..5cd2d240 --- /dev/null +++ b/pkg/controller/runtime/dependency/database_test.go @@ -0,0 +1,155 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package dependency_test + +import ( + "testing" + + "github.com/AlekSi/pointer" + "github.com/stretchr/testify/suite" + + "github.com/talos-systems/os-runtime/pkg/controller" + "github.com/talos-systems/os-runtime/pkg/controller/runtime/dependency" +) + +type DatabaseSuite struct { + suite.Suite + + db *dependency.Database +} + +func (suite *DatabaseSuite) SetupTest() { + var err error + + suite.db, err = dependency.NewDatabase() + suite.Require().NoError(err) +} + +func (suite *DatabaseSuite) TestControllerManaged() { + suite.Require().NoError(suite.db.AddControllerManaged("ControllerBook", "default", "Book")) + suite.Require().NoError(suite.db.AddControllerManaged("ControllerTable", "default", "Table")) + + suite.Require().EqualError(suite.db.AddControllerManaged("ControllerTable", "default", "Book"), `duplicate controller managed link: ("default", "Book") -> "ControllerBook"`) + suite.Require().EqualError(suite.db.AddControllerManaged("ControllerTable", "default", "Desk"), `duplicate controller managed link: ("default", "Table") -> "ControllerTable"`) + + suite.Require().NoError(suite.db.AddControllerManaged("ControllerDesk", "desky", "Table")) + + controller, err := suite.db.GetResourceController("default", "Table") + suite.Require().NoError(err) + suite.Assert().Equal("ControllerTable", controller) + + controller, err = suite.db.GetResourceController("default", "Desk") + suite.Require().NoError(err) + suite.Assert().Empty(controller) + + namespace, typ, err := suite.db.GetControllerResource("ControllerBook") + suite.Require().NoError(err) + suite.Assert().Equal("default", namespace) + suite.Assert().Equal("Book", typ) + + _, _, err = suite.db.GetControllerResource("ControllerWardrobe") + suite.Require().EqualError(err, `controller "ControllerWardrobe" is not registered`) +} + +func (suite *DatabaseSuite) TestControllerDependency() { + suite.Require().NoError(suite.db.AddControllerDependency("ConfigController", controller.Dependency{ + Namespace: "user", + Type: "Config", + Kind: controller.DependencyWeak, + })) + + deps, err := suite.db.GetControllerDependencies("ConfigController") + suite.Require().NoError(err) + suite.Assert().Len(deps, 1) + suite.Assert().Equal("user", deps[0].Namespace) + suite.Assert().Equal("Config", deps[0].Type) + suite.Assert().Nil(deps[0].ID) + suite.Assert().Equal(controller.DependencyWeak, deps[0].Kind) + + suite.Require().NoError(suite.db.AddControllerDependency("ConfigController", controller.Dependency{ + Namespace: "state", + Type: "Machine", + ID: pointer.ToString("system"), + Kind: controller.DependencyHard, + })) + + deps, err = suite.db.GetControllerDependencies("ConfigController") + suite.Require().NoError(err) + suite.Assert().Len(deps, 2) + + suite.Assert().Equal("state", deps[0].Namespace) + suite.Assert().Equal("Machine", deps[0].Type) + suite.Assert().Equal("system", *deps[0].ID) + suite.Assert().Equal(controller.DependencyHard, deps[0].Kind) + + suite.Assert().Equal("user", deps[1].Namespace) + suite.Assert().Equal("Config", deps[1].Type) + suite.Assert().Nil(deps[1].ID) + suite.Assert().Equal(controller.DependencyWeak, deps[1].Kind) + + ctrls, err := suite.db.GetDependentControllers(controller.Dependency{ + Namespace: "user", + Type: "Config", + ID: pointer.ToString("config"), + }) + suite.Require().NoError(err) + suite.Assert().Equal([]string{"ConfigController"}, ctrls) + + ctrls, err = suite.db.GetDependentControllers(controller.Dependency{ + Namespace: "user", + Type: "Config", + }) + suite.Require().NoError(err) + suite.Assert().Equal([]string{"ConfigController"}, ctrls) + + ctrls, err = suite.db.GetDependentControllers(controller.Dependency{ + Namespace: "user", + Type: "Spec", + }) + suite.Require().NoError(err) + suite.Assert().Empty(ctrls) + + ctrls, err = suite.db.GetDependentControllers(controller.Dependency{ + Namespace: "state", + Type: "Machine", + ID: pointer.ToString("node"), + }) + suite.Require().NoError(err) + suite.Assert().Empty(ctrls) + + ctrls, err = suite.db.GetDependentControllers(controller.Dependency{ + Namespace: "state", + Type: "Machine", + ID: pointer.ToString("system"), + }) + suite.Require().NoError(err) + suite.Assert().Equal([]string{"ConfigController"}, ctrls) + + ctrls, err = suite.db.GetDependentControllers(controller.Dependency{ + Namespace: "state", + Type: "Machine", + }) + suite.Require().NoError(err) + suite.Assert().Equal([]string{"ConfigController"}, ctrls) + + suite.Require().NoError(suite.db.DeleteControllerDependency("ConfigController", controller.Dependency{ + Namespace: "state", + Type: "Machine", + ID: pointer.ToString("system"), + })) + + ctrls, err = suite.db.GetDependentControllers(controller.Dependency{ + Namespace: "state", + Type: "Machine", + }) + suite.Require().NoError(err) + suite.Assert().Empty(ctrls) +} + +func TestDabaseSuite(t *testing.T) { + t.Parallel() + + suite.Run(t, new(DatabaseSuite)) +} diff --git a/pkg/controller/runtime/dependency/dependency.go b/pkg/controller/runtime/dependency/dependency.go new file mode 100644 index 00000000..5734eb48 --- /dev/null +++ b/pkg/controller/runtime/dependency/dependency.go @@ -0,0 +1,53 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// Package dependency implements the dependency handling database. +package dependency + +import ( + "fmt" + "log" + + "github.com/talos-systems/os-runtime/pkg/controller" +) + +// Less compares two controller.Dependency objects. +// +// This sort order is compatible with the way memdb handles ordering. +func Less(a, b *controller.Dependency) bool { + aID := StarID + if a.ID != nil { + aID = *a.ID + } + + bID := StarID + if b.ID != nil { + bID = *b.ID + } + + aStr := fmt.Sprintf("%s\000%s\000%s", a.Namespace, a.Type, aID) + bStr := fmt.Sprintf("%s\000%s\000%s", b.Namespace, b.Type, bID) + + log.Printf("%q, %q", aStr, bStr) + + return aStr < bStr +} + +// Equal checks if two controller.Dependency objects are completely equivalent. +func Equal(a, b *controller.Dependency) bool { + return EqualKeys(a, b) && a.Kind == b.Kind +} + +// EqualKeys checks if two controller.Dependency objects have equal (conflicting) keys. +func EqualKeys(a, b *controller.Dependency) bool { + if a.Namespace != b.Namespace || a.Type != b.Type { + return false + } + + if a.ID == nil || b.ID == nil { + return a.ID == nil && b.ID == nil + } + + return *a.ID == *b.ID +} diff --git a/pkg/controller/runtime/dependency/dependency_test.go b/pkg/controller/runtime/dependency/dependency_test.go new file mode 100644 index 00000000..9e30417f --- /dev/null +++ b/pkg/controller/runtime/dependency/dependency_test.go @@ -0,0 +1,280 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package dependency_test + +import ( + "testing" + + "github.com/AlekSi/pointer" + "github.com/stretchr/testify/assert" + + "github.com/talos-systems/os-runtime/pkg/controller" + "github.com/talos-systems/os-runtime/pkg/controller/runtime/dependency" +) + +func TestLess(t *testing.T) { + t.Parallel() + + for _, testCase := range []struct { + Name string + A, B controller.Dependency + Expected bool + }{ + { + Name: "equal", + A: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: pointer.ToString("id"), + Kind: controller.DependencyHard, + }, + B: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: pointer.ToString("id"), + Kind: controller.DependencyHard, + }, + Expected: false, + }, + { + Name: "lessId", + A: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: pointer.ToString("id"), + Kind: controller.DependencyHard, + }, + B: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: pointer.ToString("jd"), + Kind: controller.DependencyHard, + }, + Expected: true, + }, + { + Name: "moreType", + A: controller.Dependency{ + Namespace: "default", + Type: "Data", + ID: nil, + Kind: controller.DependencyHard, + }, + B: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: nil, + Kind: controller.DependencyHard, + }, + Expected: false, + }, + } { + testCase := testCase + + t.Run(testCase.Name, func(t *testing.T) { + t.Parallel() + + assert.Equal(t, testCase.Expected, dependency.Less(&testCase.A, &testCase.B)) + }) + } +} + +func TestEqual(t *testing.T) { + t.Parallel() + + for _, testCase := range []struct { + Name string + A, B controller.Dependency + Expected bool + }{ + { + Name: "equal", + A: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: pointer.ToString("id"), + Kind: controller.DependencyHard, + }, + B: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: pointer.ToString("id"), + Kind: controller.DependencyHard, + }, + Expected: true, + }, + { + Name: "id", + A: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: pointer.ToString("id"), + Kind: controller.DependencyHard, + }, + B: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: pointer.ToString("jd"), + Kind: controller.DependencyHard, + }, + Expected: false, + }, + { + Name: "idNil", + A: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: pointer.ToString("id"), + Kind: controller.DependencyHard, + }, + B: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: nil, + Kind: controller.DependencyHard, + }, + Expected: false, + }, + { + Name: "idsNil", + A: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: nil, + Kind: controller.DependencyHard, + }, + B: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: nil, + Kind: controller.DependencyHard, + }, + Expected: true, + }, + { + Name: "kind", + A: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: nil, + Kind: controller.DependencyHard, + }, + B: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: nil, + Kind: controller.DependencyWeak, + }, + Expected: false, + }, + { + Name: "ns", + A: controller.Dependency{ + Namespace: "user", + Type: "Config", + ID: pointer.ToString("id"), + Kind: controller.DependencyHard, + }, + B: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: pointer.ToString("id"), + Kind: controller.DependencyHard, + }, + Expected: false, + }, + } { + testCase := testCase + + t.Run(testCase.Name, func(t *testing.T) { + t.Parallel() + + assert.Equal(t, testCase.Expected, dependency.Equal(&testCase.A, &testCase.B)) + }) + } +} + +func TestEqualKeys(t *testing.T) { + t.Parallel() + + for _, testCase := range []struct { + Name string + A, B controller.Dependency + Expected bool + }{ + { + Name: "equal", + A: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: pointer.ToString("id"), + Kind: controller.DependencyHard, + }, + B: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: pointer.ToString("id"), + Kind: controller.DependencyHard, + }, + Expected: true, + }, + { + Name: "kind", + A: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: pointer.ToString("id"), + Kind: controller.DependencyWeak, + }, + B: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: pointer.ToString("id"), + Kind: controller.DependencyHard, + }, + Expected: true, + }, + { + Name: "id", + A: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: pointer.ToString("id"), + Kind: controller.DependencyHard, + }, + B: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: pointer.ToString("jd"), + Kind: controller.DependencyHard, + }, + Expected: false, + }, + { + Name: "idNil", + A: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: pointer.ToString("id"), + Kind: controller.DependencyHard, + }, + B: controller.Dependency{ + Namespace: "default", + Type: "Config", + ID: nil, + Kind: controller.DependencyHard, + }, + Expected: false, + }, + } { + testCase := testCase + + t.Run(testCase.Name, func(t *testing.T) { + t.Parallel() + + assert.Equal(t, testCase.Expected, dependency.EqualKeys(&testCase.A, &testCase.B)) + }) + } +} diff --git a/pkg/controller/runtime/dependency/models.go b/pkg/controller/runtime/dependency/models.go new file mode 100644 index 00000000..844678bd --- /dev/null +++ b/pkg/controller/runtime/dependency/models.go @@ -0,0 +1,29 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package dependency + +import ( + "github.com/talos-systems/os-runtime/pkg/controller" + "github.com/talos-systems/os-runtime/pkg/resource" +) + +// ManagedResource tracks which objects are managed by controllers. +type ManagedResource struct { + Namespace resource.Namespace + Type resource.Type + ControllerName string +} + +// StarID denotes ID value which matches any other ID. +const StarID = "*" + +// ControllerDependency tracks dependencies of the controller. +type ControllerDependency struct { + ControllerName string + Namespace resource.Namespace + Type resource.Type + ID resource.ID + Kind controller.DependencyKind +} diff --git a/pkg/controller/runtime/runtime.go b/pkg/controller/runtime/runtime.go new file mode 100644 index 00000000..4c0d5b26 --- /dev/null +++ b/pkg/controller/runtime/runtime.go @@ -0,0 +1,170 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// Package runtime implements the controller runtime. +package runtime + +import ( + "context" + "fmt" + "log" + "sync" + + "github.com/AlekSi/pointer" + "github.com/hashicorp/go-multierror" + + "github.com/talos-systems/os-runtime/pkg/controller" + "github.com/talos-systems/os-runtime/pkg/controller/runtime/dependency" + "github.com/talos-systems/os-runtime/pkg/resource" + "github.com/talos-systems/os-runtime/pkg/state" +) + +// Runtime implements controller runtime. +type Runtime struct { + depDB *dependency.Database + + state state.State + logger *log.Logger + + watchCh chan state.Event + watchedMu sync.Mutex + watched map[string]struct{} + + controllersMu sync.RWMutex + controllers map[string]*adapter + + runCtx context.Context + runCtxCancel context.CancelFunc +} + +// NewRuntime initializes controller runtime object. +func NewRuntime(st state.State, logger *log.Logger) (*Runtime, error) { + runtime := &Runtime{ + state: st, + logger: logger, + controllers: make(map[string]*adapter), + watchCh: make(chan state.Event), + watched: make(map[string]struct{}), + } + + var err error + + runtime.depDB, err = dependency.NewDatabase() + if err != nil { + return nil, fmt.Errorf("error creating dependency database: %w", err) + } + + return runtime, nil +} + +// RegisterController registers new controller. +func (runtime *Runtime) RegisterController(ctrl controller.Controller) error { + runtime.controllersMu.Lock() + defer runtime.controllersMu.Unlock() + + name := ctrl.Name() + + if _, exists := runtime.controllers[name]; exists { + return fmt.Errorf("controller %q already registered", name) + } + + adapter := &adapter{ + runtime: runtime, + + ctrl: ctrl, + ch: make(chan controller.ReconcileEvent, 1), + } + + if err := adapter.initialize(); err != nil { + return fmt.Errorf("error initializing controller %q adapter: %w", name, err) + } + + // initial reconcile + adapter.triggerReconcile() + + runtime.controllers[name] = adapter + + return nil +} + +// Run all the controller loops. +func (runtime *Runtime) Run(ctx context.Context) error { + runtime.runCtx, runtime.runCtxCancel = context.WithCancel(ctx) + defer runtime.runCtxCancel() + + go runtime.processWatched() + + errCh := make(chan error) + + runtime.controllersMu.RLock() + + for _, adapter := range runtime.controllers { + adapter := adapter + + go func() { + errCh <- adapter.run(runtime.runCtx) + }() + } + + n := len(runtime.controllers) + + runtime.controllersMu.RUnlock() + + var multiErr *multierror.Error + + for i := 0; i < n; i++ { + multiErr = multierror.Append(multiErr, <-errCh) + } + + return multiErr.ErrorOrNil() +} + +func (runtime *Runtime) watch(resourceNamespace resource.Namespace, resourceType resource.Type) error { + runtime.watchedMu.Lock() + defer runtime.watchedMu.Unlock() + + key := fmt.Sprintf("%s\000%s", resourceNamespace, resourceType) + + if _, exists := runtime.watched[key]; exists { + return nil + } + + runtime.watched[key] = struct{}{} + + kind := resource.NewMetadata(resourceNamespace, resourceType, "", resource.Version{}) + + return runtime.state.WatchKind(runtime.runCtx, kind, runtime.watchCh) +} + +func (runtime *Runtime) processWatched() { + for { + var e state.Event + + select { + case <-runtime.runCtx.Done(): + return + case e = <-runtime.watchCh: + } + + md := e.Resource.Metadata() + + controllers, err := runtime.depDB.GetDependentControllers(controller.Dependency{ + Namespace: md.Namespace(), + Type: md.Type(), + ID: pointer.ToString(md.ID()), + }) + if err != nil { + // TODO: no way to handle it here + continue + } + + runtime.controllersMu.RLock() + + for _, ctrl := range controllers { + runtime.controllers[ctrl].triggerReconcile() + } + + runtime.controllersMu.RUnlock() + } +} diff --git a/pkg/controller/runtime/runtime_controllers_test.go b/pkg/controller/runtime/runtime_controllers_test.go new file mode 100644 index 00000000..821c3bc5 --- /dev/null +++ b/pkg/controller/runtime/runtime_controllers_test.go @@ -0,0 +1,267 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package runtime_test + +import ( + "context" + "fmt" + "log" + "strconv" + + "github.com/talos-systems/os-runtime/pkg/controller" + "github.com/talos-systems/os-runtime/pkg/resource" + "github.com/talos-systems/os-runtime/pkg/state" +) + +// IntToStrController converts IntResource to StrResource. +type IntToStrController struct { + SourceNamespace resource.Namespace + TargetNamespace resource.Namespace +} + +// Name implements controller.Controller interface. +func (ctrl *IntToStrController) Name() string { + return "IntToStrController" +} + +// ManagedResources implements controller.Controller interface. +func (ctrl *IntToStrController) ManagedResources() (resource.Namespace, resource.Type) { + return ctrl.TargetNamespace, StrResourceType +} + +// Run implements controller.Controller interface. +// +//nolint: gocognit +func (ctrl *IntToStrController) Run(ctx context.Context, r controller.Runtime, logger *log.Logger) error { + if err := r.UpdateDependencies([]controller.Dependency{ + { + Namespace: ctrl.SourceNamespace, + Type: IntResourceType, + Kind: controller.DependencyHard, + }, + }); err != nil { + return fmt.Errorf("error setting up dependencies: %w", err) + } + + sourceMd := resource.NewMetadata(ctrl.SourceNamespace, IntResourceType, "", resource.VersionUndefined) + + for { + select { + case <-ctx.Done(): + return nil + case <-r.EventCh(): + } + + intList, err := r.List(ctx, sourceMd) + if err != nil { + return fmt.Errorf("error listing objects: %w", err) + } + + for _, intRes := range intList.Items { + intRes := intRes + + strRes := NewStrResource(ctrl.TargetNamespace, intRes.Metadata().ID(), "") + + switch intRes.Metadata().Phase() { + case resource.PhaseRunning: + if err = r.AddFinalizer(ctx, intRes.Metadata(), strRes.String()); err != nil { + return fmt.Errorf("error adding finalizer: %w", err) + } + + if err = r.Update(ctx, strRes, + func(r resource.Resource) error { + r.(*StrResource).value = strconv.Itoa(intRes.Spec().(int)) + + return nil + }); err != nil { + return fmt.Errorf("error updating objects: %w", err) + } + case resource.PhaseTearingDown: + ready, err := r.Teardown(ctx, strRes.Metadata()) + if err != nil { + if state.IsNotFoundError(err) { + if err = r.RemoveFinalizer(ctx, intRes.Metadata(), strRes.String()); err != nil { + return fmt.Errorf("error removing finalizer (str controller): %w", err) + } + + continue + } + + return fmt.Errorf("error tearing down: %w", err) + } + + if !ready { + _, err = r.WatchFor(ctx, strRes.Metadata(), state.WithFinalizerEmpty()) + if err != nil { + return fmt.Errorf("error waiting for teardown ready: %w", err) + } + } + + if err = r.Destroy(ctx, strRes.Metadata()); err != nil { + return fmt.Errorf("error destroying: %w", err) + } + + if err = r.RemoveFinalizer(ctx, intRes.Metadata(), strRes.String()); err != nil { + if !state.IsNotFoundError(err) { + return fmt.Errorf("error removing finalizer (str controller): %w", err) + } + } + } + } + } +} + +// StrToSentenceController converts StrResource to SentenceResource. +type StrToSentenceController struct { + SourceNamespace resource.Namespace + TargetNamespace resource.Namespace +} + +// Name implements controller.Controller interface. +func (ctrl *StrToSentenceController) Name() string { + return "StrToSentenceController" +} + +// ManagedResources implements controller.Controller interface. +func (ctrl *StrToSentenceController) ManagedResources() (resource.Namespace, resource.Type) { + return ctrl.TargetNamespace, SententceResourceType +} + +// Run implements controller.Controller interface. +// +//nolint: gocognit +func (ctrl *StrToSentenceController) Run(ctx context.Context, r controller.Runtime, logger *log.Logger) error { + if err := r.UpdateDependencies([]controller.Dependency{ + { + Namespace: ctrl.SourceNamespace, + Type: StrResourceType, + Kind: controller.DependencyHard, + }, + }); err != nil { + return fmt.Errorf("error setting up dependencies: %w", err) + } + + sourceMd := resource.NewMetadata(ctrl.SourceNamespace, StrResourceType, "", resource.VersionUndefined) + + for { + select { + case <-ctx.Done(): + return nil + case <-r.EventCh(): + } + + strList, err := r.List(ctx, sourceMd) + if err != nil { + return fmt.Errorf("error listing objects: %w", err) + } + + for _, strRes := range strList.Items { + strRes := strRes + + sentenceRes := NewSentenceResource(ctrl.TargetNamespace, strRes.Metadata().ID(), "") + + switch strRes.Metadata().Phase() { + case resource.PhaseRunning: + if err = r.AddFinalizer(ctx, strRes.Metadata(), sentenceRes.String()); err != nil { + return fmt.Errorf("error adding finalizer: %w", err) + } + + if err = r.Update(ctx, sentenceRes, func(r resource.Resource) error { + r.(*SentenceResource).value = strRes.(*StrResource).value + " sentence" + + return nil + }); err != nil { + return fmt.Errorf("error updating objects: %w", err) + } + case resource.PhaseTearingDown: + ready, err := r.Teardown(ctx, sentenceRes.Metadata()) + if err != nil { + if state.IsNotFoundError(err) { + if err = r.RemoveFinalizer(ctx, strRes.Metadata(), sentenceRes.String()); err != nil { + return fmt.Errorf("error removing finalizer (sentence controller): %w", err) + } + + continue + } + + return fmt.Errorf("error tearing down: %w", err) + } + + if !ready { + _, err = r.WatchFor(ctx, sentenceRes.Metadata(), state.WithFinalizerEmpty()) + if err != nil { + return fmt.Errorf("error waiting for teardown ready: %w", err) + } + } + + if err = r.Destroy(ctx, sentenceRes.Metadata()); err != nil { + return fmt.Errorf("error destroying: %w", err) + } + + if err = r.RemoveFinalizer(ctx, strRes.Metadata(), sentenceRes.String()); err != nil { + return fmt.Errorf("error removing finalizer (sentence controller): %w", err) + } + } + } + } +} + +// SumController calculates sum of IntResources into new IntResource. +type SumController struct { + SourceNamespace resource.Namespace + TargetNamespace resource.Namespace +} + +// Name implements controller.Controller interface. +func (ctrl *SumController) Name() string { + return "SumController" +} + +// ManagedResources implements controller.Controller interface. +func (ctrl *SumController) ManagedResources() (resource.Namespace, resource.Type) { + return ctrl.TargetNamespace, IntResourceType +} + +// Run implements controller.Controller interface. +func (ctrl *SumController) Run(ctx context.Context, r controller.Runtime, logger *log.Logger) error { + if err := r.UpdateDependencies([]controller.Dependency{ + { + Namespace: ctrl.SourceNamespace, + Type: IntResourceType, + Kind: controller.DependencyWeak, + }, + }); err != nil { + return fmt.Errorf("error setting up dependencies: %w", err) + } + + sourceMd := resource.NewMetadata(ctrl.SourceNamespace, IntResourceType, "", resource.VersionUndefined) + + for { + select { + case <-ctx.Done(): + return nil + case <-r.EventCh(): + } + + intList, err := r.List(ctx, sourceMd) + if err != nil { + return fmt.Errorf("error listing objects: %w", err) + } + + var sum int + + for _, intRes := range intList.Items { + sum += intRes.Spec().(int) //nolint: errcheck + } + + if err = r.Update(ctx, NewIntResource(ctrl.TargetNamespace, "sum", 0), func(r resource.Resource) error { + r.(*IntResource).value = sum + + return nil + }); err != nil { + return fmt.Errorf("error updating sum") + } + } +} diff --git a/pkg/controller/runtime/runtime_resources_test.go b/pkg/controller/runtime/runtime_resources_test.go new file mode 100644 index 00000000..d0eb9d85 --- /dev/null +++ b/pkg/controller/runtime/runtime_resources_test.go @@ -0,0 +1,137 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package runtime_test + +import ( + "fmt" + + "github.com/talos-systems/os-runtime/pkg/resource" +) + +// IntResourceType is the type of IntResource. +const IntResourceType = resource.Type("test/int") + +// IntResource represents some integer value. +type IntResource struct { + md resource.Metadata + value int +} + +// NewIntResource creates new IntResource. +func NewIntResource(ns resource.Namespace, id resource.ID, value int) *IntResource { + r := &IntResource{ + md: resource.NewMetadata(ns, IntResourceType, id, resource.VersionUndefined), + value: value, + } + r.md.BumpVersion() + + return r +} + +// Metadata implements resource.Resource. +func (r *IntResource) Metadata() *resource.Metadata { + return &r.md +} + +// Spec implements resource.Resource. +func (r *IntResource) Spec() interface{} { + return r.value +} + +func (r *IntResource) String() string { + return fmt.Sprintf("IntResource(%q -> %d)", r.md.ID(), r.value) +} + +// DeepCopy implements resource.Resource. +func (r *IntResource) DeepCopy() resource.Resource { + return &IntResource{ + md: r.md, + value: r.value, + } +} + +// StrResourceType is the type of StrResource. +const StrResourceType = resource.Type("test/str") + +// StrResource represents some string value. +type StrResource struct { + md resource.Metadata + value string +} + +// NewStrResource creates new StrResource. +func NewStrResource(ns resource.Namespace, id resource.ID, value string) *StrResource { + r := &StrResource{ + md: resource.NewMetadata(ns, StrResourceType, id, resource.VersionUndefined), + value: value, + } + r.md.BumpVersion() + + return r +} + +// Metadata implements resource.Resource. +func (r *StrResource) Metadata() *resource.Metadata { + return &r.md +} + +// Spec implements resource.Resource. +func (r *StrResource) Spec() interface{} { + return r.value +} + +func (r *StrResource) String() string { + return fmt.Sprintf("StrResource(%q -> %q)", r.md.ID(), r.value) +} + +// DeepCopy implements resource.Resource. +func (r *StrResource) DeepCopy() resource.Resource { + return &StrResource{ + md: r.md, + value: r.value, + } +} + +// SententceResourceType is the type of SentenceResource. +const SententceResourceType = resource.Type("test/sentence") + +// StrResource represents some string value. +type SentenceResource struct { + md resource.Metadata + value string +} + +// NewSentenceResource creates new SentenceResource. +func NewSentenceResource(ns resource.Namespace, id resource.ID, value string) *SentenceResource { + r := &SentenceResource{ + md: resource.NewMetadata(ns, SententceResourceType, id, resource.VersionUndefined), + value: value, + } + r.md.BumpVersion() + + return r +} + +// Metadata implements resource.Resource. +func (r *SentenceResource) Metadata() *resource.Metadata { + return &r.md +} + +// Spec implements resource.Resource. +func (r *SentenceResource) Spec() interface{} { + return r.value +} + +func (r *SentenceResource) String() string { + return fmt.Sprintf("SentenceResource(%q -> %q)", r.md.ID(), r.value) +} + +// DeepCopy implements resource.Resource. +func (r *SentenceResource) DeepCopy() resource.Resource { + return &SentenceResource{ + md: r.md, + value: r.value, + } +} diff --git a/pkg/controller/runtime/runtime_test.go b/pkg/controller/runtime/runtime_test.go new file mode 100644 index 00000000..5845e436 --- /dev/null +++ b/pkg/controller/runtime/runtime_test.go @@ -0,0 +1,268 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package runtime_test + +import ( + "context" + "fmt" + "log" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/suite" + "github.com/talos-systems/go-retry/retry" + "go.uber.org/goleak" + + "github.com/talos-systems/os-runtime/pkg/controller/runtime" + "github.com/talos-systems/os-runtime/pkg/resource" + "github.com/talos-systems/os-runtime/pkg/state" + "github.com/talos-systems/os-runtime/pkg/state/impl/inmem" + "github.com/talos-systems/os-runtime/pkg/state/impl/namespaced" +) + +type RuntimeSuite struct { + suite.Suite + + state state.State + + runtime *runtime.Runtime + wg sync.WaitGroup + + ctx context.Context + ctxCancel context.CancelFunc +} + +func (suite *RuntimeSuite) SetupTest() { + suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 3*time.Minute) + + suite.state = state.WrapCore(namespaced.NewState(inmem.Build)) + + var err error + + logger := log.New(log.Writer(), "controller-runtime: ", log.Flags()) + + suite.runtime, err = runtime.NewRuntime(suite.state, logger) + suite.Require().NoError(err) +} + +func (suite *RuntimeSuite) startRuntime() { + suite.wg.Add(1) + + go func() { + defer suite.wg.Done() + + suite.Assert().NoError(suite.runtime.Run(suite.ctx)) + }() +} + +//nolint: dupl +func (suite *RuntimeSuite) assertStrObjects(ns resource.Namespace, typ resource.Type, ids, values []string) retry.RetryableFunc { + return func() error { + items, err := suite.state.List(suite.ctx, resource.NewMetadata(ns, typ, "", resource.VersionUndefined)) + if err != nil { + return retry.UnexpectedError(err) + } + + if len(items.Items) != len(ids) { + return retry.ExpectedError(fmt.Errorf("expected %d objects, got %d", len(ids), len(items.Items))) + } + + for i, id := range ids { + r, err := suite.state.Get(suite.ctx, resource.NewMetadata(ns, typ, id, resource.VersionUndefined)) + if err != nil { + if state.IsNotFoundError(err) { + return retry.ExpectedError(err) + } + + return retry.UnexpectedError(err) + } + + strValue := r.Spec().(string) //nolint: errcheck + + if strValue != values[i] { + return retry.ExpectedError(fmt.Errorf("expected value of %q to be %q, found %q", id, values[i], strValue)) + } + } + + return nil + } +} + +//nolint: dupl +func (suite *RuntimeSuite) assertIntObjects(ns resource.Namespace, typ resource.Type, ids []string, values []int) retry.RetryableFunc { + return func() error { + items, err := suite.state.List(suite.ctx, resource.NewMetadata(ns, typ, "", resource.VersionUndefined)) + if err != nil { + return retry.UnexpectedError(err) + } + + if len(items.Items) != len(ids) { + return retry.ExpectedError(fmt.Errorf("expected %d objects, got %d", len(ids), len(items.Items))) + } + + for i, id := range ids { + r, err := suite.state.Get(suite.ctx, resource.NewMetadata(ns, typ, id, resource.VersionUndefined)) + if err != nil { + if state.IsNotFoundError(err) { + return retry.ExpectedError(err) + } + + return retry.UnexpectedError(err) + } + + intValue := r.Spec().(int) //nolint: errcheck + + if intValue != values[i] { + return retry.ExpectedError(fmt.Errorf("expected value of %q to be %d, found %d", id, values[i], intValue)) + } + } + + return nil + } +} + +func (suite *RuntimeSuite) TearDownTest() { + suite.T().Log("tear down") + + suite.ctxCancel() + + suite.wg.Wait() + + // trigger updates in resources to stop watch loops + suite.Assert().NoError(suite.state.Create(context.Background(), NewIntResource("default", "xxx", 0))) + suite.Assert().NoError(suite.state.Create(context.Background(), NewIntResource("ints", "xxx", 0))) + suite.Assert().NoError(suite.state.Create(context.Background(), NewStrResource("strings", "xxx", ""))) + suite.Assert().NoError(suite.state.Create(context.Background(), NewIntResource("source", "xxx", 0))) +} + +func (suite *RuntimeSuite) TestNoControllers() { + // no controllers registered + suite.startRuntime() +} + +func (suite *RuntimeSuite) TestIntToStrControllers() { + suite.Require().NoError(suite.runtime.RegisterController(&IntToStrController{ + SourceNamespace: "default", + TargetNamespace: "default", + })) + + suite.Assert().NoError(suite.state.Create(suite.ctx, NewIntResource("default", "one", 1))) + + suite.startRuntime() + + suite.Assert().NoError(suite.state.Create(suite.ctx, NewIntResource("default", "two", 2))) + + suite.Assert().NoError(retry.Constant(10*time.Second, retry.WithUnits(10*time.Millisecond)). + Retry(suite.assertStrObjects("default", StrResourceType, []string{"one", "two"}, []string{"1", "2"}))) + + three := NewIntResource("default", "three", 3) + suite.Assert().NoError(suite.state.Create(suite.ctx, three)) + + suite.Assert().NoError(retry.Constant(10*time.Second, retry.WithUnits(10*time.Millisecond)). + Retry(suite.assertStrObjects("default", StrResourceType, []string{"one", "two", "three"}, []string{"1", "2", "3"}))) + + _, err := suite.state.UpdateWithConflicts(suite.ctx, three.Metadata(), func(r resource.Resource) error { + r.(*IntResource).value = 33 + + return nil + }) + suite.Assert().NoError(err) + + suite.Assert().NoError(retry.Constant(10*time.Second, retry.WithUnits(10*time.Millisecond)). + Retry(suite.assertStrObjects("default", StrResourceType, []string{"one", "two", "three"}, []string{"1", "2", "33"}))) + + ready, err := suite.state.Teardown(suite.ctx, three.Metadata()) + suite.Assert().NoError(err) + suite.Assert().False(ready) + + _, err = suite.state.WatchFor(suite.ctx, three.Metadata(), state.WithFinalizerEmpty()) + suite.Assert().NoError(err) + + suite.Assert().NoError(retry.Constant(10*time.Second, retry.WithUnits(10*time.Millisecond)). + Retry(suite.assertStrObjects("default", StrResourceType, []string{"one", "two"}, []string{"1", "2"}))) + + suite.Assert().NoError(suite.state.Destroy(suite.ctx, three.Metadata())) +} + +func (suite *RuntimeSuite) TestIntToStrToSentenceControllers() { + suite.Require().NoError(suite.runtime.RegisterController(&IntToStrController{ + SourceNamespace: "ints", + TargetNamespace: "strings", + })) + + suite.Require().NoError(suite.runtime.RegisterController(&StrToSentenceController{ + SourceNamespace: "strings", + TargetNamespace: "sentences", + })) + + one := NewIntResource("ints", "one", 1) + suite.Assert().NoError(suite.state.Create(suite.ctx, one)) + + suite.startRuntime() + + suite.Assert().NoError(suite.state.Create(suite.ctx, NewIntResource("ints", "two", 2))) + + suite.Assert().NoError(retry.Constant(10*time.Second, retry.WithUnits(10*time.Millisecond)). + Retry(suite.assertStrObjects("sentences", SententceResourceType, []string{"one", "two"}, []string{"1 sentence", "2 sentence"}))) + + suite.Assert().NoError(suite.state.Create(suite.ctx, NewIntResource("ints", "three", 3))) + + suite.Assert().NoError(retry.Constant(10*time.Second, retry.WithUnits(10*time.Millisecond)). + Retry(suite.assertStrObjects("sentences", SententceResourceType, []string{"one", "two", "three"}, []string{"1 sentence", "2 sentence", "3 sentence"}))) + + _, err := suite.state.UpdateWithConflicts(suite.ctx, one.Metadata(), func(r resource.Resource) error { + r.(*IntResource).value = 11 + + return nil + }) + suite.Assert().NoError(err) + + suite.Assert().NoError(retry.Constant(10*time.Second, retry.WithUnits(10*time.Millisecond)). + Retry(suite.assertStrObjects("sentences", SententceResourceType, []string{"one", "two", "three"}, []string{"11 sentence", "2 sentence", "3 sentence"}))) + + ready, err := suite.state.Teardown(suite.ctx, one.Metadata()) + suite.Assert().NoError(err) + suite.Assert().False(ready) + + _, err = suite.state.WatchFor(suite.ctx, one.Metadata(), state.WithFinalizerEmpty()) + suite.Assert().NoError(err) + + suite.Assert().NoError(retry.Constant(10*time.Second, retry.WithUnits(10*time.Millisecond)). + Retry(suite.assertStrObjects("sentences", SententceResourceType, []string{"two", "three"}, []string{"2 sentence", "3 sentence"}))) + + suite.Assert().NoError(suite.state.Destroy(suite.ctx, one.Metadata())) +} + +func (suite *RuntimeSuite) TestSumControllers() { + suite.Require().NoError(suite.runtime.RegisterController(&SumController{ + SourceNamespace: "source", + TargetNamespace: "target", + })) + + suite.startRuntime() + + suite.Assert().NoError(retry.Constant(10*time.Second, retry.WithUnits(10*time.Millisecond)). + Retry(suite.assertIntObjects("target", IntResourceType, []string{"sum"}, []int{0}))) + + suite.Assert().NoError(suite.state.Create(suite.ctx, NewIntResource("source", "one", 1))) + suite.Assert().NoError(suite.state.Create(suite.ctx, NewIntResource("source", "two", 2))) + + suite.Assert().NoError(retry.Constant(10*time.Second, retry.WithUnits(10*time.Millisecond)). + Retry(suite.assertIntObjects("target", IntResourceType, []string{"sum"}, []int{3}))) + + suite.Assert().NoError(suite.state.Destroy(suite.ctx, NewIntResource("source", "one", 1).Metadata())) + + suite.Assert().NoError(retry.Constant(10*time.Second, retry.WithUnits(10*time.Millisecond)). + Retry(suite.assertIntObjects("target", IntResourceType, []string{"sum"}, []int{2}))) +} + +func TestRuntime(t *testing.T) { + t.Parallel() + + defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) + + suite.Run(t, new(RuntimeSuite)) +} diff --git a/pkg/resource/any.go b/pkg/resource/any.go new file mode 100644 index 00000000..84d5f15e --- /dev/null +++ b/pkg/resource/any.go @@ -0,0 +1,64 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package resource + +import ( + "fmt" + + "gopkg.in/yaml.v3" +) + +// Any can hold data from any resource type. +type Any struct { + md Metadata + spec interface{} + yaml []byte +} + +// SpecProto is a protobuf interface of resource spec. +type SpecProto interface { + GetYaml() []byte +} + +// NewAnyFromProto unmarshals Any from protobuf interface. +func NewAnyFromProto(protoMd MetadataProto, protoSpec SpecProto) (*Any, error) { + md, err := NewMetadataFromProto(protoMd) + if err != nil { + return nil, err + } + + any := &Any{ + md: md, + yaml: protoSpec.GetYaml(), + } + + if err = yaml.Unmarshal(any.yaml, &any.spec); err != nil { + return nil, err + } + + return any, nil +} + +// Metadata implements resource.Resource. +func (a *Any) Metadata() *Metadata { + return &a.md +} + +// Spec implements resource.Resource. +func (a *Any) Spec() interface{} { + return a.spec +} + +func (a *Any) String() string { + return fmt.Sprintf("Any(%s)", a.md) +} + +// DeepCopy implements resource.Resource. +func (a *Any) DeepCopy() Resource { + return &Any{ + md: a.md, + spec: a.spec, + } +} diff --git a/pkg/resource/any_test.go b/pkg/resource/any_test.go new file mode 100644 index 00000000..dabade7d --- /dev/null +++ b/pkg/resource/any_test.go @@ -0,0 +1,35 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package resource_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/talos-systems/os-runtime/pkg/resource" +) + +func TestAnyInterfaces(t *testing.T) { + t.Parallel() + + assert.Implements(t, (*resource.Resource)(nil), new(resource.Any)) +} + +type protoSpec struct{} + +func (s *protoSpec) GetYaml() []byte { + return []byte(`value: xyz +something: [a, b, c] +`) +} + +func TestNewAnyFromProto(t *testing.T) { + r, err := resource.NewAnyFromProto(&protoMd{}, &protoSpec{}) + assert.NoError(t, err) + + assert.Equal(t, map[string]interface{}{"something": []interface{}{"a", "b", "c"}, "value": "xyz"}, r.Spec()) + assert.Equal(t, "aaa", r.Metadata().ID()) +} diff --git a/pkg/resource/core/core.go b/pkg/resource/core/core.go new file mode 100644 index 00000000..4ea4bcd4 --- /dev/null +++ b/pkg/resource/core/core.go @@ -0,0 +1,11 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// Package core provides definition of core system resources. +package core + +import "github.com/talos-systems/os-runtime/pkg/resource" + +// NamespaceName is the name of 'core' namespace. +const NamespaceName resource.Namespace = "core" diff --git a/pkg/resource/core/namespace.go b/pkg/resource/core/namespace.go new file mode 100644 index 00000000..2595a55b --- /dev/null +++ b/pkg/resource/core/namespace.go @@ -0,0 +1,70 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package core + +import ( + "fmt" + + "github.com/talos-systems/os-runtime/pkg/resource" +) + +// NamespaceType is the type of Namespace. +const NamespaceType = resource.Type("core/namespace") + +// Namespace provides metadata about namespaces. +type Namespace struct { + md resource.Metadata + spec NamespaceSpec +} + +// NamespaceSpec provides Namespace definition. +type NamespaceSpec struct { + Description string `yaml:"description"` + System bool `yaml:"system"` + UserWritable bool `yaml:"userWritable"` +} + +// NewNamespace initializes a Namespace resource. +func NewNamespace(id resource.ID, spec NamespaceSpec) *Namespace { + r := &Namespace{ + md: resource.NewMetadata(NamespaceName, NamespaceType, id, resource.VersionUndefined), + spec: spec, + } + + r.md.BumpVersion() + + return r +} + +// Metadata implements resource.Resource. +func (r *Namespace) Metadata() *resource.Metadata { + return &r.md +} + +// Spec implements resource.Resource. +func (r *Namespace) Spec() interface{} { + return r.spec +} + +func (r *Namespace) String() string { + return fmt.Sprintf("Namespace(%q)", r.md.ID()) +} + +// DeepCopy implements resource.Resource. +func (r *Namespace) DeepCopy() resource.Resource { + return &Namespace{ + md: r.md, + spec: r.spec, + } +} + +// ResourceDefinition implements core.ResourceDefinitionProvider interface. +func (r *Namespace) ResourceDefinition() ResourceDefinitionSpec { + return ResourceDefinitionSpec{ + Type: NamespaceType, + Aliases: []resource.Type{"namespace", "namespaces", "ns"}, + DefaultNamespace: NamespaceName, + } +} diff --git a/pkg/resource/core/resource_definition.go b/pkg/resource/core/resource_definition.go new file mode 100644 index 00000000..9bea633d --- /dev/null +++ b/pkg/resource/core/resource_definition.go @@ -0,0 +1,76 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package core + +import ( + "fmt" + + "github.com/talos-systems/os-runtime/pkg/resource" +) + +// ResourceDefinitionType is the type of ResourceDefinition. +const ResourceDefinitionType = resource.Type("core/resource-definition") + +// ResourceDefinition provides metadata about namespaces. +type ResourceDefinition struct { + md resource.Metadata + spec ResourceDefinitionSpec +} + +// ResourceDefinitionSpec provides ResourceDefinition definition. +type ResourceDefinitionSpec struct { + Type resource.Type `yaml:"type"` + Aliases []resource.Type `yaml:"aliases"` + + DefaultNamespace resource.Namespace `yaml:"defaultNamespace"` +} + +// NewResourceDefinition initializes a ResourceDefinition resource. +func NewResourceDefinition(id resource.ID, spec ResourceDefinitionSpec) *ResourceDefinition { + r := &ResourceDefinition{ + md: resource.NewMetadata(NamespaceName, ResourceDefinitionType, id, resource.VersionUndefined), + spec: spec, + } + + r.md.BumpVersion() + + return r +} + +// Metadata implements resource.Resource. +func (r *ResourceDefinition) Metadata() *resource.Metadata { + return &r.md +} + +// Spec implements resource.Resource. +func (r *ResourceDefinition) Spec() interface{} { + return r.spec +} + +func (r *ResourceDefinition) String() string { + return fmt.Sprintf("Namespace(%q)", r.md.ID()) +} + +// DeepCopy implements resource.Resource. +func (r *ResourceDefinition) DeepCopy() resource.Resource { + return &ResourceDefinition{ + md: r.md, + spec: r.spec, + } +} + +// ResourceDefinition implements core.ResourceDefinitionProvider interface. +func (r *ResourceDefinition) ResourceDefinition() ResourceDefinitionSpec { + return ResourceDefinitionSpec{ + Type: ResourceDefinitionType, + Aliases: []resource.Type{"resource", "resources", "resourcedefinition"}, + DefaultNamespace: NamespaceName, + } +} + +// ResourceDefinitionProvider is implemented by resources which can be registered automatically. +type ResourceDefinitionProvider interface { + ResourceDefinition() ResourceDefinitionSpec +} diff --git a/pkg/resource/finalizer.go b/pkg/resource/finalizer.go new file mode 100644 index 00000000..024d2fc9 --- /dev/null +++ b/pkg/resource/finalizer.go @@ -0,0 +1,50 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package resource + +// Finalizer is a free-form string which blocks resource destruction. +// +// Resource can't be destroyed until all the finalizers are cleared. +type Finalizer = string + +// Finalizers is a set of Finalizer's with methods to add/remove items. +type Finalizers []Finalizer + +// Add a (unique) Finalizer to the set. +func (fins *Finalizers) Add(fin Finalizer) (added bool) { + *fins = append(Finalizers(nil), *fins...) + + for _, f := range *fins { + if f == fin { + return false + } + } + + *fins = append(*fins, fin) + + return true +} + +// Remove a (unique) Finalizer from the set. +func (fins *Finalizers) Remove(fin Finalizer) (removed bool) { + *fins = append(Finalizers(nil), *fins...) + + for i, f := range *fins { + if f == fin { + removed = true + + *fins = append((*fins)[:i], (*fins)[i+1:]...) + + return + } + } + + return +} + +// Empty returns true if list of finalizers is empty. +func (fins Finalizers) Empty() bool { + return len(fins) == 0 +} diff --git a/pkg/resource/finalizer_test.go b/pkg/resource/finalizer_test.go new file mode 100644 index 00000000..b63093fb --- /dev/null +++ b/pkg/resource/finalizer_test.go @@ -0,0 +1,47 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package resource_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/talos-systems/os-runtime/pkg/resource" +) + +func TestFinalizers(t *testing.T) { + const ( + A resource.Finalizer = "A" + B resource.Finalizer = "B" + C resource.Finalizer = "C" + ) + + var fins resource.Finalizers + + assert.True(t, fins.Empty()) + + assert.True(t, fins.Add(A)) + + finsCopy := fins + + assert.False(t, fins.Empty()) + assert.False(t, finsCopy.Empty()) + + assert.True(t, fins.Add(B)) + assert.False(t, fins.Add(B)) + + assert.True(t, finsCopy.Add(B)) + + assert.False(t, fins.Remove(C)) + assert.True(t, fins.Remove(B)) + assert.False(t, fins.Remove(B)) + + finsCopy = fins + + assert.True(t, finsCopy.Add(C)) + assert.True(t, fins.Add(C)) + assert.True(t, fins.Remove(C)) +} diff --git a/pkg/resource/kind.go b/pkg/resource/kind.go new file mode 100644 index 00000000..237c9459 --- /dev/null +++ b/pkg/resource/kind.go @@ -0,0 +1,11 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package resource + +// Kind is a Pointer minus resource ID. +type Kind interface { + Namespace() Namespace + Type() Type +} diff --git a/pkg/resource/list.go b/pkg/resource/list.go new file mode 100644 index 00000000..986ea390 --- /dev/null +++ b/pkg/resource/list.go @@ -0,0 +1,10 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package resource + +// List is a list of resources. +type List struct { + Items []Resource +} diff --git a/pkg/resource/metadata.go b/pkg/resource/metadata.go index 61a0c5ad..958236d3 100644 --- a/pkg/resource/metadata.go +++ b/pkg/resource/metadata.go @@ -4,23 +4,31 @@ package resource -import "fmt" +import ( + "fmt" + "sort" + + "gopkg.in/yaml.v3" +) // Metadata implements resource meta. type Metadata struct { - ns Namespace - typ Type - id ID - ver Version + ns Namespace + typ Type + id ID + ver Version + fins Finalizers + phase Phase } // NewMetadata builds new metadata. func NewMetadata(ns Namespace, typ Type, id ID, ver Version) Metadata { return Metadata{ - ns: ns, - typ: typ, - id: id, - ver: ver, + ns: ns, + typ: typ, + id: id, + ver: ver, + phase: PhaseRunning, } } @@ -31,7 +39,7 @@ func (md Metadata) ID() ID { // Type returns resource types. func (md Metadata) Type() Type { - return md.id + return md.typ } // Namespace returns resource namespace. @@ -39,28 +47,185 @@ func (md Metadata) Namespace() Namespace { return md.ns } +// Copy returns metadata copy. +func (md Metadata) Copy() Metadata { + return md +} + // Version returns resource version. func (md Metadata) Version() Version { return md.ver } // SetVersion updates resource version. -func (md Metadata) SetVersion(newVersion Version) { +func (md *Metadata) SetVersion(newVersion Version) { md.ver = newVersion } // BumpVersion increments resource version. -func (md Metadata) BumpVersion() { - if md.ver.uint64 == nil { - v := uint64(1) +func (md *Metadata) BumpVersion() { + var v uint64 - md.ver.uint64 = &v + if md.ver.uint64 == nil { + v = uint64(1) } else { - *md.ver.uint64++ + v = *md.ver.uint64 + 1 } + + md.ver.uint64 = &v +} + +// Finalizers returns a reference to the finalizers. +func (md *Metadata) Finalizers() *Finalizers { + return &md.fins +} + +// Phase returns current resource phase. +func (md Metadata) Phase() Phase { + return md.phase +} + +// SetPhase updates resource state. +func (md *Metadata) SetPhase(newPhase Phase) { + md.phase = newPhase } // String implements fmt.Stringer. func (md Metadata) String() string { return fmt.Sprintf("%s(%s/%s@%s)", md.typ, md.ns, md.id, md.ver) } + +// Equal tests two metadata objects for equality. +func (md Metadata) Equal(other Metadata) bool { + equal := md.ns == other.ns && md.typ == other.typ && md.id == other.id && md.phase == other.phase && md.ver.Equal(other.ver) + if !equal { + return false + } + + if len(md.fins) != len(other.fins) { + return false + } + + if md.fins == nil && other.fins == nil { + return true + } + + fins := append(Finalizers(nil), md.fins...) + otherFins := append(Finalizers(nil), other.fins...) + + sort.Strings(fins) + sort.Strings(otherFins) + + for i := range fins { + if fins[i] != otherFins[i] { + return false + } + } + + return true +} + +// MarshalYAML implements yaml.Marshaller interface. +func (md *Metadata) MarshalYAML() (interface{}, error) { + var finalizers []*yaml.Node + + if !md.fins.Empty() { + finalizers = []*yaml.Node{ + { + Kind: yaml.ScalarNode, + Value: "finalizers", + }, + { + Kind: yaml.SequenceNode, + Content: make([]*yaml.Node, 0, len(md.fins)), + }, + } + + for _, fin := range md.fins { + finalizers[1].Content = append(finalizers[1].Content, &yaml.Node{ + Kind: yaml.ScalarNode, + Value: fin, + }) + } + } + + return &yaml.Node{ + Kind: yaml.MappingNode, + Content: append( + []*yaml.Node{ + { + Kind: yaml.ScalarNode, + Value: "namespace", + }, + { + Kind: yaml.ScalarNode, + Value: md.ns, + }, + { + Kind: yaml.ScalarNode, + Value: "type", + }, + { + Kind: yaml.ScalarNode, + Value: md.typ, + }, + { + Kind: yaml.ScalarNode, + Value: "id", + }, + { + Kind: yaml.ScalarNode, + Value: md.id, + }, + { + Kind: yaml.ScalarNode, + Value: "version", + }, + { + Kind: yaml.ScalarNode, + Value: md.ver.String(), + }, + { + Kind: yaml.ScalarNode, + Value: "phase", + }, + { + Kind: yaml.ScalarNode, + Value: md.phase.String(), + }, + }, + finalizers...), + }, nil +} + +// MetadataProto is an interface for protobuf serialization of Metadata. +type MetadataProto interface { + GetNamespace() string + GetType() string + GetId() string + GetVersion() string + GetPhase() string + GetFinalizers() []string +} + +// NewMetadataFromProto builds Metadata object from ProtoMetadata interface data. +func NewMetadataFromProto(proto MetadataProto) (Metadata, error) { + ver, err := ParseVersion(proto.GetVersion()) + if err != nil { + return Metadata{}, err + } + + phase, err := ParsePhase(proto.GetPhase()) + if err != nil { + return Metadata{}, err + } + + md := NewMetadata(proto.GetNamespace(), proto.GetType(), proto.GetId(), ver) + md.SetPhase(phase) + + for _, fin := range proto.GetFinalizers() { + md.Finalizers().Add(fin) + } + + return md, nil +} diff --git a/pkg/resource/metadata_test.go b/pkg/resource/metadata_test.go new file mode 100644 index 00000000..4e2bf277 --- /dev/null +++ b/pkg/resource/metadata_test.go @@ -0,0 +1,145 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package resource_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v3" + + "github.com/talos-systems/os-runtime/pkg/resource" +) + +func TestMetadata(t *testing.T) { + t.Parallel() + + md := resource.NewMetadata("default", "type", "aaa", resource.VersionUndefined) + assert.Equal(t, "default", md.Namespace()) + assert.Equal(t, "type", md.Type()) + assert.Equal(t, "aaa", md.ID()) + assert.Equal(t, resource.VersionUndefined, md.Version()) + assert.Equal(t, "undefined", md.Version().String()) + + md.BumpVersion() + assert.Equal(t, "1", md.Version().String()) + + md.BumpVersion() + assert.Equal(t, "2", md.Version().String()) + + assert.True(t, md.Equal(md)) + + other := resource.NewMetadata("default", "type", "bbb", resource.VersionUndefined) + other.BumpVersion() + + md.SetVersion(other.Version()) + assert.Equal(t, "1", md.Version().String()) + + assert.Equal(t, resource.PhaseRunning, md.Phase()) + + md.SetPhase(resource.PhaseTearingDown) + assert.Equal(t, resource.PhaseTearingDown, md.Phase()) + + assert.True(t, md.Finalizers().Empty()) + assert.True(t, md.Finalizers().Add("A")) + assert.False(t, md.Finalizers().Add("A")) + + assert.False(t, md.Equal(other)) + + md = resource.NewMetadata("default", "type", "aaa", resource.VersionUndefined) + mdCopy := md.Copy() + + assert.True(t, md.Equal(mdCopy)) + + assert.True(t, md.Finalizers().Add("A")) + assert.False(t, md.Equal(mdCopy)) + + assert.True(t, mdCopy.Finalizers().Add("B")) + assert.False(t, md.Equal(mdCopy)) + + assert.True(t, mdCopy.Finalizers().Add("A")) + assert.True(t, md.Finalizers().Add("B")) + assert.True(t, md.Equal(mdCopy)) + + md.BumpVersion() + assert.False(t, md.Equal(mdCopy)) + + mdCopy.BumpVersion() + assert.True(t, md.Equal(mdCopy)) + + md.SetPhase(resource.PhaseTearingDown) + assert.False(t, md.Equal(mdCopy)) +} + +func TestMetadataMarshalYAML(t *testing.T) { + t.Parallel() + + md := resource.NewMetadata("default", "type", "aaa", resource.VersionUndefined) + md.BumpVersion() + + out, err := yaml.Marshal(&md) + assert.NoError(t, err) + assert.Equal(t, `namespace: default +type: type +id: aaa +version: 1 +phase: running +`, string(out)) + + md.Finalizers().Add("\"resource1") + md.Finalizers().Add("resource2") + + out, err = yaml.Marshal(&md) + assert.NoError(t, err) + assert.Equal(t, `namespace: default +type: type +id: aaa +version: 1 +phase: running +finalizers: + - '"resource1' + - resource2 +`, string(out)) +} + +type protoMd struct{} + +func (p *protoMd) GetNamespace() string { + return "default" +} + +func (p *protoMd) GetType() string { + return "type" +} + +//nolint: golint, stylecheck +func (p *protoMd) GetId() string { + return "aaa" +} + +func (p *protoMd) GetVersion() string { + return "1" +} + +func (p *protoMd) GetPhase() string { + return resource.PhaseRunning.String() +} + +func (p *protoMd) GetFinalizers() []string { + return []string{"resource1", "resource2"} +} + +func TestNewMedataFromProto(t *testing.T) { + md, err := resource.NewMetadataFromProto(&protoMd{}) + assert.NoError(t, err) + + other := resource.NewMetadata("default", "type", "aaa", resource.VersionUndefined) + other.BumpVersion() + + other.Finalizers().Add("resource1") + other.Finalizers().Add("resource2") + + assert.True(t, md.Equal(other)) +} diff --git a/pkg/resource/phase.go b/pkg/resource/phase.go new file mode 100644 index 00000000..d7045f5a --- /dev/null +++ b/pkg/resource/phase.go @@ -0,0 +1,39 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package resource + +import "fmt" + +// Phase represents state of the resource. +// +// Resource might be either Running or TearingDown (waiting for the finalizers to be removed). +type Phase int + +// Phase constants. +const ( + PhaseRunning Phase = iota + PhaseTearingDown +) + +const ( + strPhaseRunning = "running" + strPhaseTearingDown = "tearingDown" +) + +func (ph Phase) String() string { + return [...]string{strPhaseRunning, strPhaseTearingDown}[ph] +} + +// ParsePhase from string representation. +func ParsePhase(ph string) (Phase, error) { + switch ph { + case strPhaseRunning: + return PhaseRunning, nil + case strPhaseTearingDown: + return PhaseTearingDown, nil + default: + return 0, fmt.Errorf("uknown phase: %v", ph) + } +} diff --git a/pkg/resource/pointer.go b/pkg/resource/pointer.go index 20e90f82..34e72bd3 100644 --- a/pkg/resource/pointer.go +++ b/pkg/resource/pointer.go @@ -6,7 +6,7 @@ package resource // Pointer is a Reference minus resource version. type Pointer interface { - Namespace() Namespace - Type() Type + Kind + ID() ID } diff --git a/pkg/resource/resource.go b/pkg/resource/resource.go index 737a61d2..9f585baf 100644 --- a/pkg/resource/resource.go +++ b/pkg/resource/resource.go @@ -7,7 +7,7 @@ package resource import ( "fmt" - "strconv" + "reflect" ) type ( @@ -17,27 +17,10 @@ type ( // // Type could be e.g. runtime/os/mount. Type = string - // Version of a resource. - Version struct { - *uint64 - } // Namespace of a resource. Namespace = string ) -// Special version constants. -var ( - VersionUndefined = Version{} -) - -func (v Version) String() string { - if v.uint64 == nil { - return "undefined" - } - - return strconv.FormatUint(*v.uint64, 10) -} - // Resource is an abstract resource managed by the state. // // Resource is uniquely identified by the tuple (Namespace, Type, ID). @@ -50,11 +33,31 @@ type Resource interface { // Metadata for the resource. // // Metadata.Version should change each time Spec changes. - Metadata() Metadata + Metadata() *Metadata // Opaque data resource contains. Spec() interface{} // Deep copy of the resource. - Copy() Resource + DeepCopy() Resource +} + +// Equal tests two resources for equality. +func Equal(r1, r2 Resource) bool { + if !r1.Metadata().Equal(*r2.Metadata()) { + return false + } + + return reflect.DeepEqual(r1.Spec(), r2.Spec()) +} + +// MarshalYAML marshals resource to YAML definition. +func MarshalYAML(r Resource) (interface{}, error) { + return &struct { + Metadata *Metadata `yaml:"metadata"` + Spec interface{} `yaml:"spec"` + }{ + Metadata: r.Metadata(), + Spec: r.Spec(), + }, nil } diff --git a/pkg/resource/resource_test.go b/pkg/resource/resource_test.go index ac1f26fe..6076b141 100644 --- a/pkg/resource/resource_test.go +++ b/pkg/resource/resource_test.go @@ -13,11 +13,15 @@ import ( ) func TestInterfaces(t *testing.T) { + t.Parallel() + assert.Implements(t, (*resource.Reference)(nil), resource.Metadata{}) assert.Implements(t, (*resource.Resource)(nil), new(resource.Tombstone)) } func TestIsTombstone(t *testing.T) { + t.Parallel() + assert.True(t, resource.IsTombstone(new(resource.Tombstone))) assert.False(t, resource.IsTombstone((resource.Resource)(nil))) } diff --git a/pkg/resource/tombstone.go b/pkg/resource/tombstone.go index 7324cf32..ecedb730 100644 --- a/pkg/resource/tombstone.go +++ b/pkg/resource/tombstone.go @@ -28,8 +28,8 @@ func (t *Tombstone) String() string { // Metadata for the resource. // // Metadata.Version should change each time Spec changes. -func (t *Tombstone) Metadata() Metadata { - return t.ref +func (t *Tombstone) Metadata() *Metadata { + return &t.ref } // Spec is not implemented for tobmstones. @@ -37,8 +37,8 @@ func (t *Tombstone) Spec() interface{} { panic("tombstone doesn't contain spec") } -// Copy returns self, as tombstone is immutable. -func (t *Tombstone) Copy() Resource { +// DeepCopy returns self, as tombstone is immutable. +func (t *Tombstone) DeepCopy() Resource { return t } diff --git a/pkg/resource/version.go b/pkg/resource/version.go new file mode 100644 index 00000000..e2d24cef --- /dev/null +++ b/pkg/resource/version.go @@ -0,0 +1,60 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package resource + +import ( + "fmt" + "strconv" + + "github.com/AlekSi/pointer" +) + +// Version of a resource. +type Version struct { + // make versions uncomparable with equality operator + _ [0]func() + + *uint64 +} + +// Special version constants. +var ( + VersionUndefined = Version{} +) + +const undefinedVersion = "undefined" + +func (v Version) String() string { + if v.uint64 == nil { + return undefinedVersion + } + + return strconv.FormatUint(*v.uint64, 10) +} + +// Equal compares versions. +func (v Version) Equal(other Version) bool { + if v.uint64 == nil || other.uint64 == nil { + return v.uint64 == nil && other.uint64 == nil + } + + return *v.uint64 == *other.uint64 +} + +// ParseVersion from string representation. +func ParseVersion(ver string) (Version, error) { + if ver == undefinedVersion { + return VersionUndefined, nil + } + + intVersion, err := strconv.ParseInt(ver, 10, 64) + if err != nil { + return VersionUndefined, fmt.Errorf("error parsing version: %w", err) + } + + return Version{ + uint64: pointer.ToUint64(uint64(intVersion)), + }, nil +} diff --git a/pkg/state/condition.go b/pkg/state/condition.go index 34c6c5d6..5a972508 100644 --- a/pkg/state/condition.go +++ b/pkg/state/condition.go @@ -4,7 +4,9 @@ package state -import "github.com/talos-systems/os-runtime/pkg/resource" +import ( + "github.com/talos-systems/os-runtime/pkg/resource" +) // ResourceConditionFunc checks some condition on the resource. type ResourceConditionFunc func(resource.Resource) (bool, error) @@ -15,6 +17,10 @@ type WatchForCondition struct { EventTypes []EventType // If set, match only if func returns true. Condition ResourceConditionFunc + // If true, wait for the finalizers to empty + FinalizersEmpty bool + // If set, wait for resource phase to be one of the specified. + Phases []resource.Phase } // Matches checks whether event matches a condition. @@ -46,6 +52,32 @@ func (condition *WatchForCondition) Matches(event Event) (bool, error) { } } + if condition.FinalizersEmpty { + if event.Type == Destroyed { + return false, nil + } + + if !event.Resource.Metadata().Finalizers().Empty() { + return false, nil + } + } + + if condition.Phases != nil { + matched := false + + for _, phase := range condition.Phases { + if event.Resource.Metadata().Phase() == phase { + matched = true + + break + } + } + + if !matched { + return false, nil + } + } + // no conditions denied the event, consider it matching return true, nil } @@ -70,3 +102,21 @@ func WithCondition(conditionFunc ResourceConditionFunc) WatchForConditionFunc { return nil } } + +// WithFinalizerEmpty waits for the resource finalizers to be empty. +func WithFinalizerEmpty() WatchForConditionFunc { + return func(condition *WatchForCondition) error { + condition.FinalizersEmpty = true + + return nil + } +} + +// WithPhases watches for specified resource phases. +func WithPhases(phases ...resource.Phase) WatchForConditionFunc { + return func(condition *WatchForCondition) error { + condition.Phases = append(condition.Phases, phases...) + + return nil + } +} diff --git a/pkg/state/conformance/conformance.go b/pkg/state/conformance/conformance.go new file mode 100644 index 00000000..ebf877b3 --- /dev/null +++ b/pkg/state/conformance/conformance.go @@ -0,0 +1,6 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// Package conformance implements tests which verify conformance of the implementation with the spec. +package conformance diff --git a/pkg/state/conformance/resources.go b/pkg/state/conformance/resources.go new file mode 100644 index 00000000..566d9a36 --- /dev/null +++ b/pkg/state/conformance/resources.go @@ -0,0 +1,52 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package conformance + +import ( + "fmt" + + "github.com/talos-systems/os-runtime/pkg/resource" +) + +// PathResourceType is the type of PathResource. +const PathResourceType = resource.Type("os/path") + +// PathResource represents a path in the filesystem. +// +// Resource ID is the path. +type PathResource struct { + md resource.Metadata +} + +// NewPathResource creates new PathResource. +func NewPathResource(ns resource.Namespace, path string) *PathResource { + r := &PathResource{ + md: resource.NewMetadata(ns, PathResourceType, path, resource.VersionUndefined), + } + r.md.BumpVersion() + + return r +} + +// Metadata implements resource.Resource. +func (path *PathResource) Metadata() *resource.Metadata { + return &path.md +} + +// Spec implements resource.Resource. +func (path *PathResource) Spec() interface{} { + return nil +} + +func (path *PathResource) String() string { + return fmt.Sprintf("PathResource(%q)", path.md.ID()) +} + +// DeepCopy implements resource.Resource. +func (path *PathResource) DeepCopy() resource.Resource { + return &PathResource{ + md: path.md, + } +} diff --git a/pkg/state/conformance/state.go b/pkg/state/conformance/state.go new file mode 100644 index 00000000..1517d810 --- /dev/null +++ b/pkg/state/conformance/state.go @@ -0,0 +1,376 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package conformance + +import ( + "context" + "math/rand" + "sort" + "sync" + "time" + + "github.com/stretchr/testify/suite" + "golang.org/x/sync/errgroup" + + "github.com/talos-systems/os-runtime/pkg/resource" + "github.com/talos-systems/os-runtime/pkg/state" +) + +// StateSuite implements conformance test for state.State. +type StateSuite struct { + suite.Suite + + State state.State + + Namespaces []resource.Namespace +} + +func (suite *StateSuite) getNamespace() resource.Namespace { + if len(suite.Namespaces) == 0 { + return "default" + } + + return suite.Namespaces[rand.Intn(len(suite.Namespaces))] +} + +// TestCRD verifies create, read, delete. +func (suite *StateSuite) TestCRD() { + path1 := NewPathResource(suite.getNamespace(), "var/run") + path2 := NewPathResource(suite.getNamespace(), "var/lib") + + suite.Require().NotEqual(path1.String(), path2.String()) + + ctx := context.Background() + + _, err := suite.State.Get(ctx, path1.Metadata()) + suite.Assert().Error(err) + suite.Assert().True(state.IsNotFoundError(err)) + + _, err = suite.State.Get(ctx, path2.Metadata()) + suite.Assert().Error(err) + suite.Assert().True(state.IsNotFoundError(err)) + + list, err := suite.State.List(ctx, path1.Metadata()) + suite.Require().NoError(err) + suite.Assert().Empty(list.Items) + + suite.Require().NoError(suite.State.Create(ctx, path1)) + suite.Require().NoError(suite.State.Create(ctx, path2)) + + r, err := suite.State.Get(ctx, path1.Metadata()) + suite.Require().NoError(err) + suite.Assert().Equal(path1.String(), r.String()) + + r, err = suite.State.Get(ctx, path2.Metadata()) + suite.Require().NoError(err) + suite.Assert().Equal(path2.String(), r.String()) + + for _, res := range []resource.Resource{path1, path2} { + list, err = suite.State.List(ctx, res.Metadata()) + suite.Require().NoError(err) + + if path1.Metadata().Namespace() == path2.Metadata().Namespace() { + suite.Assert().Len(list.Items, 2) + + ids := make([]string, len(list.Items)) + + for i := range ids { + ids[i] = list.Items[i].String() + } + + sort.Strings(ids) + + suite.Assert().Equal([]string{path2.String(), path1.String()}, ids) + } else { + suite.Assert().Len(list.Items, 1) + + suite.Assert().Equal(res.String(), list.Items[0].String()) + } + } + + err = suite.State.Create(ctx, path1) + suite.Assert().Error(err) + suite.Assert().True(state.IsConflictError(err)) + + destroyReady, err := suite.State.Teardown(ctx, path1.Metadata()) + suite.Require().NoError(err) + suite.Assert().True(destroyReady) + + suite.Require().NoError(suite.State.Destroy(ctx, path1.Metadata())) + + _, err = suite.State.Teardown(ctx, path1.Metadata()) + suite.Assert().Error(err) + suite.Assert().True(state.IsNotFoundError(err)) + + err = suite.State.Destroy(ctx, path1.Metadata()) + suite.Assert().Error(err) + suite.Assert().True(state.IsNotFoundError(err)) + + _, err = suite.State.Get(ctx, path1.Metadata()) + suite.Assert().Error(err) + suite.Assert().True(state.IsNotFoundError(err)) + + list, err = suite.State.List(ctx, path2.Metadata()) + suite.Require().NoError(err) + suite.Assert().Len(list.Items, 1) + suite.Assert().Equal(path2.String(), list.Items[0].String()) + + destroyReady, err = suite.State.Teardown(ctx, path2.Metadata()) + suite.Require().NoError(err) + suite.Assert().True(destroyReady) + + suite.Require().NoError(suite.State.Create(ctx, path1)) +} + +// TestWatchKind verifies WatchKind API. +func (suite *StateSuite) TestWatchKind() { + ns := suite.getNamespace() + path1 := NewPathResource(ns, "var/db") + path2 := NewPathResource(ns, "var/tmp") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + suite.Require().NoError(suite.State.Create(ctx, path1)) + + ch := make(chan state.Event) + + suite.Require().NoError(suite.State.WatchKind(ctx, path1.Metadata(), ch)) + + suite.Require().NoError(suite.State.Create(ctx, path2)) + + select { + case event := <-ch: + suite.Assert().Equal(state.Created, event.Type) + suite.Assert().Equal(path2.String(), event.Resource.String()) + case <-time.After(time.Second): + suite.FailNow("timed out waiting for event") + } + + _, err := suite.State.Teardown(ctx, path1.Metadata()) + suite.Require().NoError(err) + suite.Require().NoError(suite.State.Destroy(ctx, path1.Metadata())) + + select { + case event := <-ch: + suite.Assert().Equal(state.Updated, event.Type) + suite.Assert().Equal(path1.String(), event.Resource.String()) + case <-time.After(time.Second): + suite.FailNow("timed out waiting for event") + } + + select { + case event := <-ch: + suite.Assert().Equal(state.Destroyed, event.Type) + suite.Assert().Equal(path1.String(), event.Resource.String()) + case <-time.After(time.Second): + suite.FailNow("timed out waiting for event") + } + + oldVersion := path2.Metadata().Version() + path2.Metadata().BumpVersion() + + suite.Require().NoError(suite.State.Update(ctx, oldVersion, path2)) + + select { + case event := <-ch: + suite.Assert().Equal(state.Updated, event.Type) + suite.Assert().Equal(path2.String(), event.Resource.String()) + suite.Assert().Equal(path2.Metadata().Version(), event.Resource.Metadata().Version()) + case <-time.After(time.Second): + suite.FailNow("timed out waiting for event") + } +} + +// TestConcurrentFinalizers perform concurrent finalizer updates. +func (suite *StateSuite) TestConcurrentFinalizers() { + ns := suite.getNamespace() + path := NewPathResource(ns, "var/final") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + suite.Require().NoError(suite.State.Create(ctx, path)) + + var eg errgroup.Group + + for _, fin := range []resource.Finalizer{"A", "B", "C", "D", "E", "F", "G", "H"} { + fin := fin + + eg.Go(func() error { + return suite.State.AddFinalizer(ctx, path.Metadata(), fin) + }) + } + + for _, fin := range []resource.Finalizer{"A", "B", "C"} { + fin := fin + + eg.Go(func() error { + return suite.State.RemoveFinalizer(ctx, path.Metadata(), fin) + }) + } + + suite.Assert().NoError(eg.Wait()) + + eg = errgroup.Group{} + + for _, fin := range []resource.Finalizer{"A", "B", "C"} { + fin := fin + + eg.Go(func() error { + return suite.State.RemoveFinalizer(ctx, path.Metadata(), fin) + }) + } + + suite.Assert().NoError(eg.Wait()) + + pathRes, err := suite.State.Get(ctx, path.Metadata()) + suite.Require().NoError(err) + + path = pathRes.(*PathResource) //nolint: errcheck + + finalizers := path.Metadata().Finalizers() + sort.Strings(*finalizers) + + suite.Assert().Equal(resource.Finalizers{"D", "E", "F", "G", "H"}, *finalizers) +} + +// TestWatchFor verifies WatchFor. +func (suite *StateSuite) TestWatchFor() { + ns := suite.getNamespace() + path1 := NewPathResource(ns, "tmp/one") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var ( + wg sync.WaitGroup + r resource.Resource + err error + ) + + wg.Add(1) + + go func() { + defer wg.Done() + + r, err = suite.State.WatchFor(ctx, path1.Metadata(), state.WithEventTypes(state.Created)) + }() + + suite.Require().NoError(suite.State.Create(ctx, path1)) + + wg.Wait() + + suite.Assert().NoError(err) + suite.Assert().Equal(r.Metadata().String(), path1.Metadata().String()) + + r, err = suite.State.WatchFor(ctx, path1.Metadata(), state.WithFinalizerEmpty()) + suite.Assert().NoError(err) + suite.Assert().Equal(r.Metadata().String(), path1.Metadata().String()) + + wg.Add(1) + + go func() { + defer wg.Done() + + r, err = suite.State.WatchFor(ctx, path1.Metadata(), state.WithPhases(resource.PhaseTearingDown)) + }() + + ready, e := suite.State.Teardown(ctx, path1.Metadata()) + suite.Require().NoError(e) + suite.Assert().True(ready) + + wg.Wait() + suite.Assert().NoError(err) + suite.Assert().Equal(r.Metadata().ID(), path1.Metadata().ID()) + suite.Assert().Equal(resource.PhaseTearingDown, r.Metadata().Phase()) + + wg.Add(1) + + go func() { + defer wg.Done() + + r, err = suite.State.WatchFor(ctx, path1.Metadata(), state.WithEventTypes(state.Destroyed)) + }() + + suite.Assert().NoError(suite.State.AddFinalizer(ctx, path1.Metadata(), "A")) + suite.Assert().NoError(suite.State.RemoveFinalizer(ctx, path1.Metadata(), "A")) + + suite.Assert().NoError(suite.State.Destroy(ctx, path1.Metadata())) + + wg.Wait() + suite.Assert().NoError(err) + suite.Assert().Equal(r.Metadata().ID(), path1.Metadata().ID()) +} + +// TestTeardownDestroy verifies finalizers, teardown and destroy. +func (suite *StateSuite) TestTeardownDestroy() { + ns := suite.getNamespace() + path1 := NewPathResource(ns, "tmp/1") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + suite.Require().NoError(suite.State.Create(ctx, path1)) + + suite.Assert().NoError(suite.State.AddFinalizer(ctx, path1.Metadata(), "A")) + + err := suite.State.Destroy(ctx, path1.Metadata()) + suite.Require().Error(err) + suite.Assert().True(state.IsConflictError(err)) + + ready, err := suite.State.Teardown(ctx, path1.Metadata()) + suite.Require().NoError(err) + suite.Assert().False(ready) + + ready, err = suite.State.Teardown(ctx, path1.Metadata()) + suite.Require().NoError(err) + suite.Assert().False(ready) + + err = suite.State.Destroy(ctx, path1.Metadata()) + suite.Require().Error(err) + suite.Assert().True(state.IsConflictError(err)) + + suite.Assert().NoError(suite.State.RemoveFinalizer(ctx, path1.Metadata(), "A")) + + ready, err = suite.State.Teardown(ctx, path1.Metadata()) + suite.Require().NoError(err) + suite.Assert().True(ready) + + suite.Assert().NoError(suite.State.Destroy(ctx, path1.Metadata())) +} + +// TestUpdate verifies update flow. +func (suite *StateSuite) TestUpdate() { + ns := suite.getNamespace() + path1 := NewPathResource(ns, "tmp/path1") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err := suite.State.Update(ctx, path1.Metadata().Version(), path1) + suite.Assert().Error(err) + suite.Assert().True(state.IsNotFoundError(err)) + + suite.Require().NoError(suite.State.Create(ctx, path1)) + + err = suite.State.Update(ctx, path1.Metadata().Version(), path1) + suite.Assert().Error(err) + suite.Assert().True(state.IsConflictError(err)) + + md := path1.Metadata().Copy() + md.BumpVersion() + + suite.Assert().False(md.Version().Equal(path1.Metadata().Version())) + + err = suite.State.Update(ctx, md.Version(), path1) + suite.Assert().Error(err) + suite.Assert().True(state.IsConflictError(err)) + + curVersion := path1.Metadata().Version() + path1.Metadata().BumpVersion() + + suite.Assert().NoError(suite.State.Update(ctx, curVersion, path1)) +} diff --git a/pkg/state/impl/local/local_test.go b/pkg/state/impl/inmem/build.go similarity index 53% rename from pkg/state/impl/local/local_test.go rename to pkg/state/impl/inmem/build.go index 42819f58..e27e5971 100644 --- a/pkg/state/impl/local/local_test.go +++ b/pkg/state/impl/inmem/build.go @@ -2,17 +2,14 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -package local_test +package inmem import ( - "testing" - - "github.com/stretchr/testify/assert" - + "github.com/talos-systems/os-runtime/pkg/resource" "github.com/talos-systems/os-runtime/pkg/state" - "github.com/talos-systems/os-runtime/pkg/state/impl/local" ) -func TestInterfaces(t *testing.T) { - assert.Implements(t, (*state.CoreState)(nil), new(local.State)) +// Build a local state for namespace. +func Build(ns resource.Namespace) state.CoreState { + return NewState(ns) } diff --git a/pkg/state/impl/local/collection.go b/pkg/state/impl/inmem/collection.go similarity index 70% rename from pkg/state/impl/local/collection.go rename to pkg/state/impl/inmem/collection.go index ee95f376..f61060ee 100644 --- a/pkg/state/impl/local/collection.go +++ b/pkg/state/impl/inmem/collection.go @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -package local +package inmem import ( "context" @@ -18,7 +18,6 @@ type ResourceCollection struct { c *sync.Cond storage map[resource.ID]resource.Resource - rip map[resource.ID]struct{} stream []state.Event @@ -44,7 +43,6 @@ func NewResourceCollection(ns resource.Namespace, typ resource.Type) *ResourceCo cap: cap, gap: gap, storage: make(map[resource.ID]resource.Resource), - rip: make(map[resource.ID]struct{}), stream: make([]state.Event, cap), } @@ -71,12 +69,28 @@ func (collection *ResourceCollection) Get(resourceID resource.ID) (resource.Reso return nil, ErrNotFound(resource.NewMetadata(collection.ns, collection.typ, resourceID, resource.VersionUndefined)) } - return res.Copy(), nil + return res.DeepCopy(), nil +} + +// List resources. +func (collection *ResourceCollection) List() (resource.List, error) { + collection.mu.Lock() + defer collection.mu.Unlock() + + result := resource.List{ + Items: make([]resource.Resource, 0, len(collection.storage)), + } + + for _, res := range collection.storage { + result.Items = append(result.Items, res.DeepCopy()) + } + + return result, nil } // Create a resource. func (collection *ResourceCollection) Create(resource resource.Resource) error { - resource = resource.Copy() + resource = resource.DeepCopy() id := resource.Metadata().ID() collection.mu.Lock() @@ -97,7 +111,7 @@ func (collection *ResourceCollection) Create(resource resource.Resource) error { // Update a resource. func (collection *ResourceCollection) Update(curVersion resource.Version, newResource resource.Resource) error { - newResource = newResource.Copy() + newResource = newResource.DeepCopy() id := newResource.Metadata().ID() collection.mu.Lock() @@ -108,7 +122,11 @@ func (collection *ResourceCollection) Update(curVersion resource.Version, newRes return ErrNotFound(newResource.Metadata()) } - if curResource.Metadata().Version() != curVersion { + if newResource.Metadata().Version().Equal(curVersion) { + return ErrUpdateSameVersion(curResource.Metadata(), curVersion) + } + + if !curResource.Metadata().Version().Equal(curVersion) { return ErrVersionConflict(curResource.Metadata(), curVersion, curResource.Metadata().Version()) } @@ -122,51 +140,23 @@ func (collection *ResourceCollection) Update(curVersion resource.Version, newRes return nil } -// Teardown a resource. -func (collection *ResourceCollection) Teardown(ref resource.Reference) error { - id := ref.ID() +// Destroy a resource. +func (collection *ResourceCollection) Destroy(ptr resource.Pointer) error { + id := ptr.ID() collection.mu.Lock() defer collection.mu.Unlock() resource, exists := collection.storage[id] if !exists { - return ErrNotFound(ref) + return ErrNotFound(ptr) } - if resource.Metadata().Version() != ref.Version() { - return ErrVersionConflict(ref, ref.Version(), resource.Metadata().Version()) - } - - _, torndown := collection.rip[id] - if torndown { - return ErrAlreadyTorndown(resource.Metadata()) - } - - collection.rip[id] = struct{}{} - - collection.publish(state.Event{ - Type: state.Torndown, - Resource: resource.Copy(), - }) - - return nil -} - -// Destroy a resource. -func (collection *ResourceCollection) Destroy(ref resource.Reference) error { - id := ref.ID() - - collection.mu.Lock() - defer collection.mu.Unlock() - - resource, exists := collection.storage[id] - if !exists { - return ErrNotFound(ref) + if !resource.Metadata().Finalizers().Empty() { + return ErrPendingFinalizers(*resource.Metadata()) } delete(collection.storage, id) - delete(collection.rip, id) collection.publish(state.Event{ Type: state.Destroyed, @@ -176,7 +166,7 @@ func (collection *ResourceCollection) Destroy(ref resource.Reference) error { return nil } -// Watch for resource changes. +// Watch for specific resource changes. // //nolint: gocognit func (collection *ResourceCollection) Watch(ctx context.Context, id resource.ID, ch chan<- state.Event) error { @@ -185,19 +175,14 @@ func (collection *ResourceCollection) Watch(ctx context.Context, id resource.ID, pos := collection.writePos curResource := collection.storage[id] - _, inTeardown := collection.rip[id] go func() { var event state.Event if curResource != nil { - event.Resource = curResource.Copy() + event.Resource = curResource.DeepCopy() - if inTeardown { - event.Type = state.Torndown - } else { - event.Type = state.Created - } + event.Type = state.Created } else { event.Resource = resource.NewTombstone(resource.NewMetadata(collection.ns, collection.typ, id, resource.VersionUndefined)) event.Type = state.Destroyed @@ -261,3 +246,52 @@ func (collection *ResourceCollection) Watch(ctx context.Context, id resource.ID, return nil } + +// WatchAll for any resource change stored in this collection. +func (collection *ResourceCollection) WatchAll(ctx context.Context, ch chan<- state.Event) error { + collection.mu.Lock() + defer collection.mu.Unlock() + + pos := collection.writePos + + go func() { + for { + collection.mu.Lock() + // while there's no data to consume (pos == e.writePos), wait for Condition variable signal, + // then recheck the condition to be true. + for pos == collection.writePos { + collection.c.Wait() + + select { + case <-ctx.Done(): + collection.mu.Unlock() + + return + default: + } + } + + if collection.writePos-pos >= int64(collection.cap) { + // buffer overrun, there's no way to signal error in this case, + // so for now just return + collection.mu.Unlock() + + return + } + + event := collection.stream[pos%int64(collection.cap)] + pos++ + + collection.mu.Unlock() + + // deliver event + select { + case ch <- event: + case <-ctx.Done(): + return + } + } + }() + + return nil +} diff --git a/pkg/state/impl/local/errors.go b/pkg/state/impl/inmem/errors.go similarity index 66% rename from pkg/state/impl/local/errors.go rename to pkg/state/impl/inmem/errors.go index a4e6c7ed..1a083ca3 100644 --- a/pkg/state/impl/local/errors.go +++ b/pkg/state/impl/inmem/errors.go @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -package local +package inmem import ( "fmt" @@ -17,7 +17,7 @@ type eNotFound struct { func (eNotFound) NotFoundError() {} // ErrNotFound generates error compatible with state.ErrNotFound. -func ErrNotFound(r resource.Reference) error { +func ErrNotFound(r resource.Pointer) error { return eNotFound{ fmt.Errorf("resource %s doesn't exist", r), } @@ -43,9 +43,16 @@ func ErrVersionConflict(r resource.Reference, expected, found resource.Version) } } -// ErrAlreadyTorndown generates error compatible with state.ErrConflict. -func ErrAlreadyTorndown(r resource.Reference) error { +// ErrUpdateSameVersion generates error compatible with state.ErrConflict. +func ErrUpdateSameVersion(r resource.Reference, version resource.Version) error { return eConflict{ - fmt.Errorf("resource %s has already been torn down", r), + fmt.Errorf("resource %s update conflict: same %q version for new and existing objects", r, version), + } +} + +// ErrPendingFinalizers generates error compatible with state.ErrConflict. +func ErrPendingFinalizers(r resource.Metadata) error { + return eConflict{ + fmt.Errorf("resource %s has pending finalizers %s", r, r.Finalizers()), } } diff --git a/pkg/state/impl/local/errors_test.go b/pkg/state/impl/inmem/errors_test.go similarity index 60% rename from pkg/state/impl/local/errors_test.go rename to pkg/state/impl/inmem/errors_test.go index 113039bc..4e0b3716 100644 --- a/pkg/state/impl/local/errors_test.go +++ b/pkg/state/impl/inmem/errors_test.go @@ -1,7 +1,7 @@ // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -package local_test +package inmem_test import ( "testing" @@ -10,12 +10,14 @@ import ( "github.com/talos-systems/os-runtime/pkg/resource" "github.com/talos-systems/os-runtime/pkg/state" - "github.com/talos-systems/os-runtime/pkg/state/impl/local" + "github.com/talos-systems/os-runtime/pkg/state/impl/inmem" ) func TestErrors(t *testing.T) { - assert.True(t, state.IsNotFoundError(local.ErrNotFound(resource.NewMetadata("ns", "a", "b", resource.VersionUndefined)))) - assert.True(t, state.IsConflictError(local.ErrAlreadyExists(resource.NewMetadata("ns", "a", "b", resource.VersionUndefined)))) - assert.True(t, state.IsConflictError(local.ErrVersionConflict(resource.NewMetadata("ns", "a", "b", resource.VersionUndefined), resource.VersionUndefined, resource.VersionUndefined))) - assert.True(t, state.IsConflictError(local.ErrAlreadyTorndown(resource.NewMetadata("ns", "a", "b", resource.VersionUndefined)))) + t.Parallel() + + assert.True(t, state.IsNotFoundError(inmem.ErrNotFound(resource.NewMetadata("ns", "a", "b", resource.VersionUndefined)))) + assert.True(t, state.IsConflictError(inmem.ErrAlreadyExists(resource.NewMetadata("ns", "a", "b", resource.VersionUndefined)))) + assert.True(t, state.IsConflictError(inmem.ErrVersionConflict(resource.NewMetadata("ns", "a", "b", resource.VersionUndefined), resource.VersionUndefined, resource.VersionUndefined))) + assert.True(t, state.IsConflictError(inmem.ErrPendingFinalizers(resource.NewMetadata("ns", "a", "b", resource.VersionUndefined)))) } diff --git a/pkg/state/impl/local/local.go b/pkg/state/impl/inmem/inmem.go similarity index 72% rename from pkg/state/impl/local/local.go rename to pkg/state/impl/inmem/inmem.go index 59b372e4..f5bbd114 100644 --- a/pkg/state/impl/local/local.go +++ b/pkg/state/impl/inmem/inmem.go @@ -2,8 +2,8 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -// Package local provides an implementation of state.State in memory. -package local +// Package inmem provides an implementation of state.State in memory. +package inmem import ( "context" @@ -43,6 +43,11 @@ func (state *State) Get(ctx context.Context, resourcePointer resource.Pointer, o return state.getCollection(resourcePointer.Type()).Get(resourcePointer.ID()) } +// List resources. +func (state *State) List(ctx context.Context, resourceKind resource.Kind, opts ...state.ListOption) (resource.List, error) { + return state.getCollection(resourceKind.Type()).List() +} + // Create a resource. func (state *State) Create(ctx context.Context, resource resource.Resource, opts ...state.CreateOption) error { return state.getCollection(resource.Metadata().Type()).Create(resource) @@ -53,17 +58,17 @@ func (state *State) Update(ctx context.Context, curVersion resource.Version, new return state.getCollection(newResource.Metadata().Type()).Update(curVersion, newResource) } -// Teardown a resource. -func (state *State) Teardown(ctx context.Context, resourceReference resource.Reference, opts ...state.TeardownOption) error { - return state.getCollection(resourceReference.Type()).Teardown(resourceReference) -} - // Destroy a resource. -func (state *State) Destroy(ctx context.Context, resourceReference resource.Reference, opts ...state.DestroyOption) error { - return state.getCollection(resourceReference.Type()).Destroy(resourceReference) +func (state *State) Destroy(ctx context.Context, resourcePointer resource.Pointer, opts ...state.DestroyOption) error { + return state.getCollection(resourcePointer.Type()).Destroy(resourcePointer) } // Watch a resource. func (state *State) Watch(ctx context.Context, resourcePointer resource.Pointer, ch chan<- state.Event, opts ...state.WatchOption) error { return state.getCollection(resourcePointer.Type()).Watch(ctx, resourcePointer.ID(), ch) } + +// WatchKind all resources by type. +func (state *State) WatchKind(ctx context.Context, resourceKind resource.Kind, ch chan<- state.Event) error { + return state.getCollection(resourceKind.Type()).WatchAll(ctx, ch) +} diff --git a/pkg/state/impl/inmem/local_test.go b/pkg/state/impl/inmem/local_test.go new file mode 100644 index 00000000..88105a4e --- /dev/null +++ b/pkg/state/impl/inmem/local_test.go @@ -0,0 +1,32 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package inmem_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + + "github.com/talos-systems/os-runtime/pkg/resource" + "github.com/talos-systems/os-runtime/pkg/state" + "github.com/talos-systems/os-runtime/pkg/state/conformance" + "github.com/talos-systems/os-runtime/pkg/state/impl/inmem" +) + +func TestInterfaces(t *testing.T) { + t.Parallel() + + assert.Implements(t, (*state.CoreState)(nil), new(inmem.State)) +} + +func TestLocalConformance(t *testing.T) { + t.Parallel() + + suite.Run(t, &conformance.StateSuite{ + State: state.WrapCore(inmem.NewState("default")), + Namespaces: []resource.Namespace{"default"}, + }) +} diff --git a/pkg/state/impl/namespaced/namespaced.go b/pkg/state/impl/namespaced/namespaced.go new file mode 100644 index 00000000..95a11275 --- /dev/null +++ b/pkg/state/impl/namespaced/namespaced.go @@ -0,0 +1,91 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// Package namespaced provides an implementation of state split by namespaces. +package namespaced + +import ( + "context" + "sync" + + "github.com/talos-systems/os-runtime/pkg/resource" + "github.com/talos-systems/os-runtime/pkg/state" +) + +// StateBuilder builds state by namespace. +type StateBuilder func(resource.Namespace) state.CoreState + +// State implements delegating State for each namespace. +type State struct { + namespaces sync.Map + + builder StateBuilder +} + +// NewState initializes new namespaced State. +func NewState(builder StateBuilder) *State { + return &State{ + builder: builder, + } +} + +func (st *State) getNamespace(ns resource.Namespace) state.CoreState { + if s, ok := st.namespaces.Load(ns); ok { + return s.(state.CoreState) + } + + s, _ := st.namespaces.LoadOrStore(ns, st.builder(ns)) + + return s.(state.CoreState) +} + +// Get a resource by type and ID. +// +// If a resource is not found, error is returned. +func (st *State) Get(ctx context.Context, ptr resource.Pointer, opts ...state.GetOption) (resource.Resource, error) { + return st.getNamespace(ptr.Namespace()).Get(ctx, ptr, opts...) +} + +// List resources by kind. +func (st *State) List(ctx context.Context, kind resource.Kind, opts ...state.ListOption) (resource.List, error) { + return st.getNamespace(kind.Namespace()).List(ctx, kind, opts...) +} + +// Create a resource. +// +// If a resource already exists, Create returns an error. +func (st *State) Create(ctx context.Context, res resource.Resource, opts ...state.CreateOption) error { + return st.getNamespace(res.Metadata().Namespace()).Create(ctx, res, opts...) +} + +// Update a resource. +// +// If a resource doesn't exist, error is returned. +// On update current version of resource `new` in the state should match +// curVersion, otherwise conflict error is returned. +func (st *State) Update(ctx context.Context, curVersion resource.Version, new resource.Resource, opts ...state.UpdateOption) error { + return st.getNamespace(new.Metadata().Namespace()).Update(ctx, curVersion, new, opts...) +} + +// Destroy a resource. +// +// If a resource doesn't exist, error is returned. +func (st *State) Destroy(ctx context.Context, ptr resource.Pointer, opts ...state.DestroyOption) error { + return st.getNamespace(ptr.Namespace()).Destroy(ctx, ptr, opts...) +} + +// Watch state of a resource by type. +// +// It's fine to watch for a resource which doesn't exist yet. +// Watch is canceled when context gets canceled. +// Watch sends initial resource state as the very first event on the channel, +// and then sends any updates to the resource as events. +func (st *State) Watch(ctx context.Context, ptr resource.Pointer, ch chan<- state.Event, opts ...state.WatchOption) error { + return st.getNamespace(ptr.Namespace()).Watch(ctx, ptr, ch, opts...) +} + +// WatchKind watches resources of specific kind (namespace and type). +func (st *State) WatchKind(ctx context.Context, kind resource.Kind, ch chan<- state.Event) error { + return st.getNamespace(kind.Namespace()).WatchKind(ctx, kind, ch) +} diff --git a/pkg/state/impl/namespaced/namespaced_test.go b/pkg/state/impl/namespaced/namespaced_test.go new file mode 100644 index 00000000..ec3c0731 --- /dev/null +++ b/pkg/state/impl/namespaced/namespaced_test.go @@ -0,0 +1,33 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package namespaced_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + + "github.com/talos-systems/os-runtime/pkg/resource" + "github.com/talos-systems/os-runtime/pkg/state" + "github.com/talos-systems/os-runtime/pkg/state/conformance" + "github.com/talos-systems/os-runtime/pkg/state/impl/inmem" + "github.com/talos-systems/os-runtime/pkg/state/impl/namespaced" +) + +func TestInterfaces(t *testing.T) { + t.Parallel() + + assert.Implements(t, (*state.CoreState)(nil), new(namespaced.State)) +} + +func TestNamespacedConformance(t *testing.T) { + t.Parallel() + + suite.Run(t, &conformance.StateSuite{ + State: state.WrapCore(namespaced.NewState(inmem.Build)), + Namespaces: []resource.Namespace{"default", "controller", "system", "runtime"}, + }) +} diff --git a/pkg/state/options.go b/pkg/state/options.go index a99540ba..872c8f9d 100644 --- a/pkg/state/options.go +++ b/pkg/state/options.go @@ -10,6 +10,12 @@ type GetOptions struct{} // GetOption builds GetOptions. type GetOption func(*GetOptions) +// ListOptions for the CoreState.List function. +type ListOptions struct{} + +// ListOption builds ListOptions. +type ListOption func(*ListOptions) + // CreateOptions for the CoreState.Create function. type CreateOptions struct{} diff --git a/pkg/state/registry/namespace.go b/pkg/state/registry/namespace.go new file mode 100644 index 00000000..3c5f6695 --- /dev/null +++ b/pkg/state/registry/namespace.go @@ -0,0 +1,39 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package registry + +import ( + "context" + + "github.com/talos-systems/os-runtime/pkg/resource" + "github.com/talos-systems/os-runtime/pkg/resource/core" + "github.com/talos-systems/os-runtime/pkg/state" +) + +// NamespaceRegistry facilitates tracking namespaces. +type NamespaceRegistry struct { + state state.State +} + +// NewNamespaceRegistry creates new NamespaceRegistry. +func NewNamespaceRegistry(state state.State) *NamespaceRegistry { + return &NamespaceRegistry{ + state: state, + } +} + +// RegisterDefault registers default namespaces. +func (registry *NamespaceRegistry) RegisterDefault(ctx context.Context) error { + return registry.Register(ctx, core.NamespaceName, "System namespace containing resource and namespace definitions.", true) +} + +// Register a namespace. +func (registry *NamespaceRegistry) Register(ctx context.Context, ns resource.Namespace, description string, system bool) error { + return registry.state.Create(ctx, core.NewNamespace(ns, core.NamespaceSpec{ + Description: description, + System: system, + UserWritable: !system, + })) +} diff --git a/pkg/state/registry/namespace_test.go b/pkg/state/registry/namespace_test.go new file mode 100644 index 00000000..a3320684 --- /dev/null +++ b/pkg/state/registry/namespace_test.go @@ -0,0 +1,25 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package registry_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/talos-systems/os-runtime/pkg/state" + "github.com/talos-systems/os-runtime/pkg/state/impl/inmem" + "github.com/talos-systems/os-runtime/pkg/state/impl/namespaced" + "github.com/talos-systems/os-runtime/pkg/state/registry" +) + +func TestNamespaceRegistry(t *testing.T) { + t.Parallel() + + r := registry.NewNamespaceRegistry(state.WrapCore(namespaced.NewState(inmem.Build))) + + assert.NoError(t, r.RegisterDefault(context.Background())) +} diff --git a/pkg/state/registry/registry.go b/pkg/state/registry/registry.go new file mode 100644 index 00000000..abcc20a6 --- /dev/null +++ b/pkg/state/registry/registry.go @@ -0,0 +1,6 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +// Package registry provides registries for namespaces and resource definitions. +package registry diff --git a/pkg/state/registry/resource.go b/pkg/state/registry/resource.go new file mode 100644 index 00000000..6ec8795b --- /dev/null +++ b/pkg/state/registry/resource.go @@ -0,0 +1,49 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package registry + +import ( + "context" + "fmt" + + "github.com/talos-systems/os-runtime/pkg/resource" + "github.com/talos-systems/os-runtime/pkg/resource/core" + "github.com/talos-systems/os-runtime/pkg/state" +) + +// ResourceRegistry facilitates tracking namespaces. +type ResourceRegistry struct { + state state.State +} + +// NewResourceRegistry creates new ResourceRegistry. +func NewResourceRegistry(state state.State) *ResourceRegistry { + return &ResourceRegistry{ + state: state, + } +} + +// RegisterDefault registers default resource definitions. +func (registry *ResourceRegistry) RegisterDefault(ctx context.Context) error { + for _, r := range []resource.Resource{&core.ResourceDefinition{}, &core.Namespace{}} { + if err := registry.Register(ctx, r); err != nil { + return err + } + } + + return nil +} + +// Register a namespace. +func (registry *ResourceRegistry) Register(ctx context.Context, r resource.Resource) error { + definitionProvider, ok := r.(core.ResourceDefinitionProvider) + if !ok { + return fmt.Errorf("value %v doesn't implement core.ResourceDefinitionProvider", r) + } + + definition := definitionProvider.ResourceDefinition() + + return registry.state.Create(ctx, core.NewResourceDefinition(definition.Type, definition)) +} diff --git a/pkg/state/registry/resource_test.go b/pkg/state/registry/resource_test.go new file mode 100644 index 00000000..4b5946f0 --- /dev/null +++ b/pkg/state/registry/resource_test.go @@ -0,0 +1,25 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package registry_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/talos-systems/os-runtime/pkg/state" + "github.com/talos-systems/os-runtime/pkg/state/impl/inmem" + "github.com/talos-systems/os-runtime/pkg/state/impl/namespaced" + "github.com/talos-systems/os-runtime/pkg/state/registry" +) + +func TestResourceRegistry(t *testing.T) { + t.Parallel() + + r := registry.NewResourceRegistry(state.WrapCore(namespaced.NewState(inmem.Build))) + + assert.NoError(t, r.RegisterDefault(context.Background())) +} diff --git a/pkg/state/state.go b/pkg/state/state.go index 5c15b2af..5d799696 100644 --- a/pkg/state/state.go +++ b/pkg/state/state.go @@ -20,8 +20,6 @@ const ( Created EventType = iota // Resource got changed. Updated - // Resource is about to be destroyed. - Torndown // Resource was destroyed. Destroyed ) @@ -32,12 +30,6 @@ type Event struct { Resource resource.Resource } -// NamespacedState allows to create different namespaces which might be backed by different -// State implementations. -type NamespacedState interface { - Namespace(resource.Namespace) State -} - // CoreState is the central broker in the system handling state and changes. // // CoreState provides the core API that should be implemented. @@ -48,6 +40,9 @@ type CoreState interface { // If a resource is not found, error is returned. Get(context.Context, resource.Pointer, ...GetOption) (resource.Resource, error) + // List resources by type. + List(context.Context, resource.Kind, ...ListOption) (resource.List, error) + // Create a resource. // // If a resource already exists, Create returns an error. @@ -60,17 +55,11 @@ type CoreState interface { // curVersion, otherwise conflict error is returned. Update(ctx context.Context, curVersion resource.Version, new resource.Resource, opts ...UpdateOption) error - // Teardown a resource (mark as being destroyed). - // - // If a resource doesn't exist, error is returned. - // If resource version doesn't match, conflict error is returned. - // It's an error to tear down a resource which is already being torn down. - Teardown(context.Context, resource.Reference, ...TeardownOption) error - // Destroy a resource. // // If a resource doesn't exist, error is returned. - Destroy(context.Context, resource.Reference, ...DestroyOption) error + // If a resource has pending finalizers, error is returned. + Destroy(context.Context, resource.Pointer, ...DestroyOption) error // Watch state of a resource by type. // @@ -79,6 +68,9 @@ type CoreState interface { // Watch sends initial resource state as the very first event on the channel, // and then sends any updates to the resource as events. Watch(context.Context, resource.Pointer, chan<- Event, ...WatchOption) error + + // WatchKind watches resources of specific kind (namespace and type). + WatchKind(context.Context, resource.Kind, chan<- Event) error } // UpdaterFunc is called on resource to update it to the desired state. @@ -91,7 +83,21 @@ type State interface { CoreState // UpdateWithConflicts automatically handles conflicts on update. - UpdateWithConflicts(context.Context, resource.Resource, UpdaterFunc) (resource.Resource, error) + UpdateWithConflicts(context.Context, resource.Pointer, UpdaterFunc) (resource.Resource, error) + // WatchFor watches for resource to reach all of the specified conditions. WatchFor(context.Context, resource.Pointer, ...WatchForConditionFunc) (resource.Resource, error) + + // Teardown a resource (mark as being destroyed). + // + // If a resource doesn't exist, error is returned. + // It's not an error to tear down a resource which is already being torn down. + // Teardown returns a flag telling whether it's fine to destroy a resource. + Teardown(context.Context, resource.Pointer, ...TeardownOption) (bool, error) + + // AddFinalizer adds finalizer to resource metadata handling conflicts. + AddFinalizer(context.Context, resource.Pointer, ...resource.Finalizer) error + + // RemoveFinalizer removes finalizer from resource metadata handling conflicts. + RemoveFinalizer(context.Context, resource.Pointer, ...resource.Finalizer) error } diff --git a/pkg/state/wrap.go b/pkg/state/wrap.go index c001e618..b2cb5fcb 100644 --- a/pkg/state/wrap.go +++ b/pkg/state/wrap.go @@ -22,20 +22,28 @@ type coreWrapper struct { } // UpdateWithConflicts automatically handles conflicts on update. -func (state coreWrapper) UpdateWithConflicts(ctx context.Context, r resource.Resource, f UpdaterFunc) (resource.Resource, error) { +func (state coreWrapper) UpdateWithConflicts(ctx context.Context, resourcePointer resource.Pointer, f UpdaterFunc) (resource.Resource, error) { for { - current, err := state.Get(ctx, resource.Pointer(r.Metadata())) + current, err := state.Get(ctx, resourcePointer) if err != nil { return nil, err } curVersion := current.Metadata().Version() - if err = f(current); err != nil { + new := current.DeepCopy() + + if err = f(new); err != nil { return nil, err } - err = state.Update(ctx, curVersion, current) + if resource.Equal(current, new) { + return current, nil + } + + new.Metadata().BumpVersion() + + err = state.Update(ctx, curVersion, new) if err == nil { return current, nil } @@ -83,3 +91,54 @@ func (state coreWrapper) WatchFor(ctx context.Context, pointer resource.Pointer, } } } + +// Teardown a resource (mark as being destroyed). +// +// If a resource doesn't exist, error is returned. +// It's not an error to tear down a resource which is already being torn down. +// Teardown returns a flag telling whether it's fine to destroy a resource. +func (state coreWrapper) Teardown(ctx context.Context, resourcePointer resource.Pointer, opts ...TeardownOption) (bool, error) { + res, err := state.Get(ctx, resourcePointer) + if err != nil { + return false, err + } + + if res.Metadata().Phase() != resource.PhaseTearingDown { + res, err = state.UpdateWithConflicts(ctx, res.Metadata(), func(r resource.Resource) error { + r.Metadata().SetPhase(resource.PhaseTearingDown) + + return nil + }) + if err != nil { + return false, err + } + } + + return res.Metadata().Finalizers().Empty(), nil +} + +// AddFinalizer adds finalizer to resource metadata handling conflicts. +func (state coreWrapper) AddFinalizer(ctx context.Context, resourcePointer resource.Pointer, fins ...resource.Finalizer) error { + _, err := state.UpdateWithConflicts(ctx, resourcePointer, func(r resource.Resource) error { + for _, fin := range fins { + r.Metadata().Finalizers().Add(fin) + } + + return nil + }) + + return err +} + +// RemoveFinalizer removes finalizer from resource metadata handling conflicts. +func (state coreWrapper) RemoveFinalizer(ctx context.Context, resourcePointer resource.Pointer, fins ...resource.Finalizer) error { + _, err := state.UpdateWithConflicts(ctx, resourcePointer, func(r resource.Resource) error { + for _, fin := range fins { + r.Metadata().Finalizers().Remove(fin) + } + + return nil + }) + + return err +} diff --git a/pkg/state/wrap_test.go b/pkg/state/wrap_test.go new file mode 100644 index 00000000..9b76cf78 --- /dev/null +++ b/pkg/state/wrap_test.go @@ -0,0 +1,26 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package state_test + +import ( + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/talos-systems/os-runtime/pkg/resource" + "github.com/talos-systems/os-runtime/pkg/state" + "github.com/talos-systems/os-runtime/pkg/state/conformance" + "github.com/talos-systems/os-runtime/pkg/state/impl/inmem" + "github.com/talos-systems/os-runtime/pkg/state/impl/namespaced" +) + +func TestWrapConformance(t *testing.T) { + t.Parallel() + + suite.Run(t, &conformance.StateSuite{ + State: state.WrapCore(namespaced.NewState(inmem.Build)), + Namespaces: []resource.Namespace{"default", "controller", "system", "runtime"}, + }) +}