Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue where schema extraction is broken for remote services. Closes #1497 #1502

Merged
merged 5 commits into from
Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 15 additions & 16 deletions db/db_client/db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package db_client
import (
"context"
"database/sql"
"fmt"
"log"
"strings"
"sync"

"github.com/jackc/pgx/v4"
Expand Down Expand Up @@ -184,13 +182,13 @@ func (c *DbClient) RefreshConnectionAndSearchPaths(ctx context.Context) *steampi
return res
}

func (c *DbClient) GetSchemaFromDB(ctx context.Context, schemas []string) (*schema.Metadata, error) {
func (c *DbClient) GetSchemaFromDB(ctx context.Context) (*schema.Metadata, error) {
utils.LogTime("db_client.GetSchemaFromDB start")
defer utils.LogTime("db_client.GetSchemaFromDB end")
connection, err := c.dbClient.Conn(ctx)
utils.FailOnError(err)

query := c.buildSchemasQuery(schemas)
query := c.buildSchemasQuery()

tablesResult, err := connection.QueryContext(ctx, query)
if err != nil {
Expand All @@ -212,14 +210,15 @@ func (c *DbClient) GetSchemaFromDB(ctx context.Context, schemas []string) (*sche
return metadata, nil
}

func (c *DbClient) buildSchemasQuery(schemas []string) string {
schemasClause := ""
if len(schemas) > 0 {
schemasClause = fmt.Sprintf(`
cols.table_schema in ('%s')
OR`, strings.Join(schemas, "','"))
}
query := fmt.Sprintf(`
func (c *DbClient) buildSchemasQuery() string {
query := `
WITH distinct_schema AS (
SELECT DISTINCT(foreign_table_schema)
FROM
information_schema.foreign_tables
WHERE
foreign_table_schema <> 'steampipe_command'
)
SELECT
table_name,
column_name,
Expand All @@ -235,12 +234,12 @@ LEFT JOIN
pg_catalog.pg_namespace nsp ON nsp.nspname = cols.table_schema
LEFT JOIN
pg_catalog.pg_class c ON c.relname = cols.table_name AND c.relnamespace = nsp.oid
WHERE %s
WHERE
cols.table_schema in (select * from distinct_schema)
OR
LEFT(cols.table_schema,8) = 'pg_temp_'
ORDER BY
cols.table_schema, cols.table_name, cols.column_name;

`, schemasClause)
`
return query
}

Expand Down
2 changes: 1 addition & 1 deletion db/db_common/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Client interface {

SetEnsureSessionDataFunc(EnsureSessionStateCallback)
RefreshSessions(ctx context.Context) *AcquireSessionResult
GetSchemaFromDB(context.Context, []string) (*schema.Metadata, error)
GetSchemaFromDB(context.Context) (*schema.Metadata, error)
// remote client will have empty implementation
RefreshConnectionAndSearchPaths(context.Context) *steampipeconfig.RefreshConnectionResult
LoadForeignSchemaNames(context.Context) error
Expand Down
86 changes: 84 additions & 2 deletions db/db_local/local_db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,90 @@ func (c *LocalDbClient) ContructSearchPath(ctx context.Context, requiredSearchPa
return c.client.ContructSearchPath(ctx, requiredSearchPath, searchPathPrefix, currentSearchPath)
}

func (c *LocalDbClient) GetSchemaFromDB(ctx context.Context, schemas []string) (*schema.Metadata, error) {
return c.client.GetSchemaFromDB(ctx, schemas)
func (c *LocalDbClient) GetSchemaFromDB(ctx context.Context) (*schema.Metadata, error) {
kaidaguerre marked this conversation as resolved.
Show resolved Hide resolved
// build a ConnectionSchemaMap object to identify the schemas to load
// (pass nil for connection state - this forces NewConnectionSchemaMap to load it)
connectionSchemaMap, err := steampipeconfig.NewConnectionSchemaMap()
if err != nil {
return nil, err
}
// get the unique schema - we use this to limit the schemas we load from the database
schemas := connectionSchemaMap.UniqueSchemas()
query := c.buildSchemasQuery(schemas)

acquireSessionResult := c.AcquireSession(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to acquire session here, but the base implementation just calls

connection, err := c.dbClient.Conn(ctx)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because dbClient is not a public property in the DbClient struct

if acquireSessionResult.Error != nil {
acquireSessionResult.Session.Close(false)
return nil, err
}

tablesResult, err := acquireSessionResult.Session.Connection.QueryContext(ctx, query)
if err != nil {
return nil, err
}

metadata, err := db_common.BuildSchemaMetadata(tablesResult)
if err != nil {
acquireSessionResult.Session.Close(false)
return nil, err
}
acquireSessionResult.Session.Close(false)

c.populateSchemaMetadata(metadata, connectionSchemaMap)

searchPath, err := c.GetCurrentSearchPath(ctx)
if err != nil {
return nil, err
}
metadata.SearchPath = searchPath

return metadata, nil
}

func (c *LocalDbClient) populateSchemaMetadata(schemaMetadata *schema.Metadata, connectionSchemaMap steampipeconfig.ConnectionSchemaMap) {
kaidaguerre marked this conversation as resolved.
Show resolved Hide resolved
// we now need to add in all other schemas which have the same schemas as those we have loaded
for loadedSchema, otherSchemas := range connectionSchemaMap {
// all 'otherSchema's have the same schema as loadedSchema
exemplarSchema, ok := schemaMetadata.Schemas[loadedSchema]
if !ok {
// should can happen in the case of a dynamic plugin with no tables - use empty schema
exemplarSchema = make(map[string]schema.TableSchema)
}

for _, s := range otherSchemas {
schemaMetadata.Schemas[s] = exemplarSchema
}
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

function comment to explain why this is different from the base version

func (c *LocalDbClient) buildSchemasQuery(hintSchemas []string) string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are hintSchemas?

just call it schemas

for idx, s := range hintSchemas {
hintSchemas[idx] = fmt.Sprintf("'%s'", s)
}
schemaClause := strings.Join(hintSchemas, ",")
query := fmt.Sprintf(`
SELECT
table_name,
column_name,
column_default,
is_nullable,
data_type,
table_schema,
(COALESCE(pg_catalog.col_description(c.oid, cols.ordinal_position :: int),'')) as column_comment,
(COALESCE(pg_catalog.obj_description(c.oid),'')) as table_comment
FROM
information_schema.columns cols
LEFT JOIN
pg_catalog.pg_namespace nsp ON nsp.nspname = cols.table_schema
LEFT JOIN
pg_catalog.pg_class c ON c.relname = cols.table_name AND c.relnamespace = nsp.oid
WHERE
cols.table_schema in (%s)
OR
LEFT(cols.table_schema,8) = 'pg_temp_'

`, schemaClause)
return query
}

func (c *LocalDbClient) LoadForeignSchemaNames(ctx context.Context) error {
Expand Down
32 changes: 2 additions & 30 deletions interactive/interactive_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/turbot/steampipe/query/queryresult"
"github.com/turbot/steampipe/schema"
"github.com/turbot/steampipe/statushooks"
"github.com/turbot/steampipe/steampipeconfig"
"github.com/turbot/steampipe/steampipeconfig/modconfig"
"github.com/turbot/steampipe/utils"
"github.com/turbot/steampipe/version"
Expand Down Expand Up @@ -155,23 +154,14 @@ func (c *InteractiveClient) LoadSchema() error {
utils.LogTime("db_client.LoadSchema start")
defer utils.LogTime("db_client.LoadSchema end")

// build a ConnectionSchemaMap object to identify the schemas to load
// (pass nil for connection state - this forces NewConnectionSchemaMap to load it)
connectionSchemaMap, err := steampipeconfig.NewConnectionSchemaMap()
if err != nil {
return err
}
// get the unique schema - we use this to limit the schemas we load from the database
schemas := connectionSchemaMap.UniqueSchemas()
// load these schemas
// in a background context, since we are not running in a context - but GetSchemaFromDB needs one
metadata, err := c.client().GetSchemaFromDB(context.Background(), schemas)
metadata, err := c.client().GetSchemaFromDB(context.Background())
if err != nil {
return err
}

c.populateSchemaMetadata(metadata, connectionSchemaMap)

c.schemaMetadata = metadata
return nil
}

Expand Down Expand Up @@ -589,21 +579,3 @@ func (c *InteractiveClient) addControlSuggestion(control *modconfig.Control, con
}
return prompt.Suggest{Text: controlName, Output: controlName, Description: description}
}

func (c *InteractiveClient) populateSchemaMetadata(schemaMetadata *schema.Metadata, connectionSchemaMap steampipeconfig.ConnectionSchemaMap) error {
// we now need to add in all other schemas which have the same schemas as those we have loaded
for loadedSchema, otherSchemas := range connectionSchemaMap {
// all 'otherSchema's have the same schema as loadedSchema
exemplarSchema, ok := schemaMetadata.Schemas[loadedSchema]
if !ok {
// should can happen in the case of a dynamic plugin with no tables - use empty schema
exemplarSchema = make(map[string]schema.TableSchema)
}

for _, s := range otherSchemas {
schemaMetadata.Schemas[s] = exemplarSchema
}
}
c.schemaMetadata = schemaMetadata
return nil
}