Skip to content

Commit

Permalink
Fix excessive memory usage when starting with a high number of connec…
Browse files Browse the repository at this point in the history
…tions. Closes #1656
  • Loading branch information
kaidaguerre authored Mar 15, 2022
1 parent 7a8b3ca commit b6b17a0
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 81 deletions.
160 changes: 87 additions & 73 deletions db/db_local/local_db_client_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package db_local

import (
"context"
"database/sql"
"fmt"
"log"
"strings"

"github.com/spf13/viper"
"github.com/turbot/steampipe/constants"

"github.com/turbot/steampipe/db/db_common"
"github.com/turbot/steampipe/pluginmanager"
"github.com/turbot/steampipe/steampipeconfig"
Expand Down Expand Up @@ -42,49 +42,45 @@ func (c *LocalDbClient) refreshConnections(ctx context.Context) *steampipeconfig
return res
}

if !connectionUpdates.HasUpdates() {
log.Println("[TRACE] RefreshConnections: no updates required")
return res
}

// now build list of necessary queries to perform the update
connectionQueries, otherRes := c.buildConnectionUpdateQueries(connectionUpdates)
otherRes := c.executeConnectionUpdateQueries(ctx, connectionUpdates)
// merge results into local results
res.Merge(otherRes)
if res.Error != nil {
return res
}

log.Printf("[TRACE] refreshConnections, %d connection update %s\n", len(connectionQueries), utils.Pluralize("query", len(connectionQueries)))

// if there are no connection queries, we are done
if len(connectionQueries) == 0 {
return res
}

// so there ARE connections to update
// execute the connection queries
if err := executeConnectionQueries(ctx, connectionQueries); err != nil {
res.Error = err
return res
}

// now serialise the connection state
// update required connections with the schema mode from the connection state and schema hash from the hash map
if err := steampipeconfig.SaveConnectionState(connectionUpdates.RequiredConnectionState); err != nil {
res.Error = err
return res
}
// reload the database foreign schema names, since they have changed
// this is to ensuire search paths are correctly updated
// this is to ensure search paths are correctly updated
log.Println("[TRACE] RefreshConnections: reloading foreign schema names")
c.LoadForeignSchemaNames(ctx)

res.UpdatedConnections = true
return res
}

