From 094ea2a2b461b804616f281ec7b11f0c0aae4df4 Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Tue, 29 Nov 2022 15:45:00 -0500 Subject: [PATCH] jobs: protected timestamps refresh logic is not transactional Fixes: #92427 Previously, the refresh logic for protected timestamps would fetch jobs with transactions before attempting to update an existing one. Due to a recent change to allow this API to reflect any changes into individual job objects, we no longer do that correctly. This patch fixes the protected timestamp manager to update timestamps in a transactionally, safe manner since multiple updates can be happening concurrently for schema changes. Release note: None --- .../jobs_protected_ts_manager.go | 32 ++++++++----------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/pkg/jobs/jobsprotectedts/jobs_protected_ts_manager.go b/pkg/jobs/jobsprotectedts/jobs_protected_ts_manager.go index 65dbcd472015..b700014817f5 100644 --- a/pkg/jobs/jobsprotectedts/jobs_protected_ts_manager.go +++ b/pkg/jobs/jobsprotectedts/jobs_protected_ts_manager.go @@ -164,28 +164,22 @@ func (p *Manager) Protect( if readAsOf.IsEmpty() { return nil, nil } - var protectedtsID *uuid.UUID - err := p.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - details := job.Details() - protectedtsID = getProtectedTSOnJob(details) - // Check if there is an existing protected timestamp ID on the job, - // in which case we can only need to update it. + // Set up a new protected timestamp ID and install it on the job. + err := job.Update(ctx, nil, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { + // Check if the protected timestamp is visible in the txn. + protectedtsID := getProtectedTSOnJob(md.Payload.UnwrapDetails()) + // If it's been removed lets create a new one. if protectedtsID == nil { newID := uuid.MakeV4() protectedtsID = &newID - // Set up a new protected timestamp ID and install it on the job. - return job.Update(ctx, txn, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { - details = job.Details() - details = setProtectedTSOnJob(details, protectedtsID) - md.Payload.Details = jobspb.WrapPayloadDetails(details) - ju.UpdatePayload(md.Payload) - - rec := MakeRecord(*protectedtsID, - int64(job.ID()), readAsOf, nil, Jobs, target) - return p.protectedTSProvider.Protect(ctx, txn, rec) - }) + details := setProtectedTSOnJob(md.Payload.UnwrapDetails(), protectedtsID) + md.Payload.Details = jobspb.WrapPayloadDetails(details) + ju.UpdatePayload(md.Payload) + rec := MakeRecord(*protectedtsID, + int64(job.ID()), readAsOf, nil, Jobs, target) + return p.protectedTSProvider.Protect(ctx, txn, rec) } - // Refresh the existing timestamp. + // Refresh the existing timestamp, otherwise. return p.protectedTSProvider.UpdateTimestamp(ctx, txn, *protectedtsID, readAsOf) }) if err != nil { @@ -216,7 +210,7 @@ func (p *Manager) Unprotect(ctx context.Context, job *jobs.Job) error { if protectedtsID == nil { return nil } - updatedDetails := setProtectedTSOnJob(job.Details(), nil) + updatedDetails := setProtectedTSOnJob(md.Payload.UnwrapDetails(), nil) md.Payload.Details = jobspb.WrapPayloadDetails(updatedDetails) ju.UpdatePayload(md.Payload) return p.protectedTSProvider.Release(ctx, txn, *protectedtsID)