This repository has been archived by the owner on Apr 11, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 8
/
topology.go
124 lines (111 loc) · 2.88 KB
/
topology.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package testlab
import (
"fmt"
capi "github.com/hashicorp/consul/api"
napi "github.com/hashicorp/nomad/api"
"github.com/libp2p/testlab/testlab/node"
"github.com/libp2p/testlab/utils"
)
// Deployment is a pair of a Node and a Quantity of that node to schedule in the
// cluster.
type Deployment struct {
Name string
Plugin string
Options utils.NodeOptions
Quantity int
Dependencies []string
}
func (d *Deployment) TaskGroup() (*napi.TaskGroup, node.PostDeployFunc, error) {
group := napi.NewTaskGroup(d.Name, d.Quantity)
group.Count = &d.Quantity
plugin, err := node.GetPlugin(d.Plugin)
if err != nil {
return nil, nil, err
}
task, err := plugin.Task(d.Options)
if err != nil {
return nil, nil, err
}
group.AddTask(task)
postDeploy := func(c *capi.Client) error {
return plugin.PostDeploy(c, d.Options)
}
return group, postDeploy, nil
}
type TopologyOptions struct {
Region string
Priority int
Datacenters []string
}
type Topology struct {
Options *TopologyOptions
// Name will be translated into a nomad job
Name string
// Deployments details the different node types to schedule on the nomad
// cluster.
Deployments []*Deployment
}
func (t *Topology) Phases() ([][]*Deployment, error) {
var phases [][]*Deployment
scheduled := make(map[string]struct{})
for {
var nextPhase []*Deployment
numScheduled := len(scheduled)
if numScheduled == len(t.Deployments) {
break
}
DeploymentLoop:
for _, deployment := range t.Deployments {
if _, ok := scheduled[deployment.Name]; ok {
continue
}
for _, dep := range deployment.Dependencies {
if _, ok := scheduled[dep]; !ok {
continue DeploymentLoop
}
}
nextPhase = append(nextPhase, deployment)
}
for _, dep := range nextPhase {
scheduled[dep.Name] = struct{}{}
}
if numScheduled == len(scheduled) {
return nil, fmt.Errorf("could not resolve dependencies")
}
phases = append(phases, nextPhase)
}
return phases, nil
}
func (t *Topology) Jobs() ([]*napi.Job, [][]node.PostDeployFunc, error) {
opts := t.Options
region := opts.Region
if opts.Region == "" {
region = "global"
}
if opts.Priority == 0 {
opts.Priority = 50
}
phases, err := t.Phases()
if err != nil {
return nil, nil, err
}
jobs := make([]*napi.Job, len(phases))
postDeployFuncs := make([][]node.PostDeployFunc, len(phases))
for i, phase := range phases {
phasePostDeployFuncs := make([]node.PostDeployFunc, len(phase))
name := fmt.Sprintf("%s_phase_%d", t.Name, i)
job := napi.NewServiceJob(name, name, region, opts.Priority)
job.Datacenters = opts.Datacenters
for e, deployment := range phase {
group, postDeploy, err := deployment.TaskGroup()
if err != nil {
return nil, nil, err
}
job.AddTaskGroup(group)
phasePostDeployFuncs[e] = postDeploy
}
jobs[i] = job
postDeployFuncs[i] = phasePostDeployFuncs
}
return jobs, postDeployFuncs, nil
}