forked from flynn/flynn
-
Notifications
You must be signed in to change notification settings - Fork 0
/
expose.go
160 lines (135 loc) · 3.25 KB
/
expose.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package main
import (
"flag"
"fmt"
"os"
"os/exec"
"strings"
"sync"
"syscall"
"time"
)
type dependencySpec struct {
service, targetVar string
}
// A flag.Value that is a list of dependencies
type depSlice []dependencySpec
func (s *depSlice) String() string {
return "ENV:service-name, -d=service-name"
}
func (s *depSlice) Set(value string) error {
parts := strings.SplitN(value, ":", 2)
var spec dependencySpec
if len(parts) == 1 {
spec = dependencySpec{service: parts[0], targetVar: parts[0]}
} else {
spec = dependencySpec{service: parts[1], targetVar: parts[0]}
}
*s = append(*s, spec)
return nil
}
type expose struct {
clientCmd
dependencies *depSlice
}
func (cmd *expose) Name() string {
return "expose"
}
func (cmd *expose) DefineFlags(fs *flag.FlagSet) {
var t depSlice
cmd.dependencies = &t
fs.Var(cmd.dependencies, "d", "services to depend on")
}
type dependencyState struct {
deps map[string]string
}
// Given a request for a set of services needed, trigger updates
// when the request can be fullfilled, when the services chosen
// change, or when it can no longer be fullfilled.
func (cmd *expose) requireDependencies(services *depSlice) (chan dependencyState, error) {
deps := make(map[string]string)
status := make(chan dependencyState)
l := new(sync.Mutex)
for _, dependency := range *services {
set, err := cmd.client.NewServiceSet(dependency.service)
if err != nil {
return nil, err
}
go func(targetVar string) {
defer set.Close()
for leader := range set.Leaders() {
l.Lock()
if leader != nil {
deps[targetVar] = leader.Addr
if len(deps) == len(*services) {
// All services are available, make a copy to
// pass through the channel.
depState := dependencyState{deps: make(map[string]string, len(deps))}
for k, v := range deps {
depState.deps[k] = v
}
status <- depState
}
} else if deps[targetVar] != "" {
delete(deps, targetVar)
status <- dependencyState{deps: nil}
}
l.Unlock()
}
}(dependency.targetVar)
}
return status, nil
}
func (cmd *expose) Run(fs *flag.FlagSet) {
cmd.InitClient(false)
args := fs.Args()
if len(args) < 1 {
fmt.Println("no command to exec")
os.Exit(1)
return
}
serviceVarsUpdate, err := cmd.requireDependencies(cmd.dependencies)
if err != nil {
fmt.Println(err)
os.Exit(2)
}
var exitCh chan uint
var proc *exec.Cmd
for {
select {
case state := <-serviceVarsUpdate:
if state.deps == nil && exitCh != nil {
proc.Process.Signal(syscall.SIGTERM)
select {
case <-exitCh:
break
case <-time.After(5 * time.Second):
if err := proc.Process.Kill(); err != nil {
panic("failed to kill")
}
<-exitCh
}
proc = nil
exitCh = nil
} else if state.deps != nil && exitCh == nil {
proc, exitCh = startCommand(args, state.deps)
}
case exitStatus := <-exitCh:
os.Exit(int(exitStatus))
}
}
}
func startCommand(args []string, env map[string]string) (*exec.Cmd, chan uint) {
c := exec.Command(args[0], args[1:]...)
c.Env = os.Environ()
for key, value := range env {
envitem := fmt.Sprintf("%s=%s", key, value)
c.Env = append(c.Env, envitem)
}
attachCmd(c)
err := c.Start()
if err != nil {
panic(err)
}
return c, exitStatusCh(c)
}