Skip to content

Commit

Permalink
Fix aggregator connections being dropped intermittently when refreshi…
Browse files Browse the repository at this point in the history
…ng connections. Closes #3664
  • Loading branch information
kaidaguerre committed Jul 11, 2023
1 parent 457da34 commit 6e6d12b
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 34 deletions.
5 changes: 0 additions & 5 deletions pkg/steampipeconfig/connection_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,6 @@ func (p ConnectionPlugin) addConnection(name string, config string, connectionOp
}
}

func (p ConnectionPlugin) IncludesConnection(name string) bool {
_, ok := p.ConnectionMap[name]
return ok
}

// GetSchema returns the cached schema if it is static, or if it is dynamic, refetch it
func (p ConnectionPlugin) GetSchema(connectionName string) (*sdkproto.Schema, error) {
connectionData, ok := p.ConnectionMap[connectionName]
Expand Down
12 changes: 6 additions & 6 deletions pkg/steampipeconfig/connection_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func populateConnectionUpdates(ctx context.Context, pool *pgxpool.Pool, forceUpd
}
defer conn.Release()

log.Printf("[TRACE] Loading connection state")
log.Printf("[INFO] Loading connection state")
// load the connection state file and filter out any connections which are not in the list of schemas
// this allows for the database being rebuilt,modified externally
currentConnectionStateMap, err := LoadConnectionState(ctx, conn.Conn())
Expand All @@ -77,7 +77,7 @@ func populateConnectionUpdates(ctx context.Context, pool *pgxpool.Pool, forceUpd
log.Printf("[WARN] failed to build required connection state: %s", err.Error())
return nil, NewErrorRefreshConnectionResult(err)
}
log.Printf("[TRACE] built required connection state")
log.Printf("[INFO] built required connection state")

// build lookup of disabled connections
disabled := make(map[string]struct{})
Expand All @@ -100,7 +100,7 @@ func populateConnectionUpdates(ctx context.Context, pool *pgxpool.Pool, forceUpd
log.Printf("[INFO] loaded connection state")
updates.CurrentConnectionState = currentConnectionStateMap

log.Printf("[INFO] Loading dynamic schema hashes")
log.Printf("[INFO] loading dynamic schema hashes")

// for any connections with dynamic schema, we need to reload their schema
// instantiate connection plugins for all connections with dynamic schema - this will retrieve their current schema
Expand All @@ -115,22 +115,22 @@ func populateConnectionUpdates(ctx context.Context, pool *pgxpool.Pool, forceUpd
for k, v := range dynamicSchemaHashMap {
log.Printf("[INFO] %s: %s", k, v)
}
log.Printf("[INFO] Identify connections to update")
log.Printf("[INFO] identify connections to update")

modTime := time.Now()
// connections to create/update
for name, requiredConnectionState := range requiredConnectionStateMap {
// if the connection requires update, add to list
if connectionRequiresUpdate(forceUpdateConnectionNames, name, currentConnectionStateMap, requiredConnectionState) {
log.Printf("[TRACE] connection %s is out of date or missing", name)
updates.Update[name] = requiredConnectionState
log.Printf("[INFO] connection %s is out of date or missing. updates: %v", name, maps.Keys(updates.Update))

// set the connection mod time of required connection data to now
requiredConnectionState.ConnectionModTime = modTime
}
}

log.Printf("[TRACE] Identify connections to delete")
log.Printf("[INFO] Identify connections to delete")
// connections to delete - any connection which is in connection state but NOT required connections
for name, currentState := range currentConnectionStateMap {
if _, connectionRequired := requiredConnectionStateMap[name]; !connectionRequired {
Expand Down
37 changes: 14 additions & 23 deletions pkg/steampipeconfig/connection_updates_validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package steampipeconfig

import (
"fmt"
"log"
"strings"

sdkversion "github.com/turbot/steampipe-plugin-sdk/v5/version"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"github.com/turbot/steampipe/pkg/utils"
)

Expand All @@ -17,6 +17,7 @@ func (u *ConnectionUpdates) validate() {
}

func (u *ConnectionUpdates) validatePluginsAndConnections() {
// TODO should plugin manager do this when starting the plugin???
var validatedPlugins = make(map[string]*ConnectionPlugin)

for connectionName, connectionPlugin := range u.ConnectionPlugins {
Expand All @@ -40,15 +41,22 @@ func (u *ConnectionUpdates) validateUpdates() {
// ConnectionPlugins has now been validated and only contains valid connection plugins
// for every update and comment update, confirm the connection plugin is valid
for connectionName, connectionState := range u.Update {
if connectionState.GetType() == modconfig.ConnectionTypeAggregator {
if u.validateAggregator(connectionState) {
validatedUpdates[connectionName] = connectionState
}
} else if _, ok := u.ConnectionPlugins[connectionName]; ok {
if _, ok := u.ConnectionPlugins[connectionName]; ok {
// if this connection has a validated connection plugin, add to valdiated updates
validatedUpdates[connectionName] = connectionState
} else {
// try to get the validation failure - should be in InvalidConnections
validationFailure, ok := u.InvalidConnections[connectionName]
if ok {
log.Printf("[WARN] validateUpdates - connection update %s failed validation as the connection failed validation: %s", connectionName, validationFailure.Message)
} else {
// not expected
// for some reason there was no validation failure in the map
log.Printf("[WARN] validateUpdates - connection update %s failed validation (connection not found in validated ConnectionPlugins but InvalidConnections does not contain the connection - this is unexpected)", connectionName)
}
}
}

for connectionName, connectionState := range u.MissingComments {
// if this connection has a validated connection plugin, add to validated comment updates
if _, ok := u.ConnectionPlugins[connectionName]; ok {
Expand All @@ -61,23 +69,6 @@ func (u *ConnectionUpdates) validateUpdates() {
u.MissingComments = validatedCommentUpdates
}

func (u *ConnectionUpdates) validateAggregator(connectionState *ConnectionState) bool {
connectionName := connectionState.ConnectionName
if connectionState.GetType() == modconfig.ConnectionTypeAggregator {
// get the connection object
connection := GlobalConfig.Connections[connectionName]
// get the first child connection
for _, childConnection := range connection.Connections {
// check whether the plugin for this connection is validated
for _, p := range u.ConnectionPlugins {
return p.IncludesConnection(childConnection.Name)
}
}
}
// treat empty aggregator as validated - we will create a schema for it but not allow querying
return true
}

func validateConnectionName(connectionName string, p *ConnectionPlugin) *ValidationFailure {
if err := ValidateConnectionName(connectionName); err != nil {
return &ValidationFailure{
Expand Down

0 comments on commit 6e6d12b

Please sign in to comment.