Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/apply: create apply package for raft entry application #39254

Merged

Conversation

nvanbenschoten
Copy link
Member

The new package provides abstractions and routines associated with the application of committed raft entries to a replicated state machine.

This was inspired by four driving forces:

The PR begins to address these concerns by formalizing the process of applying committed raft entries. To start, this makes the process easier to understand both in terms of the macro-level steps that are taken during application of a batch of entries and in terms of the impact that an individual command has on the replicated state machine. For instance, the PR helps provide answers to all of the following questions:

  • What are the stages of raft entry application?
  • What is the difference between a "raft entry" and a "replicated command"?
  • What can a command do besides apply its write batch to the storage engine?
  • What does it mean for a successfully replicated command to be rejected during application?
  • When can we acknowledge the outcome of a raft proposal?

The refactor also uncovers a large testing surface that future PRs will exploit to write targeted unit tests. Not only can the storage/apply package be tested with a mock state machine (done in this PR), but we can test Replica's implementation of the state machine interface in isolation without needing to touch raft at all.

Finally, the refactor paves the way for making the proposed change in #38954 in a much cleaner way. This is demonstrated in the second commit, which is being included here to show why certain things were designed the way they were but will not be merged with this PR.

@nvanbenschoten nvanbenschoten requested review from danhhz, ajwerner, tbg and a team August 2, 2019 02:57
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@nvanbenschoten
Copy link
Member Author

Heh, I didn't bother adding in the DisableRaftAckBeforeApplication testing knob from #38954 and figured the tests would fail, but I guess limiting the entries that can be early acked to those already in the raft log from previous raft ready iterations essentially disables the optimization for single-node clusters (because it doesn't need it), so none of the tests become flaky. That wasn't intentional.

@danhhz
Copy link
Contributor

danhhz commented Aug 2, 2019

:plause: So excited to see this!! I won't get a chance to look at this today and I'm out on monday, but I'll look at it first thing on tuesday.

My initial feedback as someone unfamiliar with all of this code is that there are a lot of pieces that are individually documented very well, but I didn't see anything that lays out how they fit together. I'd suggest a package level godoc that gives an overview of 1) the relevant parts of raft, so the reader doesn't have to read the raft paper to understand this code and 2) how all the major pieces fit together. Two examples of the type of thing I'm thinking of are

// sendSnapshot sends a snapshot of the replica state to the specified replica.
// Currently only invoked from replicateQueue and raftSnapshotQueue. Be careful
// about adding additional calls as generating a snapshot is moderately
// expensive.
//
// A snapshot is a bulk transfer of all data in a range. It consists of a
// consistent view of all the state needed to run some replica of a range as of
// some applied index (not as of some mvcc-time). Snapshots are used by Raft
// when a follower is far enough behind the leader that it can no longer be
// caught up using incremental diffs (because the leader has already garbage
// collected the diffs, in this case because it truncated the Raft log past
// where the follower is).
//
// We also proactively send a snapshot when adding a new replica to bootstrap it
// (this is called a "learner" snapshot and is a special case of a Raft
// snapshot, we just speed the process along). It's called a learner snapshot
// because it's sent to what Raft terms a learner replica. As of 19.2, when we
// add a new replica, it's first added as a learner using a Raft ConfChange,
// which means it accepts Raft traffic but doesn't vote or affect quorum. Then
// we immediately send it a snapshot to catch it up. After the snapshot
// successfully applies, we turn it into a normal voting replica using another
// ConfChange. It then uses the normal mechanisms to catch up with whatever got
// committed to the Raft log during the snapshot transfer. In contrast to adding
// the voting replica directly, this avoids a period of fragility when the
// replica would be a full member, but very far behind. [1]
//
// Snapshots are expensive and mostly unexpected (except learner snapshots
// during rebalancing). The quota pool is responsible for keeping a leader from
// getting too far ahead of any of the followers, so ideally they'd never be far
// enough behind to need a snapshot.
//
// The snapshot process itself is broken into 3 parts: generating the snapshot,
// transmitting it, and applying it.
//
// Generating the snapshot: The data contained in a snapshot is a full copy of
// the replicated data plus everything the replica needs to be a healthy member
// of a Raft group. The former is large, so we send it via streaming rpc instead
// of keeping it all in memory at once. (Well, at least on the sender side. On
// the recipient side, we do still buffer it, but we'll fix that at some point).
// The `(Replica).GetSnapshot` method does the necessary locking and gathers the
// various Raft state needed to run a replica. It also creates an iterator for
// the range's data as it looked under those locks (this is powered by a RocksDB
// snapshot, which is a different thing but a similar idea). Notably,
// GetSnapshot does not do the data iteration.
//
// Transmitting the snapshot: The transfer itself happens over the grpc
// `RaftSnapshot` method, which is a bi-directional stream of `SnapshotRequest`s
// and `SnapshotResponse`s. The two sides are orchestrated by the
// `(RaftTransport).SendSnapshot` and `(Store).receiveSnapshot` methods.
//
// `SendSnapshot` starts up the streaming rpc and first sends a header message
// with everything but the range data and then blocks, waiting on the first
// streaming response from the recipient. This lets us short-circuit sending the
// range data if the recipient can't be contacted or if it can't use the
// snapshot (which is usually the result of a race). The recipient's grpc
// handler for RaftSnapshot sanity checks a few things and ends up calling down
// into `receiveSnapshot`, which does the bulk of the work. `receiveSnapshot`
// starts by waiting for a reservation in the snapshot rate limiter. It then
// reads the header message and hands it to `shouldAcceptSnapshotData` to
// determine if it can use the snapshot [2]. `shouldAcceptSnapshotData` is
// advisory and can return false positives. If `shouldAcceptSnapshotData`
// returns true, this is communicated back to the sender, which then proceeds to
// call `kvBatchSnapshotStrategy.Send`. This uses the iterator captured earlier
// to send the data in chunks, each chunk a streaming grpc message. The sender
// then sends a final message with an indicaton that it's done and blocks again,
// waiting for a second and final response from the recipient which indicates if
// the snapshot was a success.
//
// Applying the snapshot: After the recipient has received the message
// indicating it has all the data, it hands it all to
// `(Store).processRaftSnapshotRequest` to be applied. First, this re-checks the
// same things as `shouldAcceptSnapshotData` to make sure nothing has changed
// while the snapshot was being transferred. It then guarantees that there is
// either an initialized[3] replica or a `ReplicaPlaceholder`[4] to accept the
// snapshot by creating a placeholder if necessary. Finally, a *Raft snapshot*
// message is manually handed to the replica's Raft node (by calling
// `stepRaftGroup` + `handleRaftReadyRaftMuLocked`), at which point the snapshot
// has been applied.
//
// [1]: There is a third kind of snapshot, called "preemptive", which is how we
// avoided the above fragility before learner replicas were introduced in the
// 19.2 cycle. It's essentially a snapshot that we made very fast by staging it
// on a remote node right before we added a replica on that node. However,
// preemptive snapshots came with all sorts of complexity that we're delighted
// to be rid of. They have to stay around for clusters with mixed 19.1 and 19.2
// nodes, but after 19.2, we can remove them entirely.
//
// [2]: The largest class of rejections here is if the store contains a replica
// that overlaps the snapshot but has a different id (we maintain an invariant
// that replicas on a store never overlap). This usually happens when the
// recipient has an old copy of a replica that is no longer part of a range and
// the `replicaGCQueue` hasn't gotten around to collecting it yet. So if this
// happens, `shouldAcceptSnapshotData` will queue it up for consideration.
//
// [3]: A uninitialized replica is created when a replica that's being added
// gets traffic from its new peers before it gets a snapshot. It may be possible
// to get rid of uninitialized replicas (by dropping all Raft traffic except
// votes on the floor), but this is a cleanup that hasn't happened yet.
//
// [4]: The placeholder is essentially a snapshot lock, making any future
// callers of `shouldAcceptSnapshotData` return an error so that we no longer
// have to worry about racing with a second snapshot. See the comment on
// ReplicaPlaceholder for details.

