Skip to content

Commit

Permalink
Implement parallel execution (#6)
Browse files Browse the repository at this point in the history
* Add own error types

* Implement parallel execution

* Fix groutine capture bug

* error handling

* Fix bug and add test
  • Loading branch information
mpppk authored Sep 26, 2019
1 parent 36e33cd commit cd78350
Show file tree
Hide file tree
Showing 8 changed files with 298 additions and 40 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ lint:

.PHONY: test
test:
go test ./...
go test -race ./...

.PHONY: integration-test
integration-test: deps
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ require (
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.4.0
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7
)
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288 h1:JIqe8uIcRBHXDQVvZtHwp8
golang.org/x/oauth2 v0.0.0-20181106182150-f42d05182288/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
218 changes: 187 additions & 31 deletions messagen/internal/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package internal

import (
"errors"
"fmt"
"sync"

"golang.org/x/sync/errgroup"

"golang.org/x/xerrors"
)
Expand Down Expand Up @@ -78,12 +80,49 @@ func (d *DefinitionRepository) Generate(defType DefinitionType, initialState Sta
return "", xerrors.Errorf("failed to generate message: %w", err)
}

msgChan := make(chan Message)
errChan := make(chan error)
wg := sync.WaitGroup{}
for _, def := range defs {
msg, err := generate(def, initialState, d)
// TODO: handling recoverable error
return msg, err
wg.Add(1)
go func(def *Definition) {
defMsgChan, defErrChan, err := generate(def, initialState, d)
if err != nil {
errChan <- err
return
}
select {
case msg, ok := <-defMsgChan:
if ok {
msgChan <- msg
} else {
wg.Done()
return
}
case err, ok := <-defErrChan:
if ok {
errChan <- err
}
wg.Done()
return
}
}(def)
}

go func() {
wg.Wait()
close(msgChan)
}()

select {
case msg, ok := <-msgChan:
if !ok {
return "", xerrors.Errorf("valid message does not exist")
}
return msg, nil
case err := <-errChan:
return "", err
}
return "", xerrors.Errorf("failed to generate message. satisfied definitions are don't exist")
}

