Skip to content

Commit

Permalink
Wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nakabonne committed Nov 25, 2021
1 parent ad8a5fb commit 589f433
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 22 deletions.
1 change: 0 additions & 1 deletion cmd/pipecd/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ go_library(
"//pkg/app/ops/planpreviewoutputcleaner:go_default_library",
"//pkg/app/ops/staledpipedstatcleaner:go_default_library",
"//pkg/cache/cachemetrics:go_default_library",
"//pkg/cache/memorycache:go_default_library",
"//pkg/cache/rediscache:go_default_library",
"//pkg/cli:go_default_library",
"//pkg/config:go_default_library",
Expand Down
6 changes: 2 additions & 4 deletions cmd/pipecd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/pipe-cd/pipe/pkg/app/api/service/webservice"
"github.com/pipe-cd/pipe/pkg/app/api/stagelogstore"
"github.com/pipe-cd/pipe/pkg/cache/cachemetrics"
"github.com/pipe-cd/pipe/pkg/cache/memorycache"
"github.com/pipe-cd/pipe/pkg/cache/rediscache"
"github.com/pipe-cd/pipe/pkg/cli"
"github.com/pipe-cd/pipe/pkg/config"
Expand Down Expand Up @@ -189,7 +188,6 @@ func (s *server) run(ctx context.Context, input cli.Input) error {
is := insightstore.NewStore(fs)
cmdOutputStore := commandoutputstore.NewStore(fs, input.Logger)
statCache := rediscache.NewHashCache(rd, defaultPipedStatHashKey)
unregisteredAppsCache := memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour)

// Start a gRPC server for handling PipedAPI requests.
{
Expand All @@ -201,7 +199,7 @@ func (s *server) run(ctx context.Context, input cli.Input) error {
datastore.NewPipedStore(ds),
input.Logger,
)
service = grpcapi.NewPipedAPI(ctx, ds, sls, alss, las, cmds, statCache, unregisteredAppsCache, cmdOutputStore, cfg.Address, input.Logger)
service = grpcapi.NewPipedAPI(ctx, ds, sls, alss, las, cmds, statCache, rd, cmdOutputStore, cfg.Address, input.Logger)
opts = []rpc.Option{
rpc.WithPort(s.pipedAPIPort),
rpc.WithGracePeriod(s.gracePeriod),
Expand Down Expand Up @@ -273,7 +271,7 @@ func (s *server) run(ctx context.Context, input cli.Input) error {
return err
}

service := grpcapi.NewWebAPI(ctx, ds, fs, sls, alss, cmds, is, rd, unregisteredAppsCache, cfg.ProjectMap(), encryptDecrypter, input.Logger)
service := grpcapi.NewWebAPI(ctx, ds, fs, sls, alss, cmds, is, rd, cfg.ProjectMap(), encryptDecrypter, input.Logger)
opts := []rpc.Option{
rpc.WithPort(s.webAPIPort),
rpc.WithGracePeriod(s.gracePeriod),
Expand Down
5 changes: 5 additions & 0 deletions pkg/app/api/grpcapi/grpcapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/base64"
"errors"
"fmt"

"go.uber.org/zap"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -189,3 +190,7 @@ func getEncriptionKey(se *model.Piped_SecretEncryption) ([]byte, error) {
return nil, status.Error(codes.FailedPrecondition, "The piped does not contain a valid encryption type")
}
}

func makeUnregisteredAppsCacheKey(projectID string) string {
return fmt.Sprintf("HASHKEY:UNREGISTERED_APPS:%s", projectID)
}
16 changes: 10 additions & 6 deletions pkg/app/api/grpcapi/piped_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import (
"github.com/pipe-cd/pipe/pkg/app/api/stagelogstore"
"github.com/pipe-cd/pipe/pkg/cache"
"github.com/pipe-cd/pipe/pkg/cache/memorycache"
"github.com/pipe-cd/pipe/pkg/cache/rediscache"
"github.com/pipe-cd/pipe/pkg/datastore"
"github.com/pipe-cd/pipe/pkg/filestore"
"github.com/pipe-cd/pipe/pkg/model"
"github.com/pipe-cd/pipe/pkg/redis"
"github.com/pipe-cd/pipe/pkg/rpc/rpcauth"
)

Expand All @@ -58,15 +60,14 @@ type PipedAPI struct {
deploymentPipedCache cache.Cache
envProjectCache cache.Cache
pipedStatCache cache.Cache
// A map from projectID to map["repo-id"][]*model.ApplicationInfo
unregisteredAppsCache cache.Cache
redis redis.Redis

webBaseURL string
logger *zap.Logger
}

// NewPipedAPI creates a new PipedAPI instance.
func NewPipedAPI(ctx context.Context, ds datastore.DataStore, sls stagelogstore.Store, alss applicationlivestatestore.Store, las analysisresultstore.Store, cs commandstore.Store, hc, uac cache.Cache, cop commandOutputPutter, webBaseURL string, logger *zap.Logger) *PipedAPI {
func NewPipedAPI(ctx context.Context, ds datastore.DataStore, sls stagelogstore.Store, alss applicationlivestatestore.Store, las analysisresultstore.Store, cs commandstore.Store, hc cache.Cache, rd redis.Redis, cop commandOutputPutter, webBaseURL string, logger *zap.Logger) *PipedAPI {
a := &PipedAPI{
applicationStore: datastore.NewApplicationStore(ds),
deploymentStore: datastore.NewDeploymentStore(ds),
Expand All @@ -84,7 +85,7 @@ func NewPipedAPI(ctx context.Context, ds datastore.DataStore, sls stagelogstore.
deploymentPipedCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour),
envProjectCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour),
pipedStatCache: hc,
unregisteredAppsCache: uac,
redis: rd,
webBaseURL: webBaseURL,
logger: logger.Named("piped-api"),
}
Expand Down Expand Up @@ -997,7 +998,7 @@ func (a *PipedAPI) UpdateApplicationConfigurations(ctx context.Context, req *pip
}

func (a *PipedAPI) ReportUnregisteredApplicationConfigurations(ctx context.Context, req *pipedservice.ReportUnregisteredApplicationConfigurationsRequest) (*pipedservice.ReportUnregisteredApplicationConfigurationsResponse, error) {
projectID, _, _, err := rpcauth.ExtractPipedToken(ctx)
projectID, pipedID, _, err := rpcauth.ExtractPipedToken(ctx)
if err != nil {
return nil, err
}
Expand All @@ -1012,7 +1013,10 @@ func (a *PipedAPI) ReportUnregisteredApplicationConfigurations(ctx context.Conte
repoToApps[repoID] = append(repoToApps[repoID], appInfo)
}

if err := a.unregisteredAppsCache.Put(projectID, repoToApps); err != nil {
key := makeUnregisteredAppsCacheKey(projectID)
c := rediscache.NewHashCache(a.redis, key)
// Put an entity like map["piped-id"]map["repo-id"][]*model.ApplicationInfo
if err := c.Put(pipedID, repoToApps); err != nil {
return nil, status.Error(codes.Internal, "failed to put the unregistered apps to the cache")
}

Expand Down
49 changes: 38 additions & 11 deletions pkg/app/api/grpcapi/web_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ type WebAPI struct {
pipedProjectCache cache.Cache
envProjectCache cache.Cache
insightCache cache.Cache
// A map from projectID to map["repo-id"][]*model.ApplicationInfo
unregisteredAppsCache cache.Cache
redis redis.Redis

projectsInConfig map[string]config.ControlPlaneProject
logger *zap.Logger
Expand All @@ -84,7 +83,6 @@ func NewWebAPI(
cmds commandstore.Store,
is insightstore.Store,
rd redis.Redis,
uac cache.Cache,
projs map[string]config.ControlPlaneProject,
encrypter encrypter,
logger *zap.Logger) *WebAPI {
Expand All @@ -106,7 +104,7 @@ func NewWebAPI(
pipedProjectCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour),
envProjectCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour),
insightCache: rediscache.NewTTLCache(rd, 3*time.Hour),
unregisteredAppsCache: uac,
redis: rd,
logger: logger.Named("web-api"),
}
return a
Expand Down Expand Up @@ -610,29 +608,58 @@ func (a *WebAPI) ListUnregisteredApplications(ctx context.Context, _ *webservice
a.logger.Error("failed to authenticate the current user", zap.Error(err))
return nil, err
}
entity, err := a.unregisteredAppsCache.Get(claims.Role.ProjectId)

// Collect all apps that belong to the project.
key := makeUnregisteredAppsCacheKey(claims.Role.ProjectId)
c := rediscache.NewHashCache(a.redis, key)
// It assumes each entry is map["piped-id"]map["repo-id"][]*model.ApplicationInfo
pipedToRepos, err := c.GetAll()
if errors.Is(err, cache.ErrNotFound) {
return &webservice.ListUnregisteredApplicationsResponse{}, nil
}
if err != nil {
a.logger.Error("failed to get unregistered apps", zap.Error(err))
return nil, status.Error(codes.Internal, "Failed to get unregistered apps")
}
repoToApps, ok := entity.(map[string][]*model.ApplicationInfo)
if !ok {
return nil, status.Error(codes.Internal, "Unexpected data cached")

// FIXME: Refactor and fall into independent functions and write tests
// Integrate the apps cached for each Piped.
repoToApps := make(map[string][]*model.ApplicationInfo)
for _, r := range pipedToRepos {
repoToAppsPerPiped, ok := r.(map[string][]*model.ApplicationInfo)
if !ok {
return nil, status.Error(codes.Internal, "Unexpected data cached")
}
for repoID, apps := range repoToAppsPerPiped {
if _, ok := repoToApps[repoID]; !ok {
repoToApps[repoID] = []*model.ApplicationInfo{}
}
repoToApps[repoID] = append(repoToApps[repoID], apps...)
}
}

// Tidy apps.
repos := make([]*webservice.ListUnregisteredApplicationsResponse_Repo, len(repoToApps))
for repoID, apps := range repoToApps {
sort.Slice(apps, func(i, j int) bool {
return apps[i].GetPath() < apps[j].GetPath()
// Eliminate duplicated apps
tidiedApps := make([]*model.ApplicationInfo, 0, len(apps))
gitPaths := make(map[string]struct{})
for _, app := range apps {
if _, ok := gitPaths[app.GetPath()]; ok {
continue
}
tidiedApps = append(tidiedApps, app)
}

sort.Slice(tidiedApps, func(i, j int) bool {
return tidiedApps[i].GetPath() < tidiedApps[j].GetPath()
})
repos = append(repos, &webservice.ListUnregisteredApplicationsResponse_Repo{
Id: repoID,
Apps: apps,
Apps: tidiedApps,
})
}

return &webservice.ListUnregisteredApplicationsResponse{
Repos: repos,
}, nil
Expand Down

0 comments on commit 589f433

Please sign in to comment.