From f647ea87f6c9724777f172a300c0534c51aac375 Mon Sep 17 00:00:00 2001 From: gammazero Date: Fri, 9 Dec 2022 14:16:26 -0800 Subject: [PATCH] Reset assignments on indexer When persisted assignments of a previous version are present on an indexer, these are migrated to the new version. If the configuration value `Discovery.RemoveOldAssignments` is true, then the old version assignments are deleted instead. --- config/discovery.go | 4 +++ .../storetheindex/instances/dido/config.json | 1 + .../storetheindex/instances/kepa/config.json | 1 + .../storetheindex/instances/oden/config.json | 1 + internal/registry/registry.go | 28 ++++++++++++++----- 5 files changed, 28 insertions(+), 7 deletions(-) diff --git a/config/discovery.go b/config/discovery.go index e91bfe2df..460f5aeb3 100644 --- a/config/discovery.go +++ b/config/discovery.go @@ -49,6 +49,10 @@ type Discovery struct { // Timeout is the maximum amount of time that the indexer will spend trying // to discover and verify a new provider. Timeout Duration + // RemoveOldAssignments, if true, removes persisted assignments of previous + // versions. When false, previous versions of persisted assignments are + // migrated. Only applies if UseAssigner is true. + RemoveOldAssignments bool // UseAssigner configures the indexer to work with an assigner service. // This also requires that Policy.Allow is false, making Policy.Except into // a list of allowed peers. Peers listed in Policy.Except in the diff --git a/deploy/manifests/prod/us-east-2/tenant/storetheindex/instances/dido/config.json b/deploy/manifests/prod/us-east-2/tenant/storetheindex/instances/dido/config.json index 0d8109d64..12cd52b65 100644 --- a/deploy/manifests/prod/us-east-2/tenant/storetheindex/instances/dido/config.json +++ b/deploy/manifests/prod/us-east-2/tenant/storetheindex/instances/dido/config.json @@ -52,6 +52,7 @@ "PollOverrides": null, "RediscoverWait": "5m0s", "Timeout": "2m0s", + "RemoveOldAssignments": true, "UseAssigner": true }, "Indexer": { diff --git a/deploy/manifests/prod/us-east-2/tenant/storetheindex/instances/kepa/config.json b/deploy/manifests/prod/us-east-2/tenant/storetheindex/instances/kepa/config.json index 9df518a08..b75dd1a7b 100644 --- a/deploy/manifests/prod/us-east-2/tenant/storetheindex/instances/kepa/config.json +++ b/deploy/manifests/prod/us-east-2/tenant/storetheindex/instances/kepa/config.json @@ -52,6 +52,7 @@ "PollOverrides": null, "RediscoverWait": "5m0s", "Timeout": "2m0s", + "RemoveOldAssignments": true, "UseAssigner": true }, "Indexer": { diff --git a/deploy/manifests/prod/us-east-2/tenant/storetheindex/instances/oden/config.json b/deploy/manifests/prod/us-east-2/tenant/storetheindex/instances/oden/config.json index 5bcefe3a3..b1bcee747 100644 --- a/deploy/manifests/prod/us-east-2/tenant/storetheindex/instances/oden/config.json +++ b/deploy/manifests/prod/us-east-2/tenant/storetheindex/instances/oden/config.json @@ -58,6 +58,7 @@ ], "RediscoverWait": "5m0s", "Timeout": "2m0s", + "RemoveOldAssignments": true, "UseAssigner": true }, "Indexer": { diff --git a/internal/registry/registry.go b/internal/registry/registry.go index c44de6de6..bf2c81e56 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -30,8 +30,8 @@ import ( const ( // providerKeyPath is where provider info is stored in to indexer repo. providerKeyPath = "/registry/pinfo" - assignmentsKeyPath = "/assignments-v1" - oldAssignmentsKeyPath = "/assignments" + assignmentsKeyPath = "/assignments-v2" + oldAssignmentsKeyPath = "/assignments-v1" ) var log = logging.Logger("indexer/registry") @@ -271,7 +271,7 @@ func NewRegistry(ctx context.Context, cfg config.Discovery, dstore datastore.Dat if cfg.UseAssigner { r.assigned = make(map[peer.ID]struct{}) - if err = r.loadPersistedAssignments(ctx); err != nil { + if err = r.loadPersistedAssignments(ctx, cfg.RemoveOldAssignments); err != nil { return nil, err } r.loadPreferredAssignments() @@ -1049,7 +1049,7 @@ func (r *Registry) deleteAssignedPeer(peerID peer.ID) error { return r.dstore.Delete(context.Background(), dsKey) } -func (r *Registry) deleteOldAssignments(ctx context.Context, prefix string) error { +func (r *Registry) migrateOldAssignments(ctx context.Context, prefix string, deleteOld bool) error { q := query.Query{ Prefix: prefix, KeysOnly: true, @@ -1065,7 +1065,21 @@ func (r *Registry) deleteOldAssignments(ctx context.Context, prefix string) erro } for i := range ents { - err = r.dstore.Delete(ctx, datastore.NewKey(ents[i].Key)) + key := ents[i].Key + if !deleteOld { + peerID, err := peer.Decode(path.Base(key)) + if err != nil { + log.Errorw("cannot decode assigned peer ID, removing") + } else { + dsKey := peerIDToDsKey(assignmentsKeyPath, peerID) + log.Debugw("Renamed assignment", "from", key, "to", dsKey) + err := r.dstore.Put(ctx, dsKey, []byte{}) + if err != nil { + return err + } + } + } + err = r.dstore.Delete(ctx, datastore.NewKey(key)) if err != nil { return err } @@ -1135,12 +1149,12 @@ func (r *Registry) loadPersistedProviders(ctx context.Context) error { return nil } -func (r *Registry) loadPersistedAssignments(ctx context.Context) error { +func (r *Registry) loadPersistedAssignments(ctx context.Context, deleteOld bool) error { if r.dstore == nil { return nil } - err := r.deleteOldAssignments(ctx, oldAssignmentsKeyPath) + err := r.migrateOldAssignments(ctx, oldAssignmentsKeyPath, deleteOld) if err != nil { return err }