func (c *LocalDbClient) buildConnectionUpdateQueries(connectionUpdates *steampipeconfig.ConnectionUpdates) ([]string, *steampipeconfig.RefreshConnectionResult) {
var connectionQueries []string
func (c *LocalDbClient) executeConnectionUpdateQueries(ctx context.Context, connectionUpdates *steampipeconfig.ConnectionUpdates) *steampipeconfig.RefreshConnectionResult {
res := &steampipeconfig.RefreshConnectionResult{}
numUpdates := len(connectionUpdates.Update)
rootClient, err := createLocalDbClient(ctx, &CreateDbOptions{Username: constants.DatabaseSuperUser})
if err != nil {
res.Error = err
return res
}
defer rootClient.Close()

log.Printf("[TRACE] buildConnectionUpdateQueries: num updates %d", numUpdates)
numUpdates := len(connectionUpdates.Update)
log.Printf("[TRACE] executeConnectionUpdateQueries: num updates %d", numUpdates)

if numUpdates > 0 {
// find any plugins which use a newer sdk version than steampipe.
Expand All @@ -94,50 +90,96 @@ func (c *LocalDbClient) buildConnectionUpdateQueries(connectionUpdates *steampip
}

// get schema queries - this updates schemas for validated plugins and drops schemas for unvalidated plugins
connectionQueries = getSchemaQueries(validatedUpdates, validationFailures)
if viper.GetBool(constants.ArgSchemaComments) {
// add comments queries for validated connections
connectionQueries = append(connectionQueries, getCommentQueries(validatedPlugins)...)
err := executeUpdateQueries(ctx, rootClient, validationFailures, validatedUpdates, validatedPlugins)
if err != nil {
log.Printf("[TRACE] executeUpdateQueries returned error: %v", err)
res.Error = err
return res
}

}

for c := range connectionUpdates.Delete {
log.Printf("[TRACE] delete connection %s\n ", c)
connectionQueries = append(connectionQueries, deleteConnectionQuery(c)...)
query := getDeleteConnectionQuery(c)
_, err := rootClient.ExecContext(ctx, query)
if err != nil {
res.Error = err
return res
}
}
return connectionQueries, res

return res
}

func getSchemaQueries(updates steampipeconfig.ConnectionDataMap, failures []*steampipeconfig.ValidationFailure) []string {
var schemaQueries []string
func executeUpdateQueries(ctx context.Context, rootClient *sql.DB, failures []*steampipeconfig.ValidationFailure, updates steampipeconfig.ConnectionDataMap, validatedPlugins []*steampipeconfig.ConnectionPlugin) error {
idx := 0
numUpdates := len(updates)
for connectionName, connectionData := range updates {
log.Printf("[TRACE] executing update query %d of %d for connection '%s'", idx, numUpdates, connectionName)
remoteSchema := pluginmanager.PluginFQNToSchemaName(connectionData.Plugin)
queries := updateConnectionQuery(connectionName, remoteSchema)
schemaQueries = append(schemaQueries, queries...)

query := getUpdateConnectionQuery(connectionName, remoteSchema)
_, err := rootClient.ExecContext(ctx, query)
if err != nil {
return err
}
idx++
}

for _, failure := range failures {
log.Printf("[TRACE] remove schema for conneciton failing validation connection %s, plugin Name %s\n ", failure.ConnectionName, failure.Plugin)
log.Printf("[TRACE] remove schema for connection failing validation connection %s, plugin Name %s\n ", failure.ConnectionName, failure.Plugin)
if failure.ShouldDropIfExists {
schemaQueries = append(schemaQueries, deleteConnectionQuery(failure.ConnectionName)...)
query := getDeleteConnectionQuery(failure.ConnectionName)
_, err := rootClient.ExecContext(ctx, query)
if err != nil {
return err
}
}
}

return schemaQueries
if viper.GetBool(constants.ArgSchemaComments) {
// add comments queries for validated connections
query := getCommentsQuery(validatedPlugins)
_, err := rootClient.ExecContext(ctx, query)
if err != nil {
return err
}
}
log.Printf("[TRACE] executeUpdateQueries complete")
return nil
}

func getCommentQueries(plugins []*steampipeconfig.ConnectionPlugin) []string {
func getCommentsQuery(plugins []*steampipeconfig.ConnectionPlugin) string {
var commentQueries []string
for _, plugin := range plugins {
commentQueries = append(commentQueries, commentsQuery(plugin)...)
commentQueries = append(commentQueries, getCommentsQueryForPlugin(plugin)...)
}
return strings.Join(commentQueries, ";\n")
}

func getCommentsQueryForPlugin(p *steampipeconfig.ConnectionPlugin) []string {
var statements []string
for t, schema := range p.Schema.Schema {
table := db_common.PgEscapeName(t)
schemaName := db_common.PgEscapeName(p.ConnectionName)
if schema.Description != "" {
tableDescription := db_common.PgEscapeString(schema.Description)
statements = append(statements, fmt.Sprintf("COMMENT ON FOREIGN TABLE %s.%s is %s;", schemaName, table, tableDescription))
}
for _, c := range schema.Columns {
if c.Description != "" {
column := db_common.PgEscapeName(c.Name)
columnDescription := db_common.PgEscapeString(c.Description)
statements = append(statements, fmt.Sprintf("COMMENT ON COLUMN %s.%s.%s is %s;", schemaName, table, column, columnDescription))
}
}
}
return commentQueries
return statements
}

func updateConnectionQuery(localSchema, remoteSchema string) []string {
func getUpdateConnectionQuery(localSchema, remoteSchema string) string {
// escape the name
localSchema = db_common.PgEscapeName(localSchema)
return []string{
queries := []string{

// Each connection has a unique schema. The schema, and all objects inside it,
// are owned by the root user.
Expand All @@ -162,40 +204,12 @@ func updateConnectionQuery(localSchema, remoteSchema string) []string {
// Import the foreign schema into this connection.
fmt.Sprintf(`import foreign schema "%s" from server steampipe into %s;`, remoteSchema, localSchema),
}
return strings.Join(queries, ";\n")
}

func commentsQuery(p *steampipeconfig.ConnectionPlugin) []string {
var statements []string
for t, schema := range p.Schema.Schema {
table := db_common.PgEscapeName(t)
schemaName := db_common.PgEscapeName(p.ConnectionName)
if schema.Description != "" {
tableDescription := db_common.PgEscapeString(schema.Description)
statements = append(statements, fmt.Sprintf("COMMENT ON FOREIGN TABLE %s.%s is %s;", schemaName, table, tableDescription))
}
for _, c := range schema.Columns {
if c.Description != "" {
column := db_common.PgEscapeName(c.Name)
columnDescription := db_common.PgEscapeString(c.Description)
statements = append(statements, fmt.Sprintf("COMMENT ON COLUMN %s.%s.%s is %s;", schemaName, table, column, columnDescription))
}
}
}
return statements
}

func deleteConnectionQuery(name string) []string {
return []string{
func getDeleteConnectionQuery(name string) string {
queries := []string{
fmt.Sprintf(`DROP SCHEMA IF EXISTS %s CASCADE;`, db_common.PgEscapeName(name)),
}
}

func executeConnectionQueries(ctx context.Context, schemaQueries []string) error {
log.Printf("[TRACE] there are connections to update\n")
_, err := executeSqlAsRoot(ctx, schemaQueries...)
if err != nil {
return err
}

return nil
return strings.Join(queries, ";\n")
}
8 changes: 4 additions & 4 deletions db/db_local/start_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,11 +512,11 @@ func ensureSteampipeServer(ctx context.Context, rootClient *sql.DB) error {

// create the command schema and grant insert permission
func ensureCommandSchema(ctx context.Context, rootClient *sql.DB) error {
commandSchemaStatements := updateConnectionQuery(constants.CommandSchema, constants.CommandSchema)
commandSchemaStatements = append(
commandSchemaStatements,
commandSchemaStatements := []string{
getUpdateConnectionQuery(constants.CommandSchema, constants.CommandSchema),
fmt.Sprintf("grant insert on %s.%s to steampipe_users;", constants.CommandSchema, constants.CacheCommandTable),
)
}

for _, statement := range commandSchemaStatements {
if _, err := rootClient.ExecContext(ctx, statement); err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions db/db_local/stop_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func ShutdownService(ctx context.Context, invoker constants.Invoker) {
if count > 0 {
// there are other clients connected to the database
// we can't stop the DB.
log.Printf("[WARN] ShutdownService not closing database service - %d %s connected", count, utils.Pluralize("client", count))
return
}

Expand Down
13 changes: 10 additions & 3 deletions interactive/interactive_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,9 +589,16 @@ func (c *InteractiveClient) startCancelHandler() chan bool {
select {
case <-sigIntChannel:
log.Println("[TRACE] got SIGINT")
// call context cancellation function
c.cancelActiveQueryIfAny()
// keep waiting for further cancellations
// if initialisation is not complete, just close the prompt
// this will cancel the context used for initialisation so cancel any initialisation queries
if !c.isInitialised() {
c.ClosePrompt(AfterPromptCloseExit)
return
} else {
// otherwise call cancelActiveQueryIfAny which the for the active query, if there is one
c.cancelActiveQueryIfAny()
// keep waiting for further cancellations
}
case <-quitChannel:
log.Println("[TRACE] cancel handler exiting")
c.cancelActiveQueryIfAny()
Expand Down
6 changes: 5 additions & 1 deletion steampipeconfig/connection_updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (u *ConnectionUpdates) updateRequiredStateWithSchemaProperties(schemaHashMa
func (u *ConnectionUpdates) populateConnectionPlugins(alreadyCreatedConnectionPlugins map[string]*ConnectionPlugin) *RefreshConnectionResult {
// get list of connections to update:
// - exclude connections already created
// - for any aggregator connections, instantiate the first child conneciotn instead
// - for any aggregator connections, instantiate the first child connection instead
connectionsToCreate := u.getConnectionsToCreate(alreadyCreatedConnectionPlugins)
// now create them
connectionPlugins, res := CreateConnectionPlugins(connectionsToCreate, &CreateConnectionPluginOptions{SetConnectionConfig: true})
Expand Down Expand Up @@ -183,6 +183,10 @@ func (u *ConnectionUpdates) getConnectionsToCreate(alreadyCreatedConnectionPlugi
return res
}

func (u *ConnectionUpdates) HasUpdates() bool {
return len(u.Update)+len(u.Delete) > 0
}

func getSchemaHashesForDynamicSchemas(requiredConnectionData ConnectionDataMap, connectionState ConnectionDataMap) (map[string]string, map[string]*ConnectionPlugin, error) {
log.Printf("[TRACE] getSchemaHashesForDynamicSchemas")
// for every required connection, check the connection state to determine whether the schema mode is 'dynamic'
Expand Down

0 comments on commit b6b17a0

Please sign in to comment.