From 37607b4afbc4c42baa4a931a9a86cddcc6d885ca Mon Sep 17 00:00:00 2001 From: shollyman Date: Tue, 22 Jun 2021 10:23:31 -0700 Subject: [PATCH] feat(bigquery): add support for user defined TVF (#4043) * feat(bigquery): add support for user defined TVF Allows user to define table-valued functions using the BigQuery API. --- bigquery/integration_test.go | 38 ++++++++++++++++++++++++++++++++++++ bigquery/routine.go | 28 ++++++++++++++++++++++++-- bigquery/routine_test.go | 10 ++++++++++ bigquery/standardsql.go | 37 +++++++++++++++++++++++++++++++++++ 4 files changed, 111 insertions(+), 2 deletions(-) diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go index c9756b194cd3..9cca940d876d 100644 --- a/bigquery/integration_test.go +++ b/bigquery/integration_test.go @@ -1291,6 +1291,44 @@ func TestIntegration_RoutineStoredProcedure(t *testing.T) { it, [][]Value{{int64(10)}}) } +func TestIntegration_RoutineUserTVF(t *testing.T) { + if client == nil { + t.Skip("Integration tests skipped") + } + ctx := context.Background() + + routineID := routineIDs.New() + routine := dataset.Routine(routineID) + inMeta := &RoutineMetadata{ + Type: "TABLE_VALUED_FUNCTION", + Language: "SQL", + Arguments: []*RoutineArgument{ + {Name: "filter", + DataType: &StandardSQLDataType{TypeKind: "INT64"}, + }}, + ReturnTableType: &StandardSQLTableType{ + Columns: []*StandardSQLField{ + {Name: "x", Type: &StandardSQLDataType{TypeKind: "INT64"}}, + }, + }, + Body: "SELECT x FROM UNNEST([1,2,3]) x WHERE x = filter", + } + if err := routine.Create(ctx, inMeta); err != nil { + t.Fatalf("routine create: %v", err) + } + defer routine.Delete(ctx) + + meta, err := routine.Metadata(ctx) + if err != nil { + t.Fatal(err) + } + + // Now, compare the input meta to the output meta + if diff := testutil.Diff(inMeta, meta, cmpopts.IgnoreFields(RoutineMetadata{}, "CreationTime", "LastModifiedTime", "ETag")); diff != "" { + t.Errorf("routine metadata differs, got=-, want=+\n%s", diff) + } +} + func TestIntegration_InsertErrors(t *testing.T) { // This test serves to verify streaming behavior in the face of oversized data. // BigQuery will reject insertAll payloads that exceed a defined limit (10MB). diff --git a/bigquery/routine.go b/bigquery/routine.go index 6f6c3a177f4c..46c8ca398f9d 100644 --- a/bigquery/routine.go +++ b/bigquery/routine.go @@ -144,7 +144,8 @@ const ( // RoutineMetadata represents details of a given BigQuery Routine. type RoutineMetadata struct { ETag string - // Type indicates the type of routine, such as SCALAR_FUNCTION or PROCEDURE. + // Type indicates the type of routine, such as SCALAR_FUNCTION, PROCEDURE, + // or TABLE_VALUED_FUNCTION. Type string CreationTime time.Time Description string @@ -156,6 +157,9 @@ type RoutineMetadata struct { // The list of arguments for the the routine. Arguments []*RoutineArgument ReturnType *StandardSQLDataType + + // Set only if the routine type is TABLE_VALUED_FUNCTION. + ReturnTableType *StandardSQLTableType // For javascript routines, this indicates the paths for imported libraries. ImportedLibraries []string // Body contains the routine's body. @@ -184,7 +188,13 @@ func (rm *RoutineMetadata) toBQ() (*bq.Routine, error) { return nil, err } r.ReturnType = rt - + if rm.ReturnTableType != nil { + tt, err := rm.ReturnTableType.toBQ() + if err != nil { + return nil, fmt.Errorf("couldn't convert return table type: %v", err) + } + r.ReturnTableType = tt + } var args []*bq.Argument for _, v := range rm.Arguments { bqa, err := v.toBQ() @@ -301,6 +311,7 @@ type RoutineMetadataToUpdate struct { Body optional.String ImportedLibraries []string ReturnType *StandardSQLDataType + ReturnTableType *StandardSQLTableType } func (rm *RoutineMetadataToUpdate) toBQ() (*bq.Routine, error) { @@ -370,6 +381,14 @@ func (rm *RoutineMetadataToUpdate) toBQ() (*bq.Routine, error) { r.ReturnType = dt forceSend("ReturnType") } + if rm.ReturnTableType != nil { + tt, err := rm.ReturnTableType.toBQ() + if err != nil { + return nil, err + } + r.ReturnTableType = tt + forceSend("ReturnTableType") + } return r, nil } @@ -395,5 +414,10 @@ func bqToRoutineMetadata(r *bq.Routine) (*RoutineMetadata, error) { return nil, err } meta.ReturnType = ret + tt, err := bqToStandardSQLTableType(r.ReturnTableType) + if err != nil { + return nil, err + } + meta.ReturnTableType = tt return meta, nil } diff --git a/bigquery/routine_test.go b/bigquery/routine_test.go index f0c122ea1134..697713c86dac 100644 --- a/bigquery/routine_test.go +++ b/bigquery/routine_test.go @@ -84,6 +84,11 @@ func TestRoutineTypeConversions(t *testing.T) { RoutineType: "type", Language: "lang", ReturnType: &bq.StandardSqlDataType{TypeKind: "INT64"}, + ReturnTableType: &bq.StandardSqlTableType{ + Columns: []*bq.StandardSqlField{ + {Name: "field", Type: &bq.StandardSqlDataType{TypeKind: "FLOAT64"}}, + }, + }, }, &RoutineMetadata{ CreationTime: aTime, @@ -95,6 +100,11 @@ func TestRoutineTypeConversions(t *testing.T) { Type: "type", Language: "lang", ReturnType: &StandardSQLDataType{TypeKind: "INT64"}, + ReturnTableType: &StandardSQLTableType{ + Columns: []*StandardSQLField{ + {Name: "field", Type: &StandardSQLDataType{TypeKind: "FLOAT64"}}, + }, + }, }}, {"body_and_libs", "FromRoutineMetadataToUpdate", &RoutineMetadataToUpdate{ diff --git a/bigquery/standardsql.go b/bigquery/standardsql.go index 0219057dfd6a..a208d06e796b 100644 --- a/bigquery/standardsql.go +++ b/bigquery/standardsql.go @@ -175,3 +175,40 @@ func standardSQLStructFieldsToBQ(fields []*StandardSQLField) ([]*bq.StandardSqlF } return bqFields, nil } + +// StandardSQLTableType models a table-like resource, which has a set of columns. +type StandardSQLTableType struct { + + // The columns of the table. + Columns []*StandardSQLField +} + +func (sstt *StandardSQLTableType) toBQ() (*bq.StandardSqlTableType, error) { + if sstt == nil { + return nil, nil + } + out := &bq.StandardSqlTableType{} + for k, v := range sstt.Columns { + bq, err := v.toBQ() + if err != nil { + return nil, fmt.Errorf("error converting column %d: %v", k, err) + } + out.Columns = append(out.Columns, bq) + } + return out, nil +} + +func bqToStandardSQLTableType(in *bq.StandardSqlTableType) (*StandardSQLTableType, error) { + if in == nil { + return nil, nil + } + out := &StandardSQLTableType{} + for k, v := range in.Columns { + f, err := bqToStandardSQLField(v) + if err != nil { + return nil, fmt.Errorf("error converting column %d: %v", k, err) + } + out.Columns = append(out.Columns, f) + } + return out, nil +}