diff --git a/changelog/unreleased/ocs-share-cache-warmup.md b/changelog/unreleased/ocs-share-cache-warmup.md new file mode 100644 index 0000000000..7aa6dda160 --- /dev/null +++ b/changelog/unreleased/ocs-share-cache-warmup.md @@ -0,0 +1,8 @@ +Enhancement: Add cache warmup strategy for OCS resource infos + +Recently, a TTL cache was added to OCS to store statted resource infos. This PR +adds an interface to define warmup strategies and also adds a cbox specific +strategy which starts a goroutine to initialize the cache with all the valid +shares present in the system. + +https://github.com/cs3org/reva/pull/1664 diff --git a/internal/http/services/owncloud/ocs/config/config.go b/internal/http/services/owncloud/ocs/config/config.go index 3d8f73b0b3..20d785c011 100644 --- a/internal/http/services/owncloud/ocs/config/config.go +++ b/internal/http/services/owncloud/ocs/config/config.go @@ -25,17 +25,19 @@ import ( // Config holds the config options that need to be passed down to all ocs handlers type Config struct { - Prefix string `mapstructure:"prefix"` - Config data.ConfigData `mapstructure:"config"` - Capabilities data.CapabilitiesData `mapstructure:"capabilities"` - GatewaySvc string `mapstructure:"gatewaysvc"` - DefaultUploadProtocol string `mapstructure:"default_upload_protocol"` - UserAgentChunkingMap map[string]string `mapstructure:"user_agent_chunking_map"` - SharePrefix string `mapstructure:"share_prefix"` - HomeNamespace string `mapstructure:"home_namespace"` - AdditionalInfoAttribute string `mapstructure:"additional_info_attribute"` - ResourceInfoCacheSize int `mapstructure:"resource_info_cache_size"` - ResourceInfoCacheTTL int `mapstructure:"resource_info_cache_ttl"` + Prefix string `mapstructure:"prefix"` + Config data.ConfigData `mapstructure:"config"` + Capabilities data.CapabilitiesData `mapstructure:"capabilities"` + GatewaySvc string `mapstructure:"gatewaysvc"` + DefaultUploadProtocol string `mapstructure:"default_upload_protocol"` + UserAgentChunkingMap map[string]string `mapstructure:"user_agent_chunking_map"` + SharePrefix string `mapstructure:"share_prefix"` + HomeNamespace string `mapstructure:"home_namespace"` + AdditionalInfoAttribute string `mapstructure:"additional_info_attribute"` + CacheWarmupDriver string `mapstructure:"cache_warmup_driver"` + CacheWarmupDrivers map[string]map[string]interface{} `mapstructure:"cache_warmup_drivers"` + ResourceInfoCacheSize int `mapstructure:"resource_info_cache_size"` + ResourceInfoCacheTTL int `mapstructure:"resource_info_cache_ttl"` } // Init sets sane defaults diff --git a/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go b/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go index 1e25a9e0a5..d44afed8d6 100644 --- a/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go +++ b/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/shares.go @@ -50,6 +50,8 @@ import ( "github.com/cs3org/reva/pkg/appctx" "github.com/cs3org/reva/pkg/rgrpc/todo/pool" "github.com/cs3org/reva/pkg/rhttp/router" + "github.com/cs3org/reva/pkg/share/cache" + "github.com/cs3org/reva/pkg/share/cache/registry" "github.com/pkg/errors" ) @@ -72,6 +74,13 @@ type userIdentifiers struct { Mail string } +func getCacheWarmupManager(c *config.Config) (cache.Warmup, error) { + if f, ok := registry.NewFuncs[c.CacheWarmupDriver]; ok { + return f(c.CacheWarmupDrivers[c.CacheWarmupDriver]) + } + return nil, fmt.Errorf("driver not found: %s", c.CacheWarmupDriver) +} + // Init initializes this and any contained handlers func (h *Handler) Init(c *config.Config) error { h.gatewayAddr = c.GatewaySvc @@ -87,9 +96,27 @@ func (h *Handler) Init(c *config.Config) error { h.resourceInfoCache = gcache.New(c.ResourceInfoCacheSize).LFU().Build() + if h.resourceInfoCacheTTL > 0 { + cwm, err := getCacheWarmupManager(c) + if err == nil { + go h.startCacheWarmup(cwm) + } + } + return nil } +func (h *Handler) startCacheWarmup(c cache.Warmup) { + infos, err := c.GetResourceInfos() + if err != nil { + return + } + for _, r := range infos { + key := wrapResourceID(r.Id) + _ = h.resourceInfoCache.SetWithExpire(key, r, time.Second*h.resourceInfoCacheTTL) + } +} + func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { log := appctx.GetLogger(r.Context()) diff --git a/pkg/share/cache/cache.go b/pkg/share/cache/cache.go new file mode 100644 index 0000000000..8e2023873b --- /dev/null +++ b/pkg/share/cache/cache.go @@ -0,0 +1,28 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package cache + +import ( + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" +) + +// Warmup is the interface to implement cache warmup strategies. +type Warmup interface { + GetResourceInfos() ([]*provider.ResourceInfo, error) +} diff --git a/pkg/share/cache/cbox/cbox.go b/pkg/share/cache/cbox/cbox.go new file mode 100644 index 0000000000..925a553c2d --- /dev/null +++ b/pkg/share/cache/cbox/cbox.go @@ -0,0 +1,149 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package eos + +import ( + "context" + "database/sql" + "fmt" + + userpb "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" + "github.com/cs3org/reva/pkg/share/cache" + "github.com/cs3org/reva/pkg/share/cache/registry" + "github.com/cs3org/reva/pkg/storage/fs/eos" + "github.com/cs3org/reva/pkg/user" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" + + // Provides mysql drivers + _ "github.com/go-sql-driver/mysql" +) + +func init() { + registry.Register("cbox", New) +} + +type config struct { + DbUsername string `mapstructure:"db_username"` + DbPassword string `mapstructure:"db_password"` + DbHost string `mapstructure:"db_host"` + DbPort int `mapstructure:"db_port"` + DbName string `mapstructure:"db_name"` + EOSNamespace string `mapstructure:"namespace"` + GatewaySvc string `mapstructure:"gatewaysvc"` +} + +type manager struct { + conf *config + db *sql.DB +} + +func parseConfig(m map[string]interface{}) (*config, error) { + c := &config{} + if err := mapstructure.Decode(m, c); err != nil { + err = errors.Wrap(err, "error decoding conf") + return nil, err + } + return c, nil +} + +// New returns a new implementation of the storage.FS interface that connects to EOS. +func New(m map[string]interface{}) (cache.Warmup, error) { + c, err := parseConfig(m) + if err != nil { + return nil, err + } + db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", c.DbUsername, c.DbPassword, c.DbHost, c.DbPort, c.DbName)) + if err != nil { + return nil, err + } + + return &manager{ + conf: c, + db: db, + }, nil +} + +func (m *manager) GetResourceInfos() ([]*provider.ResourceInfo, error) { + query := "select coalesce(fileid_prefix, '') as fileid_prefix, coalesce(item_source, '') as item_source FROM oc_share WHERE (orphan = 0 or orphan IS NULL)" + rows, err := m.db.Query(query) + if err != nil { + return nil, err + } + defer rows.Close() + + infos := []*provider.ResourceInfo{} + for rows.Next() { + var storageID, opaqueID string + if err := rows.Scan(&storageID, &opaqueID); err != nil { + continue + } + + eosOpts := map[string]interface{}{ + "namespace": m.conf.EOSNamespace, + "master_url": fmt.Sprintf("root://%s.cern.ch", storageID), + "version_invariant": true, + "gatewaysvc": m.conf.GatewaySvc, + } + eos, err := eos.New(eosOpts) + if err != nil { + return nil, err + } + + ctx := user.ContextSetUser(context.Background(), &userpb.User{ + Id: &userpb.UserId{ + OpaqueId: "root", + }, + Opaque: &types.Opaque{ + Map: map[string]*types.OpaqueEntry{ + "uid": &types.OpaqueEntry{ + Decoder: "plain", + Value: []byte("0"), + }, + "gid": &types.OpaqueEntry{ + Decoder: "plain", + Value: []byte("0"), + }, + }, + }, + }) + + inf, err := eos.GetMD(ctx, &provider.Reference{ + Spec: &provider.Reference_Id{ + Id: &provider.ResourceId{ + StorageId: storageID, + OpaqueId: opaqueID, + }, + }, + }, []string{}) + if err != nil { + return nil, err + } + infos = append(infos, inf) + } + + if err = rows.Err(); err != nil { + return nil, err + } + + return infos, nil + +} diff --git a/pkg/share/cache/registry/registry.go b/pkg/share/cache/registry/registry.go new file mode 100644 index 0000000000..ce9dff08b5 --- /dev/null +++ b/pkg/share/cache/registry/registry.go @@ -0,0 +1,34 @@ +// Copyright 2018-2021 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package registry + +import "github.com/cs3org/reva/pkg/share/cache" + +// NewFunc is the function that cache warmup implementations +// should register at init time. +type NewFunc func(map[string]interface{}) (cache.Warmup, error) + +// NewFuncs is a map containing all the registered cache warmup implementations. +var NewFuncs = map[string]NewFunc{} + +// Register registers a new cache warmup function. +// Not safe for concurrent use. Safe for use from package init. +func Register(name string, f NewFunc) { + NewFuncs[name] = f +} diff --git a/pkg/storage/fs/registry/registry.go b/pkg/storage/fs/registry/registry.go index b6eab9d52c..e9aa41100b 100644 --- a/pkg/storage/fs/registry/registry.go +++ b/pkg/storage/fs/registry/registry.go @@ -27,7 +27,7 @@ type NewFunc func(map[string]interface{}) (storage.FS, error) // NewFuncs is a map containing all the registered storage backends. var NewFuncs = map[string]NewFunc{} -// Register registers a new storage backend new function. +// Register registers a new storage backend function. // Not safe for concurrent use. Safe for use from package init. func Register(name string, f NewFunc) { NewFuncs[name] = f