Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Saga Recovery #27

Merged
merged 5 commits into from
Jun 16, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions saga/inMemorySagaLog.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,45 +27,61 @@ func MakeInMemorySaga() Saga {
}

func (log *inMemorySagaLog) LogMessage(msg sagaMessage) error {
fmt.Println(fmt.Sprintf("Saga %s: %s %s", msg.sagaId, msg.msgType.String(), msg.taskId))

log.mutex.Lock()
defer log.mutex.Unlock()

fmt.Println(fmt.Sprintf("Saga %s: %s %s", msg.sagaId, msg.msgType.String(), msg.taskId))
sagaId := msg.sagaId
var err error

log.mutex.Lock()
msgs, ok := log.sagas[sagaId]
if !ok {
return errors.New(fmt.Sprintf("Saga: %s is not Started yet.", msg.sagaId))
}

log.sagas[sagaId] = append(msgs, msg)
log.mutex.Unlock()
return err
return nil
}

func (log *inMemorySagaLog) StartSaga(sagaId string, job []byte) error {

log.mutex.Lock()
defer log.mutex.Unlock()

fmt.Println(fmt.Sprintf("Start Saga %s", sagaId))

startMsg := MakeStartSagaMessage(sagaId, job)
log.sagas[sagaId] = []sagaMessage{startMsg}

log.mutex.Unlock()

return nil
}

func (log *inMemorySagaLog) GetMessages(sagaId string) ([]sagaMessage, error) {

log.mutex.RLock()
defer log.mutex.RUnlock()

msgs, ok := log.sagas[sagaId]
log.mutex.RUnlock()

if ok {
return msgs, nil
} else {
return nil, nil
}
}

