Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1604 from hashicorp/server/refactor-poll-queuer
Browse files Browse the repository at this point in the history
internal/server: Refactor how server queues poll jobs
  • Loading branch information
briancain authored Jun 9, 2021
2 parents 43bb29a + 9aa22c7 commit 19fc5f8
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 51 deletions.
94 changes: 48 additions & 46 deletions internal/server/singleprocess/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package singleprocess

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -12,15 +11,42 @@ import (
pb "github.com/hashicorp/waypoint/internal/server/gen"
)

// pollHandler is a private interface that the server implements for polling on
// different items such as projects or status reports.
type pollHandler interface {
// Peek returns the next item that should be polled.
// This will return (nil,nil,nil) if there are no items to poll currently.
//
// This calls the items state implementation of its "peek" operation so it
// does not update the poll item's next poll time. Therefore, calling this
// multiple times should return the same result unless a function like
// Complete is called.
//
// Note that the WatchSet must be populated with a watch channel that is triggered when
// there might be a new or changed record
Peek(hclog.Logger, memdb.WatchSet) (interface{}, time.Time, error)

// PollJob generates a QueueJobRequest that is used to poll on.
// It is expected to be given a proto message obtained from Peek which
// is used to define the job returned.
PollJob(hclog.Logger, interface{}) (*pb.QueueJobRequest, error)

// Complete will mark the job that was queued as complete using the specific
// state implementation.
Complete(hclog.Logger, interface{}) error
}

