Skip to content

Commit

Permalink
Fix shard context error state check (#2612)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Mar 16, 2022
1 parent c8960b5 commit 28aebc2
Showing 1 changed file with 33 additions and 41 deletions.
74 changes: 33 additions & 41 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,13 +549,8 @@ func (s *ContextImpl) UpdateHandoverNamespaces(namespaces []*namespace.Namespace
func (s *ContextImpl) AddTasks(
request *persistence.AddHistoryTasksRequest,
) error {
if err := s.errorByState(); err != nil {
return err
}

namespaceID := namespace.ID(request.NamespaceID)

// do not try to get namespace cache within shard lock
namespaceID := namespace.ID(request.NamespaceID)
namespaceEntry, err := s.GetNamespaceRegistry().GetNamespaceByID(namespaceID)
if err != nil {
return err
Expand All @@ -564,20 +559,19 @@ func (s *ContextImpl) AddTasks(
s.wLock()
defer s.wUnlock()

if err := s.errorByStateLocked(); err != nil {
return err
}

return s.addTasksLocked(request, namespaceEntry)
}

func (s *ContextImpl) CreateWorkflowExecution(
request *persistence.CreateWorkflowExecutionRequest,
) (*persistence.CreateWorkflowExecutionResponse, error) {
if err := s.errorByState(); err != nil {
return nil, err
}

// do not try to get namespace cache within shard lock
namespaceID := namespace.ID(request.NewWorkflowSnapshot.ExecutionInfo.NamespaceId)
workflowID := request.NewWorkflowSnapshot.ExecutionInfo.WorkflowId

// do not try to get namespace cache within shard lock
namespaceEntry, err := s.GetNamespaceRegistry().GetNamespaceByID(namespaceID)
if err != nil {
return nil, err
Expand All @@ -586,6 +580,10 @@ func (s *ContextImpl) CreateWorkflowExecution(
s.wLock()
defer s.wUnlock()

if err := s.errorByStateLocked(); err != nil {
return nil, err
}

transferMaxReadLevel := int64(0)
if err := s.allocateTaskIDsLocked(
namespaceEntry,
Expand All @@ -608,14 +606,9 @@ func (s *ContextImpl) CreateWorkflowExecution(
func (s *ContextImpl) UpdateWorkflowExecution(
request *persistence.UpdateWorkflowExecutionRequest,
) (*persistence.UpdateWorkflowExecutionResponse, error) {
if err := s.errorByState(); err != nil {
return nil, err
}

// do not try to get namespace cache within shard lock
namespaceID := namespace.ID(request.UpdateWorkflowMutation.ExecutionInfo.NamespaceId)
workflowID := request.UpdateWorkflowMutation.ExecutionInfo.WorkflowId

// do not try to get namespace cache within shard lock
namespaceEntry, err := s.GetNamespaceRegistry().GetNamespaceByID(namespaceID)
if err != nil {
return nil, err
Expand All @@ -624,6 +617,10 @@ func (s *ContextImpl) UpdateWorkflowExecution(
s.wLock()
defer s.wUnlock()

if err := s.errorByStateLocked(); err != nil {
return nil, err
}

transferMaxReadLevel := int64(0)
if err := s.allocateTaskIDsLocked(
namespaceEntry,
Expand Down Expand Up @@ -656,14 +653,9 @@ func (s *ContextImpl) UpdateWorkflowExecution(
func (s *ContextImpl) ConflictResolveWorkflowExecution(
request *persistence.ConflictResolveWorkflowExecutionRequest,
) (*persistence.ConflictResolveWorkflowExecutionResponse, error) {
if err := s.errorByState(); err != nil {
return nil, err
}

// do not try to get namespace cache within shard lock
namespaceID := namespace.ID(request.ResetWorkflowSnapshot.ExecutionInfo.NamespaceId)
workflowID := request.ResetWorkflowSnapshot.ExecutionInfo.WorkflowId

// do not try to get namespace cache within shard lock
namespaceEntry, err := s.GetNamespaceRegistry().GetNamespaceByID(namespaceID)
if err != nil {
return nil, err
Expand All @@ -672,6 +664,10 @@ func (s *ContextImpl) ConflictResolveWorkflowExecution(
s.wLock()
defer s.wUnlock()

if err := s.errorByStateLocked(); err != nil {
return nil, err
}

transferMaxReadLevel := int64(0)
if request.CurrentWorkflowMutation != nil {
if err := s.allocateTaskIDsLocked(
Expand Down Expand Up @@ -714,14 +710,9 @@ func (s *ContextImpl) ConflictResolveWorkflowExecution(
func (s *ContextImpl) SetWorkflowExecution(
request *persistence.SetWorkflowExecutionRequest,
) (*persistence.SetWorkflowExecutionResponse, error) {
if err := s.errorByState(); err != nil {
return nil, err
}

// do not try to get namespace cache within shard lock
namespaceID := namespace.ID(request.SetWorkflowSnapshot.ExecutionInfo.NamespaceId)
workflowID := request.SetWorkflowSnapshot.ExecutionInfo.WorkflowId

// do not try to get namespace cache within shard lock
namespaceEntry, err := s.GetNamespaceRegistry().GetNamespaceByID(namespaceID)
if err != nil {
return nil, err
Expand All @@ -730,6 +721,10 @@ func (s *ContextImpl) SetWorkflowExecution(
s.wLock()
defer s.wUnlock()

if err := s.errorByStateLocked(); err != nil {
return nil, err
}

transferMaxReadLevel := int64(0)
if err := s.allocateTaskIDsLocked(
namespaceEntry,
Expand Down Expand Up @@ -777,9 +772,12 @@ func (s *ContextImpl) AppendHistoryEvents(
namespaceID namespace.ID,
execution commonpb.WorkflowExecution,
) (int, error) {
if err := s.errorByState(); err != nil {
s.rLock()
if err := s.errorByStateLocked(); err != nil {
s.rUnlock()
return 0, err
}
s.rUnlock()

request.ShardID = s.shardID

Expand Down Expand Up @@ -830,10 +828,6 @@ func (s *ContextImpl) DeleteWorkflowExecution(
// The history branch won't be accessible (because mutable state is deleted) and special garbage collection workflow will delete it eventually.
// Step 4 shouldn't be done earlier because if this func fails after it, workflow execution will be accessible but won't have history (inconsistent state).

if err := s.errorByState(); err != nil {
return err
}

// Do not get namespace cache within shard lock.
namespaceEntry, err := s.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(key.NamespaceID))
deleteVisibilityRecord := true
Expand All @@ -850,6 +844,10 @@ func (s *ContextImpl) DeleteWorkflowExecution(
s.wLock()
defer s.wUnlock()

if err := s.errorByStateLocked(); err != nil {
return err
}

// Step 1. Delete visibility.
if deleteVisibilityRecord {
addTasksRequest := &persistence.AddHistoryTasksRequest{
Expand Down Expand Up @@ -948,12 +946,6 @@ func (s *ContextImpl) getRangeIDLocked() int64 {
return s.shardInfo.GetRangeId()
}

func (s *ContextImpl) errorByState() error {
s.rLock()
defer s.rUnlock()
return s.errorByStateLocked()
}

func (s *ContextImpl) errorByStateLocked() error {
switch s.state {
case contextStateInitialized, contextStateAcquiring:
Expand Down

0 comments on commit 28aebc2

Please sign in to comment.