Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
28156: cdc: Add highwater timestamp to crdb_internal.jobs r=mrtracy a=mrtracy

Adds column high_water_timestamp to the crdb_internal.jobs table, and
thus also to the output of SHOW JOBS. This column is stored internally
as a HLC timestamp, but is output to the user as a DECIMAL in the same
fashion as the `cluster_logical_timestamp()` built-in. This column will
be nil for existing jobs, which use fraction_completed to represent
their progress.

Release note: Added an additional column to crdb_internal.jobs and SHOW
JOBS to represent the "high water timestamp" of Changefeed jobs. This is
used to represent the progress of changefeeds, as an alternative to the
fraction_completed column which is used by existing jobs.

Co-authored-by: Matt Tracy <matt@cockroachlabs.com>
  • Loading branch information
craig[bot] and Matt Tracy committed Aug 12, 2018
2 parents 9772077 + 9f0c6da commit 470173b
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 84 deletions.
20 changes: 18 additions & 2 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1151,6 +1151,7 @@ func (s *adminServer) Jobs(
}
for i, row := range rows {
job := &resp.Jobs[i]
var fractionCompletedOrNil *float32
if err := scanner.ScanAll(
row,
&job.ID,
Expand All @@ -1163,11 +1164,14 @@ func (s *adminServer) Jobs(
&job.Started,
&job.Finished,
&job.Modified,
&job.FractionCompleted,
&fractionCompletedOrNil,
&job.Error,
); err != nil {
return nil, s.serverError(err)
}
if fractionCompletedOrNil != nil {
job.FractionCompleted = *fractionCompletedOrNil
}
}

return &resp, nil
Expand Down Expand Up @@ -1662,6 +1666,18 @@ func (rs resultScanner) ScanIndex(row tree.Datums, index int, dst interface{}) e
}
*d = float32(*s)

case **float32:
s, ok := src.(*tree.DFloat)
if !ok {
if src != tree.DNull {
return errors.Errorf("source type assertion failed")
}
*d = nil
break
}
val := float32(*s)
*d = &val

case *int64:
s, ok := tree.AsDInt(src)
if !ok {
Expand Down Expand Up @@ -1698,7 +1714,7 @@ func (rs resultScanner) ScanIndex(row tree.Datums, index int, dst interface{}) e
return errors.Errorf("source type assertion failed")
}
*d = nil
return nil
break
}
*d = &s.Time

