Skip to content

Commit

Permalink
Add RPCs to list unregistered apps
Browse files Browse the repository at this point in the history
  • Loading branch information
nakabonne committed Nov 25, 2021
1 parent 7af924d commit ad8a5fb
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 5 deletions.
1 change: 1 addition & 0 deletions cmd/pipecd/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"//pkg/app/ops/planpreviewoutputcleaner:go_default_library",
"//pkg/app/ops/staledpipedstatcleaner:go_default_library",
"//pkg/cache/cachemetrics:go_default_library",
"//pkg/cache/memorycache:go_default_library",
"//pkg/cache/rediscache:go_default_library",
"//pkg/cli:go_default_library",
"//pkg/config:go_default_library",
Expand Down
6 changes: 4 additions & 2 deletions cmd/pipecd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pipe-cd/pipe/pkg/app/api/service/webservice"
"github.com/pipe-cd/pipe/pkg/app/api/stagelogstore"
"github.com/pipe-cd/pipe/pkg/cache/cachemetrics"
"github.com/pipe-cd/pipe/pkg/cache/memorycache"
"github.com/pipe-cd/pipe/pkg/cache/rediscache"
"github.com/pipe-cd/pipe/pkg/cli"
"github.com/pipe-cd/pipe/pkg/config"
Expand Down Expand Up @@ -188,6 +189,7 @@ func (s *server) run(ctx context.Context, input cli.Input) error {
is := insightstore.NewStore(fs)
cmdOutputStore := commandoutputstore.NewStore(fs, input.Logger)
statCache := rediscache.NewHashCache(rd, defaultPipedStatHashKey)
unregisteredAppsCache := memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour)

