Skip to content

Commit

Permalink
use semaphore model
Browse files Browse the repository at this point in the history
  • Loading branch information
kelwang committed Jun 26, 2018
1 parent 3457c98 commit 4ec7999
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 25 deletions.
8 changes: 7 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

79 changes: 55 additions & 24 deletions plugins/inputs/jenkins/jenkins.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type Jenkins struct {

NodeExclude []string `toml:"node_exclude"`
nodeFilter filter.Filter

semaphore chan struct{}
}

type byBuildNumber []gojenkins.JobBuild
Expand Down Expand Up @@ -159,6 +161,8 @@ func (j *Jenkins) newInstance(client *http.Client) error {
j.MaxSubJobPerLayer = 10
}

j.semaphore = make(chan struct{}, j.MaxConnections)

return nil
}

Expand All @@ -170,7 +174,7 @@ func (j *Jenkins) gatherNodeData(node *gojenkins.Node, url string, acc telegraf.

// detect the parsing error, since gojenkins lib won't do it
if info == nil || info.DisplayName == "" {
return fmt.Errorf("error empty node name[%s]: ", j.URL)
return fmt.Errorf("error empty node name[%s]: ", url)
}

tags["node_name"] = info.DisplayName
Expand All @@ -181,7 +185,7 @@ func (j *Jenkins) gatherNodeData(node *gojenkins.Node, url string, acc telegraf.
}

if info.MonitorData.Hudson_NodeMonitors_ArchitectureMonitor == nil {
return fmt.Errorf("error empty monitor data[%s]: ", j.URL)
return fmt.Errorf("error empty monitor data[%s]: ", url)
}
tags["arch"], ok = info.MonitorData.Hudson_NodeMonitors_ArchitectureMonitor.(string)
if !ok {
Expand Down Expand Up @@ -246,12 +250,18 @@ func (j *Jenkins) gatherNodeData(node *gojenkins.Node, url string, acc telegraf.
}

func (j *Jenkins) gatherNodesData(acc telegraf.Accumulator) {
nodes, err := j.instance.GetAllNodes()
var nodes []*gojenkins.Node
var err error
err = j.doGet(func() error {
nodes, err = j.instance.GetAllNodes()
return err
})

url := j.URL + "/computer/api/json"
// since gojenkins lib will never return error
// returns error for len(nodes) is 0
if err != nil || len(nodes) == 0 {
acc.AddError(fmt.Errorf("error retrieving nodes[%s]: %v", j.URL, err))
acc.AddError(fmt.Errorf("error retrieving nodes[%s]: %v", url, err))
return
}
// get node data
Expand All @@ -270,33 +280,42 @@ func (j *Jenkins) gatherJobs(acc telegraf.Accumulator) {
acc.AddError(fmt.Errorf("error retrieving jobs[%s]: %v", j.URL, err))
return
}
jobsC := make(chan jobRequest, j.MaxConnections)
var wg sync.WaitGroup
for _, job := range jobs {
wg.Add(1)
go func(name string) {
jobsC <- jobRequest{
go func(name string, wg *sync.WaitGroup, acc telegraf.Accumulator) {
defer wg.Done()
if te := j.getJobDetail(jobRequest{
name: name,
parents: []string{},
layer: 0,
}, wg, acc); te != nil {
acc.AddError(te)
}
}(job.Name)
}(job.Name, &wg, acc)
}
wg.Wait()
}

for i := 0; i < j.MaxConnections; i++ {
go func(jobsC chan jobRequest, acc telegraf.Accumulator, wg *sync.WaitGroup) {
for sj := range jobsC {
if te := j.getJobDetail(sj, jobsC, wg, acc); te != nil {
acc.AddError(te)
}
}
}(jobsC, acc, &wg)
// wrap the tcp request with doGet
// block tcp request if buffered channel is full
func (j *Jenkins) doGet(tcp func() error) error {
j.semaphore <- struct{}{}
if err := tcp(); err != nil {
if err == gojenkins.ErrSessionExpired {
// ignore the error here, since config parsing should be finished.
client, _ := j.initClient()
// SessionExpired use a go routine to create a new session
go j.newInstance(client)
}
<-j.semaphore
return err
}
wg.Wait()
<-j.semaphore
return nil
}

func (j *Jenkins) getJobDetail(sj jobRequest, jobsC chan<- jobRequest, wg *sync.WaitGroup, acc telegraf.Accumulator) error {
defer wg.Done()
func (j *Jenkins) getJobDetail(sj jobRequest, wg *sync.WaitGroup, acc telegraf.Accumulator) error {
if j.MaxSubJobDepth > 0 && sj.layer == j.MaxSubJobDepth {
return nil
}
Expand All @@ -305,7 +324,12 @@ func (j *Jenkins) getJobDetail(sj jobRequest, jobsC chan<- jobRequest, wg *sync.
return nil
}
url := j.URL + "/job/" + strings.Join(sj.combined(), "/job/") + "/api/json"
jobDetail, err := j.instance.GetJob(sj.name, sj.parents...)
var jobDetail *gojenkins.Job
var err error
err = j.doGet(func() error {
jobDetail, err = j.instance.GetJob(sj.name, sj.parents...)
return err
})
if err != nil {
return fmt.Errorf("error retrieving inner jobs[%s]: ", url)
}
Expand All @@ -316,13 +340,16 @@ func (j *Jenkins) getJobDetail(sj jobRequest, jobsC chan<- jobRequest, wg *sync.
}
wg.Add(1)
// schedule tcp fetch for inner jobs
go func(innerJob gojenkins.InnerJob, sj jobRequest) {
jobsC <- jobRequest{
go func(innerJob gojenkins.InnerJob, sj jobRequest, wg *sync.WaitGroup, acc telegraf.Accumulator) {
defer wg.Done()
if te := j.getJobDetail(jobRequest{
name: innerJob.Name,
parents: sj.combined(),
layer: sj.layer + 1,
}, wg, acc); te != nil {
acc.AddError(te)
}
}(innerJob, sj)
}(innerJob, sj, wg, acc)
}

// collect build info
Expand All @@ -339,7 +366,11 @@ func (j *Jenkins) getJobDetail(sj jobRequest, jobsC chan<- jobRequest, wg *sync.
Base: baseURL,
Raw: new(gojenkins.BuildResponse),
}
status, err := build.Poll()
var status int
err = j.doGet(func() error {
status, err = build.Poll()
return err
})
if err != nil || status != 200 {
if err == nil && status != 200 {
err = fmt.Errorf("status code %d", status)
Expand Down

0 comments on commit 4ec7999

Please sign in to comment.