Skip to content
This repository has been archived by the owner on Feb 11, 2022. It is now read-only.

Move PE sync into LastOperation #123

Merged
merged 4 commits into from
Sep 9, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 116 additions & 109 deletions pkg/broker/instance_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,14 @@ func (b *Broker) createOrUpdateResources(ctx context.Context, client *mongodbatl
}
}

if err := b.syncPrivateEndpoints(ctx, client, newPlan, oldPlan); err != nil {
return errors.Wrap(err, "cannot sync Private Endpoints")
if err := b.removeOldPrivateEndpoints(ctx, client, newPlan, oldPlan); err != nil {
return errors.Wrap(err, "failed to remove old Private Endpoints")
}

return nil
}

func (b *Broker) syncPrivateEndpoints(ctx context.Context, client *mongodbatlas.Client, newPlan *dynamicplans.Plan, oldPlan *dynamicplans.Plan) error {
func (b *Broker) removeOldPrivateEndpoints(ctx context.Context, client *mongodbatlas.Client, newPlan *dynamicplans.Plan, oldPlan *dynamicplans.Plan) error {
logger := b.funcLogger()

peProvider := "AZURE" // this is hardcoded cause only one provider is supported for now
Expand All @@ -235,78 +235,37 @@ func (b *Broker) syncPrivateEndpoints(ctx context.Context, client *mongodbatlas.
}
atlasPrivateEndpoints = b.populateConnections(atlasPrivateEndpoints)

planPrivateEndpoints := make(map[string]struct{})
for _, endpoint := range newPlan.PrivateEndpoints {
if endpoint.ID != "" {
// endpoint service already created
planPrivateEndpoints[endpoint.ID] = struct{}{}

continue
}

logger.Debugw("Atlas PEs", "endpoints", atlasPrivateEndpoints)

if connID, exists := b.privateEndpointExists(endpoint.Provider, endpoint.EndpointName, atlasPrivateEndpoints); exists {
planPrivateEndpoints[connID] = struct{}{}
endpoint.ID = connID

continue
}

logger.Debugw("start creation process for PrivateEndpoints", "project ID", oldPlan.Project.ID)
conn, _, err := client.PrivateEndpoints.Create(ctx, oldPlan.Project.ID, &mongodbatlas.PrivateEndpointConnection{
ProviderName: endpoint.Provider,
Region: endpoint.Region,
})
if err != nil {
return errors.Wrap(err, "cannot create Private Endpoint Service")
} else {
endpoint.ID = conn.ID
planPrivateEndpoints[endpoint.ID] = struct{}{}
}
}

logger.Debugw("Private Endpoints from the plan", "PE names", planPrivateEndpoints)

for _, peConnection := range atlasPrivateEndpoints {
// delete all PE endpoints which are not in the plan
if _, isPlanned := planPrivateEndpoints[peConnection.ID]; !isPlanned {
if !privateEndpointInPlan(peConnection.ProviderName, peConnection.EndpointServiceName, newPlan) {
logger.Debugw("Deleting Private Endpoint", "connection", peConnection)

for _, endpoint := range oldPlan.PrivateEndpoints {
if endpoint.EndpointName == peConnection.EndpointServiceName && endpoint.Provider == peConnection.ProviderName {
if _, err := privateendpoint.Delete(ctx, endpoint); err != nil {
logger.Errorw("Failed to delete Private Endpoint from Azure", "error", err, "endpoint", endpoint.EndpointName)
}
}
}

for _, peID := range peConnection.PrivateEndpoints {
if _, err := client.PrivateEndpoints.DeleteOnePrivateEndpoint(ctx, oldPlan.Project.ID, peProvider, peConnection.ID, peID); err != nil {
logger.Errorw("Failed to delete Private Endpoint from Atlas", "error", err, "pe", peID)
}
}

if _, err := client.PrivateEndpoints.Delete(ctx, oldPlan.Project.ID, peProvider, peConnection.ID); err != nil {
logger.Errorw("Failed to delete Private Endpoint Service from Atlas", "error", err, "pe", peConnection)
}
b.deletePrivateEndpoint(ctx, client, peProvider, peConnection, oldPlan)
}
}

return nil
}

func (b *Broker) privateEndpointExists(provider string, name string, connections []mongodbatlas.PrivateEndpointConnection) (string, bool) {
func (b Broker) deletePrivateEndpoint(ctx context.Context, client *mongodbatlas.Client, peProvider string, peConnection mongodbatlas.PrivateEndpointConnection, plan *dynamicplans.Plan) {
logger := b.funcLogger()

for _, peConnection := range connections {
logger.Infow("privateEndpointExists func", "conn", peConnection)
if peConnection.ProviderName == provider && peConnection.EndpointServiceName == name {
return peConnection.ID, true
for _, endpoint := range plan.PrivateEndpoints {
if endpoint.EndpointName == peConnection.EndpointServiceName && endpoint.Provider == peConnection.ProviderName {
if _, err := privateendpoint.Delete(ctx, endpoint); err != nil {
logger.Errorw("Failed to delete Private Endpoint from Azure", "error", err, "endpoint", endpoint.EndpointName)
}
}
}

for _, peID := range peConnection.PrivateEndpoints {
if _, err := client.PrivateEndpoints.DeleteOnePrivateEndpoint(ctx, plan.Project.ID, peProvider, peConnection.ID, peID); err != nil {
logger.Errorw("Failed to delete Private Endpoint from Atlas", "error", err, "pe", peID)
}
}

return "", false
if _, err := client.PrivateEndpoints.Delete(ctx, plan.Project.ID, peProvider, peConnection.ID); err != nil {
logger.Errorw("Failed to delete Private Endpoint Service from Atlas", "error", err, "pe", peConnection)
}
}

func (b *Broker) populateConnections(connections []mongodbatlas.PrivateEndpointConnection) []mongodbatlas.PrivateEndpointConnection {
Expand All @@ -332,14 +291,37 @@ func (b *Broker) populateConnections(connections []mongodbatlas.PrivateEndpointC
return connections
}

func privateEndpointInPlan(provider string, name string, plan *dynamicplans.Plan) bool {
for _, endpoint := range plan.PrivateEndpoints {
if endpoint.Provider == provider && endpoint.EndpointName == name {
return true
}
}

return false
}

// TODO: this retry logic is clunky, come up with something better?
func (b *Broker) postCreateResources(ctx context.Context, client *mongodbatlas.Client, dp *dynamicplans.Plan) (retry bool, err error) {
logger := b.funcLogger()

logger.Debugw("Setup PrivateEndpoints", "PrivateEndpoints", dp.PrivateEndpoints)
for _, endpoint := range dp.PrivateEndpoints {
for peIdx, endpoint := range dp.PrivateEndpoints {
if endpoint.ID == "" {
continue
conn, _, err := client.PrivateEndpoints.Create(ctx, dp.Project.ID, &mongodbatlas.PrivateEndpointConnection{
ProviderName: endpoint.Provider,
Region: endpoint.Region,
})
if err != nil {
logger.Warnw("cannot create Private Endpoint Service", "err", err)

return false, nil
}

dp.PrivateEndpoints[peIdx].ID = conn.ID
logger.Debugw("Creating new Private Endpoint", "endpoint", endpoint)

return true, nil
}

atlasService, _, err := client.PrivateEndpoints.Get(ctx, dp.Project.ID, endpoint.Provider, endpoint.ID)
Expand Down Expand Up @@ -524,21 +506,39 @@ func (b Broker) Update(ctx context.Context, instanceID string, details domain.Up
oldPlan.Settings = newPlan.Settings
oldPlan.Cluster = resultingCluster
oldPlan.IPAccessLists = newPlan.IPAccessLists
oldPlan.PrivateEndpoints = newPlan.PrivateEndpoints
oldPlan.PrivateEndpoints = b.mergePrivateEndpoints(oldPlan, newPlan)

logger.Debugw("Resulting plan to be saved", "plan", oldPlan)

if err = b.updateState(ctx, instanceID, details.PlanID, details.ServiceID, oldPlan); err != nil {
logger.Errorw("Failed when updating the state", "err", err)
}

logger.Infow("Successfully started Atlas cluster update process", "cluster", resultingCluster)

planEnc, err := encodePlan(*oldPlan)
return domain.UpdateServiceSpec{
IsAsync: true,
OperationData: operationUpdate,
DashboardURL: b.GetDashboardURL(oldPlan.Project.ID, resultingCluster.Name),
}, nil
}

func (b Broker) updateState(ctx context.Context, instanceID string, planID string, serviceID string, p *dynamicplans.Plan) (err error) {
logger := b.funcLogger().With("instance_id", instanceID)

planEnc, err := encodePlan(*p)
if err != nil {
return
}

s := domain.GetInstanceDetailsSpec{
PlanID: details.PlanID,
ServiceID: details.ServiceID,
DashboardURL: b.GetDashboardURL(oldPlan.Project.ID, oldPlan.Cluster.Name),
PlanID: planID,
ServiceID: serviceID,
DashboardURL: b.GetDashboardURL(p.Project.ID, p.Cluster.Name),
Parameters: planEnc,
}

state, err := b.getState(ctx, oldPlan.Project.OrgID)
state, err := b.getState(ctx, p.Project.OrgID)
if err != nil {
return
}
Expand All @@ -559,13 +559,35 @@ func (b Broker) Update(ctx context.Context, instanceID string, details domain.Up
}

logger.Infow("Inserted into state", "obj", obj)
logger.Infow("Successfully started Atlas cluster update process", "cluster", resultingCluster)

return domain.UpdateServiceSpec{
IsAsync: true,
OperationData: operationUpdate,
DashboardURL: b.GetDashboardURL(oldPlan.Project.ID, resultingCluster.Name),
}, nil
return
}

func (b Broker) mergePrivateEndpoints(oldPlan, newPlan *dynamicplans.Plan) privateendpoint.PrivateEndpoints {
logger := b.funcLogger()

newListOfEndpoints := privateendpoint.PrivateEndpoints{}
for _, newPlanPE := range newPlan.PrivateEndpoints {
matchedPE := matchPlanPE(newPlanPE, oldPlan)
if matchedPE == nil {
matchedPE = newPlanPE
}

logger.Debugw("Appending Private Endpoint to the merged", "PE", matchedPE)
newListOfEndpoints = append(newListOfEndpoints, newPlanPE)
}

return newListOfEndpoints
}

func matchPlanPE(pe *privateendpoint.PrivateEndpoint, plan *dynamicplans.Plan) *privateendpoint.PrivateEndpoint {
for _, pe2 := range plan.PrivateEndpoints {
if pe.Provider == pe2.Provider && pe.Region == pe2.Region && pe.EndpointName == pe2.EndpointName {
return pe2
}
}

return nil
}

// Deprovision will destroy an Atlas cluster asynchronously.
Expand All @@ -585,25 +607,14 @@ func (b Broker) Deprovision(ctx context.Context, instanceID string, details doma
return
}

for _, peService := range p.PrivateEndpoints {
if _, err := privateendpoint.Delete(ctx, peService); err != nil {
logger.Errorw("Failed to delete Private Endpoint from Azure", "error", err, "peService", peService)
}

if peService.ID == "" {
continue
}

conn, _, err := client.PrivateEndpoints.Get(ctx, p.Project.ID, peService.Provider, peService.ID)
if err != nil {
logger.Errorw("Failed to fetch Private Endpoint Service Connection from Atlas", "error", err, "peService", peService)
}
peProvider := "AZURE"
peEndpoints, _, err := client.PrivateEndpoints.List(ctx, p.Project.ID, peProvider, nil)
if err != nil {
logger.Errorw("cannot get Private Endpoints from Atlas", "err", err)
}

for _, peID := range conn.PrivateEndpoints {
if _, err := client.PrivateEndpoints.DeleteOnePrivateEndpoint(ctx, p.Project.ID, peService.Provider, peService.ID, peID); err != nil {
logger.Errorw("Failed to delete Private Endpoint from Atlas", "error", err, "pe", peID)
}
}
for _, peConnection := range peEndpoints {
b.deletePrivateEndpoint(ctx, client, peProvider, peConnection, p)
}

_, err = client.Clusters.Delete(ctx, p.Project.ID, p.Cluster.Name)
Expand Down Expand Up @@ -698,22 +709,15 @@ func (b Broker) LastOperation(ctx context.Context, instanceID string, details do

return
}
err = nil
}

logger.Infow("Found existing cluster", "cluster", cluster)

peEndpoints := []*mongodbatlas.PrivateEndpointConnection{}
for _, pe := range p.PrivateEndpoints {
if pe.ID == "" {
continue
}
peConnection, _, err := client.PrivateEndpoints.Get(ctx, p.Project.ID, pe.Provider, pe.ID)
if err != nil {
logger.Errorw("Failed to fetch Private Endpoint Service Connection from Atlas", "error", err)

continue
}
peEndpoints = append(peEndpoints, peConnection)
peProvider := "AZURE"
peEndpoints, _, err := client.PrivateEndpoints.List(ctx, p.Project.ID, peProvider, nil)
if err != nil {
logger.Errorw("cannot get Private Endpoints from Atlas", "err", err)
}

logger.Infow("Found existing Private Endpoints", "endpoints", peEndpoints)
Expand Down Expand Up @@ -749,6 +753,10 @@ func (b Broker) LastOperation(ctx context.Context, instanceID string, details do
resp.State = domain.InProgress
resp.Description = "resources are being created"

if err = b.updateState(ctx, instanceID, details.PlanID, details.ServiceID, p); err != nil {
logger.Errorw("Failed when updating the state", "err", err)
}

break
}

Expand All @@ -770,13 +778,12 @@ func (b Broker) LastOperation(ctx context.Context, instanceID string, details do
// scenarios indicate that a cluster has been successfully deleted.
case r.StatusCode == http.StatusNotFound, cluster.StateName == "DELETED":
if len(peEndpoints) != 0 {
resp.State = domain.InProgress
for _, peConnection := range peEndpoints {
if _, err := client.PrivateEndpoints.Delete(ctx, p.Project.ID, "AZURE", peConnection.ID); err != nil {
logger.Errorw("Failed to delete Private Endpoint Service from Atlas", "error", err, "connection", peConnection)
}
b.deletePrivateEndpoint(ctx, client, peProvider, peConnection, p)
}

resp.State = domain.InProgress

break
}

Expand Down