From 589f433399c6dbdfcfba07113fa08d253a11f864 Mon Sep 17 00:00:00 2001 From: nakabonne Date: Thu, 25 Nov 2021 18:55:35 +0900 Subject: [PATCH] Wip --- cmd/pipecd/BUILD.bazel | 1 - cmd/pipecd/server.go | 6 ++-- pkg/app/api/grpcapi/grpcapi.go | 5 ++++ pkg/app/api/grpcapi/piped_api.go | 16 +++++++---- pkg/app/api/grpcapi/web_api.go | 49 +++++++++++++++++++++++++------- 5 files changed, 55 insertions(+), 22 deletions(-) diff --git a/cmd/pipecd/BUILD.bazel b/cmd/pipecd/BUILD.bazel index 741f689590..e2063697ac 100644 --- a/cmd/pipecd/BUILD.bazel +++ b/cmd/pipecd/BUILD.bazel @@ -32,7 +32,6 @@ go_library( "//pkg/app/ops/planpreviewoutputcleaner:go_default_library", "//pkg/app/ops/staledpipedstatcleaner:go_default_library", "//pkg/cache/cachemetrics:go_default_library", - "//pkg/cache/memorycache:go_default_library", "//pkg/cache/rediscache:go_default_library", "//pkg/cli:go_default_library", "//pkg/config:go_default_library", diff --git a/cmd/pipecd/server.go b/cmd/pipecd/server.go index f27d1f9dab..2f30bbf400 100644 --- a/cmd/pipecd/server.go +++ b/cmd/pipecd/server.go @@ -41,7 +41,6 @@ import ( "github.com/pipe-cd/pipe/pkg/app/api/service/webservice" "github.com/pipe-cd/pipe/pkg/app/api/stagelogstore" "github.com/pipe-cd/pipe/pkg/cache/cachemetrics" - "github.com/pipe-cd/pipe/pkg/cache/memorycache" "github.com/pipe-cd/pipe/pkg/cache/rediscache" "github.com/pipe-cd/pipe/pkg/cli" "github.com/pipe-cd/pipe/pkg/config" @@ -189,7 +188,6 @@ func (s *server) run(ctx context.Context, input cli.Input) error { is := insightstore.NewStore(fs) cmdOutputStore := commandoutputstore.NewStore(fs, input.Logger) statCache := rediscache.NewHashCache(rd, defaultPipedStatHashKey) - unregisteredAppsCache := memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour) // Start a gRPC server for handling PipedAPI requests. { @@ -201,7 +199,7 @@ func (s *server) run(ctx context.Context, input cli.Input) error { datastore.NewPipedStore(ds), input.Logger, ) - service = grpcapi.NewPipedAPI(ctx, ds, sls, alss, las, cmds, statCache, unregisteredAppsCache, cmdOutputStore, cfg.Address, input.Logger) + service = grpcapi.NewPipedAPI(ctx, ds, sls, alss, las, cmds, statCache, rd, cmdOutputStore, cfg.Address, input.Logger) opts = []rpc.Option{ rpc.WithPort(s.pipedAPIPort), rpc.WithGracePeriod(s.gracePeriod), @@ -273,7 +271,7 @@ func (s *server) run(ctx context.Context, input cli.Input) error { return err } - service := grpcapi.NewWebAPI(ctx, ds, fs, sls, alss, cmds, is, rd, unregisteredAppsCache, cfg.ProjectMap(), encryptDecrypter, input.Logger) + service := grpcapi.NewWebAPI(ctx, ds, fs, sls, alss, cmds, is, rd, cfg.ProjectMap(), encryptDecrypter, input.Logger) opts := []rpc.Option{ rpc.WithPort(s.webAPIPort), rpc.WithGracePeriod(s.gracePeriod), diff --git a/pkg/app/api/grpcapi/grpcapi.go b/pkg/app/api/grpcapi/grpcapi.go index 9e06fc64e6..0b1eed7482 100644 --- a/pkg/app/api/grpcapi/grpcapi.go +++ b/pkg/app/api/grpcapi/grpcapi.go @@ -18,6 +18,7 @@ import ( "context" "encoding/base64" "errors" + "fmt" "go.uber.org/zap" "google.golang.org/grpc/codes" @@ -189,3 +190,7 @@ func getEncriptionKey(se *model.Piped_SecretEncryption) ([]byte, error) { return nil, status.Error(codes.FailedPrecondition, "The piped does not contain a valid encryption type") } } + +func makeUnregisteredAppsCacheKey(projectID string) string { + return fmt.Sprintf("HASHKEY:UNREGISTERED_APPS:%s", projectID) +} diff --git a/pkg/app/api/grpcapi/piped_api.go b/pkg/app/api/grpcapi/piped_api.go index a1f4248768..e8c0bf983f 100644 --- a/pkg/app/api/grpcapi/piped_api.go +++ b/pkg/app/api/grpcapi/piped_api.go @@ -33,9 +33,11 @@ import ( "github.com/pipe-cd/pipe/pkg/app/api/stagelogstore" "github.com/pipe-cd/pipe/pkg/cache" "github.com/pipe-cd/pipe/pkg/cache/memorycache" + "github.com/pipe-cd/pipe/pkg/cache/rediscache" "github.com/pipe-cd/pipe/pkg/datastore" "github.com/pipe-cd/pipe/pkg/filestore" "github.com/pipe-cd/pipe/pkg/model" + "github.com/pipe-cd/pipe/pkg/redis" "github.com/pipe-cd/pipe/pkg/rpc/rpcauth" ) @@ -58,15 +60,14 @@ type PipedAPI struct { deploymentPipedCache cache.Cache envProjectCache cache.Cache pipedStatCache cache.Cache - // A map from projectID to map["repo-id"][]*model.ApplicationInfo - unregisteredAppsCache cache.Cache + redis redis.Redis webBaseURL string logger *zap.Logger } // NewPipedAPI creates a new PipedAPI instance. -func NewPipedAPI(ctx context.Context, ds datastore.DataStore, sls stagelogstore.Store, alss applicationlivestatestore.Store, las analysisresultstore.Store, cs commandstore.Store, hc, uac cache.Cache, cop commandOutputPutter, webBaseURL string, logger *zap.Logger) *PipedAPI { +func NewPipedAPI(ctx context.Context, ds datastore.DataStore, sls stagelogstore.Store, alss applicationlivestatestore.Store, las analysisresultstore.Store, cs commandstore.Store, hc cache.Cache, rd redis.Redis, cop commandOutputPutter, webBaseURL string, logger *zap.Logger) *PipedAPI { a := &PipedAPI{ applicationStore: datastore.NewApplicationStore(ds), deploymentStore: datastore.NewDeploymentStore(ds), @@ -84,7 +85,7 @@ func NewPipedAPI(ctx context.Context, ds datastore.DataStore, sls stagelogstore. deploymentPipedCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour), envProjectCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour), pipedStatCache: hc, - unregisteredAppsCache: uac, + redis: rd, webBaseURL: webBaseURL, logger: logger.Named("piped-api"), } @@ -997,7 +998,7 @@ func (a *PipedAPI) UpdateApplicationConfigurations(ctx context.Context, req *pip } func (a *PipedAPI) ReportUnregisteredApplicationConfigurations(ctx context.Context, req *pipedservice.ReportUnregisteredApplicationConfigurationsRequest) (*pipedservice.ReportUnregisteredApplicationConfigurationsResponse, error) { - projectID, _, _, err := rpcauth.ExtractPipedToken(ctx) + projectID, pipedID, _, err := rpcauth.ExtractPipedToken(ctx) if err != nil { return nil, err } @@ -1012,7 +1013,10 @@ func (a *PipedAPI) ReportUnregisteredApplicationConfigurations(ctx context.Conte repoToApps[repoID] = append(repoToApps[repoID], appInfo) } - if err := a.unregisteredAppsCache.Put(projectID, repoToApps); err != nil { + key := makeUnregisteredAppsCacheKey(projectID) + c := rediscache.NewHashCache(a.redis, key) + // Put an entity like map["piped-id"]map["repo-id"][]*model.ApplicationInfo + if err := c.Put(pipedID, repoToApps); err != nil { return nil, status.Error(codes.Internal, "failed to put the unregistered apps to the cache") } diff --git a/pkg/app/api/grpcapi/web_api.go b/pkg/app/api/grpcapi/web_api.go index 579a9a8d90..6ebf8a1ee0 100644 --- a/pkg/app/api/grpcapi/web_api.go +++ b/pkg/app/api/grpcapi/web_api.go @@ -67,8 +67,7 @@ type WebAPI struct { pipedProjectCache cache.Cache envProjectCache cache.Cache insightCache cache.Cache - // A map from projectID to map["repo-id"][]*model.ApplicationInfo - unregisteredAppsCache cache.Cache + redis redis.Redis projectsInConfig map[string]config.ControlPlaneProject logger *zap.Logger @@ -84,7 +83,6 @@ func NewWebAPI( cmds commandstore.Store, is insightstore.Store, rd redis.Redis, - uac cache.Cache, projs map[string]config.ControlPlaneProject, encrypter encrypter, logger *zap.Logger) *WebAPI { @@ -106,7 +104,7 @@ func NewWebAPI( pipedProjectCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour), envProjectCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour), insightCache: rediscache.NewTTLCache(rd, 3*time.Hour), - unregisteredAppsCache: uac, + redis: rd, logger: logger.Named("web-api"), } return a @@ -610,7 +608,12 @@ func (a *WebAPI) ListUnregisteredApplications(ctx context.Context, _ *webservice a.logger.Error("failed to authenticate the current user", zap.Error(err)) return nil, err } - entity, err := a.unregisteredAppsCache.Get(claims.Role.ProjectId) + + // Collect all apps that belong to the project. + key := makeUnregisteredAppsCacheKey(claims.Role.ProjectId) + c := rediscache.NewHashCache(a.redis, key) + // It assumes each entry is map["piped-id"]map["repo-id"][]*model.ApplicationInfo + pipedToRepos, err := c.GetAll() if errors.Is(err, cache.ErrNotFound) { return &webservice.ListUnregisteredApplicationsResponse{}, nil } @@ -618,21 +621,45 @@ func (a *WebAPI) ListUnregisteredApplications(ctx context.Context, _ *webservice a.logger.Error("failed to get unregistered apps", zap.Error(err)) return nil, status.Error(codes.Internal, "Failed to get unregistered apps") } - repoToApps, ok := entity.(map[string][]*model.ApplicationInfo) - if !ok { - return nil, status.Error(codes.Internal, "Unexpected data cached") + + // FIXME: Refactor and fall into independent functions and write tests + // Integrate the apps cached for each Piped. + repoToApps := make(map[string][]*model.ApplicationInfo) + for _, r := range pipedToRepos { + repoToAppsPerPiped, ok := r.(map[string][]*model.ApplicationInfo) + if !ok { + return nil, status.Error(codes.Internal, "Unexpected data cached") + } + for repoID, apps := range repoToAppsPerPiped { + if _, ok := repoToApps[repoID]; !ok { + repoToApps[repoID] = []*model.ApplicationInfo{} + } + repoToApps[repoID] = append(repoToApps[repoID], apps...) + } } + // Tidy apps. repos := make([]*webservice.ListUnregisteredApplicationsResponse_Repo, len(repoToApps)) for repoID, apps := range repoToApps { - sort.Slice(apps, func(i, j int) bool { - return apps[i].GetPath() < apps[j].GetPath() + // Eliminate duplicated apps + tidiedApps := make([]*model.ApplicationInfo, 0, len(apps)) + gitPaths := make(map[string]struct{}) + for _, app := range apps { + if _, ok := gitPaths[app.GetPath()]; ok { + continue + } + tidiedApps = append(tidiedApps, app) + } + + sort.Slice(tidiedApps, func(i, j int) bool { + return tidiedApps[i].GetPath() < tidiedApps[j].GetPath() }) repos = append(repos, &webservice.ListUnregisteredApplicationsResponse_Repo{ Id: repoID, - Apps: apps, + Apps: tidiedApps, }) } + return &webservice.ListUnregisteredApplicationsResponse{ Repos: repos, }, nil