/*
* Returns all Sagas Started since this InMemory Saga was created
*/
func (log *inMemorySagaLog) GetActiveSagas() ([]string, error) {
log.mutex.RLock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just defer the unlock after acquiring it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer has a pretty high performance cost, given how simple this code is i just chose to explicitly place it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cost is fairly low, if this isn't a hot spot I'd suggest we follow the idiom.

If this is a hot spot, that sounds like performance optimization. Does it make sense to use the syntax for performance optimizations we discussed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(BTW, just googled an found this, golang/go#6980 , which shows they know about it, and the cost is ~70ns)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure just switched to lock, defer unlock since its just test code

defer log.mutex.RUnlock()

keys := make([]string, 0, len(log.sagas))

for key, _ := range log.sagas {
keys = append(keys, key)
}

return keys, nil
}
82 changes: 45 additions & 37 deletions saga/saga.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,5 @@
package saga

type SagaRecoveryType int

/*
* Saga Recovery Types define how to interpret SagaState in RecoveryMode.
*
* ForwardRecovery: all tasks in the saga must be executed at least once.
* tasks MUST BE idempotent
*
* RollbackRecovery: if Saga is Aborted or in unsafe state, compensating
* tasks for all started tasks need to be executed.
* compensating tasks MUST BE idempotent.
*/
const (
BackwardRecovery SagaRecoveryType = iota
ForwardRecovery
)

/*
* Saga Object which provides all Saga Functionality
* Implementations of SagaLog should provide a factory method
Expand Down Expand Up @@ -56,26 +39,6 @@ func (s Saga) StartSaga(sagaId string, job []byte) (*SagaState, error) {
return state, nil
}

/*
* logs the specified message durably to the SagaLog & updates internal state if its a valid state transition
*/
func (s Saga) logMessage(state *SagaState, msg sagaMessage) (*SagaState, error) {

//verify that the applied message results in a valid state
newState, err := updateSagaState(state, msg)
if err != nil {
return nil, err
}

//try durably storing the message
err = s.log.LogMessage(msg)
if err != nil {
return nil, err
}

return newState, nil
}

/*
* Log an End Saga Message to the log, returns updated SagaState
* Returns the resulting SagaState or an error if it fails
Expand Down Expand Up @@ -148,3 +111,48 @@ func (s Saga) StartCompensatingTask(state *SagaState, taskId string, data []byte
func (s Saga) EndCompensatingTask(state *SagaState, taskId string, results []byte) (*SagaState, error) {
return s.logMessage(state, MakeEndCompTaskMessage(state.sagaId, taskId, results))
}

/*
* Should be called at Saga Creation time.
* Returns a Slice of In Progress SagaIds
*/
func (s Saga) Startup() ([]string, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will Startup ever do anything other than just return GetActiveSagas? Maybe this should just be GetActiveSagas?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I named it Startup because you suggested calling it Startup when we were in New York, since you should always call this when creating a Saga

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I'm fine with Startup, and also acknowledge that I am not always right, so if there's something you think is better, I'm sorry for having led you astray.

WDYT is best going forward?

(Feel free to just do what you think is best and then merge, don't need to consider this blocking)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like Startup! you shouldn't be calling this method as part of normal execution. Its only meant for Recovery mode which is at startup time!


ids, err := s.log.GetActiveSagas()
if err != nil {
return nil, err
}

return ids, nil
}

/*
* Recovers SagaState by reading all logged messages from the log.
* Utilizes the specified recoveryType to determine if Saga needs to be
* Aborted or can proceed safely.
*
* Returns the current SagaState
*/
func (s Saga) RecoverSagaState(sagaId string, recoveryType SagaRecoveryType) (*SagaState, error) {
return recoverState(sagaId, s, recoveryType)
}

/*
* logs the specified message durably to the SagaLog & updates internal state if its a valid state transition
*/
func (s Saga) logMessage(state *SagaState, msg sagaMessage) (*SagaState, error) {

//verify that the applied message results in a valid state
newState, err := updateSagaState(state, msg)
if err != nil {
return nil, err
}

//try durably storing the message
err = s.log.LogMessage(msg)
if err != nil {
return nil, err
}

return newState, nil
}
101 changes: 101 additions & 0 deletions saga/sagaRecovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package saga

import (
"fmt"
)

type SagaRecoveryType int

/*
* Saga Recovery Types define how to interpret SagaState in RecoveryMode.
*
* ForwardRecovery: all tasks in the saga must be executed at least once.
* tasks MUST BE idempotent
*
* RollbackRecovery: if Saga is Aborted or in unsafe state, compensating
* tasks for all started tasks need to be executed.
* compensating tasks MUST BE idempotent.
*/
const (
RollbackRecovery SagaRecoveryType = iota
ForwardRecovery
)

/*
* Recovers SagaState from SagaLog messages
*/
func recoverState(sagaId string, saga Saga, recoveryType SagaRecoveryType) (*SagaState, error) {

// Get Logged Messages For this Saga from the Log.
msgs, err := saga.log.GetMessages(sagaId)
if err != nil {
return nil, err
}

if msgs == nil || len(msgs) == 0 {
return nil, nil
}

// Reconstruct Saga State from Logged Messages
startMsg := msgs[0]
if startMsg.msgType != StartSaga {
return nil, fmt.Errorf("InvalidMessages: first message must be StartSaga")
}

state, err := makeSagaState(sagaId, startMsg.data)
if err != nil {
return nil, err
}

for _, msg := range msgs {
state, err = updateSagaState(state, msg)
if err != nil {
return nil, err
}
}

// Check if we can safely proceed forward based on recovery method
// RollbackRecovery must check if in a SafeState,
// ForwardRecovery can always make progress
switch recoveryType {

case RollbackRecovery:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand why we're modifying SagaState here, that's more than just recovery. Also, how does the caller know if it should call compensating tasks due to an abort here (if that's the intent), and what is the expected action for a logged abort vs one we've applied right here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in Sagas, RollbackRecovery means we apply compensating actions when it fails. When this method is called, the scheduler has failed so if there is no start/end task pairing a saga is in an unsafe state. This is because tasks do not have to be idempotent in RollbackRecovery mode. (This is all in the talk I linked / slides) So we have to abort if we aren't in safe state, because its not safe to just replay a task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So after recovery runs, users should check if a saga has been aborted, and then start applying compensating actions that need to be applied. If they try to continue applying tasks to an aborted Saga, SagaState update method will catch an invalid transition and not apply it and throw an error.


// if Saga is not in a safe state we must abort the saga
// And compensating tasks should start
if !isSagaInSafeState(state) {
state, err = saga.AbortSaga(state)
if err != nil {
return nil, err
}
}

case ForwardRecovery:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you omitting the TODO here because that would mean having to create a new ticket? Would it be ok to attribute this to an existing/task story?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is nothing really to do on ForwardRecovery, you are always in a safe state. I can add a comment, explaining this, since Tasks have to be Idempotent.

}

return state, nil
}

/*
* Returns true if saga is in a safe state, i.e. execution can pick up where
* it left off. This is only used in RollbackRecovery
*
* A Saga is in a Safe State if all StartedTasks also have EndTask Messages
* A Saga is also in a Safe State if the Saga has been aborted and compensating
* actions have started to be applied.
*/
func isSagaInSafeState(state *SagaState) bool {

if state.IsSagaAborted() {
return true
}

for taskId, _ := range state.taskState {
if state.IsTaskStarted(taskId) && !state.IsTaskCompleted(taskId) {
return false
}
}

return true
}
Loading