Skip to content

Commit

Permalink
storage: persist minimum transaction timestamps in intents
Browse files Browse the repository at this point in the history
Fixes cockroachdb#36089.
Informs cockroachdb#37199.

This commit addresses the second concern from cockroachdb#37199 (comment)
by implementing its suggestion #1.

It augments the TxnMeta struct to begin storing the transaction's
minimum timestamp. This allows pushers to have perfect accuracy
into whether an intent is part of a transaction that can eventually
be committed or whether it has been aborted by any other pusher and
uncommittable.

This allows us to get rid of the false positive cases where a pusher
incorrectly detects that a transaction is still active and begins
waiting on it. In this worst case, this could lead to transactions
waiting for the entire transaction expiration for a contending txn.

Release note: None
  • Loading branch information
nvanbenschoten committed Jul 10, 2019
1 parent fc45ea0 commit 9bace21
Show file tree
Hide file tree
Showing 22 changed files with 508 additions and 272 deletions.
45 changes: 45 additions & 0 deletions c-deps/libroach/protos/storage/engine/enginepb/mvcc3.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 59 additions & 0 deletions c-deps/libroach/protos/storage/engine/enginepb/mvcc3.pb.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 34 additions & 20 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,11 +770,12 @@ func MakeTransaction(

return Transaction{
TxnMeta: enginepb.TxnMeta{
Key: baseKey,
ID: u,
Timestamp: now,
Priority: MakePriority(userPriority),
Sequence: 0, // 1-indexed, incremented before each Request
Key: baseKey,
ID: u,
Timestamp: now,
MinTimestamp: now,
Priority: MakePriority(userPriority),
Sequence: 0, // 1-indexed, incremented before each Request
},
Name: name,
LastHeartbeat: now,
Expand Down Expand Up @@ -949,13 +950,18 @@ func (t *Transaction) BumpEpoch() {
// InclusiveTimeBounds returns start and end timestamps such that all intents written as
// part of this transaction have a timestamp in the interval [start, end].
func (t *Transaction) InclusiveTimeBounds() (hlc.Timestamp, hlc.Timestamp) {
min := t.OrigTimestamp
min := t.MinTimestamp
max := t.Timestamp
if t.Epoch != 0 && t.EpochZeroTimestamp != (hlc.Timestamp{}) {
if min.Less(t.EpochZeroTimestamp) {
panic(fmt.Sprintf("orig timestamp %s less than epoch zero %s", min, t.EpochZeroTimestamp))
if min.IsEmpty() {
// Backwards compatibility with pre-v19.2 nodes.
// TODO(nvanbenschoten): Remove in v20.1.
min = t.OrigTimestamp
if t.Epoch != 0 && t.EpochZeroTimestamp != (hlc.Timestamp{}) {
if min.Less(t.EpochZeroTimestamp) {
panic(fmt.Sprintf("orig timestamp %s less than epoch zero %s", min, t.EpochZeroTimestamp))
}
min = t.EpochZeroTimestamp
}
min = t.EpochZeroTimestamp
}
return min, max
}
Expand Down Expand Up @@ -1001,6 +1007,20 @@ func (t *Transaction) Update(o *Transaction) {
t.MaxTimestamp.Forward(o.MaxTimestamp)
t.RefreshedTimestamp.Forward(o.RefreshedTimestamp)

// On update, set lower bound timestamps to the minimum seen by either txn.
// These shouldn't differ unless one of them is empty, but we're careful
// anyway.
if t.MinTimestamp == (hlc.Timestamp{}) {
t.MinTimestamp = o.MinTimestamp
} else if o.MinTimestamp != (hlc.Timestamp{}) {
t.MinTimestamp.Backward(o.MinTimestamp)
}
if t.EpochZeroTimestamp == (hlc.Timestamp{}) {
t.EpochZeroTimestamp = o.EpochZeroTimestamp
} else if o.EpochZeroTimestamp != (hlc.Timestamp{}) {
t.EpochZeroTimestamp.Backward(o.EpochZeroTimestamp)
}

// Absorb the collected clock uncertainty information.
for _, v := range o.ObservedTimestamps {
t.UpdateObservedTimestamp(v.NodeID, v.Timestamp)
Expand All @@ -1016,12 +1036,6 @@ func (t *Transaction) Update(o *Transaction) {
if len(o.InFlightWrites) > 0 {
t.InFlightWrites = o.InFlightWrites
}
// On update, set epoch zero timestamp to the minimum seen by either txn.
if o.EpochZeroTimestamp != (hlc.Timestamp{}) {
if t.EpochZeroTimestamp == (hlc.Timestamp{}) || o.EpochZeroTimestamp.Less(t.EpochZeroTimestamp) {
t.EpochZeroTimestamp = o.EpochZeroTimestamp
}
}
}

// UpgradePriority sets transaction priority to the maximum of current
Expand Down Expand Up @@ -1051,9 +1065,9 @@ func (t Transaction) String() string {
fmt.Fprintf(&buf, "%q ", t.Name)
}
fmt.Fprintf(&buf, "id=%s key=%s rw=%t pri=%.8f stat=%s epo=%d "+
"ts=%s orig=%s max=%s wto=%t seq=%d",
"ts=%s orig=%s min=%s max=%s wto=%t seq=%d",
t.Short(), Key(t.Key), t.IsWriting(), floatPri, t.Status, t.Epoch, t.Timestamp,
t.OrigTimestamp, t.MaxTimestamp, t.WriteTooOld, t.Sequence)
t.OrigTimestamp, t.MinTimestamp, t.MaxTimestamp, t.WriteTooOld, t.Sequence)
if ni := len(t.IntentSpans); t.Status != PENDING && ni > 0 {
fmt.Fprintf(&buf, " int=%d", ni)
}
Expand All @@ -1075,9 +1089,9 @@ func (t Transaction) SafeMessage() string {
fmt.Fprintf(&buf, "%q ", t.Name)
}
fmt.Fprintf(&buf, "id=%s rw=%t pri=%.8f stat=%s epo=%d "+
"ts=%s orig=%s max=%s wto=%t seq=%d",
"ts=%s orig=%s min=%s max=%s wto=%t seq=%d",
t.Short(), t.IsWriting(), floatPri, t.Status, t.Epoch, t.Timestamp,
t.OrigTimestamp, t.MaxTimestamp, t.WriteTooOld, t.Sequence)
t.OrigTimestamp, t.MinTimestamp, t.MaxTimestamp, t.WriteTooOld, t.Sequence)
if ni := len(t.IntentSpans); t.Status != PENDING && ni > 0 {
fmt.Fprintf(&buf, " int=%d", ni)
}
Expand Down
Loading

0 comments on commit 9bace21

Please sign in to comment.