From ead4dce441c7ecbedda8b54cc15e03a87a37990a Mon Sep 17 00:00:00 2001 From: Binaek Sarkar Date: Wed, 23 Feb 2022 21:22:52 +0530 Subject: [PATCH 1/5] fix schema extraction issue for remote --- db/db_client/db_client.go | 28 +++++++++++++-------------- db/db_common/client.go | 2 +- db/db_local/local_db_client.go | 4 ++-- interactive/interactive_client.go | 32 ++----------------------------- 4 files changed, 19 insertions(+), 47 deletions(-) diff --git a/db/db_client/db_client.go b/db/db_client/db_client.go index b3562cc76e..3b97d88e97 100644 --- a/db/db_client/db_client.go +++ b/db/db_client/db_client.go @@ -5,7 +5,6 @@ import ( "database/sql" "fmt" "log" - "strings" "sync" "github.com/jackc/pgx/v4" @@ -184,13 +183,13 @@ func (c *DbClient) RefreshConnectionAndSearchPaths(ctx context.Context) *steampi return res } -func (c *DbClient) GetSchemaFromDB(ctx context.Context, schemas []string) (*schema.Metadata, error) { +func (c *DbClient) GetSchemaFromDB(ctx context.Context) (*schema.Metadata, error) { utils.LogTime("db_client.GetSchemaFromDB start") defer utils.LogTime("db_client.GetSchemaFromDB end") connection, err := c.dbClient.Conn(ctx) utils.FailOnError(err) - query := c.buildSchemasQuery(schemas) + query := c.buildSchemasQuery() tablesResult, err := connection.QueryContext(ctx, query) if err != nil { @@ -212,14 +211,15 @@ func (c *DbClient) GetSchemaFromDB(ctx context.Context, schemas []string) (*sche return metadata, nil } -func (c *DbClient) buildSchemasQuery(schemas []string) string { - schemasClause := "" - if len(schemas) > 0 { - schemasClause = fmt.Sprintf(` - cols.table_schema in ('%s') -OR`, strings.Join(schemas, "','")) - } +func (c *DbClient) buildSchemasQuery() string { query := fmt.Sprintf(` +WITH distinct_schema AS ( + SELECT DISTINCT(foreign_table_schema) + FROM + information_schema.foreign_tables + WHERE + foreign_table_schema <> 'steampipe_command' +) SELECT table_name, column_name, @@ -235,12 +235,12 @@ LEFT JOIN pg_catalog.pg_namespace nsp ON nsp.nspname = cols.table_schema LEFT JOIN pg_catalog.pg_class c ON c.relname = cols.table_name AND c.relnamespace = nsp.oid -WHERE %s +WHERE + cols.table_schema in (select * from distinct_schema) + OR LEFT(cols.table_schema,8) = 'pg_temp_' -ORDER BY - cols.table_schema, cols.table_name, cols.column_name; -`, schemasClause) +`) return query } diff --git a/db/db_common/client.go b/db/db_common/client.go index fe07aeaebb..2b42f1c69a 100644 --- a/db/db_common/client.go +++ b/db/db_common/client.go @@ -34,7 +34,7 @@ type Client interface { SetEnsureSessionDataFunc(EnsureSessionStateCallback) RefreshSessions(ctx context.Context) *AcquireSessionResult - GetSchemaFromDB(context.Context, []string) (*schema.Metadata, error) + GetSchemaFromDB(context.Context) (*schema.Metadata, error) // remote client will have empty implementation RefreshConnectionAndSearchPaths(context.Context) *steampipeconfig.RefreshConnectionResult LoadForeignSchemaNames(context.Context) error diff --git a/db/db_local/local_db_client.go b/db/db_local/local_db_client.go index 7d6b8da6be..e4abca6191 100644 --- a/db/db_local/local_db_client.go +++ b/db/db_local/local_db_client.go @@ -154,8 +154,8 @@ func (c *LocalDbClient) ContructSearchPath(ctx context.Context, requiredSearchPa return c.client.ContructSearchPath(ctx, requiredSearchPath, searchPathPrefix, currentSearchPath) } -func (c *LocalDbClient) GetSchemaFromDB(ctx context.Context, schemas []string) (*schema.Metadata, error) { - return c.client.GetSchemaFromDB(ctx, schemas) +func (c *LocalDbClient) GetSchemaFromDB(ctx context.Context) (*schema.Metadata, error) { + return c.client.GetSchemaFromDB(ctx) } func (c *LocalDbClient) LoadForeignSchemaNames(ctx context.Context) error { diff --git a/interactive/interactive_client.go b/interactive/interactive_client.go index ac72a0b703..9334a7c30d 100644 --- a/interactive/interactive_client.go +++ b/interactive/interactive_client.go @@ -25,7 +25,6 @@ import ( "github.com/turbot/steampipe/query/queryresult" "github.com/turbot/steampipe/schema" "github.com/turbot/steampipe/statushooks" - "github.com/turbot/steampipe/steampipeconfig" "github.com/turbot/steampipe/steampipeconfig/modconfig" "github.com/turbot/steampipe/utils" "github.com/turbot/steampipe/version" @@ -155,23 +154,14 @@ func (c *InteractiveClient) LoadSchema() error { utils.LogTime("db_client.LoadSchema start") defer utils.LogTime("db_client.LoadSchema end") - // build a ConnectionSchemaMap object to identify the schemas to load - // (pass nil for connection state - this forces NewConnectionSchemaMap to load it) - connectionSchemaMap, err := steampipeconfig.NewConnectionSchemaMap() - if err != nil { - return err - } - // get the unique schema - we use this to limit the schemas we load from the database - schemas := connectionSchemaMap.UniqueSchemas() // load these schemas // in a background context, since we are not running in a context - but GetSchemaFromDB needs one - metadata, err := c.client().GetSchemaFromDB(context.Background(), schemas) + metadata, err := c.client().GetSchemaFromDB(context.Background()) if err != nil { return err } - c.populateSchemaMetadata(metadata, connectionSchemaMap) - + c.schemaMetadata = metadata return nil } @@ -589,21 +579,3 @@ func (c *InteractiveClient) addControlSuggestion(control *modconfig.Control, con } return prompt.Suggest{Text: controlName, Output: controlName, Description: description} } - -func (c *InteractiveClient) populateSchemaMetadata(schemaMetadata *schema.Metadata, connectionSchemaMap steampipeconfig.ConnectionSchemaMap) error { - // we now need to add in all other schemas which have the same schemas as those we have loaded - for loadedSchema, otherSchemas := range connectionSchemaMap { - // all 'otherSchema's have the same schema as loadedSchema - exemplarSchema, ok := schemaMetadata.Schemas[loadedSchema] - if !ok { - // should can happen in the case of a dynamic plugin with no tables - use empty schema - exemplarSchema = make(map[string]schema.TableSchema) - } - - for _, s := range otherSchemas { - schemaMetadata.Schemas[s] = exemplarSchema - } - } - c.schemaMetadata = schemaMetadata - return nil -} From ca7e2b99fd592314bceb41d22ad100fb5e9bc1c2 Mon Sep 17 00:00:00 2001 From: Binaek Sarkar Date: Mon, 28 Feb 2022 19:38:42 +0530 Subject: [PATCH 2/5] optimising local schema fetch --- db/db_client/db_client.go | 5 +- db/db_local/local_db_client.go | 84 +++++++++++++++++++++++++++++++++- 2 files changed, 85 insertions(+), 4 deletions(-) diff --git a/db/db_client/db_client.go b/db/db_client/db_client.go index 3b97d88e97..727235be2c 100644 --- a/db/db_client/db_client.go +++ b/db/db_client/db_client.go @@ -3,7 +3,6 @@ package db_client import ( "context" "database/sql" - "fmt" "log" "sync" @@ -212,7 +211,7 @@ func (c *DbClient) GetSchemaFromDB(ctx context.Context) (*schema.Metadata, error } func (c *DbClient) buildSchemasQuery() string { - query := fmt.Sprintf(` + query := ` WITH distinct_schema AS ( SELECT DISTINCT(foreign_table_schema) FROM @@ -240,7 +239,7 @@ WHERE OR LEFT(cols.table_schema,8) = 'pg_temp_' -`) +` return query } diff --git a/db/db_local/local_db_client.go b/db/db_local/local_db_client.go index e4abca6191..7f49486be6 100644 --- a/db/db_local/local_db_client.go +++ b/db/db_local/local_db_client.go @@ -155,7 +155,89 @@ func (c *LocalDbClient) ContructSearchPath(ctx context.Context, requiredSearchPa } func (c *LocalDbClient) GetSchemaFromDB(ctx context.Context) (*schema.Metadata, error) { - return c.client.GetSchemaFromDB(ctx) + // build a ConnectionSchemaMap object to identify the schemas to load + // (pass nil for connection state - this forces NewConnectionSchemaMap to load it) + connectionSchemaMap, err := steampipeconfig.NewConnectionSchemaMap() + if err != nil { + return nil, err + } + // get the unique schema - we use this to limit the schemas we load from the database + schemas := connectionSchemaMap.UniqueSchemas() + query := c.buildSchemasQuery(schemas) + + acquireSessionResult := c.AcquireSession(ctx) + if acquireSessionResult.Error != nil { + acquireSessionResult.Session.Close(false) + return nil, err + } + + tablesResult, err := acquireSessionResult.Session.Connection.QueryContext(ctx, query) + if err != nil { + return nil, err + } + + metadata, err := db_common.BuildSchemaMetadata(tablesResult) + if err != nil { + acquireSessionResult.Session.Close(false) + return nil, err + } + acquireSessionResult.Session.Close(false) + + c.populateSchemaMetadata(metadata, connectionSchemaMap) + + searchPath, err := c.GetCurrentSearchPath(ctx) + if err != nil { + return nil, err + } + metadata.SearchPath = searchPath + + return metadata, nil +} + +func (c *LocalDbClient) populateSchemaMetadata(schemaMetadata *schema.Metadata, connectionSchemaMap steampipeconfig.ConnectionSchemaMap) { + // we now need to add in all other schemas which have the same schemas as those we have loaded + for loadedSchema, otherSchemas := range connectionSchemaMap { + // all 'otherSchema's have the same schema as loadedSchema + exemplarSchema, ok := schemaMetadata.Schemas[loadedSchema] + if !ok { + // should can happen in the case of a dynamic plugin with no tables - use empty schema + exemplarSchema = make(map[string]schema.TableSchema) + } + + for _, s := range otherSchemas { + schemaMetadata.Schemas[s] = exemplarSchema + } + } +} + +func (c *LocalDbClient) buildSchemasQuery(hintSchemas []string) string { + for idx, s := range hintSchemas { + hintSchemas[idx] = fmt.Sprintf("'%s'", s) + } + schemaClause := strings.Join(hintSchemas, ",") + query := fmt.Sprintf(` +SELECT + table_name, + column_name, + column_default, + is_nullable, + data_type, + table_schema, + (COALESCE(pg_catalog.col_description(c.oid, cols.ordinal_position :: int),'')) as column_comment, + (COALESCE(pg_catalog.obj_description(c.oid),'')) as table_comment +FROM + information_schema.columns cols +LEFT JOIN + pg_catalog.pg_namespace nsp ON nsp.nspname = cols.table_schema +LEFT JOIN + pg_catalog.pg_class c ON c.relname = cols.table_name AND c.relnamespace = nsp.oid +WHERE + cols.table_schema in (%s) + OR + LEFT(cols.table_schema,8) = 'pg_temp_' + +`, schemaClause) + return query } func (c *LocalDbClient) LoadForeignSchemaNames(ctx context.Context) error { From f3b95069a7417cbc198154849f79c5f2538c5631 Mon Sep 17 00:00:00 2001 From: Binaek Sarkar Date: Wed, 2 Mar 2022 17:36:16 +0530 Subject: [PATCH 3/5] review corrections --- db/db_client/db_client.go | 2 ++ db/db_local/local_db_client.go | 12 ++++++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/db/db_client/db_client.go b/db/db_client/db_client.go index 727235be2c..41ee6dce11 100644 --- a/db/db_client/db_client.go +++ b/db/db_client/db_client.go @@ -182,6 +182,8 @@ func (c *DbClient) RefreshConnectionAndSearchPaths(ctx context.Context) *steampi return res } +// GetSchemaFromDB requests for all columns of tables backed by steampipe plugins +// and creates golang struct representations from the result func (c *DbClient) GetSchemaFromDB(ctx context.Context) (*schema.Metadata, error) { utils.LogTime("db_client.GetSchemaFromDB start") defer utils.LogTime("db_client.GetSchemaFromDB end") diff --git a/db/db_local/local_db_client.go b/db/db_local/local_db_client.go index 7f49486be6..788b3cb04e 100644 --- a/db/db_local/local_db_client.go +++ b/db/db_local/local_db_client.go @@ -154,6 +154,8 @@ func (c *LocalDbClient) ContructSearchPath(ctx context.Context, requiredSearchPa return c.client.ContructSearchPath(ctx, requiredSearchPath, searchPathPrefix, currentSearchPath) } +// GetSchemaFromDB for LocalDBClient optimises the schema extraction by extracting schema +// information for connections backed by distinct plugins and then fanning back out. func (c *LocalDbClient) GetSchemaFromDB(ctx context.Context) (*schema.Metadata, error) { // build a ConnectionSchemaMap object to identify the schemas to load // (pass nil for connection state - this forces NewConnectionSchemaMap to load it) @@ -194,6 +196,8 @@ func (c *LocalDbClient) GetSchemaFromDB(ctx context.Context) (*schema.Metadata, return metadata, nil } +// update schemaMetadata to add in all other schemas which have the same schemas as those we have loaded +// NOTE: this mutates schemaMetadata func (c *LocalDbClient) populateSchemaMetadata(schemaMetadata *schema.Metadata, connectionSchemaMap steampipeconfig.ConnectionSchemaMap) { // we now need to add in all other schemas which have the same schemas as those we have loaded for loadedSchema, otherSchemas := range connectionSchemaMap { @@ -210,11 +214,11 @@ func (c *LocalDbClient) populateSchemaMetadata(schemaMetadata *schema.Metadata, } } -func (c *LocalDbClient) buildSchemasQuery(hintSchemas []string) string { - for idx, s := range hintSchemas { - hintSchemas[idx] = fmt.Sprintf("'%s'", s) +func (c *LocalDbClient) buildSchemasQuery(schemas []string) string { + for idx, s := range schemas { + schemas[idx] = fmt.Sprintf("'%s'", s) } - schemaClause := strings.Join(hintSchemas, ",") + schemaClause := strings.Join(schemas, ",") query := fmt.Sprintf(` SELECT table_name, From 00e328c481f2045d9bb0b9a97e98c5fc74ea45d0 Mon Sep 17 00:00:00 2001 From: Binaek Sarkar Date: Wed, 2 Mar 2022 17:37:00 +0530 Subject: [PATCH 4/5] rename --- db/db_local/local_db_client.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/db/db_local/local_db_client.go b/db/db_local/local_db_client.go index 788b3cb04e..a96df346e7 100644 --- a/db/db_local/local_db_client.go +++ b/db/db_local/local_db_client.go @@ -214,11 +214,11 @@ func (c *LocalDbClient) populateSchemaMetadata(schemaMetadata *schema.Metadata, } } -func (c *LocalDbClient) buildSchemasQuery(schemas []string) string { - for idx, s := range schemas { - schemas[idx] = fmt.Sprintf("'%s'", s) +func (c *LocalDbClient) buildSchemasQuery(exemplarSchemas []string) string { + for idx, s := range exemplarSchemas { + exemplarSchemas[idx] = fmt.Sprintf("'%s'", s) } - schemaClause := strings.Join(schemas, ",") + schemaClause := strings.Join(exemplarSchemas, ",") query := fmt.Sprintf(` SELECT table_name, From 23cad58230719007066b58b76f798a55aad3cd8c Mon Sep 17 00:00:00 2001 From: Binaek Sarkar Date: Mon, 7 Mar 2022 20:00:36 +0530 Subject: [PATCH 5/5] review corrections --- db/db_local/local_db_client.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/db/db_local/local_db_client.go b/db/db_local/local_db_client.go index a96df346e7..9a6cf654f7 100644 --- a/db/db_local/local_db_client.go +++ b/db/db_local/local_db_client.go @@ -197,7 +197,7 @@ func (c *LocalDbClient) GetSchemaFromDB(ctx context.Context) (*schema.Metadata, } // update schemaMetadata to add in all other schemas which have the same schemas as those we have loaded -// NOTE: this mutates schemaMetadata +// NOTE: this mutates schemaMetadata func (c *LocalDbClient) populateSchemaMetadata(schemaMetadata *schema.Metadata, connectionSchemaMap steampipeconfig.ConnectionSchemaMap) { // we now need to add in all other schemas which have the same schemas as those we have loaded for loadedSchema, otherSchemas := range connectionSchemaMap { @@ -214,11 +214,11 @@ func (c *LocalDbClient) populateSchemaMetadata(schemaMetadata *schema.Metadata, } } -func (c *LocalDbClient) buildSchemasQuery(exemplarSchemas []string) string { - for idx, s := range exemplarSchemas { - exemplarSchemas[idx] = fmt.Sprintf("'%s'", s) +func (c *LocalDbClient) buildSchemasQuery(schemas []string) string { + for idx, s := range schemas { + schemas[idx] = fmt.Sprintf("'%s'", s) } - schemaClause := strings.Join(exemplarSchemas, ",") + schemaClause := strings.Join(schemas, ",") query := fmt.Sprintf(` SELECT table_name,