-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
initial.go
131 lines (118 loc) · 4.59 KB
/
initial.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// Copyright 2016 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package stateloader
import (
"context"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
// raftInitialLog{Index,Term} are the starting points for the raft log. We
// bootstrap the raft membership by synthesizing a snapshot as if there were
// some discarded prefix to the log, so we must begin the log at an arbitrary
// index greater than 1.
const (
raftInitialLogIndex = 10
raftInitialLogTerm = 5
// RaftLogTermSignalForAddRaftAppliedIndexTermMigration is never persisted
// in the state machine or in HardState. It is only used in
// AddRaftAppliedIndexTermMigration to signal to the below raft code that
// the migration should happen when applying the raft log entry that
// contains ReplicatedEvalResult.State.RaftAppliedIndexTerm equal to this
// value. It is less than raftInitialLogTerm since that ensures it will
// never be used under normal operation.
RaftLogTermSignalForAddRaftAppliedIndexTermMigration = 3
)
// WriteInitialReplicaState sets up a new Range, but without writing an
// associated Raft state (which must be written separately via
// SynthesizeRaftState before instantiating a Replica). The main task is to
// persist a ReplicaState which does not start from zero but presupposes a few
// entries already having applied. The supplied MVCCStats are used for the Stats
// field after adjusting for persisting the state itself, and the updated stats
// are returned.
func WriteInitialReplicaState(
ctx context.Context,
readWriter storage.ReadWriter,
ms enginepb.MVCCStats,
desc roachpb.RangeDescriptor,
lease roachpb.Lease,
gcThreshold hlc.Timestamp,
replicaVersion roachpb.Version,
writeRaftAppliedIndexTerm bool,
) (enginepb.MVCCStats, error) {
rsl := Make(desc.RangeID)
var s kvserverpb.ReplicaState
s.TruncatedState = &roachpb.RaftTruncatedState{
Term: raftInitialLogTerm,
Index: raftInitialLogIndex,
}
s.RaftAppliedIndex = s.TruncatedState.Index
if writeRaftAppliedIndexTerm {
s.RaftAppliedIndexTerm = s.TruncatedState.Term
}
s.Desc = &roachpb.RangeDescriptor{
RangeID: desc.RangeID,
}
s.Stats = &ms
s.Lease = &lease
s.GCThreshold = &gcThreshold
if (replicaVersion != roachpb.Version{}) {
s.Version = &replicaVersion
}
if existingLease, err := rsl.LoadLease(ctx, readWriter); err != nil {
return enginepb.MVCCStats{}, errors.Wrap(err, "error reading lease")
} else if (existingLease != roachpb.Lease{}) {
log.Fatalf(ctx, "expected trivial lease, but found %+v", existingLease)
}
if existingGCThreshold, err := rsl.LoadGCThreshold(ctx, readWriter); err != nil {
return enginepb.MVCCStats{}, errors.Wrap(err, "error reading GCThreshold")
} else if !existingGCThreshold.IsEmpty() {
log.Fatalf(ctx, "expected trivial GCthreshold, but found %+v", existingGCThreshold)
}
if existingVersion, err := rsl.LoadVersion(ctx, readWriter); err != nil {
return enginepb.MVCCStats{}, errors.Wrap(err, "error reading Version")
} else if (existingVersion != roachpb.Version{}) {
log.Fatalf(ctx, "expected trivial version, but found %+v", existingVersion)
}
newMS, err := rsl.Save(ctx, readWriter, s)
if err != nil {
return enginepb.MVCCStats{}, err
}
return newMS, nil
}
// WriteInitialRangeState writes the initial range state. It's called during
// bootstrap.
func WriteInitialRangeState(
ctx context.Context,
readWriter storage.ReadWriter,
desc roachpb.RangeDescriptor,
replicaVersion roachpb.Version,
) error {
initialLease := roachpb.Lease{}
initialGCThreshold := hlc.Timestamp{}
initialMS := enginepb.MVCCStats{}
// TODO: is WriteInitialRangeState only used for a new cluster, so
// we can assume that we should set writeAppliedIndexTerm to true?
// What prevents older versions from joining this cluster?
if _, err := WriteInitialReplicaState(
ctx, readWriter, initialMS, desc, initialLease, initialGCThreshold,
replicaVersion, true, /* writeRaftAppliedIndexTerm */
); err != nil {
return err
}
if err := Make(desc.RangeID).SynthesizeRaftState(ctx, readWriter); err != nil {
return err
}
return nil
}