From 411e19abc617a25b921aa55f6505d38207d8af1f Mon Sep 17 00:00:00 2001 From: Diogo Nogueira Date: Thu, 10 Oct 2024 17:14:49 +0200 Subject: [PATCH 1/3] fix(frontend-service): New experimental logs added (#2032) Added new logs to be checked on dev env with respect to context cancelation around the `processBatch` call. Ref: SRX-BJVFRL --- .../frontend-service/pkg/service/batch.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/services/frontend-service/pkg/service/batch.go b/services/frontend-service/pkg/service/batch.go index 00e01c64d..e4de64fef 100644 --- a/services/frontend-service/pkg/service/batch.go +++ b/services/frontend-service/pkg/service/batch.go @@ -18,9 +18,12 @@ package service import ( "context" + "errors" "time" api "github.com/freiheit-com/kuberpult/pkg/api/v1" + "github.com/freiheit-com/kuberpult/pkg/logger" + "go.uber.org/zap" "google.golang.org/grpc" ) @@ -32,9 +35,21 @@ type BatchServiceWithDefaultTimeout struct { func (b *BatchServiceWithDefaultTimeout) ProcessBatch(ctx context.Context, req *api.BatchRequest, options ...grpc.CallOption) (*api.BatchResponse, error) { var cancel context.CancelFunc _, hasDeadline := ctx.Deadline() + kuberpultTimeoutError := errors.New("kuberpult batch client timeout exceeded") if !hasDeadline { - ctx, cancel = context.WithTimeout(ctx, b.DefaultTimeout) + ctx, cancel = context.WithTimeoutCause(ctx, b.DefaultTimeout, kuberpultTimeoutError) defer cancel() } - return b.Inner.ProcessBatch(ctx, req, options...) + + response, err := b.Inner.ProcessBatch(ctx, req, options...) + + if ctx.Err() != nil { + if context.Cause(ctx) == kuberpultTimeoutError { + logger.FromContext(ctx).Warn("Context cancelled due to kuberpult timeout") + } else { + logger.FromContext(ctx).Warn("Context cancelled due %v", zap.Error(context.Cause(ctx))) + } + } + + return response, err } From 0540d9c7c9d0db48d772e4946a474a934a675bf4 Mon Sep 17 00:00:00 2001 From: Sven Urbanski Date: Thu, 10 Oct 2024 19:00:50 +0200 Subject: [PATCH 2/3] fix: skip event for releases with commit id (#2031) This applies for undeploy versions. Also fixes a potential nil pointer. Ref: SRX-LEIHOZ --- .../cd-service/pkg/repository/transformer.go | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/services/cd-service/pkg/repository/transformer.go b/services/cd-service/pkg/repository/transformer.go index e58f88e5a..e4d4a613d 100644 --- a/services/cd-service/pkg/repository/transformer.go +++ b/services/cd-service/pkg/repository/transformer.go @@ -3181,15 +3181,21 @@ func (c *DeployApplicationVersion) Transform( } else { ev := createReplacedByEvent(c.Application, c.Environment, newReleaseCommitId) if s.DBHandler.ShouldUseOtherTables() { - gen := getGenerator(ctx) - eventUuid := gen.Generate() - oldReleaseCommitId, err := getCommitID(ctx, transaction, state, fs, uint64(*oldVersion), oldReleaseDir, c.Application) - if err != nil { - return "", GetCreateReleaseGeneralFailure(err) - } - err = state.DBHandler.DBWriteReplacedByEvent(ctx, transaction, c.TransformerEslVersion, eventUuid, oldReleaseCommitId, ev) - if err != nil { - return "", err + if oldVersion == nil { + logger.FromContext(ctx).Sugar().Errorf("did not find old version of app %s - skipping replaced-by event", c.Application) + } else { + gen := getGenerator(ctx) + eventUuid := gen.Generate() + v := uint64(*oldVersion) + oldReleaseCommitId, err := getCommitID(ctx, transaction, state, fs, v, oldReleaseDir, c.Application) + if err != nil { + logger.FromContext(ctx).Sugar().Warnf("could not find commit for release %d of app %s - skipping replaced-by event", v, c.Application) + } else { + err = state.DBHandler.DBWriteReplacedByEvent(ctx, transaction, c.TransformerEslVersion, eventUuid, oldReleaseCommitId, ev) + if err != nil { + return "", err + } + } } } else { if err := addEventForRelease(ctx, fs, oldReleaseDir, ev); err != nil { From 701af3144026d1af2edbf06b7273b85403413e2b Mon Sep 17 00:00:00 2001 From: Sven Urbanski Date: Thu, 10 Oct 2024 19:19:51 +0200 Subject: [PATCH 3/3] fix: reduce number of queries to get releases (#2029) This reduces the number of queries to the releases table, roughly by a factor of 25 (average number of releases per app). This also reduces the number of cases where we actually load all the manifests from the releases table. And it fixes a few tests that had a nil context. Ref: SRX-LEIHOZ --- pkg/db/db.go | 100 +++++++++++++++--- .../cd-service/pkg/repository/repository.go | 47 ++++++-- services/cd-service/pkg/service/git_test.go | 5 +- .../cd-service/pkg/service/overview_test.go | 2 + 4 files changed, 133 insertions(+), 21 deletions(-) diff --git a/pkg/db/db.go b/pkg/db/db.go index 815ff4b26..aca038183 100644 --- a/pkg/db/db.go +++ b/pkg/db/db.go @@ -555,7 +555,7 @@ type DBAllReleasesWithMetaData struct { func (h *DBHandler) DBSelectAnyRelease(ctx context.Context, tx *sql.Tx, ignorePrepublishes bool) (*DBReleaseWithMetaData, error) { selectQuery := h.AdaptQuery(fmt.Sprintf( - "SELECT eslVersion, created, appName, metadata, manifests, releaseVersion, deleted, environments " + + "SELECT eslVersion, created, appName, metadata, releaseVersion, deleted, environments " + " FROM releases " + " LIMIT 1;")) span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectAnyRelease") @@ -566,7 +566,7 @@ func (h *DBHandler) DBSelectAnyRelease(ctx context.Context, tx *sql.Tx, ignorePr ctx, selectQuery, ) - processedRows, err := h.processReleaseRows(ctx, err, rows, ignorePrepublishes) + processedRows, err := h.processReleaseRows(ctx, err, rows, ignorePrepublishes, false) if err != nil { return nil, err } @@ -608,13 +608,79 @@ func (h *DBHandler) DBSelectReleasesWithoutEnvironments(ctx context.Context, tx LIMIT 100; `) rows, err := tx.QueryContext(ctx, selectQuery) - processedRows, err := h.processReleaseRows(ctx, err, rows, true) + processedRows, err := h.processReleaseRows(ctx, err, rows, true, true) if err != nil { return nil, err } return processedRows, nil } +func (h *DBHandler) DBSelectReleasesByVersions(ctx context.Context, tx *sql.Tx, app string, releaseVersions []uint64, ignorePrePublishes bool) ([]*DBReleaseWithMetaData, error) { + span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectReleasesByVersions") + defer span.Finish() + + if len(releaseVersions) == 0 { + return []*DBReleaseWithMetaData{}, nil + } + repeatedQuestionMarks := strings.Repeat(",?", len(releaseVersions)-1) + selectQuery := h.AdaptQuery(` + + SELECT DISTINCT + releases.eslVersion, + releases.created, + releases.appName, + releases.metadata, + releases.releaseVersion, + releases.deleted, + releases.environments + FROM + ( SELECT + MAX(eslVersion) AS latestEslVersion, + appname, + releaseversion + FROM + "releases" + WHERE + appname=? + AND + releaseversion IN (?` + repeatedQuestionMarks + `) + GROUP BY + appname, releaseversion + ) AS latest + JOIN + releases AS releases + ON + latest.latestEslVersion=releases.eslVersion + AND latest.releaseVersion=releases.releaseVersion + AND latest.appname=releases.appname + LIMIT ? + `) + + span.SetTag("query", selectQuery) + + args := []any{} + args = append(args, app) + for _, version := range releaseVersions { + args = append(args, version) + } + args = append(args, uint64(len(releaseVersions))) + + rows, err := tx.QueryContext( + ctx, + selectQuery, + args..., + ) + + processedRows, err := h.processReleaseRows(ctx, err, rows, ignorePrePublishes, false) + if err != nil { + return nil, err + } + if len(processedRows) == 0 { + return nil, nil + } + return processedRows, nil +} + func (h *DBHandler) DBSelectReleaseByVersion(ctx context.Context, tx *sql.Tx, app string, releaseVersion uint64, ignorePrepublishes bool) (*DBReleaseWithMetaData, error) { selectQuery := h.AdaptQuery(fmt.Sprintf( "SELECT eslVersion, created, appName, metadata, manifests, releaseVersion, deleted, environments " + @@ -632,7 +698,7 @@ func (h *DBHandler) DBSelectReleaseByVersion(ctx context.Context, tx *sql.Tx, ap releaseVersion, ) - processedRows, err := h.processReleaseRows(ctx, err, rows, ignorePrepublishes) + processedRows, err := h.processReleaseRows(ctx, err, rows, ignorePrepublishes, true) if err != nil { return nil, err } @@ -737,14 +803,14 @@ func (h *DBHandler) DBSelectReleasesByApp(ctx context.Context, tx *sql.Tx, app s deleted, ) - return h.processReleaseRows(ctx, err, rows, ignorePrepublishes) + return h.processReleaseRows(ctx, err, rows, ignorePrepublishes, true) } func (h *DBHandler) DBSelectReleasesByAppLatestEslVersion(ctx context.Context, tx *sql.Tx, app string, deleted bool, ignorePrepublishes bool) ([]*DBReleaseWithMetaData, error) { span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectReleasesByApp") defer span.Finish() selectQuery := h.AdaptQuery(fmt.Sprintf( - "SELECT eslVersion, created, appName, metadata, manifests, releaseVersion, deleted, environments " + + "SELECT eslVersion, created, appName, metadata, releaseVersion, deleted, environments " + " FROM releases " + " WHERE appName=? AND deleted=?" + " ORDER BY eslVersion DESC, releaseVersion DESC, created DESC;")) @@ -756,7 +822,7 @@ func (h *DBHandler) DBSelectReleasesByAppLatestEslVersion(ctx context.Context, t deleted, ) - return h.processReleaseRows(ctx, err, rows, ignorePrepublishes) + return h.processReleaseRows(ctx, err, rows, ignorePrepublishes, false) } func (h *DBHandler) DBSelectAllReleasesOfApp(ctx context.Context, tx *sql.Tx, app string) (*DBAllReleasesWithMetaData, error) { @@ -887,7 +953,8 @@ func (h *DBHandler) processAllReleasesRow(ctx context.Context, err error, rows * } return row, nil } -func (h *DBHandler) processReleaseRows(ctx context.Context, err error, rows *sql.Rows, ignorePrepublishes bool) ([]*DBReleaseWithMetaData, error) { + +func (h *DBHandler) processReleaseRows(ctx context.Context, err error, rows *sql.Rows, ignorePrepublishes bool, withManifests bool) ([]*DBReleaseWithMetaData, error) { span, ctx := tracer.StartSpanFromContext(ctx, "processReleaseRows") defer span.Finish() @@ -909,12 +976,17 @@ func (h *DBHandler) processReleaseRows(ctx context.Context, err error, rows *sql var metadataStr string var manifestStr string var environmentsStr sql.NullString - err := rows.Scan(&row.EslVersion, &row.Created, &row.App, &metadataStr, &manifestStr, &row.ReleaseNumber, &row.Deleted, &environmentsStr) + var err error + if withManifests { + err = rows.Scan(&row.EslVersion, &row.Created, &row.App, &metadataStr, &manifestStr, &row.ReleaseNumber, &row.Deleted, &environmentsStr) + } else { + err = rows.Scan(&row.EslVersion, &row.Created, &row.App, &metadataStr /*manifests*/, &row.ReleaseNumber, &row.Deleted, &environmentsStr) + } if err != nil { if errors.Is(err, sql.ErrNoRows) { return nil, nil } - return nil, fmt.Errorf("Error scanning releases row from DB. Error: %w\n", err) + return nil, fmt.Errorf("Error scanning releases row from DB withManifests=%v. Error: %w\n", withManifests, err) } if row.ReleaseNumber != lastSeenRelease { lastSeenRelease = row.ReleaseNumber @@ -942,9 +1014,11 @@ func (h *DBHandler) processReleaseRows(ctx context.Context, err error, rows *sql var manifestData = DBReleaseManifests{ Manifests: map[string]string{}, } - err = json.Unmarshal(([]byte)(manifestStr), &manifestData) - if err != nil { - return nil, fmt.Errorf("Error during json unmarshal of manifests for releases. Error: %w. Data: %s\n", err, metadataStr) + if withManifests { + err = json.Unmarshal(([]byte)(manifestStr), &manifestData) + if err != nil { + return nil, fmt.Errorf("Error during json unmarshal of manifests for releases. Error: %w. Data: %s\n", err, metadataStr) + } } row.Manifests = manifestData environments := make([]string, 0) diff --git a/services/cd-service/pkg/repository/repository.go b/services/cd-service/pkg/repository/repository.go index 50712cd8e..4d9fa6327 100644 --- a/services/cd-service/pkg/repository/repository.go +++ b/services/cd-service/pkg/repository/repository.go @@ -2512,16 +2512,15 @@ func (s *State) UpdateTopLevelAppInOverview(ctx context.Context, transaction *sq } rels = retrievedReleasesOfApp } - for _, id := range rels { - if rel, err := s.GetApplicationRelease(ctx, transaction, appName, id); err != nil { - return err - } else { + + if releasesInDb, err := s.GetApplicationReleasesDB(ctx, transaction, appName, rels); err != nil { + return err + } else { + for _, rel := range releasesInDb { if rel == nil { // ignore } else { release := rel.ToProto() - release.Version = id - release.UndeployVersion = rel.UndeployVersion app.Releases = append(app.Releases, release) } } @@ -3119,6 +3118,42 @@ func (s *State) IsUndeployVersionFromManifest(application string, version uint64 return true, nil } +func (s *State) GetApplicationReleasesDB(ctx context.Context, transaction *sql.Tx, application string, versions []uint64) ([]*Release, error) { + var result []*Release + if s.DBHandler.ShouldUseOtherTables() { + rels, err := s.DBHandler.DBSelectReleasesByVersions(ctx, transaction, application, versions, true) + if err != nil { + return nil, fmt.Errorf("could not get release of app %s: %v", application, err) + } + if rels == nil { + return nil, nil + } + for _, rel := range rels { + r := &Release{ + Version: rel.ReleaseNumber, + UndeployVersion: rel.Metadata.UndeployVersion, + SourceAuthor: rel.Metadata.SourceAuthor, + SourceCommitId: rel.Metadata.SourceCommitId, + SourceMessage: rel.Metadata.SourceMessage, + CreatedAt: rel.Created, + DisplayVersion: rel.Metadata.DisplayVersion, + IsMinor: rel.Metadata.IsMinor, + IsPrepublish: rel.Metadata.IsPrepublish, + } + result = append(result, r) + } + } else { + for i, v := range versions { + rel, err := s.GetApplicationRelease(ctx, transaction, application, v) + if err != nil { + return nil, fmt.Errorf("could not get release of app %s at index %d for version %v: %v", application, i, v, err) + } + result = append(result, rel) + } + } + return result, nil +} + func (s *State) GetApplicationRelease(ctx context.Context, transaction *sql.Tx, application string, version uint64) (*Release, error) { if s.DBHandler.ShouldUseOtherTables() { env, err := s.DBHandler.DBSelectReleaseByVersion(ctx, transaction, application, version, true) diff --git a/services/cd-service/pkg/service/git_test.go b/services/cd-service/pkg/service/git_test.go index bb195d688..564014a34 100644 --- a/services/cd-service/pkg/service/git_test.go +++ b/services/cd-service/pkg/service/git_test.go @@ -17,6 +17,7 @@ Copyright freiheit.com*/ package service import ( + "context" "fmt" "sort" "testing" @@ -237,7 +238,7 @@ func TestGetProductOverview(t *testing.T) { if err != nil { t.Fatalf("error setting up repository test: %v", err) } - sv := &GitServer{OverviewService: &OverviewServiceServer{Repository: repo, Shutdown: shutdown}} + sv := &GitServer{OverviewService: &OverviewServiceServer{Repository: repo, Shutdown: shutdown, Context: context.Background()}} for _, transformer := range tc.Setup { repo.Apply(testutil.MakeTestContext(), transformer) @@ -831,7 +832,7 @@ func TestGetCommitInfo(t *testing.T) { DBHandler: repo.State().DBHandler, } sv := &GitServer{ - OverviewService: &OverviewServiceServer{Repository: repo, Shutdown: shutdown}, + OverviewService: &OverviewServiceServer{Repository: repo, Shutdown: shutdown, Context: context.Background()}, Config: config, PageSize: uint64(pageSize), } diff --git a/services/cd-service/pkg/service/overview_test.go b/services/cd-service/pkg/service/overview_test.go index 56b602aa7..d949f1018 100644 --- a/services/cd-service/pkg/service/overview_test.go +++ b/services/cd-service/pkg/service/overview_test.go @@ -640,6 +640,7 @@ func TestOverviewService(t *testing.T) { svc := &OverviewServiceServer{ Repository: repo, Shutdown: shutdown, + Context: context.Background(), } tc.Test(t, svc) if tc.DB { @@ -972,6 +973,7 @@ func TestOverviewServiceFromCommit(t *testing.T) { svc := &OverviewServiceServer{ Repository: repo, Shutdown: shutdown, + Context: context.Background(), } ov, err := svc.GetOverview(testutil.MakeTestContext(), &api.GetOverviewRequest{})