// Start a gRPC server for handling PipedAPI requests.
{
Expand All @@ -199,7 +201,7 @@ func (s *server) run(ctx context.Context, input cli.Input) error {
datastore.NewPipedStore(ds),
input.Logger,
)
service = grpcapi.NewPipedAPI(ctx, ds, sls, alss, las, cmds, statCache, cmdOutputStore, cfg.Address, input.Logger)
service = grpcapi.NewPipedAPI(ctx, ds, sls, alss, las, cmds, statCache, unregisteredAppsCache, cmdOutputStore, cfg.Address, input.Logger)
opts = []rpc.Option{
rpc.WithPort(s.pipedAPIPort),
rpc.WithGracePeriod(s.gracePeriod),
Expand Down Expand Up @@ -271,7 +273,7 @@ func (s *server) run(ctx context.Context, input cli.Input) error {
return err
}

service := grpcapi.NewWebAPI(ctx, ds, fs, sls, alss, cmds, is, rd, cfg.ProjectMap(), encryptDecrypter, input.Logger)
service := grpcapi.NewWebAPI(ctx, ds, fs, sls, alss, cmds, is, rd, unregisteredAppsCache, cfg.ProjectMap(), encryptDecrypter, input.Logger)
opts := []rpc.Option{
rpc.WithPort(s.webAPIPort),
rpc.WithGracePeriod(s.gracePeriod),
Expand Down
27 changes: 24 additions & 3 deletions pkg/app/api/grpcapi/piped_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ type PipedAPI struct {
deploymentPipedCache cache.Cache
envProjectCache cache.Cache
pipedStatCache cache.Cache
// A map from projectID to map["repo-id"][]*model.ApplicationInfo
unregisteredAppsCache cache.Cache

webBaseURL string
logger *zap.Logger
}

// NewPipedAPI creates a new PipedAPI instance.
func NewPipedAPI(ctx context.Context, ds datastore.DataStore, sls stagelogstore.Store, alss applicationlivestatestore.Store, las analysisresultstore.Store, cs commandstore.Store, hc cache.Cache, cop commandOutputPutter, webBaseURL string, logger *zap.Logger) *PipedAPI {
func NewPipedAPI(ctx context.Context, ds datastore.DataStore, sls stagelogstore.Store, alss applicationlivestatestore.Store, las analysisresultstore.Store, cs commandstore.Store, hc, uac cache.Cache, cop commandOutputPutter, webBaseURL string, logger *zap.Logger) *PipedAPI {
a := &PipedAPI{
applicationStore: datastore.NewApplicationStore(ds),
deploymentStore: datastore.NewDeploymentStore(ds),
Expand All @@ -82,6 +84,7 @@ func NewPipedAPI(ctx context.Context, ds datastore.DataStore, sls stagelogstore.
deploymentPipedCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour),
envProjectCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour),
pipedStatCache: hc,
unregisteredAppsCache: uac,
webBaseURL: webBaseURL,
logger: logger.Named("piped-api"),
}
Expand Down Expand Up @@ -994,8 +997,26 @@ func (a *PipedAPI) UpdateApplicationConfigurations(ctx context.Context, req *pip
}

func (a *PipedAPI) ReportUnregisteredApplicationConfigurations(ctx context.Context, req *pipedservice.ReportUnregisteredApplicationConfigurationsRequest) (*pipedservice.ReportUnregisteredApplicationConfigurationsResponse, error) {
// TODO: Make the unused application configurations cache up-to-date
return nil, status.Errorf(codes.Unimplemented, "ReportUnregisteredApplicationConfigurations is not implemented yet")
projectID, _, _, err := rpcauth.ExtractPipedToken(ctx)
if err != nil {
return nil, err
}

// Make a map from repo-id to a collection of ApplicationInfo.
repoToApps := make(map[string][]*model.ApplicationInfo)
for _, appInfo := range req.Applications {
repoID := appInfo.RepoId
if _, ok := repoToApps[repoID]; !ok {
repoToApps[repoID] = []*model.ApplicationInfo{}
}
repoToApps[repoID] = append(repoToApps[repoID], appInfo)
}

if err := a.unregisteredAppsCache.Put(projectID, repoToApps); err != nil {
return nil, status.Error(codes.Internal, "failed to put the unregistered apps to the cache")
}

return &pipedservice.ReportUnregisteredApplicationConfigurationsResponse{}, nil
}

// CreateDeploymentChain creates a new deployment chain object and all required commands to
Expand Down
39 changes: 39 additions & 0 deletions pkg/app/api/grpcapi/web_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -66,6 +67,8 @@ type WebAPI struct {
pipedProjectCache cache.Cache
envProjectCache cache.Cache
insightCache cache.Cache
// A map from projectID to map["repo-id"][]*model.ApplicationInfo
unregisteredAppsCache cache.Cache

projectsInConfig map[string]config.ControlPlaneProject
logger *zap.Logger
Expand All @@ -81,6 +84,7 @@ func NewWebAPI(
cmds commandstore.Store,
is insightstore.Store,
rd redis.Redis,
uac cache.Cache,
projs map[string]config.ControlPlaneProject,
encrypter encrypter,
logger *zap.Logger) *WebAPI {
Expand All @@ -102,6 +106,7 @@ func NewWebAPI(
pipedProjectCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour),
envProjectCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour),
insightCache: rediscache.NewTTLCache(rd, 3*time.Hour),
unregisteredAppsCache: uac,
logger: logger.Named("web-api"),
}
return a
Expand Down Expand Up @@ -599,6 +604,40 @@ func (a *WebAPI) validatePipedBelongsToProject(ctx context.Context, pipedID, pro
return nil
}

func (a *WebAPI) ListUnregisteredApplications(ctx context.Context, _ *webservice.ListUnregisteredApplicationsRequest) (*webservice.ListUnregisteredApplicationsResponse, error) {
claims, err := rpcauth.ExtractClaims(ctx)
if err != nil {
a.logger.Error("failed to authenticate the current user", zap.Error(err))
return nil, err
}
entity, err := a.unregisteredAppsCache.Get(claims.Role.ProjectId)
if errors.Is(err, cache.ErrNotFound) {
return &webservice.ListUnregisteredApplicationsResponse{}, nil
}
if err != nil {
a.logger.Error("failed to get unregistered apps", zap.Error(err))
return nil, status.Error(codes.Internal, "Failed to get unregistered apps")
}
repoToApps, ok := entity.(map[string][]*model.ApplicationInfo)
if !ok {
return nil, status.Error(codes.Internal, "Unexpected data cached")
}

repos := make([]*webservice.ListUnregisteredApplicationsResponse_Repo, len(repoToApps))
for repoID, apps := range repoToApps {
sort.Slice(apps, func(i, j int) bool {
return apps[i].GetPath() < apps[j].GetPath()
})
repos = append(repos, &webservice.ListUnregisteredApplicationsResponse_Repo{
Id: repoID,
Apps: apps,
})
}
return &webservice.ListUnregisteredApplicationsResponse{
Repos: repos,
}, nil
}

// TODO: Validate the specified piped to ensure that it belongs to the specified environment.
func (a *WebAPI) AddApplication(ctx context.Context, req *webservice.AddApplicationRequest) (*webservice.AddApplicationResponse, error) {
claims, err := rpcauth.ExtractClaims(ctx)
Expand Down
13 changes: 13 additions & 0 deletions pkg/app/api/service/webservice/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ service WebService {
rpc SyncApplication(SyncApplicationRequest) returns (SyncApplicationResponse) {}
rpc GetApplication(GetApplicationRequest) returns (GetApplicationResponse) {}
rpc GenerateApplicationSealedSecret(GenerateApplicationSealedSecretRequest) returns (GenerateApplicationSealedSecretResponse) {}
rpc ListUnregisteredApplications(ListUnregisteredApplicationsRequest) returns (ListUnregisteredApplicationsResponse) {}

// Deployment
rpc ListDeployments(ListDeploymentsRequest) returns (ListDeploymentsResponse) {}
Expand Down Expand Up @@ -219,6 +220,7 @@ message AddEnvironmentRequest {
string desc = 2;
}


message AddApplicationRequest {
string name = 1 [(validate.rules).string.min_len = 1];
string env_id = 2;
Expand Down Expand Up @@ -318,6 +320,17 @@ message GenerateApplicationSealedSecretResponse {
string data = 1 [(validate.rules).string.min_len = 1];
}

message ListUnregisteredApplicationsRequest {
}

message ListUnregisteredApplicationsResponse {
message Repo {
string id = 1;
repeated model.ApplicationInfo apps = 2;
}
repeated Repo repos = 1;
}

message ListDeploymentsRequest {
message Options {
repeated model.DeploymentStatus statuses = 1;
Expand Down
2 changes: 2 additions & 0 deletions pkg/app/piped/appconfigreporter/appconfigreporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ func (r *Reporter) updateUnregisteredApps(ctx context.Context, registeredAppPath
r.logger.Info(fmt.Sprintf("found out %d unregistered applications in repository %s", len(as), repoID))
apps = append(apps, as...)
}
// Prevent Piped from continuing to send meaningless requests
// even though Control-plane already knows that there are zero unregistered apps.
if len(apps) == 0 {
if r.sweptUnregisteredApps {
return nil
Expand Down

0 comments on commit ad8a5fb

Please sign in to comment.