From ad8a5fbbd3812aafd0f87d1d3214cf948f5b9882 Mon Sep 17 00:00:00 2001 From: nakabonne Date: Thu, 25 Nov 2021 15:42:29 +0900 Subject: [PATCH 1/8] Add RPCs to list unregistered apps --- cmd/pipecd/BUILD.bazel | 1 + cmd/pipecd/server.go | 6 ++- pkg/app/api/grpcapi/piped_api.go | 27 +++++++++++-- pkg/app/api/grpcapi/web_api.go | 39 +++++++++++++++++++ pkg/app/api/service/webservice/service.proto | 13 +++++++ .../appconfigreporter/appconfigreporter.go | 2 + 6 files changed, 83 insertions(+), 5 deletions(-) diff --git a/cmd/pipecd/BUILD.bazel b/cmd/pipecd/BUILD.bazel index e2063697ac..741f689590 100644 --- a/cmd/pipecd/BUILD.bazel +++ b/cmd/pipecd/BUILD.bazel @@ -32,6 +32,7 @@ go_library( "//pkg/app/ops/planpreviewoutputcleaner:go_default_library", "//pkg/app/ops/staledpipedstatcleaner:go_default_library", "//pkg/cache/cachemetrics:go_default_library", + "//pkg/cache/memorycache:go_default_library", "//pkg/cache/rediscache:go_default_library", "//pkg/cli:go_default_library", "//pkg/config:go_default_library", diff --git a/cmd/pipecd/server.go b/cmd/pipecd/server.go index d8bfb09014..f27d1f9dab 100644 --- a/cmd/pipecd/server.go +++ b/cmd/pipecd/server.go @@ -41,6 +41,7 @@ import ( "github.com/pipe-cd/pipe/pkg/app/api/service/webservice" "github.com/pipe-cd/pipe/pkg/app/api/stagelogstore" "github.com/pipe-cd/pipe/pkg/cache/cachemetrics" + "github.com/pipe-cd/pipe/pkg/cache/memorycache" "github.com/pipe-cd/pipe/pkg/cache/rediscache" "github.com/pipe-cd/pipe/pkg/cli" "github.com/pipe-cd/pipe/pkg/config" @@ -188,6 +189,7 @@ func (s *server) run(ctx context.Context, input cli.Input) error { is := insightstore.NewStore(fs) cmdOutputStore := commandoutputstore.NewStore(fs, input.Logger) statCache := rediscache.NewHashCache(rd, defaultPipedStatHashKey) + unregisteredAppsCache := memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour) // Start a gRPC server for handling PipedAPI requests. { @@ -199,7 +201,7 @@ func (s *server) run(ctx context.Context, input cli.Input) error { datastore.NewPipedStore(ds), input.Logger, ) - service = grpcapi.NewPipedAPI(ctx, ds, sls, alss, las, cmds, statCache, cmdOutputStore, cfg.Address, input.Logger) + service = grpcapi.NewPipedAPI(ctx, ds, sls, alss, las, cmds, statCache, unregisteredAppsCache, cmdOutputStore, cfg.Address, input.Logger) opts = []rpc.Option{ rpc.WithPort(s.pipedAPIPort), rpc.WithGracePeriod(s.gracePeriod), @@ -271,7 +273,7 @@ func (s *server) run(ctx context.Context, input cli.Input) error { return err } - service := grpcapi.NewWebAPI(ctx, ds, fs, sls, alss, cmds, is, rd, cfg.ProjectMap(), encryptDecrypter, input.Logger) + service := grpcapi.NewWebAPI(ctx, ds, fs, sls, alss, cmds, is, rd, unregisteredAppsCache, cfg.ProjectMap(), encryptDecrypter, input.Logger) opts := []rpc.Option{ rpc.WithPort(s.webAPIPort), rpc.WithGracePeriod(s.gracePeriod), diff --git a/pkg/app/api/grpcapi/piped_api.go b/pkg/app/api/grpcapi/piped_api.go index adb10ced5f..a1f4248768 100644 --- a/pkg/app/api/grpcapi/piped_api.go +++ b/pkg/app/api/grpcapi/piped_api.go @@ -58,13 +58,15 @@ type PipedAPI struct { deploymentPipedCache cache.Cache envProjectCache cache.Cache pipedStatCache cache.Cache + // A map from projectID to map["repo-id"][]*model.ApplicationInfo + unregisteredAppsCache cache.Cache webBaseURL string logger *zap.Logger } // NewPipedAPI creates a new PipedAPI instance. -func NewPipedAPI(ctx context.Context, ds datastore.DataStore, sls stagelogstore.Store, alss applicationlivestatestore.Store, las analysisresultstore.Store, cs commandstore.Store, hc cache.Cache, cop commandOutputPutter, webBaseURL string, logger *zap.Logger) *PipedAPI { +func NewPipedAPI(ctx context.Context, ds datastore.DataStore, sls stagelogstore.Store, alss applicationlivestatestore.Store, las analysisresultstore.Store, cs commandstore.Store, hc, uac cache.Cache, cop commandOutputPutter, webBaseURL string, logger *zap.Logger) *PipedAPI { a := &PipedAPI{ applicationStore: datastore.NewApplicationStore(ds), deploymentStore: datastore.NewDeploymentStore(ds), @@ -82,6 +84,7 @@ func NewPipedAPI(ctx context.Context, ds datastore.DataStore, sls stagelogstore. deploymentPipedCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour), envProjectCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour), pipedStatCache: hc, + unregisteredAppsCache: uac, webBaseURL: webBaseURL, logger: logger.Named("piped-api"), } @@ -994,8 +997,26 @@ func (a *PipedAPI) UpdateApplicationConfigurations(ctx context.Context, req *pip } func (a *PipedAPI) ReportUnregisteredApplicationConfigurations(ctx context.Context, req *pipedservice.ReportUnregisteredApplicationConfigurationsRequest) (*pipedservice.ReportUnregisteredApplicationConfigurationsResponse, error) { - // TODO: Make the unused application configurations cache up-to-date - return nil, status.Errorf(codes.Unimplemented, "ReportUnregisteredApplicationConfigurations is not implemented yet") + projectID, _, _, err := rpcauth.ExtractPipedToken(ctx) + if err != nil { + return nil, err + } + + // Make a map from repo-id to a collection of ApplicationInfo. + repoToApps := make(map[string][]*model.ApplicationInfo) + for _, appInfo := range req.Applications { + repoID := appInfo.RepoId + if _, ok := repoToApps[repoID]; !ok { + repoToApps[repoID] = []*model.ApplicationInfo{} + } + repoToApps[repoID] = append(repoToApps[repoID], appInfo) + } + + if err := a.unregisteredAppsCache.Put(projectID, repoToApps); err != nil { + return nil, status.Error(codes.Internal, "failed to put the unregistered apps to the cache") + } + + return &pipedservice.ReportUnregisteredApplicationConfigurationsResponse{}, nil } // CreateDeploymentChain creates a new deployment chain object and all required commands to diff --git a/pkg/app/api/grpcapi/web_api.go b/pkg/app/api/grpcapi/web_api.go index 7a31a3b35b..579a9a8d90 100644 --- a/pkg/app/api/grpcapi/web_api.go +++ b/pkg/app/api/grpcapi/web_api.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "sort" "strings" "time" @@ -66,6 +67,8 @@ type WebAPI struct { pipedProjectCache cache.Cache envProjectCache cache.Cache insightCache cache.Cache + // A map from projectID to map["repo-id"][]*model.ApplicationInfo + unregisteredAppsCache cache.Cache projectsInConfig map[string]config.ControlPlaneProject logger *zap.Logger @@ -81,6 +84,7 @@ func NewWebAPI( cmds commandstore.Store, is insightstore.Store, rd redis.Redis, + uac cache.Cache, projs map[string]config.ControlPlaneProject, encrypter encrypter, logger *zap.Logger) *WebAPI { @@ -102,6 +106,7 @@ func NewWebAPI( pipedProjectCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour), envProjectCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour), insightCache: rediscache.NewTTLCache(rd, 3*time.Hour), + unregisteredAppsCache: uac, logger: logger.Named("web-api"), } return a @@ -599,6 +604,40 @@ func (a *WebAPI) validatePipedBelongsToProject(ctx context.Context, pipedID, pro return nil } +func (a *WebAPI) ListUnregisteredApplications(ctx context.Context, _ *webservice.ListUnregisteredApplicationsRequest) (*webservice.ListUnregisteredApplicationsResponse, error) { + claims, err := rpcauth.ExtractClaims(ctx) + if err != nil { + a.logger.Error("failed to authenticate the current user", zap.Error(err)) + return nil, err + } + entity, err := a.unregisteredAppsCache.Get(claims.Role.ProjectId) + if errors.Is(err, cache.ErrNotFound) { + return &webservice.ListUnregisteredApplicationsResponse{}, nil + } + if err != nil { + a.logger.Error("failed to get unregistered apps", zap.Error(err)) + return nil, status.Error(codes.Internal, "Failed to get unregistered apps") + } + repoToApps, ok := entity.(map[string][]*model.ApplicationInfo) + if !ok { + return nil, status.Error(codes.Internal, "Unexpected data cached") + } + + repos := make([]*webservice.ListUnregisteredApplicationsResponse_Repo, len(repoToApps)) + for repoID, apps := range repoToApps { + sort.Slice(apps, func(i, j int) bool { + return apps[i].GetPath() < apps[j].GetPath() + }) + repos = append(repos, &webservice.ListUnregisteredApplicationsResponse_Repo{ + Id: repoID, + Apps: apps, + }) + } + return &webservice.ListUnregisteredApplicationsResponse{ + Repos: repos, + }, nil +} + // TODO: Validate the specified piped to ensure that it belongs to the specified environment. func (a *WebAPI) AddApplication(ctx context.Context, req *webservice.AddApplicationRequest) (*webservice.AddApplicationResponse, error) { claims, err := rpcauth.ExtractClaims(ctx) diff --git a/pkg/app/api/service/webservice/service.proto b/pkg/app/api/service/webservice/service.proto index b6ad3a96dd..7e23fd6704 100644 --- a/pkg/app/api/service/webservice/service.proto +++ b/pkg/app/api/service/webservice/service.proto @@ -65,6 +65,7 @@ service WebService { rpc SyncApplication(SyncApplicationRequest) returns (SyncApplicationResponse) {} rpc GetApplication(GetApplicationRequest) returns (GetApplicationResponse) {} rpc GenerateApplicationSealedSecret(GenerateApplicationSealedSecretRequest) returns (GenerateApplicationSealedSecretResponse) {} + rpc ListUnregisteredApplications(ListUnregisteredApplicationsRequest) returns (ListUnregisteredApplicationsResponse) {} // Deployment rpc ListDeployments(ListDeploymentsRequest) returns (ListDeploymentsResponse) {} @@ -219,6 +220,7 @@ message AddEnvironmentRequest { string desc = 2; } + message AddApplicationRequest { string name = 1 [(validate.rules).string.min_len = 1]; string env_id = 2; @@ -318,6 +320,17 @@ message GenerateApplicationSealedSecretResponse { string data = 1 [(validate.rules).string.min_len = 1]; } +message ListUnregisteredApplicationsRequest { +} + +message ListUnregisteredApplicationsResponse { + message Repo { + string id = 1; + repeated model.ApplicationInfo apps = 2; + } + repeated Repo repos = 1; +} + message ListDeploymentsRequest { message Options { repeated model.DeploymentStatus statuses = 1; diff --git a/pkg/app/piped/appconfigreporter/appconfigreporter.go b/pkg/app/piped/appconfigreporter/appconfigreporter.go index f07c5d759c..f11c84bf71 100644 --- a/pkg/app/piped/appconfigreporter/appconfigreporter.go +++ b/pkg/app/piped/appconfigreporter/appconfigreporter.go @@ -288,6 +288,8 @@ func (r *Reporter) updateUnregisteredApps(ctx context.Context, registeredAppPath r.logger.Info(fmt.Sprintf("found out %d unregistered applications in repository %s", len(as), repoID)) apps = append(apps, as...) } + // Prevent Piped from continuing to send meaningless requests + // even though Control-plane already knows that there are zero unregistered apps. if len(apps) == 0 { if r.sweptUnregisteredApps { return nil From 589f433399c6dbdfcfba07113fa08d253a11f864 Mon Sep 17 00:00:00 2001 From: nakabonne Date: Thu, 25 Nov 2021 18:55:35 +0900 Subject: [PATCH 2/8] Wip --- cmd/pipecd/BUILD.bazel | 1 - cmd/pipecd/server.go | 6 ++-- pkg/app/api/grpcapi/grpcapi.go | 5 ++++ pkg/app/api/grpcapi/piped_api.go | 16 +++++++---- pkg/app/api/grpcapi/web_api.go | 49 +++++++++++++++++++++++++------- 5 files changed, 55 insertions(+), 22 deletions(-) diff --git a/cmd/pipecd/BUILD.bazel b/cmd/pipecd/BUILD.bazel index 741f689590..e2063697ac 100644 --- a/cmd/pipecd/BUILD.bazel +++ b/cmd/pipecd/BUILD.bazel @@ -32,7 +32,6 @@ go_library( "//pkg/app/ops/planpreviewoutputcleaner:go_default_library", "//pkg/app/ops/staledpipedstatcleaner:go_default_library", "//pkg/cache/cachemetrics:go_default_library", - "//pkg/cache/memorycache:go_default_library", "//pkg/cache/rediscache:go_default_library", "//pkg/cli:go_default_library", "//pkg/config:go_default_library", diff --git a/cmd/pipecd/server.go b/cmd/pipecd/server.go index f27d1f9dab..2f30bbf400 100644 --- a/cmd/pipecd/server.go +++ b/cmd/pipecd/server.go @@ -41,7 +41,6 @@ import ( "github.com/pipe-cd/pipe/pkg/app/api/service/webservice" "github.com/pipe-cd/pipe/pkg/app/api/stagelogstore" "github.com/pipe-cd/pipe/pkg/cache/cachemetrics" - "github.com/pipe-cd/pipe/pkg/cache/memorycache" "github.com/pipe-cd/pipe/pkg/cache/rediscache" "github.com/pipe-cd/pipe/pkg/cli" "github.com/pipe-cd/pipe/pkg/config" @@ -189,7 +188,6 @@ func (s *server) run(ctx context.Context, input cli.Input) error { is := insightstore.NewStore(fs) cmdOutputStore := commandoutputstore.NewStore(fs, input.Logger) statCache := rediscache.NewHashCache(rd, defaultPipedStatHashKey) - unregisteredAppsCache := memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour) // Start a gRPC server for handling PipedAPI requests. { @@ -201,7 +199,7 @@ func (s *server) run(ctx context.Context, input cli.Input) error { datastore.NewPipedStore(ds), input.Logger, ) - service = grpcapi.NewPipedAPI(ctx, ds, sls, alss, las, cmds, statCache, unregisteredAppsCache, cmdOutputStore, cfg.Address, input.Logger) + service = grpcapi.NewPipedAPI(ctx, ds, sls, alss, las, cmds, statCache, rd, cmdOutputStore, cfg.Address, input.Logger) opts = []rpc.Option{ rpc.WithPort(s.pipedAPIPort), rpc.WithGracePeriod(s.gracePeriod), @@ -273,7 +271,7 @@ func (s *server) run(ctx context.Context, input cli.Input) error { return err } - service := grpcapi.NewWebAPI(ctx, ds, fs, sls, alss, cmds, is, rd, unregisteredAppsCache, cfg.ProjectMap(), encryptDecrypter, input.Logger) + service := grpcapi.NewWebAPI(ctx, ds, fs, sls, alss, cmds, is, rd, cfg.ProjectMap(), encryptDecrypter, input.Logger) opts := []rpc.Option{ rpc.WithPort(s.webAPIPort), rpc.WithGracePeriod(s.gracePeriod), diff --git a/pkg/app/api/grpcapi/grpcapi.go b/pkg/app/api/grpcapi/grpcapi.go index 9e06fc64e6..0b1eed7482 100644 --- a/pkg/app/api/grpcapi/grpcapi.go +++ b/pkg/app/api/grpcapi/grpcapi.go @@ -18,6 +18,7 @@ import ( "context" "encoding/base64" "errors" + "fmt" "go.uber.org/zap" "google.golang.org/grpc/codes" @@ -189,3 +190,7 @@ func getEncriptionKey(se *model.Piped_SecretEncryption) ([]byte, error) { return nil, status.Error(codes.FailedPrecondition, "The piped does not contain a valid encryption type") } } + +func makeUnregisteredAppsCacheKey(projectID string) string { + return fmt.Sprintf("HASHKEY:UNREGISTERED_APPS:%s", projectID) +} diff --git a/pkg/app/api/grpcapi/piped_api.go b/pkg/app/api/grpcapi/piped_api.go index a1f4248768..e8c0bf983f 100644 --- a/pkg/app/api/grpcapi/piped_api.go +++ b/pkg/app/api/grpcapi/piped_api.go @@ -33,9 +33,11 @@ import ( "github.com/pipe-cd/pipe/pkg/app/api/stagelogstore" "github.com/pipe-cd/pipe/pkg/cache" "github.com/pipe-cd/pipe/pkg/cache/memorycache" + "github.com/pipe-cd/pipe/pkg/cache/rediscache" "github.com/pipe-cd/pipe/pkg/datastore" "github.com/pipe-cd/pipe/pkg/filestore" "github.com/pipe-cd/pipe/pkg/model" + "github.com/pipe-cd/pipe/pkg/redis" "github.com/pipe-cd/pipe/pkg/rpc/rpcauth" ) @@ -58,15 +60,14 @@ type PipedAPI struct { deploymentPipedCache cache.Cache envProjectCache cache.Cache pipedStatCache cache.Cache - // A map from projectID to map["repo-id"][]*model.ApplicationInfo - unregisteredAppsCache cache.Cache + redis redis.Redis webBaseURL string logger *zap.Logger } // NewPipedAPI creates a new PipedAPI instance. -func NewPipedAPI(ctx context.Context, ds datastore.DataStore, sls stagelogstore.Store, alss applicationlivestatestore.Store, las analysisresultstore.Store, cs commandstore.Store, hc, uac cache.Cache, cop commandOutputPutter, webBaseURL string, logger *zap.Logger) *PipedAPI { +func NewPipedAPI(ctx context.Context, ds datastore.DataStore, sls stagelogstore.Store, alss applicationlivestatestore.Store, las analysisresultstore.Store, cs commandstore.Store, hc cache.Cache, rd redis.Redis, cop commandOutputPutter, webBaseURL string, logger *zap.Logger) *PipedAPI { a := &PipedAPI{ applicationStore: datastore.NewApplicationStore(ds), deploymentStore: datastore.NewDeploymentStore(ds), @@ -84,7 +85,7 @@ func NewPipedAPI(ctx context.Context, ds datastore.DataStore, sls stagelogstore. deploymentPipedCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour), envProjectCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour), pipedStatCache: hc, - unregisteredAppsCache: uac, + redis: rd, webBaseURL: webBaseURL, logger: logger.Named("piped-api"), } @@ -997,7 +998,7 @@ func (a *PipedAPI) UpdateApplicationConfigurations(ctx context.Context, req *pip } func (a *PipedAPI) ReportUnregisteredApplicationConfigurations(ctx context.Context, req *pipedservice.ReportUnregisteredApplicationConfigurationsRequest) (*pipedservice.ReportUnregisteredApplicationConfigurationsResponse, error) { - projectID, _, _, err := rpcauth.ExtractPipedToken(ctx) + projectID, pipedID, _, err := rpcauth.ExtractPipedToken(ctx) if err != nil { return nil, err } @@ -1012,7 +1013,10 @@ func (a *PipedAPI) ReportUnregisteredApplicationConfigurations(ctx context.Conte repoToApps[repoID] = append(repoToApps[repoID], appInfo) } - if err := a.unregisteredAppsCache.Put(projectID, repoToApps); err != nil { + key := makeUnregisteredAppsCacheKey(projectID) + c := rediscache.NewHashCache(a.redis, key) + // Put an entity like map["piped-id"]map["repo-id"][]*model.ApplicationInfo + if err := c.Put(pipedID, repoToApps); err != nil { return nil, status.Error(codes.Internal, "failed to put the unregistered apps to the cache") } diff --git a/pkg/app/api/grpcapi/web_api.go b/pkg/app/api/grpcapi/web_api.go index 579a9a8d90..6ebf8a1ee0 100644 --- a/pkg/app/api/grpcapi/web_api.go +++ b/pkg/app/api/grpcapi/web_api.go @@ -67,8 +67,7 @@ type WebAPI struct { pipedProjectCache cache.Cache envProjectCache cache.Cache insightCache cache.Cache - // A map from projectID to map["repo-id"][]*model.ApplicationInfo - unregisteredAppsCache cache.Cache + redis redis.Redis projectsInConfig map[string]config.ControlPlaneProject logger *zap.Logger @@ -84,7 +83,6 @@ func NewWebAPI( cmds commandstore.Store, is insightstore.Store, rd redis.Redis, - uac cache.Cache, projs map[string]config.ControlPlaneProject, encrypter encrypter, logger *zap.Logger) *WebAPI { @@ -106,7 +104,7 @@ func NewWebAPI( pipedProjectCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour), envProjectCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour), insightCache: rediscache.NewTTLCache(rd, 3*time.Hour), - unregisteredAppsCache: uac, + redis: rd, logger: logger.Named("web-api"), } return a @@ -610,7 +608,12 @@ func (a *WebAPI) ListUnregisteredApplications(ctx context.Context, _ *webservice a.logger.Error("failed to authenticate the current user", zap.Error(err)) return nil, err } - entity, err := a.unregisteredAppsCache.Get(claims.Role.ProjectId) + + // Collect all apps that belong to the project. + key := makeUnregisteredAppsCacheKey(claims.Role.ProjectId) + c := rediscache.NewHashCache(a.redis, key) + // It assumes each entry is map["piped-id"]map["repo-id"][]*model.ApplicationInfo + pipedToRepos, err := c.GetAll() if errors.Is(err, cache.ErrNotFound) { return &webservice.ListUnregisteredApplicationsResponse{}, nil } @@ -618,21 +621,45 @@ func (a *WebAPI) ListUnregisteredApplications(ctx context.Context, _ *webservice a.logger.Error("failed to get unregistered apps", zap.Error(err)) return nil, status.Error(codes.Internal, "Failed to get unregistered apps") } - repoToApps, ok := entity.(map[string][]*model.ApplicationInfo) - if !ok { - return nil, status.Error(codes.Internal, "Unexpected data cached") + + // FIXME: Refactor and fall into independent functions and write tests + // Integrate the apps cached for each Piped. + repoToApps := make(map[string][]*model.ApplicationInfo) + for _, r := range pipedToRepos { + repoToAppsPerPiped, ok := r.(map[string][]*model.ApplicationInfo) + if !ok { + return nil, status.Error(codes.Internal, "Unexpected data cached") + } + for repoID, apps := range repoToAppsPerPiped { + if _, ok := repoToApps[repoID]; !ok { + repoToApps[repoID] = []*model.ApplicationInfo{} + } + repoToApps[repoID] = append(repoToApps[repoID], apps...) + } } + // Tidy apps. repos := make([]*webservice.ListUnregisteredApplicationsResponse_Repo, len(repoToApps)) for repoID, apps := range repoToApps { - sort.Slice(apps, func(i, j int) bool { - return apps[i].GetPath() < apps[j].GetPath() + // Eliminate duplicated apps + tidiedApps := make([]*model.ApplicationInfo, 0, len(apps)) + gitPaths := make(map[string]struct{}) + for _, app := range apps { + if _, ok := gitPaths[app.GetPath()]; ok { + continue + } + tidiedApps = append(tidiedApps, app) + } + + sort.Slice(tidiedApps, func(i, j int) bool { + return tidiedApps[i].GetPath() < tidiedApps[j].GetPath() }) repos = append(repos, &webservice.ListUnregisteredApplicationsResponse_Repo{ Id: repoID, - Apps: apps, + Apps: tidiedApps, }) } + return &webservice.ListUnregisteredApplicationsResponse{ Repos: repos, }, nil From d069c6e1ff476f6f95fba23ff268cf7d295692b9 Mon Sep 17 00:00:00 2001 From: nakabonne Date: Fri, 26 Nov 2021 11:50:21 +0900 Subject: [PATCH 3/8] Change cache structure --- pkg/app/api/grpcapi/piped_api.go | 14 +-- pkg/app/api/grpcapi/web_api.go | 56 ++++++--- pkg/app/api/grpcapi/web_api_test.go | 186 ++++++++++++++++++++++++++++ 3 files changed, 224 insertions(+), 32 deletions(-) diff --git a/pkg/app/api/grpcapi/piped_api.go b/pkg/app/api/grpcapi/piped_api.go index e8c0bf983f..ec18a89c73 100644 --- a/pkg/app/api/grpcapi/piped_api.go +++ b/pkg/app/api/grpcapi/piped_api.go @@ -1003,20 +1003,10 @@ func (a *PipedAPI) ReportUnregisteredApplicationConfigurations(ctx context.Conte return nil, err } - // Make a map from repo-id to a collection of ApplicationInfo. - repoToApps := make(map[string][]*model.ApplicationInfo) - for _, appInfo := range req.Applications { - repoID := appInfo.RepoId - if _, ok := repoToApps[repoID]; !ok { - repoToApps[repoID] = []*model.ApplicationInfo{} - } - repoToApps[repoID] = append(repoToApps[repoID], appInfo) - } - key := makeUnregisteredAppsCacheKey(projectID) c := rediscache.NewHashCache(a.redis, key) - // Put an entity like map["piped-id"]map["repo-id"][]*model.ApplicationInfo - if err := c.Put(pipedID, repoToApps); err != nil { + // Cache a slice of *model.ApplicationInfo. + if err := c.Put(pipedID, req.Applications); err != nil { return nil, status.Error(codes.Internal, "failed to put the unregistered apps to the cache") } diff --git a/pkg/app/api/grpcapi/web_api.go b/pkg/app/api/grpcapi/web_api.go index 6ebf8a1ee0..3db91f5b16 100644 --- a/pkg/app/api/grpcapi/web_api.go +++ b/pkg/app/api/grpcapi/web_api.go @@ -612,8 +612,8 @@ func (a *WebAPI) ListUnregisteredApplications(ctx context.Context, _ *webservice // Collect all apps that belong to the project. key := makeUnregisteredAppsCacheKey(claims.Role.ProjectId) c := rediscache.NewHashCache(a.redis, key) - // It assumes each entry is map["piped-id"]map["repo-id"][]*model.ApplicationInfo - pipedToRepos, err := c.GetAll() + // pipedToApps assumes to be a map["piped-id"][]*model.ApplicationInfo + pipedToApps, err := c.GetAll() if errors.Is(err, cache.ErrNotFound) { return &webservice.ListUnregisteredApplicationsResponse{}, nil } @@ -622,32 +622,51 @@ func (a *WebAPI) ListUnregisteredApplications(ctx context.Context, _ *webservice return nil, status.Error(codes.Internal, "Failed to get unregistered apps") } - // FIXME: Refactor and fall into independent functions and write tests - // Integrate the apps cached for each Piped. - repoToApps := make(map[string][]*model.ApplicationInfo) - for _, r := range pipedToRepos { - repoToAppsPerPiped, ok := r.(map[string][]*model.ApplicationInfo) + // Integrate all apps cached for each Piped. + allApps := make([]*model.ApplicationInfo, 0) + for _, as := range pipedToApps { + apps, ok := as.([]*model.ApplicationInfo) if !ok { return nil, status.Error(codes.Internal, "Unexpected data cached") } - for repoID, apps := range repoToAppsPerPiped { - if _, ok := repoToApps[repoID]; !ok { - repoToApps[repoID] = []*model.ApplicationInfo{} - } - repoToApps[repoID] = append(repoToApps[repoID], apps...) + allApps = append(allApps, apps...) + } + + return &webservice.ListUnregisteredApplicationsResponse{ + Repos: groupAppsByRepo(allApps), + }, nil +} + +func groupAppsByRepo(apps []*model.ApplicationInfo) []*webservice.ListUnregisteredApplicationsResponse_Repo { + if len(apps) == 0 { + return []*webservice.ListUnregisteredApplicationsResponse_Repo{} + } + if len(apps) == 1 { + return []*webservice.ListUnregisteredApplicationsResponse_Repo{ + {Id: apps[0].RepoId, Apps: apps}, } } + // Make a map from repo-id to apps. + repoToApps := make(map[string][]*model.ApplicationInfo) + for _, app := range apps { + if _, ok := repoToApps[app.RepoId]; !ok { + repoToApps[app.RepoId] = []*model.ApplicationInfo{} + } + repoToApps[app.RepoId] = append(repoToApps[app.RepoId], app) + } + // Tidy apps. - repos := make([]*webservice.ListUnregisteredApplicationsResponse_Repo, len(repoToApps)) - for repoID, apps := range repoToApps { + repos := make([]*webservice.ListUnregisteredApplicationsResponse_Repo, 0, len(repoToApps)) + for repoID, as := range repoToApps { // Eliminate duplicated apps - tidiedApps := make([]*model.ApplicationInfo, 0, len(apps)) + tidiedApps := make([]*model.ApplicationInfo, 0, len(as)) gitPaths := make(map[string]struct{}) - for _, app := range apps { + for _, app := range as { if _, ok := gitPaths[app.GetPath()]; ok { continue } + gitPaths[app.GetPath()] = struct{}{} tidiedApps = append(tidiedApps, app) } @@ -659,10 +678,7 @@ func (a *WebAPI) ListUnregisteredApplications(ctx context.Context, _ *webservice Apps: tidiedApps, }) } - - return &webservice.ListUnregisteredApplicationsResponse{ - Repos: repos, - }, nil + return repos } // TODO: Validate the specified piped to ensure that it belongs to the specified environment. diff --git a/pkg/app/api/grpcapi/web_api_test.go b/pkg/app/api/grpcapi/web_api_test.go index 604fd39b5a..3fefdc3473 100644 --- a/pkg/app/api/grpcapi/web_api_test.go +++ b/pkg/app/api/grpcapi/web_api_test.go @@ -22,6 +22,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "github.com/pipe-cd/pipe/pkg/app/api/service/webservice" "github.com/pipe-cd/pipe/pkg/cache" "github.com/pipe-cd/pipe/pkg/cache/cachetest" "github.com/pipe-cd/pipe/pkg/datastore" @@ -374,3 +375,188 @@ func TestValidateApprover(t *testing.T) { }) } } + +func Test_groupAppsByRepo(t *testing.T) { + testcases := []struct { + name string + apps []*model.ApplicationInfo + want []*webservice.ListUnregisteredApplicationsResponse_Repo + }{ + { + name: "zero app given", + apps: []*model.ApplicationInfo{}, + want: []*webservice.ListUnregisteredApplicationsResponse_Repo{}, + }, + { + name: "one app given", + apps: []*model.ApplicationInfo{ + { + Name: "app1", + RepoId: "repo1", + Path: "path/to/app1", + }, + }, + want: []*webservice.ListUnregisteredApplicationsResponse_Repo{ + { + Id: "repo1", + Apps: []*model.ApplicationInfo{ + { + Name: "app1", + RepoId: "repo1", + Path: "path/to/app1", + }, + }, + }, + }, + }, + { + name: "apps within the same repo", + apps: []*model.ApplicationInfo{ + { + Name: "app1", + RepoId: "repo1", + Path: "path/to/app1", + }, + { + Name: "app2", + RepoId: "repo1", + Path: "path/to/app2", + }, + }, + want: []*webservice.ListUnregisteredApplicationsResponse_Repo{ + { + Id: "repo1", + Apps: []*model.ApplicationInfo{ + { + Name: "app1", + RepoId: "repo1", + Path: "path/to/app1", + }, + { + Name: "app2", + RepoId: "repo1", + Path: "path/to/app2", + }, + }, + }, + }, + }, + { + name: "duplicated apps", + apps: []*model.ApplicationInfo{ + { + Name: "app1", + RepoId: "repo1", + Path: "path/to/app1", + }, + { + Name: "app1", + RepoId: "repo1", + Path: "path/to/app1", + }, + { + Name: "app1", + RepoId: "repo1", + Path: "path/to/app1", + }, + }, + want: []*webservice.ListUnregisteredApplicationsResponse_Repo{ + { + Id: "repo1", + Apps: []*model.ApplicationInfo{ + { + Name: "app1", + RepoId: "repo1", + Path: "path/to/app1", + }, + }, + }, + }, + }, + { + name: "apps across different repos", + apps: []*model.ApplicationInfo{ + { + Name: "app1", + RepoId: "repo1", + Path: "path/to/app1", + }, + { + Name: "app2", + RepoId: "repo2", + Path: "path/to/app2", + }, + }, + want: []*webservice.ListUnregisteredApplicationsResponse_Repo{ + { + Id: "repo1", + Apps: []*model.ApplicationInfo{ + { + Name: "app1", + RepoId: "repo1", + Path: "path/to/app1", + }, + }, + }, + { + Id: "repo2", + Apps: []*model.ApplicationInfo{ + { + Name: "app2", + RepoId: "repo2", + Path: "path/to/app2", + }, + }, + }, + }, + }, + { + name: "out of order apps", + apps: []*model.ApplicationInfo{ + { + Name: "app3", + RepoId: "repo1", + Path: "path/to/app3", + }, + { + Name: "app1", + RepoId: "repo1", + Path: "path/to/app1", + }, + { + Name: "app2", + RepoId: "repo1", + Path: "path/to/app2", + }, + }, + want: []*webservice.ListUnregisteredApplicationsResponse_Repo{ + { + Id: "repo1", + Apps: []*model.ApplicationInfo{ + { + Name: "app1", + RepoId: "repo1", + Path: "path/to/app1", + }, + { + Name: "app2", + RepoId: "repo1", + Path: "path/to/app2", + }, + { + Name: "app3", + RepoId: "repo1", + Path: "path/to/app3", + }, + }, + }, + }, + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + got := groupAppsByRepo(tc.apps) + assert.Equal(t, tc.want, got) + }) + } +} From f91d3c005dd84ae2605c4fcce3b0f404d178f42f Mon Sep 17 00:00:00 2001 From: nakabonne Date: Fri, 26 Nov 2021 16:03:20 +0900 Subject: [PATCH 4/8] Remove unneeded change --- pkg/app/piped/appconfigreporter/appconfigreporter.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/app/piped/appconfigreporter/appconfigreporter.go b/pkg/app/piped/appconfigreporter/appconfigreporter.go index f11c84bf71..f07c5d759c 100644 --- a/pkg/app/piped/appconfigreporter/appconfigreporter.go +++ b/pkg/app/piped/appconfigreporter/appconfigreporter.go @@ -288,8 +288,6 @@ func (r *Reporter) updateUnregisteredApps(ctx context.Context, registeredAppPath r.logger.Info(fmt.Sprintf("found out %d unregistered applications in repository %s", len(as), repoID)) apps = append(apps, as...) } - // Prevent Piped from continuing to send meaningless requests - // even though Control-plane already knows that there are zero unregistered apps. if len(apps) == 0 { if r.sweptUnregisteredApps { return nil From e78cf6a5adb37d89c54823547542d1d8ba6a4e3b Mon Sep 17 00:00:00 2001 From: nakabonne Date: Mon, 29 Nov 2021 11:29:03 +0900 Subject: [PATCH 5/8] Change to return a flat list of apps --- pkg/app/api/grpcapi/web_api.go | 52 +---- pkg/app/api/grpcapi/web_api_test.go | 186 ------------------ pkg/app/api/service/webservice/service.proto | 6 +- .../appconfigreporter/appconfigreporter.go | 1 + .../appconfigreporter_test.go | 9 + pkg/model/common.proto | 1 + 6 files changed, 19 insertions(+), 236 deletions(-) diff --git a/pkg/app/api/grpcapi/web_api.go b/pkg/app/api/grpcapi/web_api.go index 3db91f5b16..a7c54ef6dd 100644 --- a/pkg/app/api/grpcapi/web_api.go +++ b/pkg/app/api/grpcapi/web_api.go @@ -631,56 +631,18 @@ func (a *WebAPI) ListUnregisteredApplications(ctx context.Context, _ *webservice } allApps = append(allApps, apps...) } + if len(allApps) == 0 { + return &webservice.ListUnregisteredApplicationsResponse{}, nil + } + sort.Slice(allApps, func(i, j int) bool { + return allApps[i].GetPath() < allApps[j].GetPath() + }) return &webservice.ListUnregisteredApplicationsResponse{ - Repos: groupAppsByRepo(allApps), + Applications: allApps, }, nil } -func groupAppsByRepo(apps []*model.ApplicationInfo) []*webservice.ListUnregisteredApplicationsResponse_Repo { - if len(apps) == 0 { - return []*webservice.ListUnregisteredApplicationsResponse_Repo{} - } - if len(apps) == 1 { - return []*webservice.ListUnregisteredApplicationsResponse_Repo{ - {Id: apps[0].RepoId, Apps: apps}, - } - } - - // Make a map from repo-id to apps. - repoToApps := make(map[string][]*model.ApplicationInfo) - for _, app := range apps { - if _, ok := repoToApps[app.RepoId]; !ok { - repoToApps[app.RepoId] = []*model.ApplicationInfo{} - } - repoToApps[app.RepoId] = append(repoToApps[app.RepoId], app) - } - - // Tidy apps. - repos := make([]*webservice.ListUnregisteredApplicationsResponse_Repo, 0, len(repoToApps)) - for repoID, as := range repoToApps { - // Eliminate duplicated apps - tidiedApps := make([]*model.ApplicationInfo, 0, len(as)) - gitPaths := make(map[string]struct{}) - for _, app := range as { - if _, ok := gitPaths[app.GetPath()]; ok { - continue - } - gitPaths[app.GetPath()] = struct{}{} - tidiedApps = append(tidiedApps, app) - } - - sort.Slice(tidiedApps, func(i, j int) bool { - return tidiedApps[i].GetPath() < tidiedApps[j].GetPath() - }) - repos = append(repos, &webservice.ListUnregisteredApplicationsResponse_Repo{ - Id: repoID, - Apps: tidiedApps, - }) - } - return repos -} - // TODO: Validate the specified piped to ensure that it belongs to the specified environment. func (a *WebAPI) AddApplication(ctx context.Context, req *webservice.AddApplicationRequest) (*webservice.AddApplicationResponse, error) { claims, err := rpcauth.ExtractClaims(ctx) diff --git a/pkg/app/api/grpcapi/web_api_test.go b/pkg/app/api/grpcapi/web_api_test.go index 3fefdc3473..604fd39b5a 100644 --- a/pkg/app/api/grpcapi/web_api_test.go +++ b/pkg/app/api/grpcapi/web_api_test.go @@ -22,7 +22,6 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" - "github.com/pipe-cd/pipe/pkg/app/api/service/webservice" "github.com/pipe-cd/pipe/pkg/cache" "github.com/pipe-cd/pipe/pkg/cache/cachetest" "github.com/pipe-cd/pipe/pkg/datastore" @@ -375,188 +374,3 @@ func TestValidateApprover(t *testing.T) { }) } } - -func Test_groupAppsByRepo(t *testing.T) { - testcases := []struct { - name string - apps []*model.ApplicationInfo - want []*webservice.ListUnregisteredApplicationsResponse_Repo - }{ - { - name: "zero app given", - apps: []*model.ApplicationInfo{}, - want: []*webservice.ListUnregisteredApplicationsResponse_Repo{}, - }, - { - name: "one app given", - apps: []*model.ApplicationInfo{ - { - Name: "app1", - RepoId: "repo1", - Path: "path/to/app1", - }, - }, - want: []*webservice.ListUnregisteredApplicationsResponse_Repo{ - { - Id: "repo1", - Apps: []*model.ApplicationInfo{ - { - Name: "app1", - RepoId: "repo1", - Path: "path/to/app1", - }, - }, - }, - }, - }, - { - name: "apps within the same repo", - apps: []*model.ApplicationInfo{ - { - Name: "app1", - RepoId: "repo1", - Path: "path/to/app1", - }, - { - Name: "app2", - RepoId: "repo1", - Path: "path/to/app2", - }, - }, - want: []*webservice.ListUnregisteredApplicationsResponse_Repo{ - { - Id: "repo1", - Apps: []*model.ApplicationInfo{ - { - Name: "app1", - RepoId: "repo1", - Path: "path/to/app1", - }, - { - Name: "app2", - RepoId: "repo1", - Path: "path/to/app2", - }, - }, - }, - }, - }, - { - name: "duplicated apps", - apps: []*model.ApplicationInfo{ - { - Name: "app1", - RepoId: "repo1", - Path: "path/to/app1", - }, - { - Name: "app1", - RepoId: "repo1", - Path: "path/to/app1", - }, - { - Name: "app1", - RepoId: "repo1", - Path: "path/to/app1", - }, - }, - want: []*webservice.ListUnregisteredApplicationsResponse_Repo{ - { - Id: "repo1", - Apps: []*model.ApplicationInfo{ - { - Name: "app1", - RepoId: "repo1", - Path: "path/to/app1", - }, - }, - }, - }, - }, - { - name: "apps across different repos", - apps: []*model.ApplicationInfo{ - { - Name: "app1", - RepoId: "repo1", - Path: "path/to/app1", - }, - { - Name: "app2", - RepoId: "repo2", - Path: "path/to/app2", - }, - }, - want: []*webservice.ListUnregisteredApplicationsResponse_Repo{ - { - Id: "repo1", - Apps: []*model.ApplicationInfo{ - { - Name: "app1", - RepoId: "repo1", - Path: "path/to/app1", - }, - }, - }, - { - Id: "repo2", - Apps: []*model.ApplicationInfo{ - { - Name: "app2", - RepoId: "repo2", - Path: "path/to/app2", - }, - }, - }, - }, - }, - { - name: "out of order apps", - apps: []*model.ApplicationInfo{ - { - Name: "app3", - RepoId: "repo1", - Path: "path/to/app3", - }, - { - Name: "app1", - RepoId: "repo1", - Path: "path/to/app1", - }, - { - Name: "app2", - RepoId: "repo1", - Path: "path/to/app2", - }, - }, - want: []*webservice.ListUnregisteredApplicationsResponse_Repo{ - { - Id: "repo1", - Apps: []*model.ApplicationInfo{ - { - Name: "app1", - RepoId: "repo1", - Path: "path/to/app1", - }, - { - Name: "app2", - RepoId: "repo1", - Path: "path/to/app2", - }, - { - Name: "app3", - RepoId: "repo1", - Path: "path/to/app3", - }, - }, - }, - }, - }, - } - for _, tc := range testcases { - t.Run(tc.name, func(t *testing.T) { - got := groupAppsByRepo(tc.apps) - assert.Equal(t, tc.want, got) - }) - } -} diff --git a/pkg/app/api/service/webservice/service.proto b/pkg/app/api/service/webservice/service.proto index 7e23fd6704..81a215df5d 100644 --- a/pkg/app/api/service/webservice/service.proto +++ b/pkg/app/api/service/webservice/service.proto @@ -324,11 +324,7 @@ message ListUnregisteredApplicationsRequest { } message ListUnregisteredApplicationsResponse { - message Repo { - string id = 1; - repeated model.ApplicationInfo apps = 2; - } - repeated Repo repos = 1; + repeated model.ApplicationInfo applications = 1; } message ListDeploymentsRequest { diff --git a/pkg/app/piped/appconfigreporter/appconfigreporter.go b/pkg/app/piped/appconfigreporter/appconfigreporter.go index f07c5d759c..1a109b8f24 100644 --- a/pkg/app/piped/appconfigreporter/appconfigreporter.go +++ b/pkg/app/piped/appconfigreporter/appconfigreporter.go @@ -381,6 +381,7 @@ func (r *Reporter) readApplicationInfo(repoDir, repoID, appDirRelPath, cfgFilena RepoId: repoID, Path: appDirRelPath, ConfigFilename: cfgFilename, + PipedId: r.config.PipedID, }, nil } diff --git a/pkg/app/piped/appconfigreporter/appconfigreporter_test.go b/pkg/app/piped/appconfigreporter/appconfigreporter_test.go index 7ba8dcb183..99b1eff007 100644 --- a/pkg/app/piped/appconfigreporter/appconfigreporter_test.go +++ b/pkg/app/piped/appconfigreporter/appconfigreporter_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/zap" + "github.com/pipe-cd/pipe/pkg/config" "github.com/pipe-cd/pipe/pkg/model" ) @@ -90,6 +91,7 @@ func TestReporter_findUnregisteredApps(t *testing.T) { { name: "valid app config that is unregistered", reporter: &Reporter{ + config: &config.PipedSpec{PipedID: "piped-1"}, fileSystem: fstest.MapFS{ "path/to/repo-1/app-1/.pipe.yaml": &fstest.MapFile{Data: []byte(` apiVersion: pipecd.dev/v1beta1 @@ -113,6 +115,7 @@ spec: RepoId: "repo-1", Path: "app-1", ConfigFilename: ".pipe.yaml", + PipedId: "piped-1", }, }, wantErr: false, @@ -120,6 +123,7 @@ spec: { name: "valid app config that name isn't default", reporter: &Reporter{ + config: &config.PipedSpec{PipedID: "piped-1"}, fileSystem: fstest.MapFS{ "path/to/repo-1/app-1/dev.pipecd.yaml": &fstest.MapFile{Data: []byte(` apiVersion: pipecd.dev/v1beta1 @@ -143,6 +147,7 @@ spec: RepoId: "repo-1", Path: "app-1", ConfigFilename: "dev.pipecd.yaml", + PipedId: "piped-1", }, }, wantErr: false, @@ -239,6 +244,7 @@ func TestReporter_findRegisteredApps(t *testing.T) { { name: "valid app config that is registered", reporter: &Reporter{ + config: &config.PipedSpec{PipedID: "piped-1"}, fileSystem: fstest.MapFS{ "path/to/repo-1/app-1/.pipe.yaml": &fstest.MapFile{Data: []byte(` apiVersion: pipecd.dev/v1beta1 @@ -266,6 +272,7 @@ spec: RepoId: "repo-1", Path: "app-1", ConfigFilename: ".pipe.yaml", + PipedId: "piped-1", }, }, wantErr: false, @@ -273,6 +280,7 @@ spec: { name: "last scanned commit is empty", reporter: &Reporter{ + config: &config.PipedSpec{PipedID: "piped-1"}, fileSystem: fstest.MapFS{ "path/to/repo-1/app-1/.pipe.yaml": &fstest.MapFile{Data: []byte(` apiVersion: pipecd.dev/v1beta1 @@ -300,6 +308,7 @@ spec: RepoId: "repo-1", Path: "app-1", ConfigFilename: ".pipe.yaml", + PipedId: "piped-1", }, }, wantErr: false, diff --git a/pkg/model/common.proto b/pkg/model/common.proto index 022727c96a..a5c44ddcde 100644 --- a/pkg/model/common.proto +++ b/pkg/model/common.proto @@ -64,4 +64,5 @@ message ApplicationInfo { string repo_id = 5 [(validate.rules).string.min_len = 1]; string path = 6 [(validate.rules).string.pattern = "^[^/].+$"]; string config_filename = 7; + string piped_id = 8 [(validate.rules).string.min_len = 1]; } From 4731ea8c3b5bd7ba85936f1c53eb3528cf652ad4 Mon Sep 17 00:00:00 2001 From: nakabonne Date: Mon, 29 Nov 2021 15:14:43 +0900 Subject: [PATCH 6/8] Fix test --- .../appconfigreporter/appconfigreporter_test.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/pkg/app/piped/appconfigreporter/appconfigreporter_test.go b/pkg/app/piped/appconfigreporter/appconfigreporter_test.go index 23ab3af37d..87e32afe5d 100644 --- a/pkg/app/piped/appconfigreporter/appconfigreporter_test.go +++ b/pkg/app/piped/appconfigreporter/appconfigreporter_test.go @@ -94,6 +94,7 @@ func TestReporter_findRegisteredApps(t *testing.T) { { name: "app not changed", reporter: &Reporter{ + config: &config.PipedSpec{PipedID: "piped-1"}, applicationLister: &fakeApplicationLister{apps: []*model.Application{ {Id: "id-1", Name: "app-1", Labels: map[string]string{"key-1": "value-1"}, GitPath: &model.ApplicationGitPath{Repo: &model.ApplicationGitRepository{Id: "repo-1"}, Path: "app-1", ConfigFilename: ".pipe.yaml"}}, }}, @@ -118,7 +119,7 @@ spec: { name: "app changed", reporter: &Reporter{ - config: &config.PipedSpec{PipedID: "piped-1"}, + config: &config.PipedSpec{PipedID: "piped-1"}, applicationLister: &fakeApplicationLister{apps: []*model.Application{ {Id: "id-1", Name: "app-1", Labels: map[string]string{"key-1": "value-1"}, GitPath: &model.ApplicationGitPath{Repo: &model.ApplicationGitRepository{Id: "repo-1"}, Path: "app-1", ConfigFilename: ".pipe.yaml"}}, }}, @@ -145,7 +146,7 @@ spec: RepoId: "repo-1", Path: "app-1", ConfigFilename: ".pipe.yaml", - PipedId: "piped-1", + PipedId: "piped-1", }, }, wantErr: false, @@ -228,7 +229,7 @@ func TestReporter_findUnregisteredApps(t *testing.T) { { name: "valid app config that is unregistered", reporter: &Reporter{ - config: &config.PipedSpec{PipedID: "piped-1"}, + config: &config.PipedSpec{PipedID: "piped-1"}, applicationLister: &fakeApplicationLister{}, fileSystem: fstest.MapFS{ "path/to/repo-1/app-1/.pipe.yaml": &fstest.MapFile{Data: []byte(` @@ -253,7 +254,7 @@ spec: RepoId: "repo-1", Path: "app-1", ConfigFilename: ".pipe.yaml", - PipedId: "piped-1", + PipedId: "piped-1", }, }, wantErr: false, @@ -261,7 +262,7 @@ spec: { name: "valid app config that name isn't default", reporter: &Reporter{ - config: &config.PipedSpec{PipedID: "piped-1"}, + config: &config.PipedSpec{PipedID: "piped-1"}, applicationLister: &fakeApplicationLister{}, fileSystem: fstest.MapFS{ "path/to/repo-1/app-1/dev.pipecd.yaml": &fstest.MapFile{Data: []byte(` @@ -286,7 +287,7 @@ spec: RepoId: "repo-1", Path: "app-1", ConfigFilename: "dev.pipecd.yaml", - PipedId: "piped-1", + PipedId: "piped-1", }, }, wantErr: false, From 8a3c5be1cb5a7372afb620b460da24575a5b1093 Mon Sep 17 00:00:00 2001 From: nakabonne Date: Mon, 29 Nov 2021 16:13:16 +0900 Subject: [PATCH 7/8] Use gob to encode/decode app configs --- pkg/app/api/grpcapi/piped_api.go | 12 ++++++++++-- pkg/app/api/grpcapi/web_api.go | 12 ++++++++++-- pkg/cache/rediscache/cache.go | 1 + pkg/cache/rediscache/hashcache.go | 2 ++ 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/pkg/app/api/grpcapi/piped_api.go b/pkg/app/api/grpcapi/piped_api.go index 96ba0cf019..022ace071b 100644 --- a/pkg/app/api/grpcapi/piped_api.go +++ b/pkg/app/api/grpcapi/piped_api.go @@ -15,7 +15,9 @@ package grpcapi import ( + "bytes" "context" + "encoding/gob" "encoding/json" "errors" "time" @@ -1001,10 +1003,16 @@ func (a *PipedAPI) ReportUnregisteredApplicationConfigurations(ctx context.Conte return nil, err } + // Cache an encoded slice of *model.ApplicationInfo. + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + if err := enc.Encode(req.Applications); err != nil { + a.logger.Error("failed to encode the unregistered apps", zap.Error(err)) + return nil, status.Error(codes.Internal, "failed to encode the unregistered apps") + } key := makeUnregisteredAppsCacheKey(projectID) c := rediscache.NewHashCache(a.redis, key) - // Cache a slice of *model.ApplicationInfo. - if err := c.Put(pipedID, req.Applications); err != nil { + if err := c.Put(pipedID, buf.Bytes()); err != nil { return nil, status.Error(codes.Internal, "failed to put the unregistered apps to the cache") } diff --git a/pkg/app/api/grpcapi/web_api.go b/pkg/app/api/grpcapi/web_api.go index a7c54ef6dd..61d0c55a74 100644 --- a/pkg/app/api/grpcapi/web_api.go +++ b/pkg/app/api/grpcapi/web_api.go @@ -15,7 +15,9 @@ package grpcapi import ( + "bytes" "context" + "encoding/gob" "errors" "fmt" "sort" @@ -612,7 +614,7 @@ func (a *WebAPI) ListUnregisteredApplications(ctx context.Context, _ *webservice // Collect all apps that belong to the project. key := makeUnregisteredAppsCacheKey(claims.Role.ProjectId) c := rediscache.NewHashCache(a.redis, key) - // pipedToApps assumes to be a map["piped-id"][]*model.ApplicationInfo + // pipedToApps assumes to be a map["piped-id"][]byte(slice of *model.ApplicationInfo encoded by encoding/gob) pipedToApps, err := c.GetAll() if errors.Is(err, cache.ErrNotFound) { return &webservice.ListUnregisteredApplicationsResponse{}, nil @@ -625,10 +627,16 @@ func (a *WebAPI) ListUnregisteredApplications(ctx context.Context, _ *webservice // Integrate all apps cached for each Piped. allApps := make([]*model.ApplicationInfo, 0) for _, as := range pipedToApps { - apps, ok := as.([]*model.ApplicationInfo) + b, ok := as.([]byte) if !ok { return nil, status.Error(codes.Internal, "Unexpected data cached") } + dec := gob.NewDecoder(bytes.NewReader(b)) + var apps []*model.ApplicationInfo + if err := dec.Decode(&apps); err != nil { + a.logger.Error("failed to decode the unregistered apps", zap.Error(err)) + return nil, status.Error(codes.Internal, "failed to decode the unregistered apps") + } allApps = append(allApps, apps...) } if len(allApps) == 0 { diff --git a/pkg/cache/rediscache/cache.go b/pkg/cache/rediscache/cache.go index 01c4b63226..bc7d45b583 100644 --- a/pkg/cache/rediscache/cache.go +++ b/pkg/cache/rediscache/cache.go @@ -77,6 +77,7 @@ func (c *RedisCache) Get(k string) (interface{}, error) { return reply, nil } +// It is caller's responsibility to encode Go struct. func (c *RedisCache) Put(k string, v interface{}) error { conn := c.redis.Get() defer conn.Close() diff --git a/pkg/cache/rediscache/hashcache.go b/pkg/cache/rediscache/hashcache.go index 246912583b..9655ddf65d 100644 --- a/pkg/cache/rediscache/hashcache.go +++ b/pkg/cache/rediscache/hashcache.go @@ -60,6 +60,8 @@ func (r *RedisHashCache) Get(k string) (interface{}, error) { // check if the TTL for hashkey (not the key k), in case the TTL for // hashkey is not yet existed or unset, EXPIRE will be called and set // TTL time for the whole hashkey. +// +// It is caller's responsibility to encode Go struct. func (r *RedisHashCache) Put(k string, v interface{}) error { conn := r.redis.Get() defer conn.Close() From 9270bbc7f810d739430a7fca3d240fb9b455e216 Mon Sep 17 00:00:00 2001 From: Ryo Nakao Date: Mon, 29 Nov 2021 17:42:53 +0900 Subject: [PATCH 8/8] Update pkg/app/api/grpcapi/web_api.go Co-authored-by: Le Van Nghia --- pkg/app/api/grpcapi/web_api.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/app/api/grpcapi/web_api.go b/pkg/app/api/grpcapi/web_api.go index 61d0c55a74..a58ef1ac22 100644 --- a/pkg/app/api/grpcapi/web_api.go +++ b/pkg/app/api/grpcapi/web_api.go @@ -644,7 +644,7 @@ func (a *WebAPI) ListUnregisteredApplications(ctx context.Context, _ *webservice } sort.Slice(allApps, func(i, j int) bool { - return allApps[i].GetPath() < allApps[j].GetPath() + return allApps[i].Path < allApps[j].Path }) return &webservice.ListUnregisteredApplicationsResponse{ Applications: allApps,