// T is an instance of a SQL scalar, array, or tuple type. It describes the
// domain of possible values which a column can return, or to which an
// expression can evaluate. The type system does not differentiate between
// nullable and non-nullable types. It is up to the caller to store that
// information separately if it is needed. Here are some example types:
//
// INT4 - any 32-bit integer
// DECIMAL(10, 3) - any base-10 value with at most 10 digits, with
// up to 3 to right of decimal point
// FLOAT[] - array of 64-bit IEEE 754 floating-point values
// TUPLE[TIME, VARCHAR(20)] - any pair of values where first value is a time
// of day and the second value is a string having
// up to 20 characters
//
// Fundamentally, a type consists of the following attributes, each of which has
// a corresponding accessor method. Some of these attributes are only defined
// for a subset of types. See the method comments for more details.
//
// Family - equivalence group of the type (enumeration)
// Oid - Postgres Object ID that describes the type (enumeration)
// Precision - maximum accuracy of the type (numeric)
// Width - maximum size or scale of the type (numeric)
// Locale - location which governs sorting, formatting, etc. (string)
// ArrayContents - array element type (T)
// TupleContents - slice of types of each tuple field ([]T)
// TupleLabels - slice of labels of each tuple field ([]string)
//
// Some types are not currently allowed as the type of a column (e.g. nested
// arrays). Other usages of the types package may have similar restrictions.
// Each such caller is responsible for enforcing their own restrictions; it's
// not the concern of the types package.
//
// Implementation-wise, types.T wraps a protobuf-generated InternalType struct.
// The generated protobuf code defines the struct fields, marshals/unmarshals
// them, formats a string representation, etc. Meanwhile, the wrapper types.T
// struct overrides the Marshal/Unmarshal methods in order to map to/from older
// persisted InternalType representations. For example, older versions of
// InternalType (previously called ColumnType) used a VisibleType field to
// represent INT2, whereas newer versions use Width/Oid. Unmarshal upgrades from
// this old format to the new, and Marshal downgrades, thus preserving backwards
// compatibility.
//
// Simple (unary) scalars types
// ----------------------------
//
// | SQL type | Family | Oid | Precision | Width |
// |-------------------|----------------|---------------|-----------|-------|
// | NULL (unknown) | UNKNOWN | T_unknown | 0 | 0 |
// | BOOL | BOOL | T_bool | 0 | 0 |
// | DATE | DATE | T_date | 0 | 0 |
// | TIMESTAMP | TIMESTAMP | T_timestamp | 0 | 0 |
// | INTERVAL | INTERVAL | T_interval | 0 | 0 |
// | TIMESTAMPTZ | TIMESTAMPTZ | T_timestamptz | 0 | 0 |
// | OID | OID | T_oid | 0 | 0 |
// | UUID | UUID | T_uuid | 0 | 0 |
// | INET | INET | T_inet | 0 | 0 |
// | TIME | TIME | T_time | 0 | 0 |
// | JSON | JSONB | T_jsonb | 0 | 0 |
// | JSONB | JSONB | T_jsonb | 0 | 0 |
// | | | | | |
// | BYTES | BYTES | T_bytea | 0 | 0 |
// | | | | | |
// | STRING | STRING | T_text | 0 | 0 |
// | STRING(N) | STRING | T_text | 0 | N |
// | VARCHAR | STRING | T_varchar | 0 | 0 |
// | VARCHAR(N) | STRING | T_varchar | 0 | N |
// | CHAR | STRING | T_bpchar | 0 | 1 |
// | CHAR(N) | STRING | T_bpchar | 0 | N |
// | "char" | STRING | T_char | 0 | 0 |
// | NAME | STRING | T_name | 0 | 0 |
// | | | | | |
// | STRING COLLATE en | COLLATEDSTRING | T_text | 0 | 0 |
// | STRING(N) COL... | COLLATEDSTRING | T_text | 0 | N |
// | VARCHAR COL... | COLLATEDSTRING | T_varchar | 0 | N |
// | VARCHAR(N) COL... | COLLATEDSTRING | T_varchar | 0 | N |
// | CHAR COL... | COLLATEDSTRING | T_bpchar | 0 | 1 |
// | CHAR(N) COL... | COLLATEDSTRING | T_bpchar | 0 | N |
// | "char" COL... | COLLATEDSTRING | T_char | 0 | 0 |
// | | | | | |
// | DECIMAL | DECIMAL | T_decimal | 0 | 0 |
// | DECIMAL(N) | DECIMAL | T_decimal | N | 0 |
// | DECIMAL(N,M) | DECIMAL | T_decimal | N | M |
// | | | | | |
// | FLOAT8 | FLOAT | T_float8 | 0 | 0 |
// | FLOAT4 | FLOAT | T_float4 | 0 | 0 |
// | | | | | |
// | BIT | BIT | T_bit | 0 | 1 |
// | BIT(N) | BIT | T_bit | 0 | N |
// | VARBIT | BIT | T_varbit | 0 | 0 |
// | VARBIT(N) | BIT | T_varbit | 0 | N |
// | | | | | |
// | INT,INTEGER | INT | T_int8 | 0 | 64 |
// | INT2,SMALLINT | INT | T_int2 | 0 | 16 |
// | INT4 | INT | T_int4 | 0 | 32 |
// | INT8,INT64,BIGINT | INT | T_int8 | 0 | 64 |
//
// Tuple types
// -----------
//
// These cannot (yet) be used in tables but are used in DistSQL flow
// processors for queries that have tuple-typed intermediate results.
//
// | Field | Description |
// |-----------------|---------------------------------------------------------|
// | Family | TupleFamily |
// | Oid | T_record |
// | TupleContents | Contains tuple field types (can be recursively defined) |
// | TupleLabels | Contains labels for each tuple field |
//
// Array types
// -----------
//
// | Field | Description |
// |-----------------|---------------------------------------------------------|
// | Family | ArrayFamily |
// | Oid | T__XXX (double underscores), where XXX is the Oid name |
// | | of a scalar type |
// | ArrayContents | Type of array elements (scalar, array, or tuple) |
//
// There are two special ARRAY types:
//
// | SQL type | Family | Oid | ArrayContents |
// |-------------------|----------------|---------------|---------------|
// | INT2VECTOR | ARRAY | T_int2vector | Int |
// | OIDVECTOR | ARRAY | T_oidvector | Oid |
//
// When these types are themselves made into arrays, the Oids become T__int2vector and
// T__oidvector, respectively.
//

