From 5a2640967edcef6db6ce9479397096bffcf5e549 Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Mon, 26 Apr 2021 16:21:56 +0200 Subject: [PATCH 1/4] Add cache warmup strategy for OCS resource infos --- .../unreleased/ocs-share-cache-warmup.md | 8 + pkg/share/cache/cache.go | 28 ++++ pkg/share/cache/cbox/cbox.go | 149 ++++++++++++++++++ pkg/share/cache/registry/registry.go | 34 ++++ pkg/storage/fs/registry/registry.go | 2 +- 5 files changed, 220 insertions(+), 1 deletion(-) create mode 100644 changelog/unreleased/ocs-share-cache-warmup.md create mode 100644 pkg/share/cache/cache.go create mode 100644 pkg/share/cache/cbox/cbox.go create mode 100644 pkg/share/cache/registry/registry.go diff --git a/changelog/unreleased/ocs-share-cache-warmup.md b/changelog/unreleased/ocs-share-cache-warmup.md new file mode 100644 index 0000000000..7aa6dda160 --- /dev/null +++ b/changelog/unreleased/ocs-share-cache-warmup.md @@ -0,0 +1,8 @@ +Enhancement: Add cache warmup strategy for OCS resource infos + +Recently, a TTL cache was added to OCS to store statted resource infos. This PR +adds an interface to define warmup strategies and also adds a cbox specific +strategy which starts a goroutine to initialize the cache with all the valid +shares present in the system. + +https://github.com/cs3org/reva/pull/1664 diff --git a/pkg/share/cache/cache.go b/pkg/share/cache/cache.go new file mode 100644 index 0000000000..9b3f63f41d --- /dev/null +++ b/pkg/share/cache/cache.go @@ -0,0 +1,28 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package cache + +import ( + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" +) + +// FS is the interface to implement cache warmup strategies. +type Warmup interface { + GetResourceInfos() ([]*provider.ResourceInfo, error) +} diff --git a/pkg/share/cache/cbox/cbox.go b/pkg/share/cache/cbox/cbox.go new file mode 100644 index 0000000000..eae1922bfd --- /dev/null +++ b/pkg/share/cache/cbox/cbox.go @@ -0,0 +1,149 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package eos + +import ( + "context" + "database/sql" + "fmt" + + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" + "github.com/cs3org/reva/pkg/share/cache" + "github.com/cs3org/reva/pkg/share/cache/registry" + "github.com/cs3org/reva/pkg/storage/fs/eos" + "github.com/cs3org/reva/pkg/user" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" + + // Provides mysql drivers + _ "github.com/go-sql-driver/mysql" +) + +func init() { + registry.Register("cbox", New) +} + +type config struct { + DbUsername string `mapstructure:"db_username"` + DbPassword string `mapstructure:"db_password"` + DbHost string `mapstructure:"db_host"` + DbPort int `mapstructure:"db_port"` + DbName string `mapstructure:"db_name"` + EOSNamespace string `mapstructure:"namespace"` + GatewaySvc string `mapstructure:"gatewaysvc"` +} + +type manager struct { + conf *config + db *sql.DB +} + +func parseConfig(m map[string]interface{}) (*config, error) { + c := &config{} + if err := mapstructure.Decode(m, c); err != nil { + err = errors.Wrap(err, "error decoding conf") + return nil, err + } + return c, nil +} + +// New returns a new implementation of the storage.FS interface that connects to EOS. +func New(m map[string]interface{}) (cache.Warmup, error) { + c, err := parseConfig(m) + if err != nil { + return nil, err + } + db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", c.DbUsername, c.DbPassword, c.DbHost, c.DbPort, c.DbName)) + if err != nil { + return nil, err + } + + return &manager{ + conf: c, + db: db, + }, nil +} + +func (m *manager) GetResourceInfos() ([]*provider.ResourceInfo, error) { + query := "select coalesce(fileid_prefix, '') as fileid_prefix, coalesce(item_source, '') as item_source WHERE (orphan = 0 or orphan IS NULL)" + rows, err := m.db.Query(query) + if err != nil { + return nil, err + } + defer rows.Close() + + infos := []*provider.ResourceInfo{} + for rows.Next() { + var storageID, opaqueID string + if err := rows.Scan(&storageID, &opaqueID); err != nil { + continue + } + + eosOpts := map[string]interface{}{ + "namespace": m.conf.EOSNamespace, + "master_url": fmt.Sprintf("root://%s.cern.ch", storageID), + "version_invariant": true, + "gatewaysvc": m.conf.GatewaySvc, + } + eos, err := eos.New(eosOpts) + if err != nil { + return nil, err + } + + ctx := user.ContextSetUser(context.Background(), &userpb.User{ + Id: &userpb.UserId{ + OpaqueId: "root", + }, + Opaque: &types.Opaque{ + Map: map[string]*types.OpaqueEntry{ + "uid": &types.OpaqueEntry{ + Decoder: "plain", + Value: []byte("0"), + }, + "gid": &types.OpaqueEntry{ + Decoder: "plain", + Value: []byte("0"), + }, + }, + }, + }) + + inf, err := eos.GetMD(ctx, &provider.Reference{ + Spec: &provider.Reference_Id{ + Id: &provider.ResourceId{ + StorageId: storageID, + OpaqueId: opaqueID, + }, + }, + }, []string{}) + if err != nil { + return nil, err + } + infos = append(infos, inf) + } + + if err = rows.Err(); err != nil { + return nil, err + } + + return infos, nil + +} diff --git a/pkg/share/cache/registry/registry.go b/pkg/share/cache/registry/registry.go new file mode 100644 index 0000000000..ce9dff08b5 --- /dev/null +++ b/pkg/share/cache/registry/registry.go @@ -0,0 +1,34 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package registry + +import "github.com/cs3org/reva/pkg/share/cache" + +// NewFunc is the function that cache warmup implementations +// should register at init time. +type NewFunc func(map[string]interface{}) (cache.Warmup, error) + +// NewFuncs is a map containing all the registered cache warmup implementations. +var NewFuncs = map[string]NewFunc{} + +// Register registers a new cache warmup function. +// Not safe for concurrent use. Safe for use from package init. +func Register(name string, f NewFunc) { + NewFuncs[name] = f +} diff --git a/pkg/storage/fs/registry/registry.go b/pkg/storage/fs/registry/registry.go index b6eab9d52c..e9aa41100b 100644 --- a/pkg/storage/fs/registry/registry.go +++ b/pkg/storage/fs/registry/registry.go @@ -27,7 +27,7 @@ type NewFunc func(map[string]interface{}) (storage.FS, error) // NewFuncs is a map containing all the registered storage backends. var NewFuncs = map[string]NewFunc{} -// Register registers a new storage backend new function. +// Register registers a new storage backend function. // Not safe for concurrent use. Safe for use from package init. func Register(name string, f NewFunc) { NewFuncs[name] = f From 95ee61be9f70a3a5bb54d790cf02e302ee0e5e63 Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Mon, 26 Apr 2021 16:33:42 +0200 Subject: [PATCH 2/4] Add cache warmup driver to OCS --- .../services/owncloud/ocs/config/config.go | 24 ++++++++++-------- .../handlers/apps/sharing/shares/shares.go | 25 +++++++++++++++++++ pkg/share/cache/cache.go | 2 +- 3 files changed, 39 insertions(+), 12 deletions(-) diff --git a/internal/http/services/owncloud/ocs/config/config.go b/internal/http/services/owncloud/ocs/config/config.go index 3d8f73b0b3..20d785c011 100644 --- a/internal/http/services/owncloud/ocs/config/config.go +++ b/internal/http/services/owncloud/ocs/config/config.go @@ -25,17 +25,19 @@ import ( // Config holds the config options that need to be passed down to all ocs handlers type Config struct { - Prefix string `mapstructure:"prefix"` - Config data.ConfigData `mapstructure:"config"` - Capabilities data.CapabilitiesData `mapstructure:"capabilities"` - GatewaySvc string `mapstructure:"gatewaysvc"` - DefaultUploadProtocol string `mapstructure:"default_upload_protocol"` - UserAgentChunkingMap map[string]string `mapstructure:"user_agent_chunking_map"` - SharePrefix string `mapstructure:"share_prefix"` - HomeNamespace string `mapstructure:"home_namespace"` - AdditionalInfoAttribute string `mapstructure:"additional_info_attribute"` - ResourceInfoCacheSize int `mapstructure:"resource_info_cache_size"` - ResourceInfoCacheTTL int `mapstructure:"resource_info_cache_ttl"` + Prefix string `mapstructure:"prefix"` + Config data.ConfigData `mapstructure:"config"` + Capabilities data.CapabilitiesData `mapstructure:"capabilities"` + GatewaySvc string `mapstructure:"gatewaysvc"` + DefaultUploadProtocol string `mapstructure:"default_upload_protocol"` + UserAgentChunkingMap map[string]string `mapstructure:"user_agent_chunking_map"` + SharePrefix string `mapstructure:"share_prefix"` + HomeNamespace string `mapstructure:"home_namespace"` + AdditionalInfoAttribute string `mapstructure:"additional_info_attribute"` + CacheWarmupDriver string `mapstructure:"cache_warmup_driver"` + CacheWarmupDrivers map[string]map[string]interface{} `mapstructure:"cache_warmup_drivers"` + ResourceInfoCacheSize int `mapstructure:"resource_info_cache_size"` + ResourceInfoCacheTTL int `mapstructure:"resource_info_cache_ttl"` } // Init sets sane defaults diff --git a/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go b/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go index 1e25a9e0a5..ebcc9a7895 100644 --- a/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go +++ b/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go @@ -50,6 +50,8 @@ import ( "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/rgrpc/todo/pool" "github.com/cs3org/reva/pkg/rhttp/router" + "github.com/cs3org/reva/pkg/share/cache" + "github.com/cs3org/reva/pkg/share/cache/registry" "github.com/pkg/errors" ) @@ -72,6 +74,13 @@ type userIdentifiers struct { Mail string } +func getCacheWarmupManager(c *config.Config) (cache.Warmup, error) { + if f, ok := registry.NewFuncs[c.CacheWarmupDriver]; ok { + return f(c.CacheWarmupDrivers[c.CacheWarmupDriver]) + } + return nil, fmt.Errorf("driver not found: %s", c.CacheWarmupDriver) +} + // Init initializes this and any contained handlers func (h *Handler) Init(c *config.Config) error { h.gatewayAddr = c.GatewaySvc @@ -87,9 +96,25 @@ func (h *Handler) Init(c *config.Config) error { h.resourceInfoCache = gcache.New(c.ResourceInfoCacheSize).LFU().Build() + if h.resourceInfoCacheTTL > 0 { + cwm, _ := getCacheWarmupManager(c) + go h.startCacheWarmup(cwm) + } + return nil } +func (h *Handler) startCacheWarmup(c cache.Warmup) { + infos, err := c.GetResourceInfos() + if err != nil { + return + } + for _, r := range infos { + key := wrapResourceID(r.Id) + _ = h.resourceInfoCache.SetWithExpire(key, r, time.Second*h.resourceInfoCacheTTL) + } +} + func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { log := appctx.GetLogger(r.Context()) diff --git a/pkg/share/cache/cache.go b/pkg/share/cache/cache.go index 9b3f63f41d..8e2023873b 100644 --- a/pkg/share/cache/cache.go +++ b/pkg/share/cache/cache.go @@ -22,7 +22,7 @@ import ( provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" ) -// FS is the interface to implement cache warmup strategies. +// Warmup is the interface to implement cache warmup strategies. type Warmup interface { GetResourceInfos() ([]*provider.ResourceInfo, error) } From 140f15931003ef72724535b2f61b15ef0c346388 Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Thu, 6 May 2021 17:15:09 +0200 Subject: [PATCH 3/4] Apply suggestions from code review Co-authored-by: Alex Unger <6905948+refs@users.noreply.github.com> --- .../owncloud/ocs/handlers/apps/sharing/shares/shares.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go b/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go index ebcc9a7895..d44afed8d6 100644 --- a/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go +++ b/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go @@ -97,8 +97,10 @@ func (h *Handler) Init(c *config.Config) error { h.resourceInfoCache = gcache.New(c.ResourceInfoCacheSize).LFU().Build() if h.resourceInfoCacheTTL > 0 { - cwm, _ := getCacheWarmupManager(c) - go h.startCacheWarmup(cwm) + cwm, err := getCacheWarmupManager(c) + if err == nil { + go h.startCacheWarmup(cwm) + } } return nil From be320937f352fa3e1845ce7f449789a243887f29 Mon Sep 17 00:00:00 2001 From: Ishank Arora Date: Fri, 7 May 2021 09:55:20 +0200 Subject: [PATCH 4/4] Fix sql query --- pkg/share/cache/cbox/cbox.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/share/cache/cbox/cbox.go b/pkg/share/cache/cbox/cbox.go index eae1922bfd..925a553c2d 100644 --- a/pkg/share/cache/cbox/cbox.go +++ b/pkg/share/cache/cbox/cbox.go @@ -83,7 +83,7 @@ func New(m map[string]interface{}) (cache.Warmup, error) { } func (m *manager) GetResourceInfos() ([]*provider.ResourceInfo, error) { - query := "select coalesce(fileid_prefix, '') as fileid_prefix, coalesce(item_source, '') as item_source WHERE (orphan = 0 or orphan IS NULL)" + query := "select coalesce(fileid_prefix, '') as fileid_prefix, coalesce(item_source, '') as item_source FROM oc_share WHERE (orphan = 0 or orphan IS NULL)" rows, err := m.db.Query(query) if err != nil { return nil, err