// runPollQueuer starts the poll queuer. The poll queuer sleeps on and
// schedules polling operations for projects that have polling enabled.
// schedules polling operations for pollable items that have polling enabled
// and implemented.
// This blocks and is expected to be run in a goroutine.
//
// This function should only ever be invoked one at a time. Running multiple
// copies can result in duplicate polls for projects.
// copies can result in duplicate polls for items.
func (s *service) runPollQueuer(
ctx context.Context,
wg *sync.WaitGroup,
handler pollHandler,
funclog hclog.Logger,
) {
defer wg.Done()
Expand All @@ -37,7 +63,7 @@ func (s *service) runPollQueuer(
}

ws := memdb.NewWatchSet()
p, pollTime, err := s.state.ProjectPollPeek(ws)
pollItem, pollTime, err := handler.Peek(log, ws)
if err != nil {
// This error really should never happen. Instead of just exiting,
// we log it and just sleep a minute. Hopefully someone will notice
Expand All @@ -47,9 +73,6 @@ func (s *service) runPollQueuer(
time.Sleep(1 * time.Minute)
continue
}
if p != nil {
log = log.With("project", p.Name)
}

var loopCtxCancel context.CancelFunc
loopCtx := ctx
Expand All @@ -61,14 +84,14 @@ func (s *service) runPollQueuer(
// solving for: there are THREE possible outcomes that we are waiting on:
//
// (1) WatchSet (ws) triggers - this means that the data changed,
// i.e. a project changed polling settings, so we need to reloop.
// i.e. a poll item changed polling settings, so we need to reloop.
//
// (2) ctx is cancelled - this means the whole queuer is cancelled
// and we just want to exit.
//
// (3) loopCtx is cancelled - this means we hit our deadline for
// polling and we want to queue a polling operation for this
// project.
// poll item.
//

log.Trace("waiting on watchset and contexts")
Expand Down Expand Up @@ -97,60 +120,39 @@ func (s *service) runPollQueuer(
continue
}

// p is allowed to be nil in this loop, but it should never reach
// pollItem is allowed to be nil in this loop, but it should never reach
// this point. Given we use it below, we put this check here to warn
// loudly that it happened. p shouldn't be nil here because if p is
// nil then we have no pollTime and therefore no loopCtx either. This
// means outcome (1) or (2) MUST happen.
if p == nil {
log.Error("reached outcome (3) in poller with nil p. This should not happen.")
// loudly that it happened. pollItem shouldn't be nil here because if
// pollItem is nil then we have no pollTime and therefore no loopCtx either.
// This means outcome (1) or (2) MUST happen.
if pollItem == nil {
log.Error("reached outcome (3) in poller with nil pollItem. This should not happen.")
continue
}

// Outcome (3)
log.Trace("queueing poll job")
resp, err := s.QueueJob(ctx, &pb.QueueJobRequest{
Job: &pb.Job{
// SingletonId so that we only have one poll operation at
// any time queued per project.
SingletonId: fmt.Sprintf("poll/%s", p.Name),

Application: &pb.Ref_Application{
Project: p.Name,
// No Application set since PollOp is project-oriented
},

// Polling always happens on the default workspace even
// though the PollOp is across every workspace.
Workspace: &pb.Ref_Workspace{Workspace: "default"},

// Poll!
Operation: &pb.Job_Poll{
Poll: &pb.Job_PollOp{},
},

// Any runner is fine for polling.
TargetRunner: &pb.Ref_Runner{
Target: &pb.Ref_Runner_Any{
Any: &pb.Ref_RunnerAny{},
},
},
},
})
queueJobRequest, err := handler.PollJob(log, pollItem)
if err != nil {
log.Warn("error building a poll job request", "err", err)
continue
}

resp, err := s.QueueJob(ctx, queueJobRequest)
if err != nil {
log.Warn("error queueing a poll job", "err", err)
continue
}
log.Debug("queued polling job", "job_id", resp.JobId)

// Mark this as complete so the next poll gets rescheduled.
log.Trace("scheduling next project poll time")
if err := s.state.ProjectPollComplete(p, time.Now()); err != nil {
log.Trace("scheduling next poll time")
if err := handler.Complete(log, pollItem); err != nil {
// This should never happen so like above, if this happens we
// sleep for a minute so we don't completely overload the
// server since this is likely to happen again. We want people
// to see this in the logs.
log.Warn("BUG (please report): error marking project polling complete", "err", err)
log.Warn("BUG (please report): error marking polling item complete", "err", err)
time.Sleep(1 * time.Minute)
continue
}
Expand Down
105 changes: 105 additions & 0 deletions internal/server/singleprocess/poll_project.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package singleprocess

import (
"fmt"
"time"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

pb "github.com/hashicorp/waypoint/internal/server/gen"
"github.com/hashicorp/waypoint/internal/server/singleprocess/state"
)

// projectPoll accepts a state management interface which provides access
// to a projects current state implementation. Functions like Peek and Complete
// need access to this state interface for peeking at the next available project
// as well as marking a projects poll as complete.
type projectPoll struct {
// state is the state management interface that provides functions for
// safely mutating server state.
state *state.State
}

// Peek returns the latest project to poll on
// If there is an error in the ProjectPollPeek, it will return nil
// to allow the outer caller loop to continue and try again
func (pp *projectPoll) Peek(
log hclog.Logger,
ws memdb.WatchSet,
) (interface{}, time.Time, error) {
p, pollTime, err := pp.state.ProjectPollPeek(ws)
if err != nil {
return nil, time.Time{}, err // continue loop
}

if p != nil {
log = log.With("project", p.Name)
}

return p, pollTime, nil
}

// PollJob will generate a job to queue a project on
func (pp *projectPoll) PollJob(
log hclog.Logger,
project interface{},
) (*pb.QueueJobRequest, error) {
p, ok := project.(*pb.Project)
if !ok || p == nil {
log.Error("could not generate poll job for project, incorrect type passed in")
return nil, status.Error(codes.FailedPrecondition, "incorrect type passed into Project PollJob")
}

jobRequest := &pb.QueueJobRequest{
Job: &pb.Job{
// SingletonId so that we only have one poll operation at
// any time queued per project.
SingletonId: fmt.Sprintf("poll/%s", p.Name),

Application: &pb.Ref_Application{
Project: p.Name,
// No Application set since PollOp is project-oriented
},

// Polling always happens on the default workspace even
// though the PollOp is across every workspace.
Workspace: &pb.Ref_Workspace{Workspace: "default"},

// Poll!
Operation: &pb.Job_Poll{
Poll: &pb.Job_PollOp{},
},

// Any runner is fine for polling.
TargetRunner: &pb.Ref_Runner{
Target: &pb.Ref_Runner_Any{
Any: &pb.Ref_RunnerAny{},
},
},
},
}

return jobRequest, nil
}

// Complete will mark the job that was queued as complete, if it
// fails to do so, it will return false with the err to continue the loop
func (pp *projectPoll) Complete(
log hclog.Logger,
project interface{},
) error {
p, ok := project.(*pb.Project)
if !ok || p == nil {
log.Error("could not mark project poll as complete, incorrect type passed in")
return status.Error(codes.FailedPrecondition, "incorrect type passed into Project Complete")
}

// Mark this as complete so the next poll gets rescheduled.
if err := pp.state.ProjectPollComplete(p, time.Now()); err != nil {
return err
}
return nil
}
21 changes: 16 additions & 5 deletions internal/server/singleprocess/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,22 @@ func New(opts ...Option) (pb.WaypointServer, error) {
// Setup the background context that is used for internal tasks
s.bgCtx, s.bgCtxCancel = context.WithCancel(context.Background())

// Start our polling background goroutine. We have a single goroutine
// that we run in the background that handles the queue of all polling
// operations. See the func docs for more info.
s.bgWg.Add(1)
go s.runPollQueuer(s.bgCtx, &s.bgWg, log.Named("poll_queuer"))
// TODO: When more items are added, move this else where
// pollableItems is a map of potential items Waypoint can queue a poll for.
// Each item should implement the pollHandler interface
pollableItems := map[string]pollHandler{
"project": &projectPoll{state: s.state},
}

// Start our polling background goroutines.
// We currently have one goroutine that we run in the background that
// handles the queue of all polling operations. However, there will be more
// pollable items to run jobs against in future iterations.
// See the func docs for more info.
for pollName, pollItem := range pollableItems {
s.bgWg.Add(1)
go s.runPollQueuer(s.bgCtx, &s.bgWg, pollItem, log.Named("poll_queuer").Named(pollName))
}

// Start out state pruning background goroutine. This calls
// Prune on the state every 10 minutes.
Expand Down

0 comments on commit 19fc5f8

Please sign in to comment.