func (d *DefinitionRepository) applyTemplatePickers(templates Templates, state State) (newTemplates Templates, err error) {
Expand Down Expand Up @@ -118,50 +157,167 @@ func (d *DefinitionRepository) applyDefinitionPickers(defs Definitions, state St
return newDefinitions, nil
}

func generate(def *Definition, state State, repo *DefinitionRepository) (Message, error) {
func generate(def *Definition, state State, repo *DefinitionRepository) (chan Message, chan error, error) {
messageChan := make(chan Message)
errChan := make(chan error)
templates, err := repo.applyTemplatePickers(def.Templates, state)
if err != nil {
return "", err
return nil, nil, err
}

if len(templates) == 0 {
return "", errors.New("TODO: return recoverable error") // TODO
go func() {
templateMessageChan, templateErrChan := resolveTemplates(templates, state, repo)
select {
case msg, ok := <-templateMessageChan:
if ok {
messageChan <- msg
} else {
close(messageChan)
return
}
case err, ok := <-templateErrChan:
if ok {
errChan <- err
}
}
}()
return messageChan, errChan, nil
}

func resolveTemplates(templates Templates, state State, repo *DefinitionRepository) (chan Message, chan error) {
eg := errgroup.Group{}
messageChan := make(chan Message)
for _, defTemplate := range templates {
defTemplate := defTemplate
eg.Go(func() error {
if len(defTemplate.Depends) == 0 {
messageChan <- Message(defTemplate.Raw)
return nil
}
newState := state.Copy()
stateChan, err := resolveDefDepends(defTemplate, newState, repo)
if err != nil {
return err
}
for satisfiedState := range stateChan {
msg, err := defTemplate.Execute(satisfiedState)
if err != nil {
return err
}
messageChan <- msg
}
return nil
})
}
errChan := make(chan error)
go func() {
if err := eg.Wait(); err != nil {
errChan <- err
}
close(messageChan)
}()
return messageChan, errChan
}

defTemplate := templates[0] // FIXME
if len(defTemplate.Depends) == 0 {
return Message(defTemplate.Raw), nil
func resolveDefDepends(template *Template, state State, repo *DefinitionRepository) (chan State, error) {
stateChan := make(chan State)
if template.IsSatisfiedState(state) {
go func() {
stateChan <- state
close(stateChan)
}()
return stateChan, nil
}

for _, defType := range defTemplate.Depends {
if _, ok := state.Get(defType); ok {
continue
}
defType, _ := template.GetFirstUnsatisfiedDef(state)
pickDefStateChan, err := pickDef(defType, state, repo)
if err != nil {
return nil, err
}

if _, err := pickDef(defType, state, repo); err != nil {
return "", xerrors.Errorf("failed to pick depend definition: %w", err)
wg := sync.WaitGroup{}
wg.Add(1)
errChan := make(chan error)
go func() {
for newState := range pickDefStateChan {
satisfiedStateChan, err := resolveDefDepends(template, newState, repo)
if err != nil {
errChan <- err
return
}
wg.Add(1)
go func() {
for satisfiedState := range satisfiedStateChan {
stateChan <- satisfiedState
}
wg.Done()
}()
}
}
return defTemplate.Execute(state)
wg.Done()
}()

go func() {
panic(<-errChan) // FIXME goroutine leak
}()

go func() {
wg.Wait()
close(stateChan)
}()

return stateChan, nil
}

func pickDef(defType DefinitionType, state State, repo *DefinitionRepository) (Message, error) {
func pickDef(defType DefinitionType, state State, repo *DefinitionRepository) (chan State, error) {
candidateDefs, err := repo.pickDefinitions(defType, state)
if err != nil {
return "", xerrors.Errorf("failed to ")
return nil, xerrors.Errorf("failed to pick definitions")
}

stateChan := make(chan State)
eg := errgroup.Group{}
for _, candidateDef := range candidateDefs {
if ok, _ := candidateDef.CanBePicked(state); ok {
message, err := generate(candidateDef, state, repo)
if ok, _ := candidateDef.CanBePicked(state); !ok {
continue
}
candidateDef := candidateDef
eg.Go(func() error {
defMessageChan, defErrChan, err := generate(candidateDef, state, repo)
if err != nil {
return "", err
return err
}
state.Set(defType, message)
if _, err := state.SetByConstraints(candidateDef.Constraints); err != nil {
return "", xerrors.Errorf("failed to update state while message generating: %w", err)

if defMessageChan == nil {
return errors.New("message chan is nil")
}
return message, nil
}

if defErrChan == nil {
return errors.New("err chan is nil")
}

select {
case defMessage, ok := <-defMessageChan:
if !ok {
return nil
}
newState := state.Copy()
newState.Set(defType, defMessage)
if _, err := newState.SetByConstraints(candidateDef.Constraints); err != nil {
return xerrors.Errorf("failed to update state while message generating: %w", err)
}
stateChan <- newState
case err := <-defErrChan:
return err
}
return nil
})
}
return "", fmt.Errorf("all depend definition are not satisfied constraints: %s", defType)

go func() {
if err := eg.Wait(); err != nil {
panic(err) // FIXME
}
close(stateChan)
}()
return stateChan, nil
}
85 changes: 81 additions & 4 deletions messagen/internal/repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,87 @@ func TestDefinitionRepository_Generate(t *testing.T) {
want: "aaabbbccc",
wantErr: false,
},
{
name: "unresolvable template in template",
fields: fields{
m: definitionMap{
"Test": []*Definition{newDefinitionOrPanic(&RawDefinition{
Type: "Test",
RawTemplates: []RawTemplate{"aaa{{.NestTest}}ccc"},
})},
"NestTest": []*Definition{
newDefinitionOrPanic(&RawDefinition{
Type: "NestTest",
RawTemplates: []RawTemplate{"xxx"},
Constraints: newConstraintsOrPanic(RawConstraints{"k999": "v999"}),
}),
},
},
templatePickers: []TemplatePicker{AscendingOrderTemplatePicker},
},
args: args{
defType: "Test",
},
want: "",
wantErr: true,
},
{
name: "unresolvable template in template2",
fields: fields{
m: definitionMap{
"Test": []*Definition{newDefinitionOrPanic(&RawDefinition{
Type: "Test",
RawTemplates: []RawTemplate{"aaa{{.NestTest}}ccc"},
})},
"NestTestxxxxxxx": []*Definition{
newDefinitionOrPanic(&RawDefinition{
Type: "NestTest",
RawTemplates: []RawTemplate{"xxx"},
Constraints: newConstraintsOrPanic(RawConstraints{"k999": "v999"}),
}),
},
},
templatePickers: []TemplatePicker{AscendingOrderTemplatePicker},
},
args: args{
defType: "Test",
},
want: "",
wantErr: true,
},
{
name: "unresolvable template in template2",
fields: fields{
m: definitionMap{
"Test": []*Definition{newDefinitionOrPanic(&RawDefinition{
Type: "Test",
RawTemplates: []RawTemplate{"aaa{{.NestTest}}ccc"},
})},
"NestTest": []*Definition{
newDefinitionOrPanic(&RawDefinition{
Type: "NestTest",
RawTemplates: []RawTemplate{"{{.NestTest2}}"},
}),
newDefinitionOrPanic(&RawDefinition{
Type: "NestTest",
RawTemplates: []RawTemplate{"bbb"},
}),
},
"NestTest2": []*Definition{
newDefinitionOrPanic(&RawDefinition{
Type: "NestTest2",
RawTemplates: []RawTemplate{"{{.NoExistDef}}"},
}),
},
},
templatePickers: []TemplatePicker{AscendingOrderTemplatePicker},
},
args: args{
defType: "Test",
},
want: "aaabbbccc",
wantErr: false,
},
{
name: "use ! operator",
fields: fields{
Expand Down Expand Up @@ -230,10 +311,6 @@ func TestDefinitionRepository_Generate(t *testing.T) {
},
}
for _, tt := range tests {
if tt.name != "two variable in template" {
continue
}

t.Run(tt.name, func(t *testing.T) {
d := &DefinitionRepository{
m: tt.fields.m,
Expand Down
8 changes: 8 additions & 0 deletions messagen/internal/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,11 @@ func (s State) Get(defType DefinitionType) (Message, bool) {
v, ok := s[string(defType)]
return v, ok
}

func (s State) Copy() State {
ns := State{}
for key, value := range s {
ns[key] = value
}
return ns
}
Loading

0 comments on commit cd78350

Please sign in to comment.