diff --git a/cmd/pipecd/server.go b/cmd/pipecd/server.go index d8bfb09014..2f30bbf400 100644 --- a/cmd/pipecd/server.go +++ b/cmd/pipecd/server.go @@ -199,7 +199,7 @@ func (s *server) run(ctx context.Context, input cli.Input) error { datastore.NewPipedStore(ds), input.Logger, ) - service = grpcapi.NewPipedAPI(ctx, ds, sls, alss, las, cmds, statCache, cmdOutputStore, cfg.Address, input.Logger) + service = grpcapi.NewPipedAPI(ctx, ds, sls, alss, las, cmds, statCache, rd, cmdOutputStore, cfg.Address, input.Logger) opts = []rpc.Option{ rpc.WithPort(s.pipedAPIPort), rpc.WithGracePeriod(s.gracePeriod), diff --git a/pkg/app/api/grpcapi/grpcapi.go b/pkg/app/api/grpcapi/grpcapi.go index 9e06fc64e6..0b1eed7482 100644 --- a/pkg/app/api/grpcapi/grpcapi.go +++ b/pkg/app/api/grpcapi/grpcapi.go @@ -18,6 +18,7 @@ import ( "context" "encoding/base64" "errors" + "fmt" "go.uber.org/zap" "google.golang.org/grpc/codes" @@ -189,3 +190,7 @@ func getEncriptionKey(se *model.Piped_SecretEncryption) ([]byte, error) { return nil, status.Error(codes.FailedPrecondition, "The piped does not contain a valid encryption type") } } + +func makeUnregisteredAppsCacheKey(projectID string) string { + return fmt.Sprintf("HASHKEY:UNREGISTERED_APPS:%s", projectID) +} diff --git a/pkg/app/api/grpcapi/piped_api.go b/pkg/app/api/grpcapi/piped_api.go index 04b4400883..022ace071b 100644 --- a/pkg/app/api/grpcapi/piped_api.go +++ b/pkg/app/api/grpcapi/piped_api.go @@ -15,7 +15,9 @@ package grpcapi import ( + "bytes" "context" + "encoding/gob" "encoding/json" "errors" "time" @@ -33,9 +35,11 @@ import ( "github.com/pipe-cd/pipe/pkg/app/api/stagelogstore" "github.com/pipe-cd/pipe/pkg/cache" "github.com/pipe-cd/pipe/pkg/cache/memorycache" + "github.com/pipe-cd/pipe/pkg/cache/rediscache" "github.com/pipe-cd/pipe/pkg/datastore" "github.com/pipe-cd/pipe/pkg/filestore" "github.com/pipe-cd/pipe/pkg/model" + "github.com/pipe-cd/pipe/pkg/redis" "github.com/pipe-cd/pipe/pkg/rpc/rpcauth" ) @@ -58,13 +62,14 @@ type PipedAPI struct { deploymentPipedCache cache.Cache envProjectCache cache.Cache pipedStatCache cache.Cache + redis redis.Redis webBaseURL string logger *zap.Logger } // NewPipedAPI creates a new PipedAPI instance. -func NewPipedAPI(ctx context.Context, ds datastore.DataStore, sls stagelogstore.Store, alss applicationlivestatestore.Store, las analysisresultstore.Store, cs commandstore.Store, hc cache.Cache, cop commandOutputPutter, webBaseURL string, logger *zap.Logger) *PipedAPI { +func NewPipedAPI(ctx context.Context, ds datastore.DataStore, sls stagelogstore.Store, alss applicationlivestatestore.Store, las analysisresultstore.Store, cs commandstore.Store, hc cache.Cache, rd redis.Redis, cop commandOutputPutter, webBaseURL string, logger *zap.Logger) *PipedAPI { a := &PipedAPI{ applicationStore: datastore.NewApplicationStore(ds), deploymentStore: datastore.NewDeploymentStore(ds), @@ -82,6 +87,7 @@ func NewPipedAPI(ctx context.Context, ds datastore.DataStore, sls stagelogstore. deploymentPipedCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour), envProjectCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour), pipedStatCache: hc, + redis: rd, webBaseURL: webBaseURL, logger: logger.Named("piped-api"), } @@ -992,8 +998,25 @@ func (a *PipedAPI) UpdateApplicationConfigurations(ctx context.Context, req *pip } func (a *PipedAPI) ReportUnregisteredApplicationConfigurations(ctx context.Context, req *pipedservice.ReportUnregisteredApplicationConfigurationsRequest) (*pipedservice.ReportUnregisteredApplicationConfigurationsResponse, error) { - // TODO: Make the unused application configurations cache up-to-date - return nil, status.Errorf(codes.Unimplemented, "ReportUnregisteredApplicationConfigurations is not implemented yet") + projectID, pipedID, _, err := rpcauth.ExtractPipedToken(ctx) + if err != nil { + return nil, err + } + + // Cache an encoded slice of *model.ApplicationInfo. + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + if err := enc.Encode(req.Applications); err != nil { + a.logger.Error("failed to encode the unregistered apps", zap.Error(err)) + return nil, status.Error(codes.Internal, "failed to encode the unregistered apps") + } + key := makeUnregisteredAppsCacheKey(projectID) + c := rediscache.NewHashCache(a.redis, key) + if err := c.Put(pipedID, buf.Bytes()); err != nil { + return nil, status.Error(codes.Internal, "failed to put the unregistered apps to the cache") + } + + return &pipedservice.ReportUnregisteredApplicationConfigurationsResponse{}, nil } // CreateDeploymentChain creates a new deployment chain object and all required commands to diff --git a/pkg/app/api/grpcapi/web_api.go b/pkg/app/api/grpcapi/web_api.go index 7a31a3b35b..a58ef1ac22 100644 --- a/pkg/app/api/grpcapi/web_api.go +++ b/pkg/app/api/grpcapi/web_api.go @@ -15,9 +15,12 @@ package grpcapi import ( + "bytes" "context" + "encoding/gob" "errors" "fmt" + "sort" "strings" "time" @@ -66,6 +69,7 @@ type WebAPI struct { pipedProjectCache cache.Cache envProjectCache cache.Cache insightCache cache.Cache + redis redis.Redis projectsInConfig map[string]config.ControlPlaneProject logger *zap.Logger @@ -102,6 +106,7 @@ func NewWebAPI( pipedProjectCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour), envProjectCache: memorycache.NewTTLCache(ctx, 24*time.Hour, 3*time.Hour), insightCache: rediscache.NewTTLCache(rd, 3*time.Hour), + redis: rd, logger: logger.Named("web-api"), } return a @@ -599,6 +604,53 @@ func (a *WebAPI) validatePipedBelongsToProject(ctx context.Context, pipedID, pro return nil } +func (a *WebAPI) ListUnregisteredApplications(ctx context.Context, _ *webservice.ListUnregisteredApplicationsRequest) (*webservice.ListUnregisteredApplicationsResponse, error) { + claims, err := rpcauth.ExtractClaims(ctx) + if err != nil { + a.logger.Error("failed to authenticate the current user", zap.Error(err)) + return nil, err + } + + // Collect all apps that belong to the project. + key := makeUnregisteredAppsCacheKey(claims.Role.ProjectId) + c := rediscache.NewHashCache(a.redis, key) + // pipedToApps assumes to be a map["piped-id"][]byte(slice of *model.ApplicationInfo encoded by encoding/gob) + pipedToApps, err := c.GetAll() + if errors.Is(err, cache.ErrNotFound) { + return &webservice.ListUnregisteredApplicationsResponse{}, nil + } + if err != nil { + a.logger.Error("failed to get unregistered apps", zap.Error(err)) + return nil, status.Error(codes.Internal, "Failed to get unregistered apps") + } + + // Integrate all apps cached for each Piped. + allApps := make([]*model.ApplicationInfo, 0) + for _, as := range pipedToApps { + b, ok := as.([]byte) + if !ok { + return nil, status.Error(codes.Internal, "Unexpected data cached") + } + dec := gob.NewDecoder(bytes.NewReader(b)) + var apps []*model.ApplicationInfo + if err := dec.Decode(&apps); err != nil { + a.logger.Error("failed to decode the unregistered apps", zap.Error(err)) + return nil, status.Error(codes.Internal, "failed to decode the unregistered apps") + } + allApps = append(allApps, apps...) + } + if len(allApps) == 0 { + return &webservice.ListUnregisteredApplicationsResponse{}, nil + } + + sort.Slice(allApps, func(i, j int) bool { + return allApps[i].Path < allApps[j].Path + }) + return &webservice.ListUnregisteredApplicationsResponse{ + Applications: allApps, + }, nil +} + // TODO: Validate the specified piped to ensure that it belongs to the specified environment. func (a *WebAPI) AddApplication(ctx context.Context, req *webservice.AddApplicationRequest) (*webservice.AddApplicationResponse, error) { claims, err := rpcauth.ExtractClaims(ctx) diff --git a/pkg/app/api/service/webservice/service.proto b/pkg/app/api/service/webservice/service.proto index b6ad3a96dd..81a215df5d 100644 --- a/pkg/app/api/service/webservice/service.proto +++ b/pkg/app/api/service/webservice/service.proto @@ -65,6 +65,7 @@ service WebService { rpc SyncApplication(SyncApplicationRequest) returns (SyncApplicationResponse) {} rpc GetApplication(GetApplicationRequest) returns (GetApplicationResponse) {} rpc GenerateApplicationSealedSecret(GenerateApplicationSealedSecretRequest) returns (GenerateApplicationSealedSecretResponse) {} + rpc ListUnregisteredApplications(ListUnregisteredApplicationsRequest) returns (ListUnregisteredApplicationsResponse) {} // Deployment rpc ListDeployments(ListDeploymentsRequest) returns (ListDeploymentsResponse) {} @@ -219,6 +220,7 @@ message AddEnvironmentRequest { string desc = 2; } + message AddApplicationRequest { string name = 1 [(validate.rules).string.min_len = 1]; string env_id = 2; @@ -318,6 +320,13 @@ message GenerateApplicationSealedSecretResponse { string data = 1 [(validate.rules).string.min_len = 1]; } +message ListUnregisteredApplicationsRequest { +} + +message ListUnregisteredApplicationsResponse { + repeated model.ApplicationInfo applications = 1; +} + message ListDeploymentsRequest { message Options { repeated model.DeploymentStatus statuses = 1; diff --git a/pkg/app/piped/appconfigreporter/appconfigreporter.go b/pkg/app/piped/appconfigreporter/appconfigreporter.go index e3f9c57d2e..321f87b06a 100644 --- a/pkg/app/piped/appconfigreporter/appconfigreporter.go +++ b/pkg/app/piped/appconfigreporter/appconfigreporter.go @@ -370,5 +370,6 @@ func (r *Reporter) readApplicationInfo(repoDir, repoID, appDirRelPath, cfgFilena RepoId: repoID, Path: appDirRelPath, ConfigFilename: cfgFilename, + PipedId: r.config.PipedID, }, nil } diff --git a/pkg/app/piped/appconfigreporter/appconfigreporter_test.go b/pkg/app/piped/appconfigreporter/appconfigreporter_test.go index 29d7ad76eb..87e32afe5d 100644 --- a/pkg/app/piped/appconfigreporter/appconfigreporter_test.go +++ b/pkg/app/piped/appconfigreporter/appconfigreporter_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/zap" + "github.com/pipe-cd/pipe/pkg/config" "github.com/pipe-cd/pipe/pkg/model" ) @@ -93,6 +94,7 @@ func TestReporter_findRegisteredApps(t *testing.T) { { name: "app not changed", reporter: &Reporter{ + config: &config.PipedSpec{PipedID: "piped-1"}, applicationLister: &fakeApplicationLister{apps: []*model.Application{ {Id: "id-1", Name: "app-1", Labels: map[string]string{"key-1": "value-1"}, GitPath: &model.ApplicationGitPath{Repo: &model.ApplicationGitRepository{Id: "repo-1"}, Path: "app-1", ConfigFilename: ".pipe.yaml"}}, }}, @@ -117,6 +119,7 @@ spec: { name: "app changed", reporter: &Reporter{ + config: &config.PipedSpec{PipedID: "piped-1"}, applicationLister: &fakeApplicationLister{apps: []*model.Application{ {Id: "id-1", Name: "app-1", Labels: map[string]string{"key-1": "value-1"}, GitPath: &model.ApplicationGitPath{Repo: &model.ApplicationGitRepository{Id: "repo-1"}, Path: "app-1", ConfigFilename: ".pipe.yaml"}}, }}, @@ -143,6 +146,7 @@ spec: RepoId: "repo-1", Path: "app-1", ConfigFilename: ".pipe.yaml", + PipedId: "piped-1", }, }, wantErr: false, @@ -225,6 +229,7 @@ func TestReporter_findUnregisteredApps(t *testing.T) { { name: "valid app config that is unregistered", reporter: &Reporter{ + config: &config.PipedSpec{PipedID: "piped-1"}, applicationLister: &fakeApplicationLister{}, fileSystem: fstest.MapFS{ "path/to/repo-1/app-1/.pipe.yaml": &fstest.MapFile{Data: []byte(` @@ -249,6 +254,7 @@ spec: RepoId: "repo-1", Path: "app-1", ConfigFilename: ".pipe.yaml", + PipedId: "piped-1", }, }, wantErr: false, @@ -256,6 +262,7 @@ spec: { name: "valid app config that name isn't default", reporter: &Reporter{ + config: &config.PipedSpec{PipedID: "piped-1"}, applicationLister: &fakeApplicationLister{}, fileSystem: fstest.MapFS{ "path/to/repo-1/app-1/dev.pipecd.yaml": &fstest.MapFile{Data: []byte(` @@ -280,6 +287,7 @@ spec: RepoId: "repo-1", Path: "app-1", ConfigFilename: "dev.pipecd.yaml", + PipedId: "piped-1", }, }, wantErr: false, diff --git a/pkg/cache/rediscache/cache.go b/pkg/cache/rediscache/cache.go index 01c4b63226..bc7d45b583 100644 --- a/pkg/cache/rediscache/cache.go +++ b/pkg/cache/rediscache/cache.go @@ -77,6 +77,7 @@ func (c *RedisCache) Get(k string) (interface{}, error) { return reply, nil } +// It is caller's responsibility to encode Go struct. func (c *RedisCache) Put(k string, v interface{}) error { conn := c.redis.Get() defer conn.Close() diff --git a/pkg/cache/rediscache/hashcache.go b/pkg/cache/rediscache/hashcache.go index 246912583b..9655ddf65d 100644 --- a/pkg/cache/rediscache/hashcache.go +++ b/pkg/cache/rediscache/hashcache.go @@ -60,6 +60,8 @@ func (r *RedisHashCache) Get(k string) (interface{}, error) { // check if the TTL for hashkey (not the key k), in case the TTL for // hashkey is not yet existed or unset, EXPIRE will be called and set // TTL time for the whole hashkey. +// +// It is caller's responsibility to encode Go struct. func (r *RedisHashCache) Put(k string, v interface{}) error { conn := r.redis.Get() defer conn.Close() diff --git a/pkg/model/common.proto b/pkg/model/common.proto index 1d4b2d65c2..54d109d690 100644 --- a/pkg/model/common.proto +++ b/pkg/model/common.proto @@ -68,4 +68,5 @@ message ApplicationInfo { string path = 6 [(validate.rules).string.pattern = "^[^/].+$"]; // This field is not allowed to be changed. string config_filename = 7; + string piped_id = 8 [(validate.rules).string.min_len = 1]; }