Skip to content
This repository has been archived by the owner on Feb 11, 2022. It is now read-only.

Commit

Permalink
Move PE syncing into LastOperation (#123)
Browse files Browse the repository at this point in the history
  • Loading branch information
fabritsius authored Sep 9, 2021
1 parent d0d3a35 commit 3dabe83
Showing 1 changed file with 116 additions and 109 deletions.
225 changes: 116 additions & 109 deletions pkg/broker/instance_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,14 @@ func (b *Broker) createOrUpdateResources(ctx context.Context, client *mongodbatl
}
}

if err := b.syncPrivateEndpoints(ctx, client, newPlan, oldPlan); err != nil {
return errors.Wrap(err, "cannot sync Private Endpoints")
if err := b.removeOldPrivateEndpoints(ctx, client, newPlan, oldPlan); err != nil {
return errors.Wrap(err, "failed to remove old Private Endpoints")
}

return nil
}

func (b *Broker) syncPrivateEndpoints(ctx context.Context, client *mongodbatlas.Client, newPlan *dynamicplans.Plan, oldPlan *dynamicplans.Plan) error {
func (b *Broker) removeOldPrivateEndpoints(ctx context.Context, client *mongodbatlas.Client, newPlan *dynamicplans.Plan, oldPlan *dynamicplans.Plan) error {
logger := b.funcLogger()

peProvider := "AZURE" // this is hardcoded cause only one provider is supported for now
Expand All @@ -235,78 +235,37 @@ func (b *Broker) syncPrivateEndpoints(ctx context.Context, client *mongodbatlas.
}
atlasPrivateEndpoints = b.populateConnections(atlasPrivateEndpoints)

planPrivateEndpoints := make(map[string]struct{})
for _, endpoint := range newPlan.PrivateEndpoints {
if endpoint.ID != "" {
// endpoint service already created
planPrivateEndpoints[endpoint.ID] = struct{}{}

continue
}

logger.Debugw("Atlas PEs", "endpoints", atlasPrivateEndpoints)

if connID, exists := b.privateEndpointExists(endpoint.Provider, endpoint.EndpointName, atlasPrivateEndpoints); exists {
planPrivateEndpoints[connID] = struct{}{}
endpoint.ID = connID

continue
}

logger.Debugw("start creation process for PrivateEndpoints", "project ID", oldPlan.Project.ID)
conn, _, err := client.PrivateEndpoints.Create(ctx, oldPlan.Project.ID, &mongodbatlas.PrivateEndpointConnection{
ProviderName: endpoint.Provider,
Region: endpoint.Region,
})
if err != nil {
return errors.Wrap(err, "cannot create Private Endpoint Service")
} else {
endpoint.ID = conn.ID
planPrivateEndpoints[endpoint.ID] = struct{}{}
}
}

logger.Debugw("Private Endpoints from the plan", "PE names", planPrivateEndpoints)

for _, peConnection := range atlasPrivateEndpoints {
// delete all PE endpoints which are not in the plan
if _, isPlanned := planPrivateEndpoints[peConnection.ID]; !isPlanned {
if !privateEndpointInPlan(peConnection.ProviderName, peConnection.EndpointServiceName, newPlan) {
logger.Debugw("Deleting Private Endpoint", "connection", peConnection)

for _, endpoint := range oldPlan.PrivateEndpoints {
if endpoint.EndpointName == peConnection.EndpointServiceName && endpoint.Provider == peConnection.ProviderName {
if _, err := privateendpoint.Delete(ctx, endpoint); err != nil {
logger.Errorw("Failed to delete Private Endpoint from Azure", "error", err, "endpoint", endpoint.EndpointName)
}
}
}

for _, peID := range peConnection.PrivateEndpoints {
if _, err := client.PrivateEndpoints.DeleteOnePrivateEndpoint(ctx, oldPlan.Project.ID, peProvider, peConnection.ID, peID); err != nil {
logger.Errorw("Failed to delete Private Endpoint from Atlas", "error", err, "pe", peID)
}
}

if _, err := client.PrivateEndpoints.Delete(ctx, oldPlan.Project.ID, peProvider, peConnection.ID); err != nil {
logger.Errorw("Failed to delete Private Endpoint Service from Atlas", "error", err, "pe", peConnection)
}
b.deletePrivateEndpoint(ctx, client, peProvider, peConnection, oldPlan)
}
}

return nil
}

func (b *Broker) privateEndpointExists(provider string, name string, connections []mongodbatlas.PrivateEndpointConnection) (string, bool) {
func (b Broker) deletePrivateEndpoint(ctx context.Context, client *mongodbatlas.Client, peProvider string, peConnection mongodbatlas.PrivateEndpointConnection, plan *dynamicplans.Plan) {
logger := b.funcLogger()

for _, peConnection := range connections {
logger.Infow("privateEndpointExists func", "conn", peConnection)
if peConnection.ProviderName == provider && peConnection.EndpointServiceName == name {
return peConnection.ID, true
for _, endpoint := range plan.PrivateEndpoints {
if endpoint.EndpointName == peConnection.EndpointServiceName && endpoint.Provider == peConnection.ProviderName {
if _, err := privateendpoint.Delete(ctx, endpoint); err != nil {
logger.Errorw("Failed to delete Private Endpoint from Azure", "error", err, "endpoint", endpoint.EndpointName)
}
}
}

for _, peID := range peConnection.PrivateEndpoints {
if _, err := client.PrivateEndpoints.DeleteOnePrivateEndpoint(ctx, plan.Project.ID, peProvider, peConnection.ID, peID); err != nil {
logger.Errorw("Failed to delete Private Endpoint from Atlas", "error", err, "pe", peID)
}
}

return "", false
if _, err := client.PrivateEndpoints.Delete(ctx, plan.Project.ID, peProvider, peConnection.ID); err != nil {
logger.Errorw("Failed to delete Private Endpoint Service from Atlas", "error", err, "pe", peConnection)
}
}

func (b *Broker) populateConnections(connections []mongodbatlas.PrivateEndpointConnection) []mongodbatlas.PrivateEndpointConnection {
Expand All @@ -332,14 +291,37 @@ func (b *Broker) populateConnections(connections []mongodbatlas.PrivateEndpointC
return connections
}

func privateEndpointInPlan(provider string, name string, plan *dynamicplans.Plan) bool {
for _, endpoint := range plan.PrivateEndpoints {
if endpoint.Provider == provider && endpoint.EndpointName == name {
return true
}
}

return false
}

// TODO: this retry logic is clunky, come up with something better?
func (b *Broker) postCreateResources(ctx context.Context, client *mongodbatlas.Client, dp *dynamicplans.Plan) (retry bool, err error) {
logger := b.funcLogger()

logger.Debugw("Setup PrivateEndpoints", "PrivateEndpoints", dp.PrivateEndpoints)
for _, endpoint := range dp.PrivateEndpoints {
for peIdx, endpoint := range dp.PrivateEndpoints {
if endpoint.ID == "" {
continue
conn, _, err := client.PrivateEndpoints.Create(ctx, dp.Project.ID, &mongodbatlas.PrivateEndpointConnection{
ProviderName: endpoint.Provider,
Region: endpoint.Region,
})
if err != nil {
logger.Warnw("cannot create Private Endpoint Service", "err", err)

return false, nil
}

dp.PrivateEndpoints[peIdx].ID = conn.ID
logger.Debugw("Creating new Private Endpoint", "endpoint", endpoint)

return true, nil
}

atlasService, _, err := client.PrivateEndpoints.Get(ctx, dp.Project.ID, endpoint.Provider, endpoint.ID)
Expand Down Expand Up @@ -524,21 +506,39 @@ func (b Broker) Update(ctx context.Context, instanceID string, details domain.Up
oldPlan.Settings = newPlan.Settings
oldPlan.Cluster = resultingCluster
oldPlan.IPAccessLists = newPlan.IPAccessLists
oldPlan.PrivateEndpoints = newPlan.PrivateEndpoints
oldPlan.PrivateEndpoints = b.mergePrivateEndpoints(oldPlan, newPlan)

logger.Debugw("Resulting plan to be saved", "plan", oldPlan)

if err = b.updateState(ctx, instanceID, details.PlanID, details.ServiceID, oldPlan); err != nil {
logger.Errorw("Failed when updating the state", "err", err)
}

logger.Infow("Successfully started Atlas cluster update process", "cluster", resultingCluster)

planEnc, err := encodePlan(*oldPlan)
return domain.UpdateServiceSpec{
IsAsync: true,
OperationData: operationUpdate,
DashboardURL: b.GetDashboardURL(oldPlan.Project.ID, resultingCluster.Name),
}, nil
}

func (b Broker) updateState(ctx context.Context, instanceID string, planID string, serviceID string, p *dynamicplans.Plan) (err error) {
logger := b.funcLogger().With("instance_id", instanceID)

planEnc, err := encodePlan(*p)
if err != nil {
return
}

s := domain.GetInstanceDetailsSpec{
PlanID: details.PlanID,
ServiceID: details.ServiceID,
DashboardURL: b.GetDashboardURL(oldPlan.Project.ID, oldPlan.Cluster.Name),
PlanID: planID,
ServiceID: serviceID,
DashboardURL: b.GetDashboardURL(p.Project.ID, p.Cluster.Name),
Parameters: planEnc,
}

state, err := b.getState(ctx, oldPlan.Project.OrgID)
state, err := b.getState(ctx, p.Project.OrgID)
if err != nil {
return
}
Expand All @@ -559,13 +559,35 @@ func (b Broker) Update(ctx context.Context, instanceID string, details domain.Up
}

logger.Infow("Inserted into state", "obj", obj)
logger.Infow("Successfully started Atlas cluster update process", "cluster", resultingCluster)

return domain.UpdateServiceSpec{
IsAsync: true,
OperationData: operationUpdate,
DashboardURL: b.GetDashboardURL(oldPlan.Project.ID, resultingCluster.Name),
}, nil
return
}

func (b Broker) mergePrivateEndpoints(oldPlan, newPlan *dynamicplans.Plan) privateendpoint.PrivateEndpoints {
logger := b.funcLogger()

newListOfEndpoints := privateendpoint.PrivateEndpoints{}
for _, newPlanPE := range newPlan.PrivateEndpoints {
matchedPE := matchPlanPE(newPlanPE, oldPlan)
if matchedPE == nil {
matchedPE = newPlanPE
}

logger.Debugw("Appending Private Endpoint to the merged", "PE", matchedPE)
newListOfEndpoints = append(newListOfEndpoints, newPlanPE)
}

return newListOfEndpoints
}

func matchPlanPE(pe *privateendpoint.PrivateEndpoint, plan *dynamicplans.Plan) *privateendpoint.PrivateEndpoint {
for _, pe2 := range plan.PrivateEndpoints {
if pe.Provider == pe2.Provider && pe.Region == pe2.Region && pe.EndpointName == pe2.EndpointName {
return pe2
}
}

return nil
}

// Deprovision will destroy an Atlas cluster asynchronously.
Expand All @@ -585,25 +607,14 @@ func (b Broker) Deprovision(ctx context.Context, instanceID string, details doma
return
}

for _, peService := range p.PrivateEndpoints {
if _, err := privateendpoint.Delete(ctx, peService); err != nil {
logger.Errorw("Failed to delete Private Endpoint from Azure", "error", err, "peService", peService)
}

if peService.ID == "" {
continue
}

conn, _, err := client.PrivateEndpoints.Get(ctx, p.Project.ID, peService.Provider, peService.ID)
if err != nil {
logger.Errorw("Failed to fetch Private Endpoint Service Connection from Atlas", "error", err, "peService", peService)
}
peProvider := "AZURE"
peEndpoints, _, err := client.PrivateEndpoints.List(ctx, p.Project.ID, peProvider, nil)
if err != nil {
logger.Errorw("cannot get Private Endpoints from Atlas", "err", err)
}

for _, peID := range conn.PrivateEndpoints {
if _, err := client.PrivateEndpoints.DeleteOnePrivateEndpoint(ctx, p.Project.ID, peService.Provider, peService.ID, peID); err != nil {
logger.Errorw("Failed to delete Private Endpoint from Atlas", "error", err, "pe", peID)
}
}
for _, peConnection := range peEndpoints {
b.deletePrivateEndpoint(ctx, client, peProvider, peConnection, p)
}

_, err = client.Clusters.Delete(ctx, p.Project.ID, p.Cluster.Name)
Expand Down Expand Up @@ -698,22 +709,15 @@ func (b Broker) LastOperation(ctx context.Context, instanceID string, details do

return
}
err = nil
}

logger.Infow("Found existing cluster", "cluster", cluster)

peEndpoints := []*mongodbatlas.PrivateEndpointConnection{}
for _, pe := range p.PrivateEndpoints {
if pe.ID == "" {
continue
}
peConnection, _, err := client.PrivateEndpoints.Get(ctx, p.Project.ID, pe.Provider, pe.ID)
if err != nil {
logger.Errorw("Failed to fetch Private Endpoint Service Connection from Atlas", "error", err)

continue
}
peEndpoints = append(peEndpoints, peConnection)
peProvider := "AZURE"
peEndpoints, _, err := client.PrivateEndpoints.List(ctx, p.Project.ID, peProvider, nil)
if err != nil {
logger.Errorw("cannot get Private Endpoints from Atlas", "err", err)
}

logger.Infow("Found existing Private Endpoints", "endpoints", peEndpoints)
Expand Down Expand Up @@ -749,6 +753,10 @@ func (b Broker) LastOperation(ctx context.Context, instanceID string, details do
resp.State = domain.InProgress
resp.Description = "resources are being created"

if err = b.updateState(ctx, instanceID, details.PlanID, details.ServiceID, p); err != nil {
logger.Errorw("Failed when updating the state", "err", err)
}

break
}

Expand All @@ -770,13 +778,12 @@ func (b Broker) LastOperation(ctx context.Context, instanceID string, details do
// scenarios indicate that a cluster has been successfully deleted.
case r.StatusCode == http.StatusNotFound, cluster.StateName == "DELETED":
if len(peEndpoints) != 0 {
resp.State = domain.InProgress
for _, peConnection := range peEndpoints {
if _, err := client.PrivateEndpoints.Delete(ctx, p.Project.ID, "AZURE", peConnection.ID); err != nil {
logger.Errorw("Failed to delete Private Endpoint Service from Atlas", "error", err, "connection", peConnection)
}
b.deletePrivateEndpoint(ctx, client, peProvider, peConnection, p)
}

resp.State = domain.InProgress

break
}

Expand Down

0 comments on commit 3dabe83

Please sign in to comment.