Expand Down
20 changes: 17 additions & 3 deletions pkg/server/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,14 +1098,28 @@ func TestAdminAPIJobs(t *testing.T) {
{1, jobs.StatusRunning, jobspb.RestoreDetails{}, jobspb.RestoreProgress{}},
{2, jobs.StatusRunning, jobspb.BackupDetails{}, jobspb.BackupProgress{}},
{3, jobs.StatusSucceeded, jobspb.BackupDetails{}, jobspb.BackupProgress{}},
{4, jobs.StatusRunning, jobspb.ChangefeedDetails{}, jobspb.ChangefeedProgress{}},
}
for _, job := range testJobs {
payload := jobspb.Payload{Details: jobspb.WrapPayloadDetails(job.details)}
payloadBytes, err := protoutil.Marshal(&payload)
if err != nil {
t.Fatal(err)
}

progress := jobspb.Progress{Details: jobspb.WrapProgressDetails(job.progress)}
// Populate progress.Progress field with a specific progress type based on
// the job type.
if _, ok := job.progress.(jobspb.ChangefeedProgress); ok {
progress.Progress = &jobspb.Progress_HighWater{
HighWater: &hlc.Timestamp{},
}
} else {
progress.Progress = &jobspb.Progress_FractionCompleted{
FractionCompleted: 1.0,
}
}

progressBytes, err := protoutil.Marshal(&progress)
if err != nil {
t.Fatal(err)
Expand All @@ -1122,9 +1136,9 @@ func TestAdminAPIJobs(t *testing.T) {
uri string
expectedIDs []int64
}{
{"jobs", append([]int64{3, 2, 1}, existingIDs...)},
{"jobs?limit=1", []int64{3}},
{"jobs?status=running", []int64{2, 1}},
{"jobs", append([]int64{4, 3, 2, 1}, existingIDs...)},
{"jobs?limit=1", []int64{4}},
{"jobs?status=running", []int64{4, 2, 1}},
{"jobs?status=succeeded", append([]int64{3}, existingIDs...)},
{"jobs?status=pending", []int64{}},
{"jobs?status=garbage", []int64{}},
Expand Down
40 changes: 24 additions & 16 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,19 +394,20 @@ func tsOrNull(micros int64) tree.Datum {
var crdbInternalJobsTable = virtualSchemaTable{
schema: `
CREATE TABLE crdb_internal.jobs (
job_id INT,
job_type STRING,
description STRING,
user_name STRING,
descriptor_ids INT[],
status STRING,
created TIMESTAMP,
started TIMESTAMP,
finished TIMESTAMP,
modified TIMESTAMP,
fraction_completed FLOAT,
error STRING,
coordinator_id INT
job_id INT,
job_type STRING,
description STRING,
user_name STRING,
descriptor_ids INT[],
status STRING,
created TIMESTAMP,
started TIMESTAMP,
finished TIMESTAMP,
modified TIMESTAMP,
fraction_completed FLOAT,
high_water_timestamp DECIMAL,
error STRING,
coordinator_id INT
);
`,
populate: func(ctx context.Context, p *planner, _ *DatabaseDescriptor, addRow func(...tree.Datum) error) error {
Expand All @@ -422,9 +423,9 @@ CREATE TABLE crdb_internal.jobs (
id, status, created, payloadBytes, progressBytes := r[0], r[1], r[2], r[3], r[4]

var jobType, description, username, descriptorIDs, started,
finished, modified, fractionCompleted, errorStr, leaseNode = tree.DNull,
finished, modified, fractionCompleted, highWaterTimestamp, errorStr, leaseNode = tree.DNull,
tree.DNull, tree.DNull, tree.DNull, tree.DNull, tree.DNull, tree.DNull, tree.DNull,
tree.DNull, tree.DNull
tree.DNull, tree.DNull, tree.DNull

// Extract data from the payload.
payload, err := jobs.UnmarshalPayload(payloadBytes)
Expand Down Expand Up @@ -462,7 +463,13 @@ CREATE TABLE crdb_internal.jobs (
}
errorStr = tree.NewDString(fmt.Sprintf("%serror decoding progress: %v", baseErr, err))
} else {
fractionCompleted = tree.NewDFloat(tree.DFloat(progress.GetFractionCompleted()))
// Progress contains either fractionCompleted for traditional jobs,
// or the highWaterTimestamp for change feeds.
if highwater := progress.GetHighWater(); highwater != nil {
highWaterTimestamp = tree.TimestampToDecimal(*highwater)
} else {
fractionCompleted = tree.NewDFloat(tree.DFloat(progress.GetFractionCompleted()))
}
modified = tsOrNull(progress.ModifiedMicros)
}
}
Expand All @@ -480,6 +487,7 @@ CREATE TABLE crdb_internal.jobs (
finished,
modified,
fractionCompleted,
highWaterTimestamp,
errorStr,
leaseNode,
); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/crdb_internal
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ Channel


# The validity of the rows in this table are tested elsewhere; we merely assert the columns.
query ITTTTTTTTTRTI colnames
query ITTTTTTTTTRTTI colnames
SELECT * FROM crdb_internal.jobs WHERE false
----
job_id job_type description user_name descriptor_ids status created started finished modified fraction_completed error coordinator_id
job_id job_type description user_name descriptor_ids status created started finished modified fraction_completed high_water_timestamp error coordinator_id

query IITTITTT colnames
SELECT * FROM crdb_internal.schema_changes WHERE table_id < 0
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/logictest/testdata/planner_test/explain
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ EXPLAIN SHOW JOBS
----
render · ·
└── values · ·
· size 13 columns, 0 rows
· size 14 columns, 0 rows

statement ok
CREATE INDEX a ON foo(x)
Expand Down Expand Up @@ -223,7 +223,7 @@ sort · ·
├── render · ·
│ └── filter · ·
│ └── values · ·
│ size 18 columns, 907 rows
│ size 18 columns, 908 rows
└── render · ·
└── filter · ·
└── values · ·
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/opt/exec/execbuilder/testdata/explain
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ EXPLAIN SHOW JOBS
----
render · ·
└── values · ·
· size 13 columns, 0 rows
· size 14 columns, 0 rows

statement ok
CREATE INDEX a ON foo(x)
Expand Down Expand Up @@ -216,7 +216,7 @@ sort · ·
├── render · ·
│ └── filter · ·
│ └── values · ·
│ size 18 columns, 907 rows
│ size 18 columns, 908 rows
└── render · ·
└── filter · ·
└── values · ·
Expand Down
Loading

0 comments on commit 470173b

Please sign in to comment.