diff --git a/doc.go b/doc.go index aee8e5d..c567bfb 100644 --- a/doc.go +++ b/doc.go @@ -38,12 +38,6 @@ the StateMachine interface. // The current count. count int - // The last index applied to the state machine. Used for snapshots. - lastIndex uint64 - - // The term associated with the last applied index. Used for snapshots. - lastTerm uint64 - // Makes the state machine concurrent safe. mu sync.Mutex } @@ -52,15 +46,11 @@ the StateMachine interface. sm.mu.Lock() defer sm.mu.Unlock() - // If the operation is read-only, just return the value of the counter. - // You might need to actually decode the operation for a more complex state machine. - if operation.IsReadOnly { - return Result{Value: sm.count, Error: nil} - } - - // Save the term and index of the last replicated entry for snapshotting. - sm.lastIndex = operation.LogIndex - sm.lastTerm = operation.LogTerm + // If the operation is read-only, just return the value of the counter. + // You might need to actually decode the operation for a more complex state machine. + if operation.IsReadOnly { + return Result{Value: sm.count, Error: nil} + } // Decode the operation. var decodedOp int @@ -82,7 +72,7 @@ the StateMachine interface. return Result{Value: sm.count, Error: nil} } - func (sm *StateMachine) Snapshot() (raft.Snapshot, error) { + func (sm *StateMachine) Snapshot() ([]byte, error) { sm.mu.Lock() defer sm.mu.Unlock() @@ -90,17 +80,10 @@ the StateMachine interface. var buf bytes.Buffer enc := gob.NewEncoder(&buf) if err := enc.Encode(sm.count); err != nil { - return raft.Snapshot{}, err - } - - // Create the snapshot. - snapshot := raft.Snapshot{ - LastIncludedIndex: sm.lastIndex, - LastIncludedTerm: sm.lastTerm, - Data: buf.Bytes(), + return nil, err } - return snapshot, nil + return buf.bytes(), nil } func (sm *StateMachine) Restore(snapshot *raft.Snapshot) error { @@ -118,11 +101,6 @@ the StateMachine interface. // Restore the state of the state machine. sm.count = count - // Update the last seen index and term since the state has been - // restored up to this point. - sm.lastIndex = snapshot.LastIncludedIndex - sm.lastTerm = snapshot.LastIncludedTerm - return nil } @@ -155,18 +133,13 @@ the file. storage := raft.NewStorage("raft-1-storage.bin") snapshotStorage := raft.NewSnapshotStorage("raft-1-snapshots.bin") -Now, create the channel that responses from the state machine will be relayed over. Note that, when the server is started, it is important that -this channel is always being monitored. Otherwise, the internal Raft implementation will become blocked. - - responseCh := make(chan raft.OperationResponse) - Next, create an instance of the StateMachine implementation. fsm := new(StateMachine) A Raft instance may now be created as below. - raft, err := raft.NewRaft("raft-1", peers, log, storage, snapshotStorage, fsm, responseCh) + raft, err := raft.NewRaft("raft-1", peers, log, storage, snapshotStorage, fsm) Note that you can also specify options such as election timeout and lease duration when creating a new Raft instance. For example, the below code will create a Raft instance that uses 500 milliseconds as its @@ -189,13 +162,6 @@ Here is how to start the Server instance. panic(err) } - // Start a go routine in the background to intercept responses from the state machine. - go func() { - for response := range responseCh { - // Handle responses... - } - }() - // Start Raft. close(readyCh) @@ -203,8 +169,12 @@ Finally, here is how to submit an operation to the Server instance once it is st alternatively, a read-only operation may be submitted. Generally, read-only operations will offer much better performance than standard operations due to the fact that they are not added to the log or replicated. However, read-only operations are implemented using leases. This means that they are not safe - a read-only operation may read stale or incorrect data under certian -circumstances. If your application demands strong consistency, read-only operations should not be used. Results from -either type of operation will be written to the response channel provided to the server. +circumstances. If your application demands strong consistency, read-only operations should not be used. + +When either type of operation is submitted, a future for an operation response is returned. This future can be awaited +to get the result of the operation. Note that the response to the operation may contain an error. It is important to +ensure that the error is nil before consuming the contents of the response because, if the error is not nil, the +contents of the response are not valid. // Encode an operation. var buffer bytes.Buffer @@ -215,15 +185,14 @@ either type of operation will be written to the response channel provided to the } op := buffer.Bytes() - // Sumbit a standard operation that is replicated. - // Note that if this server is not the leader it will reject the operation. - logIndex, logTerm, err := server.SubmitOperation(op) + // Sumbit a standard operation with a 200 ms timeout. + future := server.SubmitOperation(op) + response := future.Await() // Submit a read-only operation. - // Like standard operations, the server will reject the operation if it is not the leader. - // Additionally, it will reject the operation if its lease has expired. In this case, since - // the value of the counter is returned on a read-only operation, an empty operation is submitted. - err := server.SubmitReadOnlyOperation([]byte{}) + // In this case, since the value of the counter is returned on a read-only operation, an empty operation is submitted. + future := server.SubmitReadOnlyOperation([]byte{}, 200 * time.Millisecond) + response := future.Await() Be warned that this is a highly simplified example that demonstrates how raft may be used and some of its features. This implementation leaves out many details that would typically be associated with a system that uses Raft such diff --git a/operation.go b/operation.go index 2377b3c..45c74cb 100644 --- a/operation.go +++ b/operation.go @@ -6,7 +6,7 @@ import "time" // raft times out. It encapsulates the operation associated with the timeout. type OperationTimeoutError struct { // The operation that was submitted to raft. - operation []byte + Operation []byte } // This function implements the error interface for the OperationTimeoutError type. @@ -85,7 +85,7 @@ func (o *OperationResponseFuture) Await() OperationResponse { case response := <-o.responseCh: return response case <-time.After(o.timeout): - return OperationResponse{Err: OperationTimeoutError{operation: o.operation}} + return OperationResponse{Err: OperationTimeoutError{Operation: o.operation}} } } }