From 6b8ef3f64e15f3079fcbb49130caa63a19444c3a Mon Sep 17 00:00:00 2001 From: mende1esmende1es Date: Fri, 9 Jun 2023 11:23:23 +0600 Subject: [PATCH 1/9] add work example --- .../read-table/go-example/main.go | 74 +++++++++---------- .../read-table/java-example/Task.java | 32 ++++---- .../read-table/python-example/task.py | 43 ++++++----- 3 files changed, 78 insertions(+), 71 deletions(-) diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go index f6966264284d..3738c618317c 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go @@ -26,56 +26,52 @@ // complexity: ADVANCED // tags: // - hellobeam - package main import ( - _ "context" - _ "flag" - _ "github.com/apache/beam/sdks/v2/go/pkg/beam" - _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio" - _ "github.com/apache/beam/sdks/v2/go/pkg/beam/log" - _ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" - _ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" + "context" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" + + "cloud.google.com/go/bigquery" internal_log "log" - _ "reflect" - "time" + "reflect" ) -type Comment struct { - ID int `bigquery:"id"` - By string `bigquery:"by"` - Author string `bigquery:"author"` - Time int `bigquery:"time"` - TimeTS time.Time `bigquery:"time_ts"` - Text string `bigquery:"text"` - Parent int `bigquery:"parent"` - Deleted bool `bigquery:"deleted"` - Dead bool `bigquery:"dead"` - Ranking float64 `bigquery:"ranking"` +type Game struct { + GameID bigquery.NullString `bigquery:"gameId"` + GameNumber bigquery.NullInt64 `bigquery:"gameNumber"` + SeasonID bigquery.NullString `bigquery:"seasonId"` + Year bigquery.NullInt64 `bigquery:"year"` + Type bigquery.NullString `bigquery:"type"` + DayNight bigquery.NullString `bigquery:"dayNight"` + Duration bigquery.NullString `bigquery:"duration"` } -// rows := bigqueryio.Read(s, project, "bigquery-public-data:hacker_news.comments", reflect.TypeOf(Comment{})) -// reads data from the specified BigQuery table and produces a PCollection where each element is a Comment. -// The reflect.TypeOf(Comment{}) is used to tell BigQuery the schema of the data. - -// debug.Print(s, rows) prints the elements of the PCollection to stdout for debugging purposes. - func main() { internal_log.Println("Running Task") - /* - ctx := context.Background() - p := beam.NewPipeline() - s := p.Root() - project := "tess-372508" - // Build a PCollection by querying BigQuery. - rows := bigqueryio.Read(s, project, "bigquery-public-data:hacker_news.comments", reflect.TypeOf(Comment{})) + ctx := context.Background() + p := beam.NewPipeline() + s := p.Root() + project := "apache-beam-testing" + + // Build a PCollection by querying BigQuery. + rows := bigqueryio.Read(s, project, "bigquery-public-data:baseball.schedules", reflect.TypeOf(Game{})) - debug.Print(s, rows) + _ = rows + fixedSizeLines := top.Largest(s, rows, 5, less) - // Now that the pipeline is fully constructed, we execute it. - if err := beamx.Run(ctx, p); err != nil { - log.Exitf(ctx, "Failed to execute job: %v", err) - }*/ + debug.Print(s, fixedSizeLines) + // Now that the pipeline is fully constructed, we execute it. + if err := beamx.Run(ctx, p); err != nil { + log.Exitf(ctx, "Failed to execute job: %v", err) + } +} +func less(a, b Game) bool { + return true } diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java index 0ec28d151f5d..02681ea77153 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java @@ -33,11 +33,12 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.*; import org.apache.beam.sdk.values.PCollection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead; + public class Task { @@ -45,26 +46,29 @@ public class Task { public static void main(String[] args) { LOG.info("Running Task"); - System.setProperty("GOOGLE_APPLICATION_CREDENTIALS", "to\\path\\credential.json"); PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); - options.setTempLocation("gs://bucket"); - options.as(BigQueryOptions.class).setProject("project-id"); + options.as(BigQueryOptions.class).setProject("apache-beam-testing"); Pipeline pipeline = Pipeline.create(options); /* - * BigQueryIO.readTableRows().from("bucket.project-id.table") reads from the specified BigQuery table, and outputs a - * PCollection of TableRow objects. Each TableRow represents a row in the BigQuery table. - * The .apply("Log words", ParDo.of(new LogOutput<>())) line applies a ParDo transform that logs each row. This is done using the LogOutput class, a custom DoFn (element-wise function). - * LogOutput class: This is a custom DoFn that logs each element in the input PCollection. This is used to inspect the data in the pipeline for debugging or monitoring purposes. - */ -/* + * BigQueryIO.readTableRows().from("bucket.project-id.table") reads from the specified BigQuery table, and outputs a + * PCollection of TableRow objects. Each TableRow represents a row in the BigQuery table. + * The .apply("Log words", ParDo.of(new LogOutput<>())) line applies a ParDo transform that logs each row. This is done using the LogOutput class, a custom DoFn (element-wise function). + * LogOutput class: This is a custom DoFn that logs each element in the input PCollection. This is used to inspect the data in the pipeline for debugging or monitoring purposes. + */ + PCollection pCollection = pipeline - .apply("ReadFromBigQuery", BigQueryIO.readTableRows().from("bucket.project-id.table")); + .apply("ReadFromBigQuery", BigQueryIO.readTableRows().from("clouddataflow-readonly:samples.weather_stations").withMethod(TypedRead.Method.DIRECT_READ)); + + final PTransform, PCollection>> sample = Sample.fixedSizeGlobally(5); - pCollection + PCollection limitedPCollection = pCollection.apply(sample).apply(Flatten.iterables()); + + + limitedPCollection .apply("Log words", ParDo.of(new LogOutput<>())); -*/ + pipeline.run(); diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py index c8fcccbe9e9b..cbb2f0a20ce0 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py @@ -26,35 +26,42 @@ # - hellobeam import argparse +import os +import warnings + import apache_beam as beam -from apache_beam.io import ReadFromText -from apache_beam.io import WriteToBigQuery -from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, SetupOptions +#from google.cloud import bigquery +from apache_beam.io.gcp.bigquery import ReadFromBigQueryRequest, ReadAllFromBigQuery + +class WeatherData: + def __init__(self, station_number, wban_number, year, month, day): + self.station_number = station_number + self.wban_number = wban_number + self.year = year + self.month = month + self.day = day + def __str__(self): + return f"Weather Data: Station {self.station_number} (WBAN {self.wban_number}), Date: {self.year}-{self.month}-{self.day}" def run(argv=None): parser = argparse.ArgumentParser() - parser.add_argument('--input', - dest='input', - default='gs://bucket', - help='Input file to process.') known_args, pipeline_args = parser.parse_known_args(argv) pipeline_options = PipelineOptions(pipeline_args) - pipeline_options.view_as(SetupOptions).save_main_session = True + pipeline_options.view_as(PipelineOptions) - # ReadFromBigQuery: This operation reads from a BigQuery table and outputs a PCollection of dictionaries. Each - # dictionary represents a row in the BigQuery table, where the keys are the BigQuery column names. beam.Map: This - # operation applies a function to each element in the PCollection, here, it selects a specific field from each row. - with beam.Pipeline(options=pipeline_options) as p: - (p #| 'ReadTable' >> beam.io.ReadFromBigQuery(table='project-id.dataset.table') - # Each row is a dictionary where the keys are the BigQuery columns - #| beam.Map(lambda elem: elem['field']) - ) + with beam.Pipeline(options=pipeline_options, argv=argv) as p: + (p | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(table='apache-beam-testing:clouddataflow_samples.weather_stations', + method=beam.io.ReadFromBigQuery.Method.DIRECT_READ) + | beam.combiners.Sample.FixedSizeGlobally(5) + | beam.FlatMap(lambda line: line) + | beam.Map(lambda element: WeatherData(element['station_number'],element['wban_number'],element['year'],element['month'],element['day'])) + | beam.Map(print)) if __name__ == '__main__': - run() + run() From 1ff1765974822a4ff11e964bb503a83b72b8a47c Mon Sep 17 00:00:00 2001 From: mende1esmende1es Date: Fri, 9 Jun 2023 14:15:56 +0600 Subject: [PATCH 2/9] correct --- .../io/big-query-io/read-table/description.md | 15 ++++++--------- .../io/big-query-io/read-table/go-example/main.go | 1 - 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/description.md b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/description.md index ef0231fe90d5..23344989d0aa 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/description.md +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/description.md @@ -20,8 +20,7 @@ limitations under the License. {{if (eq .Sdk "go")}} ``` -rows := bigqueryio.Read(s, bigquery.TableReference{ProjectID: projectID, DatasetID: datasetID, TableID: tableID}) -beam.ParDo0(s, &logOutput{}, rows) +rows := bigqueryio.Read(s, project, "bigquery-public-data:baseball.schedules", reflect.TypeOf(Game{})) ``` The `bigqueryio.Read()` method is called with a `bigquery.TableReference` object that specifies the project, dataset, and table IDs for the `BigQuery` table to read from. @@ -35,11 +34,8 @@ The `logOutput` struct is defined as a custom `DoFn` that implements the Process {{end}} {{if (eq .Sdk "java")}} ``` -PCollection rows = - pipeline - .apply( - "Read from BigQuery query", - BigQueryIO.readTableRows().from("tess-372508.fir.xasw") + PCollection pCollection = pipeline + .apply("ReadFromBigQuery", BigQueryIO.readTableRows().from("clouddataflow-readonly:samples.weather_stations").withMethod(TypedRead.Method.DIRECT_READ)) ``` The `BigQueryIO.readTableRows()` method is called to create a `BigQueryIO.Read` transform that will read data from a `BigQuery` table. @@ -50,10 +46,11 @@ The `Read` transform returns a `PCollection` of `TableRow` objects, which repres {{end}} {{if (eq .Sdk "python")}} ``` -p | 'ReadTable' >> beam.io.ReadFromBigQuery(table=table_spec) | beam.Map(lambda elem: elem['max_temperature']) +p | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(table='apache-beam-testing:clouddataflow_samples.weather_stations', + method=beam.io.ReadFromBigQuery.Method.DIRECT_READ) ``` -The `beam.io.ReadFromBigQuery()` method is called to create a `Read` transform that will read data from a `BigQuery` table. The `table_spec` parameter specifies the name of the `BigQuery` table to read from, along with any other configuration options such as **project ID**, **dataset ID**, or **query**. +The `beam.io.ReadFromBigQuery()` method is called to create a `Read` transform that will read data from a `BigQuery` table. The `table` parameter specifies the name of the `BigQuery` table to read from, along with any other configuration options such as **project ID**, **dataset ID**, or **query**. The Read transform returns a `PCollection` of dict objects, where each dictionary represents a single row of data in the `BigQuery` table. {{end}} \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go index 3738c618317c..d12caad78a5b 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go @@ -63,7 +63,6 @@ func main() { // Build a PCollection by querying BigQuery. rows := bigqueryio.Read(s, project, "bigquery-public-data:baseball.schedules", reflect.TypeOf(Game{})) - _ = rows fixedSizeLines := top.Largest(s, rows, 5, less) debug.Print(s, fixedSizeLines) From ee7addfcd1a84ca55fa6850a5b5f26cc55b2405c Mon Sep 17 00:00:00 2001 From: Oleh Borysevych Date: Tue, 13 Jun 2023 16:54:24 +0300 Subject: [PATCH 3/9] correct tags for bigquery examples --- .../io/big-query-io/read-table/go-example/main.go | 2 ++ .../io/big-query-io/read-table/java-example/Task.java | 2 ++ .../io/big-query-io/read-table/python-example/task.py | 2 ++ 3 files changed, 6 insertions(+) diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go index d12caad78a5b..1751beb191e7 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go @@ -21,6 +21,8 @@ // description: BigQueryIO read table example. // multifile: false // context_line: 42 +// never_run: true +// always_run: true // categories: // - Quickstart // complexity: ADVANCED diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java index 02681ea77153..835954382a9d 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java @@ -20,6 +20,8 @@ // name: read-table // description: BigQueryIO read table example. // multifile: false +// never_run: true +// always_run: true // context_line: 56 // categories: // - Quickstart diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py index cbb2f0a20ce0..30f50ec131cb 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py @@ -18,6 +18,8 @@ # name: read-table # description: TextIO read table example. # multifile: false +# never_run: true +# always_run: true # context_line: 34 # categories: # - Quickstart From 1a337c60e3935fa5744026d29150c2a2204466bb Mon Sep 17 00:00:00 2001 From: mende1esmende1es Date: Thu, 15 Jun 2023 14:51:49 +0600 Subject: [PATCH 4/9] correct read-query --- .../read-query/go-example/main.go | 89 ++++++++++--------- .../read-query/java-example/Task.java | 64 +++++++------ .../read-query/python-example/task.py | 54 ++++++----- .../read-table/go-example/main.go | 2 +- .../read-table/python-example/task.py | 4 +- 5 files changed, 110 insertions(+), 103 deletions(-) diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go index fec979ad7eda..ab9960892529 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go @@ -17,59 +17,64 @@ */ // beam-playground: -// name: read-query -// description: BigQuery read query example. -// multifile: false -// context_line: 40 -// categories: -// - Quickstart -// complexity: ADVANCED -// tags: -// - hellobeam +// +// name: read-table +// description: BigQueryIO read table example. +// multifile: false +// context_line: 42 +// never_run: true +// always_run: true +// categories: +// - Quickstart +// complexity: ADVANCED +// tags: +// - hellobeam package main import ( - _ "context" - _ "flag" - _ "github.com/apache/beam/sdks/v2/go/pkg/beam" - _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio" - _ "github.com/apache/beam/sdks/v2/go/pkg/beam/log" - _ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" - _ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" + "context" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/top" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" + + "cloud.google.com/go/bigquery" internal_log "log" - _ "reflect" + "reflect" ) -// Define the data model: The CommentRow struct is defined, which models one row of HackerNews comments. -//The bigquery tag in the struct field is used to map the struct field to the BigQuery column. -type CommentRow struct { - Text string `bigquery:"text"` +type Game struct { + GameID bigquery.NullString `bigquery:"gameId"` + GameNumber bigquery.NullInt64 `bigquery:"gameNumber"` + SeasonID bigquery.NullString `bigquery:"seasonId"` + Year bigquery.NullInt64 `bigquery:"year"` + Type bigquery.NullString `bigquery:"type"` + DayNight bigquery.NullString `bigquery:"dayNight"` + Duration bigquery.NullString `bigquery:"duration"` } -// Construct the BigQuery query: A constant query is defined that selects the text column -// from the bigquery-public-data.hacker_news.comments table for a certain time range. -const query = `SELECT text -FROM ` + "`bigquery-public-data.hacker_news.comments`" + ` -WHERE time_ts BETWEEN '2013-01-01' AND '2014-01-01' -LIMIT 1000 -` - func main() { internal_log.Println("Running Task") - /* - ctx := context.Background() - p := beam.NewPipeline() - s := p.Root() - project := "tess-372508" - // Build a PCollection by querying BigQuery. - rows := bigqueryio.Query(s, project, query, - reflect.TypeOf(CommentRow{}), bigqueryio.UseStandardSQL()) + ctx := context.Background() + p := beam.NewPipeline() + s := p.Root() + project := "apache-beam-testing" - debug.Print(s, rows) + // Build a PCollection by querying BigQuery. + rows := bigqueryio.Query(s, project, "select * from `bigquery-public-data.baseball.schedules`", + reflect.TypeOf(Game{}), bigqueryio.UseStandardSQL()) - // Now that the pipeline is fully constructed, we execute it. - if err := beamx.Run(ctx, p); err != nil { - log.Exitf(ctx, "Failed to execute job: %v", err) - }*/ + fixedSizeLines := top.Largest(s, rows, 5, less) + + debug.Print(s, fixedSizeLines) + // Now that the pipeline is fully constructed, we execute it. + if err := beamx.Run(ctx, p); err != nil { + log.Exitf(ctx, "Failed to execute job: %v", err) + } +} +func less(a, b Game) bool { + return true } diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/java-example/Task.java b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/java-example/Task.java index 256c70919ce7..12c1fbcd9b48 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/java-example/Task.java +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/java-example/Task.java @@ -27,11 +27,10 @@ // tags: // - hellobeam +import com.google.api.services.bigquery.model.TableRow; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.DoubleCoder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions; -import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; @@ -40,52 +39,49 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class Task { +public class Task { private static final Logger LOG = LoggerFactory.getLogger(Task.class); - public static void main(String[] args) { - LOG.info("Running Task"); - System.setProperty("GOOGLE_APPLICATION_CREDENTIALS", "to\\path\\credential.json"); - PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); - options.setTempLocation("gs://bucket"); - options.as(BigQueryOptions.class).setProject("project-id"); + private static final String WEATHER_SAMPLES_QUERY = + "select * from `clouddataflow-readonly.samples.weather_stations`"; - Pipeline pipeline = Pipeline.create(options); + public static void applyBigQueryTornadoes(Pipeline p) { + /*TypedRead bigqueryIO = + BigQueryIO.readTableRows() + .fromQuery(WEATHER_SAMPLES_QUERY) + .usingStandardSql(); - // pCollection.apply(BigQueryIO.read(... - This part of the pipeline reads from a BigQuery table using a SQL query and stores the result in a PCollection. - // The BigQueryIO.read() function is used to read from BigQuery. It is configured with a lambda function to extract a field from each record. - // The .fromQuery("SELECT field FROM project-id.dataset.table") - // specifies the SQL query used to read from BigQuery. You should replace "field", "project-id", "dataset", and "table" with your specific field name, project id, dataset name, and table name, respectively. -/* - PCollection pCollection = pipeline - .apply(BigQueryIO.read( - (SchemaAndRecord elem) -> (Double) elem.getRecord().get("field")) - .fromQuery( - "SELECT field FROM `project-id.dataset.table`") - .usingStandardSql() - .withCoder(DoubleCoder.of())); - pCollection - .apply("Log words", ParDo.of(new LogOutput<>())); -*/ - pipeline.run(); + PCollection rowsFromBigQuery = p.apply(bigqueryIO); + + rowsFromBigQuery + .apply(ParDo.of(new LogOutput<>("Result: ")));*/ + } + + public static void runBigQueryTornadoes(PipelineOptions options) { + Pipeline p = Pipeline.create(options); + applyBigQueryTornadoes(p); + p.run().waitUntilFinish(); + } + + public static void main(String[] args) { + PipelineOptions options = + PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class); + runBigQueryTornadoes(options); } static class LogOutput extends DoFn { private final String prefix; - LogOutput() { - this.prefix = "Processing element"; - } - LogOutput(String prefix) { this.prefix = prefix; } @ProcessElement - public void processElement(ProcessContext c) throws Exception { - LOG.info(prefix + ": {}", c.element()); + public void processElement(ProcessContext c) { + LOG.info(prefix + c.element()); + c.output(c.element()); } } -} \ No newline at end of file +} diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/python-example/task.py b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/python-example/task.py index 1fab9964d670..145ae00e6792 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/python-example/task.py +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/python-example/task.py @@ -15,9 +15,11 @@ # limitations under the License. # beam-playground-broken: -# name: read-query -# description: TextIO read query example. +# name: read-table +# description: BigQueryIO read table example. # multifile: false +# never_run: true +# always_run: true # context_line: 34 # categories: # - Quickstart @@ -26,39 +28,43 @@ # - hellobeam import argparse +import os +import warnings + import apache_beam as beam -from apache_beam.io import ReadFromText -from apache_beam.io import WriteToBigQuery -from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, SetupOptions +from apache_beam.io.gcp.bigquery import ReadFromBigQueryRequest, ReadAllFromBigQuery + +class WeatherData: + def __init__(self, station_number, wban_number, year, month, day): + self.station_number = station_number + self.wban_number = wban_number + self.year = year + self.month = month + self.day = day + def __str__(self): + return f"Weather Data: Station {self.station_number} (WBAN {self.wban_number}), Date: {self.year}-{self.month}-{self.day}" def run(argv=None): parser = argparse.ArgumentParser() - parser.add_argument('--input', - dest='input', - default='gs://bucket', - help='Input file to process.') known_args, pipeline_args = parser.parse_known_args(argv) pipeline_options = PipelineOptions(pipeline_args) - pipeline_options.view_as(SetupOptions).save_main_session = True + pipeline_options.view_as(PipelineOptions) - """ - (p | 'ReadTable' >> ReadFromBigQuery(query='SELECT * FROM project-id.dataset.table') - This part of the - pipeline reads from a BigQuery table using a SQL query and processes the result. The ReadFromBigQuery( - query='SELECT * FROM project-id.dataset.table') function is used to read from BigQuery. 'LogOutput' >> - beam.Map(lambda elem: print(f"Processing element: {elem['field']}"))) - This part of the pipeline processes the - PCollection and logs the output to the console. It prints the 'field' column from each row in the table. - """ - with beam.Pipeline(options=pipeline_options) as p: - (p #| 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(query='SELECT * FROM `project-id.dataset.table`'))) - # Each row is a dictionary where the keys are the BigQuery columns - #| beam.Map(lambda elem: elem['field']) - ) + with beam.Pipeline(options=pipeline_options, argv=argv) as p: + (p + # | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query='select * from `apache-beam-testing.clouddataflow_samples.weather_stations`',use_standard_sql=True, + # method=beam.io.ReadFromBigQuery.Method.DIRECT_READ) + # | beam.combiners.Sample.FixedSizeGlobally(5) + # | beam.FlatMap(lambda line: line) + # | beam.Map(lambda element: WeatherData(element['station_number'],element['wban_number'],element['year'],element['month'],element['day'])) + # | beam.Map(print) + ) if __name__ == '__main__': - run() + run() diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go index 1751beb191e7..662695f9cec1 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go @@ -62,7 +62,7 @@ func main() { s := p.Root() project := "apache-beam-testing" - // Build a PCollection by querying BigQuery. + // Build a PCollection by querying BigQuery. rows := bigqueryio.Read(s, project, "bigquery-public-data:baseball.schedules", reflect.TypeOf(Game{})) fixedSizeLines := top.Largest(s, rows, 5, less) diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py index 30f50ec131cb..8dcace20de29 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py @@ -16,7 +16,7 @@ # beam-playground-broken: # name: read-table -# description: TextIO read table example. +# description: BigQueryIO read table example. # multifile: false # never_run: true # always_run: true @@ -57,7 +57,7 @@ def run(argv=None): with beam.Pipeline(options=pipeline_options, argv=argv) as p: - (p | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(table='apache-beam-testing:clouddataflow_samples.weather_stations', + (p | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query='select * from `apache-beam-testing.clouddataflow_samples.weather_stations`', method=beam.io.ReadFromBigQuery.Method.DIRECT_READ) | beam.combiners.Sample.FixedSizeGlobally(5) | beam.FlatMap(lambda line: line) From 65083828035eb3e2f47ef401bd16e1f06e6379e5 Mon Sep 17 00:00:00 2001 From: mende1esmende1es Date: Thu, 15 Jun 2023 16:02:55 +0600 Subject: [PATCH 5/9] correct read-query tag --- .../read-query/go-example/main.go | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go index ab9960892529..b7697e4de3ec 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go @@ -17,18 +17,17 @@ */ // beam-playground: -// -// name: read-table -// description: BigQueryIO read table example. -// multifile: false -// context_line: 42 -// never_run: true -// always_run: true -// categories: -// - Quickstart -// complexity: ADVANCED -// tags: -// - hellobeam +// name: read-table +// description: BigQueryIO read table example. +// multifile: false +// context_line: 42 +// never_run: true +// always_run: true +// categories: +// - Quickstart +// complexity: ADVANCED +// tags: +// - hellobeam package main import ( From 65d40480fb8eba779ebe0e01e35fb8a7d45c23e0 Mon Sep 17 00:00:00 2001 From: mende1esmende1es Date: Mon, 19 Jun 2023 12:14:11 +0600 Subject: [PATCH 6/9] correct imports --- .../io/big-query-io/read-table/java-example/Task.java | 8 ++++++-- .../io/big-query-io/read-table/python-example/task.py | 1 - 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java index 835954382a9d..abd5de363e62 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java @@ -1,4 +1,4 @@ -/* +package com.example.demo;/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -35,7 +35,11 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.*; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Sample; import org.apache.beam.sdk.values.PCollection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py index 8dcace20de29..916bfdb0657d 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py @@ -33,7 +33,6 @@ import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, SetupOptions -#from google.cloud import bigquery from apache_beam.io.gcp.bigquery import ReadFromBigQueryRequest, ReadAllFromBigQuery class WeatherData: From afe7af483ceeca05cd54a621b9383faf4a50180a Mon Sep 17 00:00:00 2001 From: mende1esmende1es Date: Thu, 22 Jun 2023 11:43:59 +0600 Subject: [PATCH 7/9] remove package --- .../io/big-query-io/read-table/java-example/Task.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java index abd5de363e62..206a0c0b8ee0 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java @@ -1,4 +1,4 @@ -package com.example.demo;/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information From e134a12ccef0e3d132bf087bf91d70b3ff17324a Mon Sep 17 00:00:00 2001 From: mende1esmende1es Date: Thu, 22 Jun 2023 20:45:06 +0600 Subject: [PATCH 8/9] correct --- .../io/big-query-io/read-table/go-example/main.go | 2 +- .../io/big-query-io/read-table/python-example/task.py | 6 +----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go index 20ded70af4e6..ef2462f4de36 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go @@ -63,7 +63,7 @@ func main() { s := p.Root() project := "apache-beam-testing" - // Build a PCollection by querying BigQuery. + // Build a PCollection by querying BigQuery. rows := bigqueryio.Read(s, project, "bigquery-public-data:baseball.schedules", reflect.TypeOf(Game{})) fixedSizeLines := top.Largest(s, rows, 5, less) diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py index 64c48cc23c91..e89779e5a26b 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py @@ -28,12 +28,8 @@ # - hellobeam import argparse -import os -import warnings - import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, SetupOptions -from apache_beam.io.gcp.bigquery import ReadFromBigQueryRequest, ReadAllFromBigQuery class WeatherData: def __init__(self, station_number, wban_number, year, month, day): @@ -56,7 +52,7 @@ def run(argv=None): with beam.Pipeline(options=pipeline_options, argv=argv) as p: - (p | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query='select * from `apache-beam-testing.clouddataflow_samples.weather_stations`', + (p | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(table='apache-beam-testing:clouddataflow_samples.weather_stations', method=beam.io.ReadFromBigQuery.Method.DIRECT_READ) | beam.combiners.Sample.FixedSizeGlobally(5) | beam.FlatMap(lambda line: line) From e3077695d7ac24310de35b6fabec0df06f6674b4 Mon Sep 17 00:00:00 2001 From: Oleh Borysevych Date: Mon, 26 Jun 2023 17:06:59 +0300 Subject: [PATCH 9/9] fixed example name --- .../io/big-query-io/read-query/go-example/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go index b7697e4de3ec..49ab6057bac2 100644 --- a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go @@ -17,8 +17,8 @@ */ // beam-playground: -// name: read-table -// description: BigQueryIO read table example. +// name: read-query +// description: BigQueryIO read query example. // multifile: false // context_line: 42 // never_run: true