If you already have this and I've simply missed it in my quick scan, then maybe add some pointers to it from the godoc of the major pieces.

Copy link
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This apply package itself has become very abstract but still seems worthwhile. It almost feels like the policy lives here to provide this package with some business logic. If you were to abstract the policy then this package would be pretty much entirely control flow. There's something nice about that; it makes it clear what goes in to applying raft commands from an abstract perspective.

I know for a long time I've been on about the business of applying committed entries as being distinct from replication but I don't think I considered how abstract the apply interface would be as I focused more on the complexity in actually stepping the state machine. Given this interface I'm more open to a replicate package exporting these concepts.

Regardless of whether this abstraction overly moves the complexity needle on its own, it opens up the door to better abstractions moving forward. I'm all for it!

So far my review is just nits. This is great cleanup, thanks for taking this on.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @danhhz, @nvanbenschoten, and @tbg)


pkg/storage/cmd_app_ctx_buf.go, line 211 at r2 (raw file):

		}
	}
	return new(cmdAppCtxBufSlice)

I don't know how crazy it would be to panic if there weren't free ones. I worry that a later refactor could mess with this and nobody would notice until we went and tried to find the performance regression.


pkg/storage/replica_application_impl.go, line 197 at r1 (raw file):

// The decision about whether or not to apply a command is a combination of
// three checks:
// 1. verify that the command was proposed under the current lease. This is

nit: can you either indent all of this by a space or don't indent the wrapped lines. It looks bizarre in godoc.


pkg/storage/apply/cmd.go, line 79 at r1 (raw file):

type CommandIterator interface {
	CommandIteratorBase
	// cur returns the command that the iterator is currently pointing at.

s/cur/Cur/


pkg/storage/apply/cmd.go, line 95 at r1 (raw file):

type CheckedCommandIterator interface {
	CommandIteratorBase
	// cur returns the checked command that the iterator is currently

s/cur/CurChecked/


pkg/storage/apply/task.go, line 81 at r1 (raw file):

// machine when ApplyCommittedEntries is called.
//
// Example use:

+1 what @dan said, maybe just pull this into a go Example. It's worth looking at the godoc of this package.


pkg/storage/apply/task.go, line 41 at r2 (raw file):

	// purpose of checking commands to determine whether they will be
	// rejected or not when staged in a real batch.
	NewBatch(mock bool) Batch

This comment should refer to AckCommittedEntriesBeforeApplication


pkg/storage/apply/task.go, line 138 at r2 (raw file):

// yet been applied to the proposer replica's replicated state machine.
//
// This is safe because a proposal through raft is known to have succeeded as

can be known to have succeeded? The point of the below exposition is to explain the need to determine whether or not the command actually succeeded.


pkg/storage/apply/task.go, line 144 at r2 (raw file):

// is because the raft log is the provider of atomicity and durability for
// replicated writes, not (ignoring log truncation) the replicated state machine
// itself.

This comment deserves some mention of the NewBatch(true /*mock*/) usage.


pkg/storage/apply/task.go, line 149 at r2 (raw file):

// proposal at this stage:
//
// 1. Committing an entry in the raft log and having the command in that entry

Same nit about indentation.

image.png

Copy link
Member

@tbg tbg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass only, going to do another for the details when my brain has recovered from vacation.

Thanks for teasing this apart. I echo @danhhz and @ajwerner's comments about the documentation. Once that's in, the apply package will allow getting a good high-level view of the Raft pipeline and in particular how it is made performant. It looks like we're closing in on a point where we can look at the *Replica-based impls of the various interfaces in apply and reduce the amount of *Replica they contain - I find that very exciting. For example, the replicaDecoder is "mostly" the proposals map. The applier is "mostly" engine.NewBatch() and r.mu.State() (with the added complications of split/merge triggers, sideloading, and more generally side effects).

Reviewed 15 of 15 files at r1, 6 of 6 files at r2.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @danhhz and @nvanbenschoten)


pkg/storage/cmd_app_ctx.go, line 45 at r1 (raw file):

	ctx context.Context

	// The following fields are set in checkShouldApplyCommand when we validate

This sounds like they're set twice.


pkg/storage/cmd_app_ctx.go, line 134 at r1 (raw file):

	return "", nil
}

nit: move those above all of the *decodedRaftEntry methods.


pkg/storage/cmd_app_ctx_buf.go, line 25 at r1 (raw file):

// cmdAppCtxBufSliceFreeListSize is the size of the cmdAppCtxBufSlice free
// list. The size has been tuned for the maximum number of iterators that
// the storage/apply package uses at once.

Will we find out if this changes (in particularly grows)? How would one confirm manually?


pkg/storage/cmd_app_ctx_buf.go, line 141 at r1 (raw file):

}

// Valid implements the apply.commandIteratorBase interface.

CommandIteratorBase


pkg/storage/cmd_app_ctx_buf.go, line 146 at r1 (raw file):

}

// Next implements the apply.commandIteratorBase interface.

ditto


pkg/storage/cmd_app_ctx_buf.go, line 161 at r1 (raw file):

	cur := it.tail.cur()
	if cur == cmd {
		// Avoid the copy.

Just curious whether this is speculative optimization or something you've seen have an effect. We expect to usually always hit this, right?


pkg/storage/replica_application_impl.go, line 37 at r1 (raw file):

// allowing Replica to interface with the storage/apply package.
//
// TODO(nvanbenschoten): rename this file back to replica_application.go

Consider splitting into replica_decoder.go and replica_applier.go instead? It clocks in at 1000+ lines as is, so if we can reasonably split the concerns somehow, we should.


pkg/storage/replica_application_impl.go, line 51 at r1 (raw file):

}

// getDecoder returns the Replica's apply.Decoder.

Mention somewhere suitable that the raftMu is held for the whole lifetime of the decoder, including its acquisition.


pkg/storage/replica_application_impl.go, line 229 at r1 (raw file):

}

// TODO(nvanbenschoten): Unit test this function now that it is stateless.

Nice to see this boiled down to its essence.

nit: I'd think the comment should sit on this method, not on the checkShouldApplyCommand wrapper.


pkg/storage/replica_application_impl.go, line 459 at r1 (raw file):

//
// Assuming all checks were passed, the command's write batch is applied to the
// application batch. It's trivial ReplicatedState updates are then staged in

Its


