Skip to content

Commit

Permalink
final cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
miguel-crespo-fdc committed Oct 2, 2024
1 parent 9eb2311 commit e62110a
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 105 deletions.

This file was deleted.

This file was deleted.

1 change: 0 additions & 1 deletion services/cd-service/pkg/interceptors/interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func UnaryUserContextInterceptor(ctx context.Context,
return nil, err
}
ctx = auth.WriteUserToContext(ctx, *user)
ctx = context.WithoutCancel(ctx)
h, err := handler(ctx, req)
return h, err
}
1 change: 1 addition & 0 deletions services/cd-service/pkg/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -2202,6 +2202,7 @@ func (s *State) WriteCurrentEnvironmentLocks(ctx context.Context, transaction *s
return err
}
for lockId, lock := range ls {
fmt.Printf("LOCK: %s\n", lock.CreatedAt.String())
currentEnv := db.EnvironmentLock{
EslVersion: 0,
Env: envName,
Expand Down
3 changes: 1 addition & 2 deletions services/frontend-service/pkg/interceptors/interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ func DexLoginInterceptor(
http.Redirect(w, req, auth.LoginPATH, http.StatusFound)
return
}
noCancelCtx := context.WithoutCancel(httpCtx)
req = req.WithContext(noCancelCtx)
req = req.WithContext(httpCtx)
httpHandler(w, req)
}

Expand Down
1 change: 0 additions & 1 deletion services/frontend-service/pkg/service/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,5 @@ func (b *BatchServiceWithDefaultTimeout) ProcessBatch(ctx context.Context, req *
ctx, cancel = context.WithTimeout(ctx, b.DefaultTimeout)
defer cancel()
}
ctx = context.WithoutCancel(ctx)
return b.Inner.ProcessBatch(ctx, req, options...)
}
79 changes: 32 additions & 47 deletions services/manifest-repo-export-service/pkg/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,19 +290,10 @@ func processEsls(ctx context.Context, repo repository.Repository, dbHandler *db.
sleepDuration := createBackOffProvider(time.Second * time.Duration(eslProcessingIdleTimeSeconds))

const transactionRetries = 10
needsPushing := false
commits := 0
defer func(repo repository.Repository) {
err2 := repo.PushRepo(ctx)
if err2 != nil {
log.Errorf("Error pushing: %v", err2)
}
}(repo)
for {
var transformer repository.Transformer = nil
var esl *db.EslEventRow = nil
const readonly = true // we just handle the reading here, there's another transaction for writing the result to the db/git

err := dbHandler.WithTransactionR(ctx, transactionRetries, readonly, func(ctx context.Context, transaction *sql.Tx) error {
var err2 error
transformer, esl, err2 = handleOneEvent(ctx, transaction, dbHandler, ddMetrics, repo)
Expand All @@ -327,52 +318,44 @@ func processEsls(ctx context.Context, repo repository.Repository, dbHandler *db.
}
sleepDuration.Reset()
} else {
if needsPushing && commits > 20 {
err = dbHandler.WithTransactionR(ctx, 2, false, func(ctx context.Context, transaction *sql.Tx) error {
err2 := repo.PushRepo(ctx)
if err2 != nil {
d := sleepDuration.NextBackOff()
logger.FromContext(ctx).Sugar().Warnf("error pushing, will try again in %v", d)
measurePushes(ddMetrics, log, true)
time.Sleep(d)
return err2
} else {
measurePushes(ddMetrics, log, false)
}
return nil
////Get latest commit. Write esl timestamp and commit hash.
//commit, err := repo.GetHeadCommit()
//if err != nil {
// return err
//}
//return dbHandler.DBWriteCommitTransactionTimestamp(ctx, transaction, commit.Id().String(), esl.Created)
})
if err != nil {
err3 := repo.FetchAndReset(ctx)
if err3 != nil {
d := sleepDuration.NextBackOff()
logger.FromContext(ctx).Sugar().Warnf("error fetching repo, will try again in %v", d)
time.Sleep(d)
}
}
needsPushing = false
commits = 0
}
if transformer == nil {
sleepDuration.Reset()
d := sleepDuration.NextBackOff()
logger.FromContext(ctx).Sugar().Warnf("event processing skipped, will try again in %v", d)
time.Sleep(d)
continue
}
needsPushing = true
commits += 1
log.Infof("event processed successfully, now writing to cutoff")
log.Infof("event processed successfully, now writing to cutoff and pushing...")
err = dbHandler.WithTransactionR(ctx, 2, false, func(ctx context.Context, transaction *sql.Tx) error {
return db.DBWriteCutoff(dbHandler, ctx, transaction, esl.EslVersion)
err2 := db.DBWriteCutoff(dbHandler, ctx, transaction, esl.EslVersion)
if err2 != nil {
return err2
}
err2 = repo.PushRepo(ctx)
if err2 != nil {
d := sleepDuration.NextBackOff()
logger.FromContext(ctx).Sugar().Warnf("error pushing, will try again in %v", d)
measurePushes(ddMetrics, log, true)
time.Sleep(d)
return err2
} else {
measurePushes(ddMetrics, log, false)
}

//Get latest commit. Write esl timestamp and commit hash.
commit, err := repo.GetHeadCommit()
if err != nil {
return err
}
return dbHandler.DBWriteCommitTransactionTimestamp(ctx, transaction, commit.Id().String(), esl.Created)
})
if err != nil {
return err
err3 := repo.FetchAndReset(ctx)
if err3 != nil {
d := sleepDuration.NextBackOff()
logger.FromContext(ctx).Sugar().Warnf("error fetching repo, will try again in %v", d)
time.Sleep(d)
}
}
}
}
Expand Down Expand Up @@ -483,17 +466,19 @@ func processEslEvent(ctx context.Context, repo repository.Repository, esl *db.Es
// no error, but also no transformer to process:
return nil, nil
}
//logger.FromContext(ctx).Sugar().Infof("processEslEvent: unmarshal \n%s\n", esl.EventJson)
logger.FromContext(ctx).Sugar().Infof("processEslEvent: unmarshal \n%s\n", esl.EventJson)
err = json.Unmarshal(([]byte)(esl.EventJson), &t)
if err != nil {
return nil, err
}
t.SetEslVersion(db.TransformerID(esl.EslVersion))
logger.FromContext(ctx).Sugar().Infof("read esl event of type (%s)", t.GetDBEventType())
logger.FromContext(ctx).Sugar().Infof("read esl event of type (%s) event=%v", t.GetDBEventType(), t)

err = repo.Apply(ctx, tx, t)
if err != nil {
return nil, fmt.Errorf("error while running repo apply: %v", err)
}

logger.FromContext(ctx).Sugar().Infof("Applied transformer succesfully event=%s", t.GetDBEventType())
return t, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ func CombineArray(others []*TransformerResult) *TransformerResult {
func (r *repository) ApplyTransformer(ctx context.Context, transaction *sql.Tx, transformer Transformer) (*TransformerResult, *TransformerBatchApplyError) {
span, ctx := tracer.StartSpanFromContext(ctx, "ApplyTransformer")
defer span.Finish()

commitMsg, state, changes, applyErr := r.ApplyTransformersInternal(ctx, transaction, transformer)
if applyErr != nil {
return nil, applyErr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ func (c *DeployApplicationVersion) Transform(
if err != nil {
return "", err
}

d := &CleanupOldApplicationVersions{
Application: c.Application,
TransformerMetadata: TransformerMetadata{
Expand Down Expand Up @@ -944,6 +945,7 @@ func (c *CreateApplicationVersion) Transform(
}
}
}

return fmt.Sprintf("created version %d of %q", version, c.Application), nil
}

Expand Down

0 comments on commit e62110a

Please sign in to comment.