Skip to content

Commit

Permalink
Updated package documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
jmsadair committed Aug 19, 2023
1 parent f6b4049 commit 01676e5
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 54 deletions.
73 changes: 21 additions & 52 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,6 @@ the StateMachine interface.
// The current count.
count int
// The last index applied to the state machine. Used for snapshots.
lastIndex uint64
// The term associated with the last applied index. Used for snapshots.
lastTerm uint64
// Makes the state machine concurrent safe.
mu sync.Mutex
}
Expand All @@ -52,15 +46,11 @@ the StateMachine interface.
sm.mu.Lock()
defer sm.mu.Unlock()
// If the operation is read-only, just return the value of the counter.
// You might need to actually decode the operation for a more complex state machine.
if operation.IsReadOnly {
return Result{Value: sm.count, Error: nil}
}
// Save the term and index of the last replicated entry for snapshotting.
sm.lastIndex = operation.LogIndex
sm.lastTerm = operation.LogTerm
// If the operation is read-only, just return the value of the counter.
// You might need to actually decode the operation for a more complex state machine.
if operation.IsReadOnly {
return Result{Value: sm.count, Error: nil}
}
// Decode the operation.
var decodedOp int
Expand All @@ -82,25 +72,18 @@ the StateMachine interface.
return Result{Value: sm.count, Error: nil}
}
func (sm *StateMachine) Snapshot() (raft.Snapshot, error) {
func (sm *StateMachine) Snapshot() ([]byte, error) {
sm.mu.Lock()
defer sm.mu.Unlock()
// Encode the state of the state machine.
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.Encode(sm.count); err != nil {
return raft.Snapshot{}, err
}
// Create the snapshot.
snapshot := raft.Snapshot{
LastIncludedIndex: sm.lastIndex,
LastIncludedTerm: sm.lastTerm,
Data: buf.Bytes(),
return nil, err
}
return snapshot, nil
return buf.bytes(), nil
}
func (sm *StateMachine) Restore(snapshot *raft.Snapshot) error {
Expand All @@ -118,11 +101,6 @@ the StateMachine interface.
// Restore the state of the state machine.
sm.count = count
// Update the last seen index and term since the state has been
// restored up to this point.
sm.lastIndex = snapshot.LastIncludedIndex
sm.lastTerm = snapshot.LastIncludedTerm
return nil
}
Expand Down Expand Up @@ -155,18 +133,13 @@ the file.
storage := raft.NewStorage("raft-1-storage.bin")
snapshotStorage := raft.NewSnapshotStorage("raft-1-snapshots.bin")
Now, create the channel that responses from the state machine will be relayed over. Note that, when the server is started, it is important that
this channel is always being monitored. Otherwise, the internal Raft implementation will become blocked.
responseCh := make(chan raft.OperationResponse)
Next, create an instance of the StateMachine implementation.
fsm := new(StateMachine)
A Raft instance may now be created as below.
raft, err := raft.NewRaft("raft-1", peers, log, storage, snapshotStorage, fsm, responseCh)
raft, err := raft.NewRaft("raft-1", peers, log, storage, snapshotStorage, fsm)
Note that you can also specify options such as election timeout and lease duration when creating a new
Raft instance. For example, the below code will create a Raft instance that uses 500 milliseconds as its
Expand All @@ -189,22 +162,19 @@ Here is how to start the Server instance.
panic(err)
}
// Start a go routine in the background to intercept responses from the state machine.
go func() {
for response := range responseCh {
// Handle responses...
}
}()
// Start Raft.
close(readyCh)
Finally, here is how to submit an operation to the Server instance once it is started. A normal operation may be submitted or,
alternatively, a read-only operation may be submitted. Generally, read-only operations will offer much better performance than standard
operations due to the fact that they are not added to the log or replicated. However, read-only operations are implemented
using leases. This means that they are not safe - a read-only operation may read stale or incorrect data under certian
circumstances. If your application demands strong consistency, read-only operations should not be used. Results from
either type of operation will be written to the response channel provided to the server.
circumstances. If your application demands strong consistency, read-only operations should not be used.
When either type of operation is submitted, a future for an operation response is returned. This future can be awaited
to get the result of the operation. Note that the response to the operation may contain an error. It is important to
ensure that the error is nil before consuming the contents of the response because, if the error is not nil, the
contents of the response are not valid.
// Encode an operation.
var buffer bytes.Buffer
Expand All @@ -215,15 +185,14 @@ either type of operation will be written to the response channel provided to the
}
op := buffer.Bytes()
// Sumbit a standard operation that is replicated.
// Note that if this server is not the leader it will reject the operation.
logIndex, logTerm, err := server.SubmitOperation(op)
// Sumbit a standard operation with a 200 ms timeout.
future := server.SubmitOperation(op)
response := future.Await()
// Submit a read-only operation.
// Like standard operations, the server will reject the operation if it is not the leader.
// Additionally, it will reject the operation if its lease has expired. In this case, since
// the value of the counter is returned on a read-only operation, an empty operation is submitted.
err := server.SubmitReadOnlyOperation([]byte{})
// In this case, since the value of the counter is returned on a read-only operation, an empty operation is submitted.
future := server.SubmitReadOnlyOperation([]byte{}, 200 * time.Millisecond)
response := future.Await()
Be warned that this is a highly simplified example that demonstrates how raft may be used and some of its features.
This implementation leaves out many details that would typically be associated with a system that uses Raft such
Expand Down
4 changes: 2 additions & 2 deletions operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import "time"
// raft times out. It encapsulates the operation associated with the timeout.
type OperationTimeoutError struct {
// The operation that was submitted to raft.
operation []byte
Operation []byte
}

// This function implements the error interface for the OperationTimeoutError type.
Expand Down Expand Up @@ -85,7 +85,7 @@ func (o *OperationResponseFuture) Await() OperationResponse {
case response := <-o.responseCh:
return response
case <-time.After(o.timeout):
return OperationResponse{Err: OperationTimeoutError{operation: o.operation}}
return OperationResponse{Err: OperationTimeoutError{Operation: o.operation}}
}
}
}

0 comments on commit 01676e5

Please sign in to comment.