pkg/storage/replica_raft.go, line 727 at r1 (raw file):

		defer appTask.Close()
		if err := appTask.Decode(ctx, rd.CommittedEntries); err != nil {
			// TODO(WIP): Propagate errExpl.

reminder to fix or turn into proper TODO


pkg/storage/replica_raft.go, line 731 at r1 (raw file):

		}
		if err := appTask.ApplyCommittedEntries(ctx); err != nil {
			// TODO(WIP): Propagate errExpl.

ditto


pkg/storage/apply/cmd.go, line 44 at r1 (raw file):

type AppliedCommand interface {
	CheckedCommand
	// AckOutcomeAndFinish acknowledges the outcome of the command to its

Maybe flip those. There may not be a client (IsLocal() == false)

Also mention what an error here means. Is that error associated with the command or does it mean that a low-level error occurred and things are broken now?


pkg/storage/apply/cmd.go, line 59 at r1 (raw file):

	Next()
	// NewList returns a new empty command list. Usages of the list will
	// always advance the iterator before pushing into to the list, so

into to

Hmm, the New.*List methods are tricky, I wish I saw a better way of achieving the goal here. But at least I think

  • NewList and NewCheckedList are only ever called on CommandIterator
  • NewAppliedList on CheckedCommandList

so you should be able to move them into these specific interfaces only. This also simplifies things further because that way you can't create various command lists from command lists and think about what that means in terms of memory sharing.


pkg/storage/apply/cmd.go, line 79 at r1 (raw file):

type CommandIterator interface {
	CommandIteratorBase
	// cur returns the command that the iterator is currently pointing at.

Cur


pkg/storage/apply/cmd.go, line 80 at r1 (raw file):

	CommandIteratorBase
	// cur returns the command that the iterator is currently pointing at.
	// Should not be called if valid is false.

Must
Valid()


pkg/storage/apply/cmd.go, line 95 at r1 (raw file):

type CheckedCommandIterator interface {
	CommandIteratorBase
	// cur returns the checked command that the iterator is currently

CurChecked


pkg/storage/apply/cmd.go, line 96 at r1 (raw file):

	CommandIteratorBase
	// cur returns the checked command that the iterator is currently
	// pointing at. Should not be called if valid is false.

Must (here and elsewhere)


pkg/storage/apply/cmd.go, line 103 at r1 (raw file):

type CheckedCommandList interface {
	CheckedCommandIterator
	// AppendChecked pushes the checked command on to the back of the list.

is "on to" correct (here and elsewhere)


pkg/storage/apply/cmd.go, line 110 at r1 (raw file):

type AppliedCommandIterator interface {
	CommandIteratorBase
	// cur returns the applied command that the iterator is currently

CurApplied


pkg/storage/apply/cmd.go, line 39 at r2 (raw file):

	CanAckBeforeApplication() bool
	// AckSuccess acknowledges the success of the command to its client.
	// Should only be called if !Rejected.

Must


pkg/storage/apply/task.go, line 51 at r1 (raw file):

	Stage(Command) (CheckedCommand, error)
	// Commit commits the updates staged in the Batch to the StateMachine.
	Commit(context.Context) error

The use of Commit in this context could (erroneously) suggest that these would be durable, which they're not. On the other hand, calling this Apply won't necessarily be more helpful. Hmm. Maybe Commit is the right name and the implementation will have a note that reminds readers that it's not syncing and why.da


pkg/storage/apply/task.go, line 61 at r1 (raw file):

	// DecodeAndBind decodes each of the provided raft entries into commands
	// and binds any that were proposed locally to their local proposals.
	// The method must only be called once per applier. It returns whether

s/applier/Decoder/


pkg/storage/apply/task.go, line 68 at r1 (raw file):

	// were passed to DecodeAndBind. The method must not be called until
	// after DecodeAndBind is called.
	NewCommandIter() CommandIterator

You can't (or don't want to) return the CommandIterator from DecodeAndBind?


pkg/storage/apply/task.go, line 95 at r1 (raw file):

//   }
//
type Task struct {

Thanks for including the second commit, which explains why this isn't just a method.


pkg/storage/apply/task.go, line 35 at r2 (raw file):

	// entire batch is committed at once.
	//
	// Batch come in two flavors - real batches and mock batches. Real

Batch comes

Copy link
Member Author

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reviews! I tried to address all of the comments. I also went back an added a package-level godoc that discusses the package as a whole, the problems it attempts to solve, and its role in the system, along with an Example-style test. It's probably easiest to review that part through godoc.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @dan, @danhhz, @knz, and @tbg)


pkg/storage/cmd_app_ctx.go, line 45 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

This sounds like they're set twice.

Done.


pkg/storage/cmd_app_ctx.go, line 134 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

nit: move those above all of the *decodedRaftEntry methods.

Done.


pkg/storage/cmd_app_ctx_buf.go, line 25 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Will we find out if this changes (in particularly grows)? How would one confirm manually?

That's a good question. We could panic in cmdAppCtxBufSliceFreeList.get if there isn't a free iterator when we want one. Doing so would kind of break the abstraction though. I guess it's probably worth it to be loud (and specific) if we ever break this tuning. What do you think?

EDIT: I made it a panic.


pkg/storage/cmd_app_ctx_buf.go, line 141 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

CommandIteratorBase

Done.


pkg/storage/cmd_app_ctx_buf.go, line 146 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

ditto

Done.


pkg/storage/cmd_app_ctx_buf.go, line 161 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Just curious whether this is speculative optimization or something you've seen have an effect. We expect to usually always hit this, right?

Yeah, we expect to always hit this, but to fully maintain the abstraction I don't think we should require it. The thinking that led to the fast path was trying to follow along the path of zero-cost abstractions while building this, so I guess you could call it a speculative optimization.


pkg/storage/cmd_app_ctx_buf.go, line 211 at r2 (raw file):

Previously, ajwerner wrote…

I don't know how crazy it would be to panic if there weren't free ones. I worry that a later refactor could mess with this and nobody would notice until we went and tried to find the performance regression.

Heh, @tbg mentioned the same thing above. Done.


pkg/storage/replica_application_impl.go, line 37 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Consider splitting into replica_decoder.go and replica_applier.go instead? It clocks in at 1000+ lines as is, so if we can reasonably split the concerns somehow, we should.

Done. Renamed to replica_application_decoder.go and replica_application_applier.go. I also renamed cmd_app_ctx.go to replica_application_cmd.go and cmd_app_ctx_buf.go to replica_application_cmd_buf.go.


pkg/storage/replica_application_impl.go, line 51 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Mention somewhere suitable that the raftMu is held for the whole lifetime of the decoder, including its acquisition.

Done.


pkg/storage/replica_application_impl.go, line 197 at r1 (raw file):

Previously, ajwerner wrote…

nit: can you either indent all of this by a space or don't indent the wrapped lines. It looks bizarre in godoc.

Done.


pkg/storage/replica_application_impl.go, line 229 at r1 (raw file):

I'd think the comment should sit on this method, not on the checkShouldApplyCommand wrapper.

Done.

This is going to be the first place that I add testing.


pkg/storage/replica_application_impl.go, line 459 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Its

Done.


pkg/storage/replica_raft.go, line 727 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

reminder to fix or turn into proper TODO

I was hoping to wait out @knz with his change to hook cockroachdb/errors up to our logging package. I might end up introducing a nonDeterministicFailure error type instead to handle this. I'll see after the next round of reviews.


pkg/storage/apply/cmd.go, line 44 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Maybe flip those. There may not be a client (IsLocal() == false)

Also mention what an error here means. Is that error associated with the command or does it mean that a low-level error occurred and things are broken now?

Done.


pkg/storage/apply/cmd.go, line 59 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

into to

Hmm, the New.*List methods are tricky, I wish I saw a better way of achieving the goal here. But at least I think

  • NewList and NewCheckedList are only ever called on CommandIterator
  • NewAppliedList on CheckedCommandList

so you should be able to move them into these specific interfaces only. This also simplifies things further because that way you can't create various command lists from command lists and think about what that means in terms of memory sharing.

Done.


pkg/storage/apply/cmd.go, line 79 at r1 (raw file):

Previously, ajwerner wrote…

s/cur/Cur/

Done.


pkg/storage/apply/cmd.go, line 79 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Cur

Done.


pkg/storage/apply/cmd.go, line 80 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Must
Valid()

Done.


pkg/storage/apply/cmd.go, line 95 at r1 (raw file):

Previously, ajwerner wrote…

s/cur/CurChecked/

Done.


pkg/storage/apply/cmd.go, line 95 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

CurChecked

Done.


pkg/storage/apply/cmd.go, line 96 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Must (here and elsewhere)

Done.


pkg/storage/apply/cmd.go, line 103 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

is "on to" correct (here and elsewhere)

"On to" is typically the preposition associated with data structures and "pushing" elements. I replaced this with "add to the end".


pkg/storage/apply/cmd.go, line 110 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

CurApplied

Done.


pkg/storage/apply/cmd.go, line 39 at r2 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Must

Done.


pkg/storage/apply/task.go, line 51 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

The use of Commit in this context could (erroneously) suggest that these would be durable, which they're not. On the other hand, calling this Apply won't necessarily be more helpful. Hmm. Maybe Commit is the right name and the implementation will have a note that reminds readers that it's not syncing and why.da

That seems like a concern of the implementation of this method though, doesn't it. We have a comment in replicaAppBatch.Commit that discusses this.


pkg/storage/apply/task.go, line 61 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

s/applier/Decoder/

Done.


pkg/storage/apply/task.go, line 68 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

You can't (or don't want to) return the CommandIterator from DecodeAndBind?

I don't want to because I want the owner of a Decoder to be able to acquire multiple fresh command iterators.


pkg/storage/apply/task.go, line 81 at r1 (raw file):

Previously, ajwerner wrote…

+1 what @dan said, maybe just pull this into a go Example. It's worth looking at the godoc of this package.

Done.


pkg/storage/apply/task.go, line 35 at r2 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Batch comes

Done.


pkg/storage/apply/task.go, line 41 at r2 (raw file):

Previously, ajwerner wrote…

This comment should refer to AckCommittedEntriesBeforeApplication

Done.


pkg/storage/apply/task.go, line 138 at r2 (raw file):

Previously, ajwerner wrote…

can be known to have succeeded? The point of the below exposition is to explain the need to determine whether or not the command actually succeeded.

Good point. Done.


pkg/storage/apply/task.go, line 144 at r2 (raw file):

Previously, ajwerner wrote…

This comment deserves some mention of the NewBatch(true /*mock*/) usage.

Done.


pkg/storage/apply/task.go, line 149 at r2 (raw file):

Previously, ajwerner wrote…

Same nit about indentation.

image.png

Done. I guess we'll have to keep waiting for golang/go#7873.

@nvanbenschoten nvanbenschoten force-pushed the nvanbenschoten/replicaApplier branch 3 times, most recently from 9f595cb to 5187acf Compare August 6, 2019 03:13
Copy link
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

If I had one gripe it'd be about the "mock" terminology. In the commit message you even refer to a mock StateMachine which is an interface with a method NewBatch(mock bool). There's something weird about a mock producing a non-mock batch etc. How do you feel about migrating the terminology on state machine Batch to be durable vs ephemeral rather than not mock vs mock?

Reviewed 7 of 9 files at r3, 3 of 3 files at r5.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @ajwerner, @dan, @danhhz, @knz, @nvanbenschoten, and @tbg)


pkg/storage/replica_application_impl.go, line 81 at r4 (raw file):

}

// checkShouldApplyCommand determines whether or not a command should be applied

super silly nit but I feel like this function reads better as just shouldApplyCommand. What does check add?


pkg/storage/replica_application_impl.go, line 610 at r4 (raw file):

	}

	// Update the node clock with the maximum timestamp of all commands in the

Nice!


pkg/storage/apply/doc.go, line 11 at r3 (raw file):

// licenses/APL.txt.

/*

Great comment.


pkg/storage/apply/doc.go, line 82 at r3 (raw file):

begins. The only remaining work to be done after replication of a command
succeeds is to determine whether it will be rejected and replaced by an empty
command. To facilitate this acknowledge as early as possible, this package

nit: s/acknowledge/acknowledgement/ or s/this acknowledge/acknowledging/?


pkg/storage/apply/doc.go, line 84 at r3 (raw file):

command. To facilitate this acknowledge as early as possible, this package
provides the ability to acknowledge a series of commands before applying them to
the state machine. For more, see Task.AckCommittedEntriesBeforeApplication.

You could reveal the "mock" batch here by just adding a few words. The comment on AckCommittedEntriesBeforeApplication is comprehensive but we should reward a reader that got this far in this big explanation by telling that reader in broad strokes how this early ack thing works. It's a lot of build up to have to go read another big complicated comment. How about adding a sentence like:

... provides the ability to acknowledge a series of commands before durably applying 
them to the state machine. Early acknowledgement outcomes are determined by stepping
commands through an in-memory "mock" copy of the state machine before performing any
durable work.

@tbg tbg requested a review from ajwerner August 6, 2019 16:52
Copy link
Member

@tbg tbg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Nothing in my comments blocks my merge disposition, so take what you like and leave the rest.

Reviewed 6 of 9 files at r3, 3 of 3 files at r5.
Reviewable status: :shipit: complete! 2 of 0 LGTMs obtained (waiting on @ajwerner, @dan, @danhhz, and @nvanbenschoten)


pkg/storage/cmd_app_ctx_buf.go, line 161 at r1 (raw file):

I don't think we should require it

Wasn't suggesting that, I was just curious whether you had added it based on numbers or based on knowledge. It sounds like it's the latter.


pkg/storage/cmd_app_ctx_buf.go, line 211 at r2 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Heh, @tbg mentioned the same thing above. Done.

Also add a comment on the constant that sets the free list length that regressions will cause very aggressive panics.


pkg/storage/replica_application_decoder.go, line 27 at r5 (raw file):

// replica_application_cmd.go      =>  apply.Command         (and variants)
// replica_application_cmd_buf.go  =>  apply.CommandIterator (and variants)
//

❤️

It almost seems like we should rename buf -> iter.


pkg/storage/replica_application_impl.go, line 81 at r4 (raw file):

Previously, ajwerner wrote…

super silly nit but I feel like this function reads better as just shouldApplyCommand. What does check add?

+1


pkg/storage/replica_raft.go, line 727 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

I was hoping to wait out @knz with his change to hook cockroachdb/errors up to our logging package. I might end up introducing a nonDeterministicFailure error type instead to handle this. I'll see after the next round of reviews.

I'm fine waiting, just turn it into a proper TODO then.


pkg/storage/apply/cmd.go, line 44 at r1 (raw file):

Also mention what an error here means. Is that error associated with the command or does it mean that a low-level error occurred and things are broken now?


pkg/storage/apply/doc.go, line 11 at r3 (raw file):

Previously, ajwerner wrote…

Great comment.

👍


pkg/storage/apply/doc.go, line 32 at r5 (raw file):

when compared at the same log index.


pkg/storage/apply/doc.go, line 65 at r5 (raw file):

and those that do not conflict can be run concurrently. However, below the level
of replication, it is unclear which commands conflict, so to ensure determinism
during state machine transitions, no concurrency is possible.

Is this really what the parallelism gap is about? In theory we could figure out which of the log entries conflict with each other and "apply" batches of non-conflicting entries in parallel. What we can't achieve is avoid pushing things into the same log in the first place.


pkg/storage/apply/doc.go, line 94 at r5 (raw file):

, which is typically possible for nearly the entirety of user-driven traffic.


pkg/storage/apply/doc_test.go, line 27 at r5 (raw file):

	sm := getTestStateMachine()
	dec := newTestDecoder()
	dec.nonTrivial[5] = true
fmt.Print(`Setting up a batch of seven log entries:
- index 2 and 6 are non-local
- index 3 and 6 will be rejected
- index 5 is not trivial
`)

pkg/storage/apply/doc_test.go, line 36 at r5 (raw file):

	defer t.Close()

	fmt.Println("Decode:")

"Decode (note that index 2 and 6 are not local):"


pkg/storage/apply/doc_test.go, line 45 at r5 (raw file):

		panic(err)
	}
fmt.Print(`
Above, only index 1 and 4 get acked early. The command at 5 is non-trivial, so the first batch contains only 1, 2, 3, and 4. An entry must be in the first batch to
qualify for acking early. 2 is not local (so there's nobody to ack), and 3 is rejected. We can't ack rejected commands early because the state machine is free to handle them any way it likes.
`

Why don't we ack rejected commands early?


pkg/storage/apply/doc_test.go, line 71 at r5 (raw file):

	//  applying side-effects of command 3
	//  applying side-effects of command 4
	//  finishing and acknowledging command 1; rejected=false

Any chance these could be omitted for 1 and 4 since they're noops?


pkg/storage/apply/task.go, line 123 at r5 (raw file):

//
// This is safe because a proposal through raft can be known to have succeeded
// as soon as it is durability replicated to a quorum of replicas (i.e. has

durably


pkg/storage/apply/task_test.go, line 69 at r5 (raw file):

	c.finished = true
	c.acked = true
	if logging {

Looks like this can actually be omitted (or customized) if it's been acked previously. That can help make the example even clearer.

Copy link
Member Author

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I had one gripe it'd be about the "mock" terminology. In the commit message you even refer to a mock StateMachine which is an interface with a method NewBatch(mock bool). There's something weird about a mock producing a non-mock batch etc. How do you feel about migrating the terminology on state machine Batch to be durable vs ephemeral rather than not mock vs mock?

Good idea! Done.

I added a new commit with a few renames that wraps this all up. I'm going to make sure it still passes CI, then pull out the early ack commit and move it to #38954 before merging.

Thanks for all the reviews.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 2 stale) (waiting on @ajwerner, @dan, @danhhz, and @tbg)


pkg/storage/replica_application_decoder.go, line 27 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

❤️

It almost seems like we should rename buf -> iter.

Done.


pkg/storage/replica_raft.go, line 727 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

I'm fine waiting, just turn it into a proper TODO then.

Done.


pkg/storage/apply/cmd.go, line 44 at r1 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Also mention what an error here means. Is that error associated with the command or does it mean that a low-level error occurred and things are broken now?

Done.


pkg/storage/apply/doc.go, line 82 at r3 (raw file):

Previously, ajwerner wrote…

nit: s/acknowledge/acknowledgement/ or s/this acknowledge/acknowledging/?

Done.


pkg/storage/apply/doc.go, line 84 at r3 (raw file):

Previously, ajwerner wrote…

You could reveal the "mock" batch here by just adding a few words. The comment on AckCommittedEntriesBeforeApplication is comprehensive but we should reward a reader that got this far in this big explanation by telling that reader in broad strokes how this early ack thing works. It's a lot of build up to have to go read another big complicated comment. How about adding a sentence like:

... provides the ability to acknowledge a series of commands before durably applying 
them to the state machine. Early acknowledgement outcomes are determined by stepping
commands through an in-memory "mock" copy of the state machine before performing any
durable work.

Done.


pkg/storage/apply/doc.go, line 32 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

when compared at the same log index.

Done.


pkg/storage/apply/doc.go, line 65 at r5 (raw file):
This is what people mean when they refer to it. It's not insurmountable, but the "In theory" is the hard part 😃 For instance, see https://www.cs.cmu.edu/~dongz/papers/KuaFu.pdf.

What we can't achieve is avoid pushing things into the same log in the first place.

We can do a better job than most because we have separate ranges and partitioned logs with a transaction model that can handle such a thing.


pkg/storage/apply/doc.go, line 94 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

, which is typically possible for nearly the entirety of user-driven traffic.

I'm ok leaving this concern to clients of this package, who get to choose when this is possible by implementing CanAckBeforeApplication.


pkg/storage/apply/doc_test.go, line 27 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…
fmt.Print(`Setting up a batch of seven log entries:
- index 2 and 6 are non-local
- index 3 and 6 will be rejected
- index 5 is not trivial
`)

Done.


pkg/storage/apply/doc_test.go, line 36 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

"Decode (note that index 2 and 6 are not local):"

Done.


pkg/storage/apply/doc_test.go, line 45 at r5 (raw file):
Done.

Why don't we ack rejected commands early?

Because we may not want to immediately return errors to clients that fail. I added a comment.


pkg/storage/apply/doc_test.go, line 71 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Any chance these could be omitted for 1 and 4 since they're noops?

Done.


pkg/storage/apply/task.go, line 123 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

durably

Done.


pkg/storage/apply/task_test.go, line 69 at r5 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Looks like this can actually be omitted (or customized) if it's been acked previously. That can help make the example even clearer.

Done.


pkg/storage/cmd_app_ctx_buf.go, line 211 at r2 (raw file):

Previously, tbg (Tobias Grieger) wrote…

Also add a comment on the constant that sets the free list length that regressions will cause very aggressive panics.

Done.


pkg/storage/replica_application_impl.go, line 81 at r4 (raw file):

Previously, tbg (Tobias Grieger) wrote…

+1

I was hoping to make it more clear that this mutates the receiver, but this wasn't succeeding in that anyway. Done.

Copy link
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm: sorry for the late review! today was busier than i expected

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (and 2 stale) (waiting on @ajwerner, @dan, @nvanbenschoten, and @tbg)


pkg/storage/apply/cmd.go, line 19 at r6 (raw file):

	// Index is the log index of the corresponding raft entry.
	Index() uint64
	// IsTrivial returns whether the command can apply in a batch.

nit: if non-trivial will always simply mean it has to be in its own batch, i'm not sure it's worth introducing jargon for it


pkg/storage/apply/cmd.go, line 22 at r6 (raw file):

	IsTrivial() bool
	// IsLocal returns whether the command was locally proposed.
	IsLocal() bool

can we get a bit more info on the special casing of local proposals?


pkg/storage/apply/cmd.go, line 62 at r6 (raw file):

// list variants. It is exported so its methods are displayed in godoc when
// it is embedded in other interfaces.
type CommandIteratorBase interface {

i can see how you got to this, but i don't love the additional exported surface area that all these iterator+map abstractions add to the package vs everything being in terms of slices. i noticed the real impl of these is a linked list, which i assume is because of allocations. don't hold up merging over this, but i do wonder if there's something else we could be doing here


pkg/storage/apply/doc.go, line 11 at r3 (raw file):

Previously, tbg (Tobias Grieger) wrote…

👍

Ditto, this is incredibly helpful!!! Reading through this PR a second time, everything made a lot more sense with this context.

My last nit for this is that I'd love to see some pointers to Command/CheckedCommand/AppliedCommand/Task and their major methods sprinkled at appropriate places in the text. The For more, see Batch. and AckCommittedEntriesBeforeApplication references are exactly the kind of thing I'm thinking of

Update: aha, just realized the reason you said the best way to review this would be looking at the godoc is because that puts the example test next to this. it's nice that example tests don't rot, but i still had to make some jumps to match up the steps of that test to this text, which i think the inline references would address


pkg/storage/apply/task.go, line 57 at r6 (raw file):

	Stage(Command) (CheckedCommand, error)
	// Commit commits the updates staged in the Batch to the StateMachine.
	Commit(context.Context) error

q: why is this on Batch instead of a method on the StateMachine that takes a Batch? i'm guessing to mirror rocksdb batches, but figured i'd ask

second q: why is Commit a separate step from ApplySideEffects?


pkg/storage/apply/task.go, line 200 at r6 (raw file):

	})

	// Stage the commands in the (moephemeralck) batch.

nit: moephemeralck


pkg/storage/replica_application_state_machine.go, line 69 at r6 (raw file):

// The provided format string should be safe for reporting.
func makeNonDetermFailure(format string, args ...interface{}) error {

nit: don't abbreviate Deterministic


pkg/storage/replica_application_state_machine.go, line 398 at r6 (raw file):

// wiped and we apply an empty log entry instead. If a rejected command was
// proposed locally, the error will eventually be communicated to the waiting
// proposer. The two typical cases in which errors occur are lease mismatch (in

"two typical cases" and how they're resolved = 👌. i love this type of comment and wish we did it more

Copy link
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (and 2 stale) (waiting on @ajwerner, @dan, @nvanbenschoten, and @tbg)


pkg/storage/apply/cmd.go, line 62 at r6 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

i can see how you got to this, but i don't love the additional exported surface area that all these iterator+map abstractions add to the package vs everything being in terms of slices. i noticed the real impl of these is a linked list, which i assume is because of allocations. don't hold up merging over this, but i do wonder if there's something else we could be doing here

I kept mulling this over on the bus ride home and realized that what I'd missed is that none of the main parts of this package export a reference to the iterator+map stuff. it's essentially just helpers to aid in implementations of the core part of the package. this makes me less worried about it. my gut tells me there's still some simplification of the package available, but i'm not quite seeing what it is. /shrug

Copy link
Member Author

@nvanbenschoten nvanbenschoten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 3 stale) (waiting on @ajwerner, @dan, @danhhz, and @tbg)


pkg/storage/apply/cmd.go, line 19 at r6 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

nit: if non-trivial will always simply mean it has to be in its own batch, i'm not sure it's worth introducing jargon for it

We're not introducing the term here. See isTrivial and clearTrivialReplicatedEvalResultFields.


pkg/storage/apply/cmd.go, line 22 at r6 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

can we get a bit more info on the special casing of local proposals?

Done.


pkg/storage/apply/cmd.go, line 62 at r6 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

I kept mulling this over on the bus ride home and realized that what I'd missed is that none of the main parts of this package export a reference to the iterator+map stuff. it's essentially just helpers to aid in implementations of the core part of the package. this makes me less worried about it. my gut tells me there's still some simplification of the package available, but i'm not quite seeing what it is. /shrug

This structure provides two main benefits. The first is the avoidance of memory allocations, like you mentioned. The interface gives us the ability to implement the iterator as a chunked linked list with pooled nodes, which we are doing in replicatedCmdBuf. If we didn't expose this as an interface then each time we wanted a slice of commands, the Decoder would need to create a slice of Commands and then fill it in with pointers to whatever concrete struct it was using to implement Command. This would be a problem on its own even if we only iterated over the list once. However, #38954 adds a second iteration through the commands, and I really didn't want that to have any additional costs.

The second benefit is increased type safety. To achieve the level of type safety we have here, we'd need to create a new slice at each stage of the pipeline. Barring that, the closest we could get is code that looks like the following (which at one point early on I had):

var cmds []Command // allocate here
...

for i, cmd := range cmds {
    checked, err := fn(cmd)
    if err != nil {
        return err
    }
    cmds[i] = checked
}

for i, cmd := range cmds {
    checked := cmd.(CheckedCommand) // we checked this above, I promise
    applied, err := fn(checked)
    if err != nil {
        return err
    }
    cmds[i] = applied
}

for i, cmd := range cmds {
    applied := cmd.(AppliedCommand) // we applied this above, I promise
    ...

pkg/storage/apply/doc.go, line 11 at r3 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

Ditto, this is incredibly helpful!!! Reading through this PR a second time, everything made a lot more sense with this context.

My last nit for this is that I'd love to see some pointers to Command/CheckedCommand/AppliedCommand/Task and their major methods sprinkled at appropriate places in the text. The For more, see Batch. and AckCommittedEntriesBeforeApplication references are exactly the kind of thing I'm thinking of

Update: aha, just realized the reason you said the best way to review this would be looking at the godoc is because that puts the example test next to this. it's nice that example tests don't rot, but i still had to make some jumps to match up the steps of that test to this text, which i think the inline references would address

I didn't want to mix the "theory" in with the implementation, so I added a Usage section. Good catch that that was missing.


pkg/storage/apply/task.go, line 57 at r6 (raw file):

i'm guessing to mirror rocksdb batches, but figured i'd ask

Not strictly to mirror rocksdb. Mainly because I think it makes this kind of interface cleaner and completely avoids the "what if I commit a batch in a different state machine than I got it from?" question.

why is Commit a separate step from ApplySideEffects?

So we can use it to convert CheckCommands into AppliedCommands and so we can maintain less state in the Batch itself.


pkg/storage/apply/task.go, line 200 at r6 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

nit: moephemeralck

What's the problem here?


pkg/storage/replica_application_state_machine.go, line 69 at r6 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

nit: don't abbreviate Deterministic

Done.

@nvanbenschoten
Copy link
Member Author

bors r+

@nvanbenschoten
Copy link
Member Author

bors r-

Didn't remove enough of the last commit.

@craig
Copy link
Contributor

craig bot commented Aug 7, 2019

Canceled

The new package provides abstractions and routines associated with the
application of committed raft entries to a replicated state machine.

This was inspired by four driving forces:
- We've been having a number of discussions on the Core team about
  making storage abstractions more clear and easier to understand
  in isolation. One commonly discussed proposal is introducing a
  `storage/replicate` package that would encapsulate the concerns of
  raft replication (e.g. log manipulation, snapshots, leader election,
  heartbeats, etc.). This `storage/apply` package will fit in nicely
  alongside a replication abstraction.
- Initial discussion on cockroachdb#38954 concluded that adding an optimization
  to acknowledge clients after their raft entries have committed but
  before they had been applied with the current code structure was
  moving in the opposite direction and making things even harder to
  understand due to the introduction of more complex state management.
- Recent instability in this area (cockroachdb#38976, cockroachdb#39064, cockroachdb#39135, cockroachdb#39203) has
  revealed that there exists a high degree of difficulty involved in testing
  any of the logic in the area of raft entry application. This has naturally
  led to testing at a distance using tools like testing hooks, which is
  frustrating and delicate. As a result, we're missing tests for thing
  like old migrations that we still need to support. We also have trouble
  writing regression tests when bugs do pop up.
- The proposed optimization in cockroachdb#17500 (comment)
  to apply committed raft entries to the Replica storage engine asynchronously
  in a separate thread than the raft processing thread will make entry
  application significantly more complex. For instance, we'll likely
  need to introduce a separate scheduler to coordinate entry application
  passes across Ranges on a node. The schedule will likely want to
  prioritize leaders over followers and embed other policies to optimize
  for total system throughput. There's a strong desire to isolate this new
  complexity and to give the logic a place to live.

The PR begins to address these concerns by formalizing the process of
applying committed raft entries. To start, this makes the process easier
to understand both in terms of the macro-level steps that are taken during
application of a batch of entries and in terms of the impact that an individual
command has on the replicated state machine. For instance, the PR helps provide
answers to all of the following questions:

- What are the stages of raft entry application?
- What is the difference between a "raft entry" and a "replicated command"?
- What can a command do besides apply its write batch to the storage engine?
- What does it mean for a successfully replicated command to be rejected during application?
- When can we acknowledge the outcome of a raft proposal?

The refactor also uncovers a large testing surface that future PRs will exploit
to write targeted unit tests. Not only can the `storage/apply` package be tested
with a mock state machine (done in this PR), but we can test Replica's
implementation of the state machine interface in isolation without needing
to touch raft at all.

Finally, the refactor paves the way for making the proposed change in cockroachdb#38954
in a much cleaner way. This is demonstrated in next commit, which is being
included here to show why certain things were designed the way they were
but will not be merged with this PR.

Release note: None
This isn't being squashed into the previous diff to keep the diffs there
more clear. The renames include:

Types:
`replicaApplier` -> `replicaStateMachine`
`cmdAppCtx`      -> `replicatedCmd`
`cmdAppCtxBuf`   -> `replicatedCmdBuf`

Files:
`replica_application_impl.go` -> `replica_application_state_machine.go`
`cmd_app_ctx.go`              -> `replica_application_cmd.go`
`cmd_app_ctx_buf.go`          -> `replica_application_cmd_buf.go`

Release note: None
@nvanbenschoten
Copy link
Member Author

bors r+

craig bot pushed a commit that referenced this pull request Aug 7, 2019
39254: storage/apply: create apply package for raft entry application r=nvanbenschoten a=nvanbenschoten

The new package provides abstractions and routines associated with the application of committed raft entries to a replicated state machine.

This was inspired by four driving forces:
- We've been having a number of discussions on the Core team about making storage abstractions more clear and easier to understand in isolation. One commonly discussed proposal is introducing a `storage/replicate` package that would encapsulate the concerns of raft replication (e.g. log manipulation, snapshots, leader election, heartbeats, etc.). This `storage/apply` package will fit in nicely alongside a replication abstraction.
- Initial discussion on #38954 concluded that adding an optimization to acknowledge clients after their raft entries have committed but before they had been applied with the current code structure was moving in the opposite direction and making things even harder to understand due to the introduction of more complex state management.
- Recent instability in this area (#38976, #39064, #39135, #39203) has revealed that there exists a high degree of difficulty involved in testing any of the logic in the area of raft entry application. This has naturally led to testing at a distance using tools like testing hooks, which is frustrating and delicate. As a result, we're missing tests for things like old migrations that we still need to support. We also have trouble writing regression tests when bugs do pop up.
- The proposed optimization in #17500 (comment) to apply committed raft entries to the Replica storage engine asynchronously in a separate thread than the raft processing thread will make entry application significantly more complex. For instance, we'll likely need to introduce a separate scheduler to coordinate entry application passes across Ranges on a node. The schedule will likely want to prioritize leaders over followers and embed other policies to optimize for total system throughput. There's a strong desire to isolate this new complexity and to give the logic a place to live.

The PR begins to address these concerns by formalizing the process of applying committed raft entries. To start, this makes the process easier to understand both in terms of the macro-level steps that are taken during application of a batch of entries and in terms of the impact that an individual command has on the replicated state machine. For instance, the PR helps provide answers to all of the following questions:

- What are the stages of raft entry application?
- What is the difference between a "raft entry" and a "replicated command"?
- What can a command do besides apply its write batch to the storage engine?
- What does it mean for a successfully replicated command to be rejected during application?
- When can we acknowledge the outcome of a raft proposal?

The refactor also uncovers a large testing surface that future PRs will exploit to write targeted unit tests. Not only can the `storage/apply` package be tested with a mock state machine (done in this PR), but we can test Replica's implementation of the state machine interface in isolation without needing to touch raft at all.

Finally, the refactor paves the way for making the proposed change in #38954 in a much cleaner way. This is demonstrated in the second commit, which is being included here to show why certain things were designed the way they were but will not be merged with this PR.

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
@craig
Copy link
Contributor

craig bot commented Aug 7, 2019

Build succeeded

@craig craig bot merged commit e2eb877 into cockroachdb:master Aug 7, 2019
Copy link
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 3 stale) (waiting on @ajwerner, @dan, @danhhz, @nvanbenschoten, and @tbg)


pkg/storage/apply/cmd.go, line 19 at r6 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

We're not introducing the term here. See isTrivial and clearTrivialReplicatedEvalResultFields.

Gotcha. My next question is then if this is a concept that needs to be exposed at the abstraction boundary? I don't have a complete picture, but it seems to me like it may be an implementation detail

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants