Skip to content

Commit

Permalink
[Tour of Beam] Learning content for "IO Connectors" module (apache#25301
Browse files Browse the repository at this point in the history
)

* new format textIO

* fixes

* fixes

* add new units

* correct py example

* fixing structure and examples

* nonexistant category

* divide for write read

* add myfile.txt

* add licence to myfile.txt

* remove whitespace

* check multifile

* correct myfile

* comment go examples

* remove package

* comment kafka examples

* minor fix

* remove line and correct one line

* change names

* add Playground exercise

* correct kafka theory

* change folder name

* change golang examples

* correct formatter

* correct whitespace

* remove flags

* add txt files

* add excluded extensions

* change to trigger CI

* revert dummy commit

* correct kafka examples

* remove task tag

* correcr import

* change kafka example

* fixing task names

---------

Co-authored-by: oborysevych <oleg.borisevich@akvelon.com>
Co-authored-by: mende1esmende1es <mende1esmende1es@gmail.cp>
  • Loading branch information
3 people authored and cushon committed May 24, 2024
1 parent 825e15d commit b83a3e3
Show file tree
Hide file tree
Showing 64 changed files with 3,579 additions and 1 deletion.
6 changes: 6 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ tasks.rat {
// Tour Of Beam example logs
"learning/tour-of-beam/learning-content/**/*.log",

// Tour Of Beam example txt files
"learning/tour-of-beam/learning-content/**/*.txt",

// Tour Of Beam example csv files
"learning/tour-of-beam/learning-content/**/*.csv",

// Tour Of Beam backend autogenerated Datastore indexes
"learning/tour-of-beam/backend/internal/storage/index.yaml",

Expand Down
3 changes: 2 additions & 1 deletion learning/tour-of-beam/learning-content/content-info.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ content:
- core-transforms
- schema-based-transforms
- windowing
- triggers
- triggers
- io
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
### BigQuery with beam-schema

The `useBeamSchema` method is a method provided by the BigQueryIO class in Apache Beam to specify whether to use Beam's internal schema representation or BigQuery's native table schema when reading or writing data to BigQuery.

When you set `useBeamSchema` to true, Beam will use its internal schema representation when reading or writing data to BigQuery. This allows for more flexibility when working with the data, as Beam's schema representation supports more data types and allows for more advanced schema manipulation.

When you set `useBeamSchema` to false, Beam will use the native table schema of the BigQuery table when reading or writing data. This can be useful when you want to ensure that the data is written to BigQuery in a format that is compatible with other tools that read from the same table.

Here is an example of how you might use the useBeamSchema method when reading data from a BigQuery table:

```
pipeline.apply("ReadFromBigQuery",
BigQueryIO.write().to("mydataset.outputtable").useBeamSchema())
```

The `BigQueryIO.write()` method creates a `Write` transform that will write the data to a new BigQuery table. The `to()` method specifies the name of the output table, which in this case is "**mydataset.outputtable**".

The `useBeamSchema()` method is called on the `Write` transform to use the schema of the `PCollection` elements as the schema of the output table.
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
/*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// beam-playground:
// name: write-beam-schema
// description: BiqQueryIO beam-schema example.
// multifile: false
// context_line: 56
// categories:
// - Quickstart
// complexity: ADVANCED
// tags:
// - hellobeam

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.util.StreamUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.List;

public class Task {

private static final Logger LOG = LoggerFactory.getLogger(Task.class);

public static void main(String[] args) {
LOG.info("Running Task");
System.setProperty("GOOGLE_APPLICATION_CREDENTIALS", "to\\path\\credential.json");
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
options.setTempLocation("gs://bucket");
options.as(BigQueryOptions.class).setProject("project-id");

Pipeline pipeline = Pipeline.create(options);

Schema inputSchema = Schema.builder()
.addField("id", Schema.FieldType.INT32)
.addField("name", Schema.FieldType.STRING)
.addField("age", Schema.FieldType.INT32)
.build();

/*
PCollection<Object> pCollection = pipeline
// The pipeline is reading data from a BigQuery table called "table" that's in the dataset "dataset" from the project with the ID "project-id". The data read is a collection of table rows.
.apply(BigQueryIO.readTableRows()
.from("project-id.dataset.table"))
.apply(MapElements.into(TypeDescriptor.of(Object.class)).via(it -> it))
.setCoder(CustomCoder.of())
// The setRowSchema(inputSchema) is used to provide the schema of the rows in the PCollection. This is necessary for some Beam operations, including writing to BigQuery using Beam schema.
.setRowSchema(inputSchema);
// The useBeamSchema() method indicates that the schema for the table is to be inferred from the types of the elements in the PCollection.
pCollection
.apply("WriteToBigQuery", BigQueryIO.write()
.to("mydataset.outputtable")
.useBeamSchema());
*/
pipeline.run();
}

static class CustomCoder extends Coder<Object> {
final ObjectMapper objectMapper = new ObjectMapper();
private static final CustomCoder INSTANCE = new CustomCoder();

public static CustomCoder of() {
return INSTANCE;
}

@Override
public void encode(Object user, OutputStream outStream) throws IOException {
String line = user.toString();
outStream.write(line.getBytes());
}

@Override
public Object decode(InputStream inStream) throws IOException {
final String serializedDTOs = new String(StreamUtils.getBytesWithoutClosing(inStream));
return objectMapper.readValue(serializedDTOs, Object.class);
}

@Override
public List<? extends Coder<?>> getCoderArguments() {
return Collections.emptyList();
}

@Override
public void verifyDeterministic() {
}
}

static class LogOutput<T> extends DoFn<T, T> {
private final String prefix;

LogOutput() {
this.prefix = "Processing element";
}

LogOutput(String prefix) {
this.prefix = prefix;
}

@ProcessElement
public void processElement(ProcessContext c) throws Exception {
LOG.info(prefix + ": {}", c.element());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

sdk:
- Java
complexity: ADVANCED
id: write-beam-schema
name: BigQueryIO write beam-schema
taskName: write-beam-schema
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

sdk:
- Java
- Python
- Go
complexity: ADVANCED
id: big-queryIO
name: BigQueryIO
content:
- read-table
- read-query
- table-schema
- beam-schema
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
### Reading BigQuery query results

Apache Beam's `BigQueryIO` connector allows you to read data from `BigQuery` tables and use it as a source for your data pipeline. The `BigQueryIO.Read()` method is used to read data from a `BigQuery` table based on a **SQL query**.
The `BigQueryIO.Read()` method reads data from a `BigQuery` table in parallel by automatically splitting the query into smaller pieces and running each piece in a separate `BigQuery` job. This can improve performance for large tables, but can also increase the cost of running your pipeline.

{{if (eq .Sdk "go")}}
```
bigquery.NewClient(context.Background(), options).Read(p,
bigquery.Query("SELECT max_temperature FROM `tess-372508.fir.xasw`"),
bigquery.WithCoder(bigquery.Float64()))
```
{{end}}
{{if (eq .Sdk "java")}}
```
PCollection<Double> maxTemperatures =
p.apply(
BigQueryIO.read(
(SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature"))
.fromQuery(
"SELECT max_temperature FROM `tess-372508.fir.xasw`")
.usingStandardSql()
.withCoder(DoubleCoder.of()));
```
{{end}}
{{if (eq .Sdk "python")}}
```
lines = p | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query='SELECT max_temperature FROM `tess-372508.fir.xasw`'))
```
{{end}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
/*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// beam-playground:
// name: read-query
// description: BigQuery read query example.
// multifile: false
// context_line: 40
// categories:
// - Quickstart
// complexity: ADVANCED
// tags:
// - hellobeam
package main

import (
_ "context"
_ "flag"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
_ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
internal_log "log"
_ "reflect"
)

// Define the data model: The CommentRow struct is defined, which models one row of HackerNews comments.
//The bigquery tag in the struct field is used to map the struct field to the BigQuery column.
type CommentRow struct {
Text string `bigquery:"text"`
}

// Construct the BigQuery query: A constant query is defined that selects the text column
// from the bigquery-public-data.hacker_news.comments table for a certain time range.
const query = `SELECT text
FROM ` + "`bigquery-public-data.hacker_news.comments`" + `
WHERE time_ts BETWEEN '2013-01-01' AND '2014-01-01'
LIMIT 1000
`

func main() {
internal_log.Println("Running Task")
/*
ctx := context.Background()
p := beam.NewPipeline()
s := p.Root()
project := "tess-372508"
// Build a PCollection<CommentRow> by querying BigQuery.
rows := bigqueryio.Query(s, project, query,
reflect.TypeOf(CommentRow{}), bigqueryio.UseStandardSQL())
debug.Print(s, rows)
// Now that the pipeline is fully constructed, we execute it.
if err := beamx.Run(ctx, p); err != nil {
log.Exitf(ctx, "Failed to execute job: %v", err)
}*/
}
Loading

0 comments on commit b83a3e3

Please sign in to comment.