Skip to content

Commit

Permalink
Improved handling of PR merging (#22)
Browse files Browse the repository at this point in the history
* This PR reduces the reliance on GitHub's automerge feature by adding
improved handling of PR merging in the syncer.

* If a PR exists the syncer will attempt to merge it and wait for it to
merge this allows the syncer to push any changes that might be blocked
on the pending PR

* After creating a new PR with the latest changes the sycner will wait
for the PR to be merged. This provides better handling in the case of
dev takeover when automerge isn't (or can't be enabled) on the
repository. There appears to be a bit of a race condition where the PR
may not be merged right away even thought it should be immediately
mergeable but this PR fixes that by having a retry loop that will wait
for the PR to me merged and periodically retry merging

* Fix #21
* Fix #17
  • Loading branch information
jlewi authored Feb 21, 2023
1 parent 8927cb4 commit e6c8e6d
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 30 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@ tidy-go:
gofmt -s -w .
goimports -w .

tidy: tidy-go

lint-go:
# golangci-lint automatically searches up the root tree for configuration files.
golangci-lint run

lint: lint-go

test-go:
go test -v ./...
2 changes: 1 addition & 1 deletion cmd/sanitizer/app/sanitizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func isUnsafeError(e error) bool {

func (s *Sanitizer) cleanFile(path string, dest string) error {
s.log.Info("Cleaning target", "target", path)
b, err := ioutil.ReadFile(path)
b, err := os.ReadFile(path)
if err != nil {
return errors.Wrapf(err, "Failed to clean file; %v", path)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/sanitizer/app/sanitizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

// Not really a unittest as we try to sanitize the hydros directory
func Test_sanitize(t *testing.T) {
dir, err := ioutil.TempDir("", "testSanitize")
dir, err := os.MkdirTemp("", "testSanitize")
if err != nil {
t.Fatalf("Failed to create temporary directory; error %v", err)
}
Expand Down
40 changes: 26 additions & 14 deletions pkg/github/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@ import (
"go.uber.org/zap"
)

// PRMergeState is different states the PR can be in. It aggregates information from multiple
// fields in the GitHub API (e.g. pr.State and pr.MergeStateStatus and pr.IsInMergeQueue)
type PRMergeState string

const (
// MergeStateStatusBehind and other
// values correspond to enum values for the field pr.MergeStateStatus
// https://docs.github.com/en/graphql/reference/enums#mergestatestatus
MergeStateStatusBehind = "BEHIND"
MergeStateStatusBlocked = "BLOCKED"
MergeStateStatusClosed = "CLOSED"
Expand All @@ -29,6 +36,12 @@ const (
MergeStateStatusHasHooks = "HAS_HOOKS"
MergeStateStatusMerged = "MERGED"
MergeStateStatusUnstable = "UNSTABLE"

MergedState PRMergeState = "MERGED"
EnqueuedState PRMergeState = "ENQUEUED"
ClosedState PRMergeState = "CLOSED"
UnknownState PRMergeState = "UNKNOWN"
BlockedState PRMergeState = "BLOCKED"
)

// MergeOptions are the options used to construct a context.
Expand Down Expand Up @@ -70,26 +83,26 @@ func (m *prMerger) inMergeQueue() error {
}

// Merge the pull request.
func (m *prMerger) merge() error {
func (m *prMerger) merge() (PRMergeState, error) {
log := m.log
pr := m.pr

if pr.State == MergeStateStatusClosed {
log.Info("PR can't be merged it has been closed")
return errors.Errorf("Can't merge PR %v it has been closed", pr.URL)
return ClosedState, errors.Errorf("Can't merge PR %v it has been closed", pr.URL)
}
if pr.State == MergeStateStatusMerged {
log.Info("PR has already been merged")
return nil
return MergedState, nil
}
if err := m.inMergeQueue(); err != nil {
log.Info("PR is already in merge queue")
return nil
return EnqueuedState, nil
}

if reason, blocked := blockedReason(m.pr.MergeStateStatus); blocked {
log.Info("PR merging is blocked", "reason", reason)
return errors.Errorf("PR merging is blocked; MergeStateStatus: %v reason: %v", m.pr.MergeStateStatus, reason)
return BlockedState, errors.Errorf("PR merging is blocked; MergeStateStatus: %v reason: %v", m.pr.MergeStateStatus, reason)
}

payload := mergePayload{
Expand Down Expand Up @@ -119,16 +132,19 @@ func (m *prMerger) merge() error {

err := mergePullRequest(m.HttpClient, payload)
if err != nil {
return err
return UnknownState, err
}

var state PRMergeState
if payload.auto {
log.Info("Pull request was added to merge queue and will be automatically merged when all requirements are met")
state = EnqueuedState
} else {
log.Info("pull request was merged", "title", m.pr.Title)
state = MergedState
}

return nil
return state, nil
}

type addAcceptHeaderTransport struct {
Expand Down Expand Up @@ -169,17 +185,13 @@ func newPRMerger(client *http.Client, repo ghrepo.Interface, number int) (*prMer
// client - http client to use to talk to github
// repo - the repo that owns the PR
// number - the PR number to merge
func MergePR(client *http.Client, repo ghrepo.Interface, number int) error {
func MergePR(client *http.Client, repo ghrepo.Interface, number int) (PRMergeState, error) {
m, err := newPRMerger(client, repo, number)

if err != nil {
return err
return UnknownState, err
}
if err := m.merge(); err != nil {
return err
}

return err
return m.merge()
}

// blockedReason translates various MergeStateStatus GraphQL values into human-readable reason
Expand Down
66 changes: 61 additions & 5 deletions pkg/github/prs.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,13 +594,69 @@ func (h *RepoHelper) Dir() string {
// MergePR tries to merge the PR. This means either
// 1. enabling auto merge if a merge queue is required
// 2. merging right away if able
func (h *RepoHelper) MergePR(prNumber int) error {
func (h *RepoHelper) MergePR(prNumber int) (PRMergeState, error) {
client := &http.Client{Transport: h.transport}
if err := MergePR(client, h.baseRepo, prNumber); err != nil {
h.log.Error(err, "Failed to merge PR (or enable auto merge)")
return errors.Wrapf(err, "Failed to merge PR (or enable auto merge)")

return MergePR(client, h.baseRepo, prNumber)
}

// MergeAndWait merges the PR and waits for it to be merged.
func (h *RepoHelper) MergeAndWait(prNumber int, timeout time.Duration) (PRMergeState, error) {
done := false
log := h.log.WithValues("number", prNumber)
wait := 10 * time.Second
for endTime := time.Now().Add(timeout); endTime.After(time.Now()) && !done; {
state := func() PRMergeState {
pr, err := h.FetchPR(prNumber)
if err != nil {
log.Error(err, "Failed to fetch PR; unable to confirm if its been merged")
return UnknownState
}

if pr.State == MergeStateStatusMerged {
log.Info("PR has been merged", "pr", pr.URL)
return MergedState
}
if pr.IsInMergeQueue {
log.Info("PR is in merge queue", "pr", pr.URL)
return EnqueuedState
}

log.Info("PR is not in merge queue; attempting to merge", "pr", pr.URL)
state, err := h.MergePR(pr.Number)
if err != nil {
log.Error(err, "Failed to merge pr", "number", pr.Number, "url", pr.URL)
}
return state
}()

switch state {
case ClosedState:
fallthrough
case MergedState:
return state, nil
case EnqueuedState:
fallthrough
case UnknownState:
fallthrough
case BlockedState:
fallthrough
default:
if endTime.After(time.Now().Add(wait)) {
time.Sleep(wait)
}
}
}
return nil

return UnknownState, errors.Errorf("Timed out waiting for PR to merge")
}

func (h *RepoHelper) FetchPR(prNumber int) (*api.PullRequest, error) {
// We need to set the appropriate header in oder to get merge queue status.
transport := &addAcceptHeaderTransport{T: h.transport}
client := &http.Client{Transport: transport}
fields := []string{"id", "number", "state", "title", "lastCommit", "mergeStateStatus", "headRepositoryOwner", "headRefName", "baseRefName", "headRefOid"}
return fetchPR(client, h.baseRepo, prNumber, fields)
}

func fetchPR(httpClient *http.Client, repo ghrepo.Interface, number int, fields []string) (*api.PullRequest, error) {
Expand Down
30 changes: 22 additions & 8 deletions pkg/gitops/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,16 @@ func (s *Syncer) RunOnce(force bool) error {

if existingPR != nil {
log.Info("PR Already Exists; attempting to merge it.", "pr", existingPR.URL)
if err := s.repoHelper.MergePR(existingPR.Number); err != nil {
log.Error(err, "Failed to merge or enable automerge on pr", "number", existingPR.Number, "pr", existingPR.URL)
state, err := s.repoHelper.MergeAndWait(existingPR.Number, 3*time.Minute)
if err != nil {
log.Error(err, "Failed to Merge existing PR unable to continue with sync", "number", existingPR.Number, "pr", existingPR.URL)
return err
}
// TODO(jeremy): Ideally if the PR is merged immediately we should keep going with the sync. Since
// there could be new changes to sync. On the other hand if it was enqueued then we should return because
// we want to wait for it to merge.
return nil

if state != github.ClosedState && state != github.MergedState {
log.Info("PR hasn't been merged; unable to continue with the sync", "number", existingPR.Number, "pr", existingPR.URL, "state", state)
return errors.Errorf("Existing PR %v is blocking sync", existingPR.URL)
}
}

if err := s.cloneRepos(); err != nil {
Expand Down Expand Up @@ -552,10 +554,22 @@ func (s *Syncer) RunOnce(force bool) error {

// EnableAutoMerge or merge the PR automatically. If you don't want the PR to be automerged you should
// set up appropriate branch protections e.g. require approvers.
if err := s.repoHelper.MergePR(pr.Number); err != nil {
log.Error(err, "Failed to merge or enable automerge on pr", "number", pr.Number, "url", pr.URL)
// Wait up to 1 minute to try to merge the PR
// TODO(jeremy): This is mostly for dev takeover in the event we can't rely on automerge e.g. because
// we have a private repository for which we can't enable automerge. In this case we want to wait and
// retry the merge until its merged or we timeout. We may want to refactor this code so we can invoke the
// polling only in the case of being called with the takeover command.
// If the PR can't be merged does it make sense to report an error? in the case of long running tests
// The syncer can return and the PR will be merged either 1) when syncer is rerun or 2) by auto merge if enabled
// The desired behavior is potentially different in the takeover and non takeover setting.
state, err := s.repoHelper.MergeAndWait(pr.Number, 1*time.Minute)
if err != nil {
log.Error(err, "Failed to merge pr", "number", pr.Number, "url", pr.URL)
return err
}
if state != github.MergedState && state != github.ClosedState {
return fmt.Errorf("Failed to merge pr; state: %v", state)
}

log.Info("Sync succeeded")
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/gitutil/gitutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func AddGitignoreToWorktree(wt *git.Worktree, repositoryPath string) error {
return nil
}
readFile, err := os.Open(path)
defer readFile.Close()
util.DeferIgnoreError(readFile.Close)

if err != nil {
return errors.Wrapf(err, "Failed to read .gitignore: %s", path)
Expand Down

0 comments on commit e6c8e6d

Please sign in to comment.