From b83a3e3892ead32ab099d94374ebcc80d0c98e8b Mon Sep 17 00:00:00 2001 From: Abzal Tuganbay Date: Thu, 1 Jun 2023 01:53:01 +0600 Subject: [PATCH] [Tour of Beam] Learning content for "IO Connectors" module (#25301) * new format textIO * fixes * fixes * add new units * correct py example * fixing structure and examples * nonexistant category * divide for write read * add myfile.txt * add licence to myfile.txt * remove whitespace * check multifile * correct myfile * comment go examples * remove package * comment kafka examples * minor fix * remove line and correct one line * change names * add Playground exercise * correct kafka theory * change folder name * change golang examples * correct formatter * correct whitespace * remove flags * add txt files * add excluded extensions * change to trigger CI * revert dummy commit * correct kafka examples * remove task tag * correcr import * change kafka example * fixing task names --------- Co-authored-by: oborysevych Co-authored-by: mende1esmende1es --- build.gradle.kts | 6 + .../learning-content/content-info.yaml | 3 +- .../big-query-io/beam-schema/description.md | 31 +++ .../beam-schema/java-example/Task.java | 136 ++++++++++ .../big-query-io/beam-schema/unit-info.yaml | 25 ++ .../io/big-query-io/group-info.yaml | 31 +++ .../io/big-query-io/read-query/description.md | 42 ++++ .../read-query/go-example/main.go | 75 ++++++ .../read-query/java-example/Task.java | 91 +++++++ .../read-query/python-example/task.py | 64 +++++ .../io/big-query-io/read-query/unit-info.yaml | 27 ++ .../io/big-query-io/read-table/description.md | 59 +++++ .../read-table/go-example/main.go | 81 ++++++ .../read-table/java-example/Task.java | 89 +++++++ .../read-table/python-example/task.py | 60 +++++ .../io/big-query-io/read-table/unit-info.yaml | 27 ++ .../big-query-io/table-schema/description.md | 121 +++++++++ .../table-schema/go-example/main.go | 82 +++++++ .../table-schema/java-example/Task.java | 179 ++++++++++++++ .../table-schema/python-example/task.py | 69 ++++++ .../big-query-io/table-schema/unit-info.yaml | 27 ++ .../io/kafka-io/group-info.yaml | 29 +++ .../io/kafka-io/kafka-read/description.md | 73 ++++++ .../io/kafka-io/kafka-read/go-example/main.go | 90 +++++++ .../kafka-read/java-example/Task.java | 120 +++++++++ .../kafka-read/python-example/task.py | 59 +++++ .../io/kafka-io/kafka-read/unit-info.yaml | 27 ++ .../io/kafka-io/kafka-write/description.md | 64 +++++ .../kafka-io/kafka-write/go-example/main.go | 88 +++++++ .../kafka-write/java-example/Task.java | 68 +++++ .../kafka-write/python-example/task.py | 55 +++++ .../io/kafka-io/kafka-write/unit-info.yaml | 27 ++ .../learning-content/io/module-info.yaml | 31 +++ .../io/rest-api/description.md | 110 +++++++++ .../io/rest-api/java-example/Task.java | 232 ++++++++++++++++++ .../io/rest-api/python-example/task.py | 92 +++++++ .../io/rest-api/unit-info.yaml | 26 ++ .../io/text-io/group-info.yaml | 31 +++ .../text-io/text-io-gcs-read/description.md | 55 +++++ .../text-io-gcs-read/go-example/main.go | 64 +++++ .../text-io-gcs-read/java-example/Task.java | 54 ++++ .../text-io-gcs-read/python-example/task.py | 39 +++ .../text-io/text-io-gcs-read/unit-info.yaml | 27 ++ .../text-io/text-io-gcs-write/description.md | 87 +++++++ .../text-io-gcs-write/go-example/main.go | 62 +++++ .../text-io-gcs-write/java-example/Task.java | 54 ++++ .../text-io-gcs-write/python-example/task.py | 42 ++++ .../text-io/text-io-gcs-write/unit-info.yaml | 27 ++ .../text-io/text-io-local-read/description.md | 88 +++++++ .../text-io-local-read/go-example/main.go | 58 +++++ .../text-io-local-read/go-example/myfile.txt | 1 + .../text-io-local-read/java-example/Task.java | 53 ++++ .../java-example/myfile.txt | 1 + .../python-example/myfile.txt | 1 + .../text-io-local-read/python-example/task.py | 38 +++ .../text-io/text-io-local-read/unit-info.yaml | 27 ++ .../text-io-local-write/description.md | 88 +++++++ .../text-io-local-write/go-example/main.go | 53 ++++ .../text-io-local-write/go-example/myfile.txt | 0 .../java-example/Task.java | 46 ++++ .../java-example/myfile.txt | 0 .../python-example/myfile.txt | 0 .../python-example/task.py | 41 ++++ .../text-io-local-write/unit-info.yaml | 27 ++ 64 files changed, 3579 insertions(+), 1 deletion(-) create mode 100644 learning/tour-of-beam/learning-content/io/big-query-io/beam-schema/description.md create mode 100644 learning/tour-of-beam/learning-content/io/big-query-io/beam-schema/java-example/Task.java create mode 100644 learning/tour-of-beam/learning-content/io/big-query-io/beam-schema/unit-info.yaml create mode 100644 learning/tour-of-beam/learning-content/io/big-query-io/group-info.yaml create mode 100644 learning/tour-of-beam/learning-content/io/big-query-io/read-query/description.md create mode 100644 learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go create mode 100644 learning/tour-of-beam/learning-content/io/big-query-io/read-query/java-example/Task.java create mode 100644 learning/tour-of-beam/learning-content/io/big-query-io/read-query/python-example/task.py create mode 100644 learning/tour-of-beam/learning-content/io/big-query-io/read-query/unit-info.yaml create mode 100644 learning/tour-of-beam/learning-content/io/big-query-io/read-table/description.md create mode 100644 learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go create mode 100644 learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java create mode 100644 learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py create mode 100644 learning/tour-of-beam/learning-content/io/big-query-io/read-table/unit-info.yaml create mode 100644 learning/tour-of-beam/learning-content/io/big-query-io/table-schema/description.md create mode 100644 learning/tour-of-beam/learning-content/io/big-query-io/table-schema/go-example/main.go create mode 100644 learning/tour-of-beam/learning-content/io/big-query-io/table-schema/java-example/Task.java create mode 100644 learning/tour-of-beam/learning-content/io/big-query-io/table-schema/python-example/task.py create mode 100644 learning/tour-of-beam/learning-content/io/big-query-io/table-schema/unit-info.yaml create mode 100644 learning/tour-of-beam/learning-content/io/kafka-io/group-info.yaml create mode 100644 learning/tour-of-beam/learning-content/io/kafka-io/kafka-read/description.md create mode 100644 learning/tour-of-beam/learning-content/io/kafka-io/kafka-read/go-example/main.go create mode 100644 learning/tour-of-beam/learning-content/io/kafka-io/kafka-read/java-example/Task.java create mode 100644 learning/tour-of-beam/learning-content/io/kafka-io/kafka-read/python-example/task.py create mode 100644 learning/tour-of-beam/learning-content/io/kafka-io/kafka-read/unit-info.yaml create mode 100644 learning/tour-of-beam/learning-content/io/kafka-io/kafka-write/description.md create mode 100644 learning/tour-of-beam/learning-content/io/kafka-io/kafka-write/go-example/main.go create mode 100644 learning/tour-of-beam/learning-content/io/kafka-io/kafka-write/java-example/Task.java create mode 100644 learning/tour-of-beam/learning-content/io/kafka-io/kafka-write/python-example/task.py create mode 100644 learning/tour-of-beam/learning-content/io/kafka-io/kafka-write/unit-info.yaml create mode 100644 learning/tour-of-beam/learning-content/io/module-info.yaml create mode 100644 learning/tour-of-beam/learning-content/io/rest-api/description.md create mode 100644 learning/tour-of-beam/learning-content/io/rest-api/java-example/Task.java create mode 100644 learning/tour-of-beam/learning-content/io/rest-api/python-example/task.py create mode 100644 learning/tour-of-beam/learning-content/io/rest-api/unit-info.yaml create mode 100644 learning/tour-of-beam/learning-content/io/text-io/group-info.yaml create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-read/description.md create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-read/go-example/main.go create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-read/java-example/Task.java create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-read/python-example/task.py create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-read/unit-info.yaml create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-write/description.md create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-write/go-example/main.go create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-write/java-example/Task.java create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-write/python-example/task.py create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-write/unit-info.yaml create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/description.md create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/go-example/main.go create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/go-example/myfile.txt create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/java-example/Task.java create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/java-example/myfile.txt create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/python-example/myfile.txt create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/python-example/task.py create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/unit-info.yaml create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/description.md create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/go-example/main.go create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/go-example/myfile.txt create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/java-example/Task.java create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/java-example/myfile.txt create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/python-example/myfile.txt create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/python-example/task.py create mode 100644 learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/unit-info.yaml diff --git a/build.gradle.kts b/build.gradle.kts index 4aab4e4c6b2f4..b89270a9537fe 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -121,6 +121,12 @@ tasks.rat { // Tour Of Beam example logs "learning/tour-of-beam/learning-content/**/*.log", + // Tour Of Beam example txt files + "learning/tour-of-beam/learning-content/**/*.txt", + + // Tour Of Beam example csv files + "learning/tour-of-beam/learning-content/**/*.csv", + // Tour Of Beam backend autogenerated Datastore indexes "learning/tour-of-beam/backend/internal/storage/index.yaml", diff --git a/learning/tour-of-beam/learning-content/content-info.yaml b/learning/tour-of-beam/learning-content/content-info.yaml index 8f1adc4703c41..c85d2456c3995 100644 --- a/learning/tour-of-beam/learning-content/content-info.yaml +++ b/learning/tour-of-beam/learning-content/content-info.yaml @@ -27,4 +27,5 @@ content: - core-transforms - schema-based-transforms - windowing - - triggers \ No newline at end of file + - triggers + - io \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/beam-schema/description.md b/learning/tour-of-beam/learning-content/io/big-query-io/beam-schema/description.md new file mode 100644 index 0000000000000..5f2ea4b98f255 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/big-query-io/beam-schema/description.md @@ -0,0 +1,31 @@ + +### BigQuery with beam-schema + +The `useBeamSchema` method is a method provided by the BigQueryIO class in Apache Beam to specify whether to use Beam's internal schema representation or BigQuery's native table schema when reading or writing data to BigQuery. + +When you set `useBeamSchema` to true, Beam will use its internal schema representation when reading or writing data to BigQuery. This allows for more flexibility when working with the data, as Beam's schema representation supports more data types and allows for more advanced schema manipulation. + +When you set `useBeamSchema` to false, Beam will use the native table schema of the BigQuery table when reading or writing data. This can be useful when you want to ensure that the data is written to BigQuery in a format that is compatible with other tools that read from the same table. + +Here is an example of how you might use the useBeamSchema method when reading data from a BigQuery table: + +``` +pipeline.apply("ReadFromBigQuery", + BigQueryIO.write().to("mydataset.outputtable").useBeamSchema()) +``` + +The `BigQueryIO.write()` method creates a `Write` transform that will write the data to a new BigQuery table. The `to()` method specifies the name of the output table, which in this case is "**mydataset.outputtable**". + +The `useBeamSchema()` method is called on the `Write` transform to use the schema of the `PCollection` elements as the schema of the output table. \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/beam-schema/java-example/Task.java b/learning/tour-of-beam/learning-content/io/big-query-io/beam-schema/java-example/Task.java new file mode 100644 index 0000000000000..6a1b8fac4782e --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/big-query-io/beam-schema/java-example/Task.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 +/* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// beam-playground: +// name: write-beam-schema +// description: BiqQueryIO beam-schema example. +// multifile: false +// context_line: 56 +// categories: +// - Quickstart +// complexity: ADVANCED +// tags: +// - hellobeam + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.util.StreamUtils; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.List; + +public class Task { + + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + + public static void main(String[] args) { + LOG.info("Running Task"); + System.setProperty("GOOGLE_APPLICATION_CREDENTIALS", "to\\path\\credential.json"); + PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); + options.setTempLocation("gs://bucket"); + options.as(BigQueryOptions.class).setProject("project-id"); + + Pipeline pipeline = Pipeline.create(options); + + Schema inputSchema = Schema.builder() + .addField("id", Schema.FieldType.INT32) + .addField("name", Schema.FieldType.STRING) + .addField("age", Schema.FieldType.INT32) + .build(); + + /* + PCollection pCollection = pipeline + // The pipeline is reading data from a BigQuery table called "table" that's in the dataset "dataset" from the project with the ID "project-id". The data read is a collection of table rows. + .apply(BigQueryIO.readTableRows() + .from("project-id.dataset.table")) + .apply(MapElements.into(TypeDescriptor.of(Object.class)).via(it -> it)) + .setCoder(CustomCoder.of()) + // The setRowSchema(inputSchema) is used to provide the schema of the rows in the PCollection. This is necessary for some Beam operations, including writing to BigQuery using Beam schema. + .setRowSchema(inputSchema); + + // The useBeamSchema() method indicates that the schema for the table is to be inferred from the types of the elements in the PCollection. + pCollection + .apply("WriteToBigQuery", BigQueryIO.write() + .to("mydataset.outputtable") + .useBeamSchema()); + */ + pipeline.run(); + } + + static class CustomCoder extends Coder { + final ObjectMapper objectMapper = new ObjectMapper(); + private static final CustomCoder INSTANCE = new CustomCoder(); + + public static CustomCoder of() { + return INSTANCE; + } + + @Override + public void encode(Object user, OutputStream outStream) throws IOException { + String line = user.toString(); + outStream.write(line.getBytes()); + } + + @Override + public Object decode(InputStream inStream) throws IOException { + final String serializedDTOs = new String(StreamUtils.getBytesWithoutClosing(inStream)); + return objectMapper.readValue(serializedDTOs, Object.class); + } + + @Override + public List> getCoderArguments() { + return Collections.emptyList(); + } + + @Override + public void verifyDeterministic() { + } + } + + static class LogOutput extends DoFn { + private final String prefix; + + LogOutput() { + this.prefix = "Processing element"; + } + + LogOutput(String prefix) { + this.prefix = prefix; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + LOG.info(prefix + ": {}", c.element()); + } + } +} \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/beam-schema/unit-info.yaml b/learning/tour-of-beam/learning-content/io/big-query-io/beam-schema/unit-info.yaml new file mode 100644 index 0000000000000..aee2d3de6797f --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/big-query-io/beam-schema/unit-info.yaml @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +sdk: + - Java +complexity: ADVANCED +id: write-beam-schema +name: BigQueryIO write beam-schema +taskName: write-beam-schema \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/group-info.yaml b/learning/tour-of-beam/learning-content/io/big-query-io/group-info.yaml new file mode 100644 index 0000000000000..2a3622932d3f9 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/big-query-io/group-info.yaml @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +sdk: + - Java + - Python + - Go +complexity: ADVANCED +id: big-queryIO +name: BigQueryIO +content: + - read-table + - read-query + - table-schema + - beam-schema \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/description.md b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/description.md new file mode 100644 index 0000000000000..eed116f2e8c23 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/description.md @@ -0,0 +1,42 @@ + +### Reading BigQuery query results + +Apache Beam's `BigQueryIO` connector allows you to read data from `BigQuery` tables and use it as a source for your data pipeline. The `BigQueryIO.Read()` method is used to read data from a `BigQuery` table based on a **SQL query**. +The `BigQueryIO.Read()` method reads data from a `BigQuery` table in parallel by automatically splitting the query into smaller pieces and running each piece in a separate `BigQuery` job. This can improve performance for large tables, but can also increase the cost of running your pipeline. + +{{if (eq .Sdk "go")}} +``` +bigquery.NewClient(context.Background(), options).Read(p, + bigquery.Query("SELECT max_temperature FROM `tess-372508.fir.xasw`"), + bigquery.WithCoder(bigquery.Float64())) +``` +{{end}} +{{if (eq .Sdk "java")}} +``` +PCollection maxTemperatures = + p.apply( + BigQueryIO.read( + (SchemaAndRecord elem) -> (Double) elem.getRecord().get("max_temperature")) + .fromQuery( + "SELECT max_temperature FROM `tess-372508.fir.xasw`") + .usingStandardSql() + .withCoder(DoubleCoder.of())); +``` +{{end}} +{{if (eq .Sdk "python")}} +``` +lines = p | 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query='SELECT max_temperature FROM `tess-372508.fir.xasw`')) +``` +{{end}} \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go new file mode 100644 index 0000000000000..fec979ad7eda0 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/go-example/main.go @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 +/* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +// beam-playground: +// name: read-query +// description: BigQuery read query example. +// multifile: false +// context_line: 40 +// categories: +// - Quickstart +// complexity: ADVANCED +// tags: +// - hellobeam +package main + +import ( + _ "context" + _ "flag" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" + internal_log "log" + _ "reflect" +) + +// Define the data model: The CommentRow struct is defined, which models one row of HackerNews comments. +//The bigquery tag in the struct field is used to map the struct field to the BigQuery column. +type CommentRow struct { + Text string `bigquery:"text"` +} + +// Construct the BigQuery query: A constant query is defined that selects the text column +// from the bigquery-public-data.hacker_news.comments table for a certain time range. +const query = `SELECT text +FROM ` + "`bigquery-public-data.hacker_news.comments`" + ` +WHERE time_ts BETWEEN '2013-01-01' AND '2014-01-01' +LIMIT 1000 +` + +func main() { + internal_log.Println("Running Task") + /* + ctx := context.Background() + p := beam.NewPipeline() + s := p.Root() + project := "tess-372508" + + // Build a PCollection by querying BigQuery. + rows := bigqueryio.Query(s, project, query, + reflect.TypeOf(CommentRow{}), bigqueryio.UseStandardSQL()) + + debug.Print(s, rows) + + // Now that the pipeline is fully constructed, we execute it. + if err := beamx.Run(ctx, p); err != nil { + log.Exitf(ctx, "Failed to execute job: %v", err) + }*/ +} diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/java-example/Task.java b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/java-example/Task.java new file mode 100644 index 0000000000000..256c70919ce77 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/java-example/Task.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 +/* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// beam-playground: +// name: read-query +// description: BigQuery read query example. +// multifile: false +// context_line: 56 +// categories: +// - Quickstart +// complexity: ADVANCED +// tags: +// - hellobeam + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.DoubleCoder; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions; +import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Task { + + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + + public static void main(String[] args) { + LOG.info("Running Task"); + System.setProperty("GOOGLE_APPLICATION_CREDENTIALS", "to\\path\\credential.json"); + PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); + options.setTempLocation("gs://bucket"); + options.as(BigQueryOptions.class).setProject("project-id"); + + Pipeline pipeline = Pipeline.create(options); + + // pCollection.apply(BigQueryIO.read(... - This part of the pipeline reads from a BigQuery table using a SQL query and stores the result in a PCollection. + // The BigQueryIO.read() function is used to read from BigQuery. It is configured with a lambda function to extract a field from each record. + // The .fromQuery("SELECT field FROM project-id.dataset.table") + // specifies the SQL query used to read from BigQuery. You should replace "field", "project-id", "dataset", and "table" with your specific field name, project id, dataset name, and table name, respectively. +/* + PCollection pCollection = pipeline + .apply(BigQueryIO.read( + (SchemaAndRecord elem) -> (Double) elem.getRecord().get("field")) + .fromQuery( + "SELECT field FROM `project-id.dataset.table`") + .usingStandardSql() + .withCoder(DoubleCoder.of())); + pCollection + .apply("Log words", ParDo.of(new LogOutput<>())); +*/ + + pipeline.run(); + } + + static class LogOutput extends DoFn { + private final String prefix; + + LogOutput() { + this.prefix = "Processing element"; + } + + LogOutput(String prefix) { + this.prefix = prefix; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + LOG.info(prefix + ": {}", c.element()); + } + } +} \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/python-example/task.py b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/python-example/task.py new file mode 100644 index 0000000000000..1fab9964d6706 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/python-example/task.py @@ -0,0 +1,64 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# beam-playground-broken: +# name: read-query +# description: TextIO read query example. +# multifile: false +# context_line: 34 +# categories: +# - Quickstart +# complexity: ADVANCED +# tags: +# - hellobeam + +import argparse +import apache_beam as beam +from apache_beam.io import ReadFromText +from apache_beam.io import WriteToBigQuery +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions + + +def run(argv=None): + parser = argparse.ArgumentParser() + parser.add_argument('--input', + dest='input', + default='gs://bucket', + help='Input file to process.') + + known_args, pipeline_args = parser.parse_known_args(argv) + + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(SetupOptions).save_main_session = True + + """ + (p | 'ReadTable' >> ReadFromBigQuery(query='SELECT * FROM project-id.dataset.table') - This part of the + pipeline reads from a BigQuery table using a SQL query and processes the result. The ReadFromBigQuery( + query='SELECT * FROM project-id.dataset.table') function is used to read from BigQuery. 'LogOutput' >> + beam.Map(lambda elem: print(f"Processing element: {elem['field']}"))) - This part of the pipeline processes the + PCollection and logs the output to the console. It prints the 'field' column from each row in the table. + """ + + with beam.Pipeline(options=pipeline_options) as p: + (p #| 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(query='SELECT * FROM `project-id.dataset.table`'))) + # Each row is a dictionary where the keys are the BigQuery columns + #| beam.Map(lambda elem: elem['field']) + ) + + +if __name__ == '__main__': + run() diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-query/unit-info.yaml b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/unit-info.yaml new file mode 100644 index 0000000000000..3a5e4c5242216 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-query/unit-info.yaml @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +sdk: + - Java + - Python + - Go +complexity: ADVANCED +id: read-query +name: BigQueryIO read query +taskName: read-query \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/description.md b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/description.md new file mode 100644 index 0000000000000..ef0231fe90d53 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/description.md @@ -0,0 +1,59 @@ + +### Reading BigQuery table + +`BigQueryIO` allows you to read from a `BigQuery` table and read the results. By default, Beam invokes a `BigQuery` export request when you apply a `BigQueryIO` read transform. In Java Beam SDK, readTableRows returns a `PCollection` of `BigQuery` `TableRow` objects. Each element in the `PCollection` represents a single row in the table. + +> `Integer` values in the `TableRow` objects are encoded as strings to match `BigQuery`’s exported JSON format. This method is convenient but has a performance impact. Alternatively, you can use `read(SerializableFunction)` method to avoid this. + +{{if (eq .Sdk "go")}} + +``` +rows := bigqueryio.Read(s, bigquery.TableReference{ProjectID: projectID, DatasetID: datasetID, TableID: tableID}) +beam.ParDo0(s, &logOutput{}, rows) +``` + +The `bigqueryio.Read()` method is called with a `bigquery.TableReference` object that specifies the project, dataset, and table IDs for the `BigQuery` table to read from. + +The `Read()` method returns a PCollection of `TableRow` objects, which represent the rows of data in the BigQuery table. + +The `ParDo()` method is called on the `PCollection` to apply a custom `DoFn` to each element in the collection. The `&logOutput{}` parameter specifies an instance of the `logOutput` struct to use as the `DoFn`. + +The `logOutput` struct is defined as a custom `DoFn` that implements the ProcessElement method. This method takes a single `TableRow` object as input and logs its contents using the `log.Printf()` function. + +{{end}} +{{if (eq .Sdk "java")}} +``` +PCollection rows = + pipeline + .apply( + "Read from BigQuery query", + BigQueryIO.readTableRows().from("tess-372508.fir.xasw") +``` + +The `BigQueryIO.readTableRows()` method is called to create a `BigQueryIO.Read` transform that will read data from a `BigQuery` table. + +The `.from()` method is called on the `Read` transform to specify the name of the `BigQuery` table to read from. In this example, the table is named **tess-372508.fir.xasw**. + +The `Read` transform returns a `PCollection` of `TableRow` objects, which represent the rows of data in the `BigQuery` table +{{end}} +{{if (eq .Sdk "python")}} +``` +p | 'ReadTable' >> beam.io.ReadFromBigQuery(table=table_spec) | beam.Map(lambda elem: elem['max_temperature']) +``` + +The `beam.io.ReadFromBigQuery()` method is called to create a `Read` transform that will read data from a `BigQuery` table. The `table_spec` parameter specifies the name of the `BigQuery` table to read from, along with any other configuration options such as **project ID**, **dataset ID**, or **query**. + +The Read transform returns a `PCollection` of dict objects, where each dictionary represents a single row of data in the `BigQuery` table. +{{end}} \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go new file mode 100644 index 0000000000000..f6966264284da --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/go-example/main.go @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 +/* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +// beam-playground: +// name: read-table +// description: BigQueryIO read table example. +// multifile: false +// context_line: 42 +// categories: +// - Quickstart +// complexity: ADVANCED +// tags: +// - hellobeam + +package main + +import ( + _ "context" + _ "flag" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" + internal_log "log" + _ "reflect" + "time" +) + +type Comment struct { + ID int `bigquery:"id"` + By string `bigquery:"by"` + Author string `bigquery:"author"` + Time int `bigquery:"time"` + TimeTS time.Time `bigquery:"time_ts"` + Text string `bigquery:"text"` + Parent int `bigquery:"parent"` + Deleted bool `bigquery:"deleted"` + Dead bool `bigquery:"dead"` + Ranking float64 `bigquery:"ranking"` +} + +// rows := bigqueryio.Read(s, project, "bigquery-public-data:hacker_news.comments", reflect.TypeOf(Comment{})) +// reads data from the specified BigQuery table and produces a PCollection where each element is a Comment. +// The reflect.TypeOf(Comment{}) is used to tell BigQuery the schema of the data. + +// debug.Print(s, rows) prints the elements of the PCollection to stdout for debugging purposes. + +func main() { + internal_log.Println("Running Task") + /* + ctx := context.Background() + p := beam.NewPipeline() + s := p.Root() + project := "tess-372508" + + // Build a PCollection by querying BigQuery. + rows := bigqueryio.Read(s, project, "bigquery-public-data:hacker_news.comments", reflect.TypeOf(Comment{})) + + debug.Print(s, rows) + + // Now that the pipeline is fully constructed, we execute it. + if err := beamx.Run(ctx, p); err != nil { + log.Exitf(ctx, "Failed to execute job: %v", err) + }*/ +} diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java new file mode 100644 index 0000000000000..0ec28d151f5d2 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/java-example/Task.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 +/* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// beam-playground: +// name: read-table +// description: BigQueryIO read table example. +// multifile: false +// context_line: 56 +// categories: +// - Quickstart +// complexity: ADVANCED +// tags: +// - hellobeam + +import com.google.api.services.bigquery.model.TableRow; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Task { + + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + + public static void main(String[] args) { + LOG.info("Running Task"); + System.setProperty("GOOGLE_APPLICATION_CREDENTIALS", "to\\path\\credential.json"); + PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); + options.setTempLocation("gs://bucket"); + options.as(BigQueryOptions.class).setProject("project-id"); + + Pipeline pipeline = Pipeline.create(options); + + /* + * BigQueryIO.readTableRows().from("bucket.project-id.table") reads from the specified BigQuery table, and outputs a + * PCollection of TableRow objects. Each TableRow represents a row in the BigQuery table. + * The .apply("Log words", ParDo.of(new LogOutput<>())) line applies a ParDo transform that logs each row. This is done using the LogOutput class, a custom DoFn (element-wise function). + * LogOutput class: This is a custom DoFn that logs each element in the input PCollection. This is used to inspect the data in the pipeline for debugging or monitoring purposes. + */ +/* + PCollection pCollection = pipeline + .apply("ReadFromBigQuery", BigQueryIO.readTableRows().from("bucket.project-id.table")); + + pCollection + .apply("Log words", ParDo.of(new LogOutput<>())); +*/ + + + pipeline.run(); + } + + static class LogOutput extends DoFn { + private final String prefix; + + LogOutput() { + this.prefix = "Processing element"; + } + + LogOutput(String prefix) { + this.prefix = prefix; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + LOG.info(prefix + ": {}", c.element()); + } + } +} \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py new file mode 100644 index 0000000000000..c8fcccbe9e9b5 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/python-example/task.py @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# beam-playground-broken: +# name: read-table +# description: TextIO read table example. +# multifile: false +# context_line: 34 +# categories: +# - Quickstart +# complexity: ADVANCED +# tags: +# - hellobeam + +import argparse +import apache_beam as beam +from apache_beam.io import ReadFromText +from apache_beam.io import WriteToBigQuery +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions + + +def run(argv=None): + parser = argparse.ArgumentParser() + parser.add_argument('--input', + dest='input', + default='gs://bucket', + help='Input file to process.') + + known_args, pipeline_args = parser.parse_known_args(argv) + + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(SetupOptions).save_main_session = True + + # ReadFromBigQuery: This operation reads from a BigQuery table and outputs a PCollection of dictionaries. Each + # dictionary represents a row in the BigQuery table, where the keys are the BigQuery column names. beam.Map: This + # operation applies a function to each element in the PCollection, here, it selects a specific field from each row. + + with beam.Pipeline(options=pipeline_options) as p: + (p #| 'ReadTable' >> beam.io.ReadFromBigQuery(table='project-id.dataset.table') + # Each row is a dictionary where the keys are the BigQuery columns + #| beam.Map(lambda elem: elem['field']) + ) + + +if __name__ == '__main__': + run() diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/read-table/unit-info.yaml b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/unit-info.yaml new file mode 100644 index 0000000000000..181193543e741 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/big-query-io/read-table/unit-info.yaml @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +sdk: + - Java + - Python + - Go +complexity: ADVANCED +id: read-table +name: BigQueryIO read table +taskName: read-table \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/table-schema/description.md b/learning/tour-of-beam/learning-content/io/big-query-io/table-schema/description.md new file mode 100644 index 0000000000000..7c4df2d3a44bf --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/big-query-io/table-schema/description.md @@ -0,0 +1,121 @@ + +### BigQuery with table-schema + +{{if (eq .Sdk "java")}} +In Apache Beam, the `BigQueryIO` package provides the ability to read from and write to Google `BigQuery`. To use this package, you need to define a table schema for your BigQuery table, which specifies the names, data types, and modes of the columns in the table. +``` +type User struct { + ID int32 `bigquery:"id"` + Name string `bigquery:"name"` + Age int32 `bigquery:"age"` +} + +rows := bigqueryio.Read(s, bigquery.TableReference{ProjectID: projectID, DatasetID: datasetID, TableID: tableID}, + beam.WithSchema(User{})) +``` +{{end}} + +{{if (eq .Sdk "java")}} +`DynamicDestinations` is a feature provided by the `BigQueryIO` class in Apache Beam that allows you to write data to different BigQuery tables based on the input elements. The feature allows you to specify a function that takes an input element and returns the destination table information (table name, schema, etc) for that element. + +`DynamicDestinations` interface provided by the `BigQueryIO` class in Apache Beam has three methods: + +* `getDestination`: takes an input element and returns a TableDestination object, which contains the information about the destination table. +* `getTable`: It takes an input element and returns the table name as a string. +* `getSchema`: It takes a table name and returns the schema as a TableSchema object. + +Here is an example of how you might use the `BigQueryIO.write()` method with DynamicDestinations to write data to different BigQuery tables based on the input elements: + +``` +weatherData.apply( + BigQueryIO.write() + .to( + new DynamicDestinations() { + @Override + public Long getDestination(ValueInSingleWindow elem) { + return elem.getValue().year; + } + + @Override + public TableDestination getTable(Long destination) { + return new TableDestination( + new TableReference() + .setProjectId(writeProject) + .setDatasetId(writeDataset) + .setTableId(writeTable + "_" + destination), + "Table for year " + destination); + } + + @Override + public TableSchema getSchema(Long destination) { + return new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema() + .setName("year") + .setType("INTEGER") + .setMode("REQUIRED"), + new TableFieldSchema() + .setName("month") + .setType("INTEGER") + .setMode("REQUIRED"), + new TableFieldSchema() + .setName("day") + .setType("INTEGER") + .setMode("REQUIRED"), + new TableFieldSchema() + .setName("maxTemp") + .setType("FLOAT") + .setMode("NULLABLE"))); + } + }) + .withFormatFunction( + (WeatherData elem) -> + new TableRow() + .set("year", elem.year) + .set("month", elem.month) + .set("day", elem.day) + .set("maxTemp", elem.maxTemp)) + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)); +``` +{{end}} + +{{if (eq .Sdk "python")}} +You can use the dynamic destinations feature to write elements in a PCollection to different BigQuery tables, possibly with different schemas. + +The dynamic destinations feature groups your user type by a user-defined destination key, uses the key to compute a destination table and/or schema, and writes each group’s elements to the computed destination. + +In addition, you can also write your own types that have a mapping function to TableRow, and you can use side inputs in all DynamicDestinations methods. + +``` +fictional_characters_view = beam.pvalue.AsDict( + pipeline | 'CreateCharacters' >> beam.Create([('Yoda', True), + ('Obi Wan Kenobi', True)])) + +def table_fn(element, fictional_characters): + if element in fictional_characters: + return 'my_dataset.fictional_quotes' + else: + return 'my_dataset.real_quotes' + +quotes | 'WriteWithDynamicDestination' >> beam.io.WriteToBigQuery( + table_fn, + schema=table_schema, + table_side_inputs=(fictional_characters_view, ), + write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) +``` +{{end}} \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/table-schema/go-example/main.go b/learning/tour-of-beam/learning-content/io/big-query-io/table-schema/go-example/main.go new file mode 100644 index 0000000000000..b81c99ed3ac21 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/big-query-io/table-schema/go-example/main.go @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 +/* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// beam-playground: +// name: write-table-schema +// description: BigQueryIO table-schema example. +// multifile: false +// context_line: 40 +// categories: +// - Quickstart +// complexity: ADVANCED +// tags: +// - hellobeam + +package main + +import ( + "log" + /* + "context" + beam_log "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "cloud.google.com/go/bigquery" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio" + */) + +type User struct { + ID int32 `bigquery:"id"` + Name string `bigquery:"name"` + Age int32 `bigquery:"age"` +} + +var(projectID = "project-id" + datasetID = "dataset" + tableID = "table") + +func main() { + log.Println("Running Task") + +/* + ctx := context.Background() + + + // set up pipeline + p := beam.NewPipeline() + s := p.Root() + s = s.Scope("ReadFromBigQuery") + + // Reads from the BigQuery table specified by the projectID, datasetID, and tableID, with the schema defined by the User struct, and stores the result in rows. + rows := bigqueryio.Read(s, bigquery.TableReference{ProjectID: projectID, DatasetID: datasetID, TableID: tableID}, + beam.WithSchema(User{})) + + beam.ParDo0(s, &logOutput{}, rows) + + if err := beam.Run(ctx, p); err != nil { + log.Fatalf("Failed to execute job: %v", err) + } + */ +} + +type logOutput struct{} + +func (l *logOutput) ProcessElement(row User, emit func(User)) { + log.Printf("Processing element: %v", row) + emit(row) +} \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/table-schema/java-example/Task.java b/learning/tour-of-beam/learning-content/io/big-query-io/table-schema/java-example/Task.java new file mode 100644 index 0000000000000..593ac0c19e20a --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/big-query-io/table-schema/java-example/Task.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 +/* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// beam-playground: +// name: write-table-schema +// description: BigQueryIO table-schema example. +// multifile: false +// context_line: 56 +// categories: +// - Quickstart +// complexity: ADVANCED +// tags: +// - hellobeam + +import avro.shaded.com.google.common.collect.ImmutableList; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions; +import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations; +import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.util.StreamUtils; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.List; + +public class Task { + + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + + private static final String projectId = "project-id"; + private static final String dataset = "dataset"; + private static final String table = "table"; + + public static void main(String[] args) { + LOG.info("Running Task"); + System.setProperty("GOOGLE_APPLICATION_CREDENTIALS", "to\\path\\credential.json"); + PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); + options.setTempLocation("gs://bucket"); + options.as(BigQueryOptions.class).setProject(projectId); + + Pipeline pipeline = Pipeline.create(options); + + Schema inputSchema = Schema.builder() + .addField("id", Schema.FieldType.INT32) + .addField("name", Schema.FieldType.STRING) + .addField("age", Schema.FieldType.INT32) + .build(); + + /*PCollection pCollection = pipeline + // Reads from the specified BigQuery table and maps the data to User objects. + .apply(BigQueryIO.readTableRows() + .from(String.format("%s.%s.%s", projectId, dataset, table))) + .apply(MapElements.into(TypeDescriptor.of(User.class)).via(it -> new User((String) it.get("id"), (String) it.get("name"), (Integer) it.get("age")))) + .setCoder(CustomCoder.of()) + .setRowSchema(inputSchema); + + pCollection + .apply("User", ParDo.of(new LogOutput<>())); + */ + pipeline.run(); + } + + static class User { + private String id; + private String name; + private Integer age; + + public User(String id, String name, Integer age) { + this.id = id; + this.name = name; + this.age = age; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Integer getAge() { + return age; + } + + public void setAge(Integer age) { + this.age = age; + } + } + + static class CustomCoder extends Coder { + final ObjectMapper objectMapper = new ObjectMapper(); + private static final CustomCoder INSTANCE = new CustomCoder(); + + public static CustomCoder of() { + return INSTANCE; + } + + @Override + public void encode(User user, OutputStream outStream) throws IOException { + String line = user.toString(); + outStream.write(line.getBytes()); + } + + @Override + public User decode(InputStream inStream) throws IOException { + final String serializedDTOs = new String(StreamUtils.getBytesWithoutClosing(inStream)); + return objectMapper.readValue(serializedDTOs, User.class); + } + + @Override + public List> getCoderArguments() { + return Collections.emptyList(); + } + + @Override + public void verifyDeterministic() { + } + } + + static class LogOutput extends DoFn { + private final String prefix; + + LogOutput() { + this.prefix = "Processing element"; + } + + LogOutput(String prefix) { + this.prefix = prefix; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + LOG.info(prefix + ": {}", c.element()); + } + } +} \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/table-schema/python-example/task.py b/learning/tour-of-beam/learning-content/io/big-query-io/table-schema/python-example/task.py new file mode 100644 index 0000000000000..2e06a1367766d --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/big-query-io/table-schema/python-example/task.py @@ -0,0 +1,69 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# beam-playground: +# name: write-table-schema +# description: TextIO table schema example. +# multifile: false +# context_line: 30 +# categories: +# - Quickstart +# complexity: ADVANCED +# tags: +# - hellobeam +import apache_beam as beam +from apache_beam.io import WriteToBigQuery +from apache_beam.io.gcp.internal.clients import bigquery + +p = beam.Pipeline() + +table_spec = bigquery.TableReference( + projectId='project-id', + datasetId='dataset', + tableId='table') + +table_schema = { + 'fields': [{ + 'name': 'source', 'type': 'STRING', 'mode': 'NULLABLE' + }, { + 'name': 'quote', 'type': 'STRING', 'mode': 'REQUIRED' + }] +} + +input = p | beam.Create([ + { + 'source': 'Mahatma Gandhi', 'quote': 'My life is my message.' + }, + { + 'source': 'Yoda', 'quote': "Do, or do not. There is no 'try'." + }, +]) + +# It defines the schema (table_schema) of the table. The table has two fields: source and quote, both of type STRING. +# The source field is nullable, while the quote field is required. + +# It creates the input data which is a collection of dictionaries. Each dictionary represents a row in the BigQuery table. + +# Finally, it writes the data to the BigQuery table using the beam.io.WriteToBigQuery function. The write_disposition +# parameter is set to WRITE_TRUNCATE which means that if the table already exists, it will be replaced with the new +# data. The create_disposition parameter is set to CREATE_IF_NEEDED which means the table will be created if it does +# not exist. + +input | beam.io.WriteToBigQuery( + table_spec, + schema=table_schema, + write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) diff --git a/learning/tour-of-beam/learning-content/io/big-query-io/table-schema/unit-info.yaml b/learning/tour-of-beam/learning-content/io/big-query-io/table-schema/unit-info.yaml new file mode 100644 index 0000000000000..eae84ae553a97 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/big-query-io/table-schema/unit-info.yaml @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +sdk: + - Java + - Python + - Go +complexity: ADVANCED +id: write-table-schema +name: BigQueryIO write table-schema +taskName: write-table-schema \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/kafka-io/group-info.yaml b/learning/tour-of-beam/learning-content/io/kafka-io/group-info.yaml new file mode 100644 index 0000000000000..3428597e449aa --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/kafka-io/group-info.yaml @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +sdk: + - Java + - Python + - Go +complexity: ADVANCED +id: kafkaIO +name: KafkaIO +content: +- kafka-read +- kafka-write \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/kafka-io/kafka-read/description.md b/learning/tour-of-beam/learning-content/io/kafka-io/kafka-read/description.md new file mode 100644 index 0000000000000..328bf3c1293ba --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/kafka-io/kafka-read/description.md @@ -0,0 +1,73 @@ + +### Reading from Kafka using KafkaIO + +`KafkaIO` is a part of the Apache Beam SDK that provides a way to read data from Apache Kafka and write data to it. It allows for the creation of Beam pipelines that can consume data from a Kafka topic, process the data and write the processed data back to another Kafka topic. This makes it possible to build data processing pipelines using Apache Beam that can easily integrate with a Kafka-based data architecture. + +`ReadFromKafka` transform returns unbounded `PCollection` of Kafka messages, where each element contains the key, value, and basic metadata such as topic-partition and offset. + +Developers can then use other Apache Beam transforms to process and analyze the Kafka messages, such as filtering, aggregating, and joining them with other data sources. Once the data processing pipeline is defined, it can be executed on a distributed processing engine, such as **Apache Flink**, **Apache Spark**, or **Google Cloud Dataflow**, to process the Kafka messages in parallel and at scale. + +When reading data from Kafka topics using Apache Beam, developers can use the `ReadFromKafka` transform to create a `PCollection` of Kafka messages. This transform takes the following parameters: + +* **consumer_config**: a dictionary that contains the Kafka consumer configuration properties, such as the Kafka broker addresses, the group ID of the consumer group, and the deserializer classes for the key and value of the Kafka messages. +* **bootstrap.servers**: is a configuration property in Apache Kafka that specifies the list of bootstrap servers that the Kafka clients should use to connect to the Kafka cluster. +* **topic**: the name of the Kafka topic to write the data to. +* **with_metadata**: a boolean flag that specifies whether to include the Kafka metadata for each message, such as the topic, partition, and offset. + +For detailed [information](https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/io/kafka/KafkaIO.html) +{{if (eq .Sdk "go")}} +Kafka works via Cross-language. Raise your expansion service. +``` +var ( + expansionAddr = flag.String("expansion_addr", "", + "Address of Expansion Service. If not specified, attempts to automatically start an appropriate expansion service.") + bootstrapServers = flag.String("bootstrap_servers", "", + "(Required) URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.") + topic = flag.String("topic", "kafka_taxirides_realtime", "Kafka topic to write to and read from.") +) + +read := kafkaio.Read(s, *expansionAddr, *bootstrapServers, []string{*topic}) +``` +{{end}} + +{{if (eq .Sdk "java")}} +``` +p.apply("ReadFromKafka", + KafkaIO.read() + .withBootstrapServers("localhost:29092") + .withTopicPartitions( + Collections.singletonList( + new TopicPartition( + "NYCTaxi1000_simple", + .withKeyDeserializer(StringDeserializer.class) + .withValueDeserializer(StringDeserializer.class) + .withConsumerConfigUpdates(consumerConfig) + .withMaxNumRecords(998) + .withoutMetadata()) +``` +{{end}} + + +{{if (eq .Sdk "python")}} +``` +input_topic = 'input-topic' +output_topic = 'output-topic' + +(p | "Read from Kafka" >> ReadFromKafka( + topics=[input_topic], + bootstrap_servers='localhost:9092') + | "Process data" >> beam.Map(process_data)) +``` +{{end}} \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/kafka-io/kafka-read/go-example/main.go b/learning/tour-of-beam/learning-content/io/kafka-io/kafka-read/go-example/main.go new file mode 100644 index 0000000000000..fcb882fdf9948 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/kafka-io/kafka-read/go-example/main.go @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// beam-playground: +// name: kafka-read +// description: KafkaIO read example +// multifile: false +// default_example: false +// context_line: 72 +// categories: +// - IO +// complexity: ADVANCED + +package main + +import ( + "context" + "flag" + "time" + + "github.com/apache/beam/sdks/v2/go/pkg/beam" + // "github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/kafkaio" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + // "github.com/apache/beam/sdks/v2/go/pkg/beam/register" + // "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" +) + +var ( + expansionAddr = flag.String("expansion_addr", "kafka_server:9092", + "Address of Expansion Service. If not specified, attempts to automatically start an appropriate expansion service.") + bootstrapServers = flag.String("bootstrap_servers", "kafka_server:9092", + "(Required) URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.") + topic = flag.String("topic", "kafka_taxirides_realtime", "Kafka topic to write to and read from.") +) + +// func init() { +// register.DoFn2x0[context.Context, []byte](&LogFn{}) +// } + +// LogFn is a DoFn to log rides. +type LogFn struct{} + +// ProcessElement logs each element it receives. +func (fn *LogFn) ProcessElement(ctx context.Context, elm []byte) { + log.Infof(ctx, "Ride info: %v", string(elm)) +} + +// FinishBundle waits a bit so the job server finishes receiving logs. +func (fn *LogFn) FinishBundle() { + time.Sleep(2 * time.Second) +} + +// Pipeline creation: A Beam pipeline is created with beam.NewPipeline(). +// Kafka reading: The script sets up to read from a Kafka topic specified by the command-line argument. The setup includes establishing a connection to Kafka via a local host and a specified port. + +func main() { + flag.Parse() + beam.Init() + /* + ctx := context.Background() + + p := beam.NewPipeline() + s := p.Root() + + // Simultaneously read from Kafka and log any element received. + read := kafkaio.Read(s, *expansionAddr, *bootstrapServers, []string{*topic}) + vals := beam.DropKey(s, read) + beam.ParDo0(s, &LogFn{}, vals) + + + if err := beamx.Run(ctx, p); err != nil { + log.Fatalf(ctx, "Failed to execute job: %v", err) + } + */ +} diff --git a/learning/tour-of-beam/learning-content/io/kafka-io/kafka-read/java-example/Task.java b/learning/tour-of-beam/learning-content/io/kafka-io/kafka-read/java-example/Task.java new file mode 100644 index 0000000000000..ccbb4fe0a5546 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/kafka-io/kafka-read/java-example/Task.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples; + +// beam-playground: +// name: kafka-read +// description: Kafka read example +// multifile: false +// context_line: 66 +// categories: +// - Filtering +// - Options +// - Quickstart +// complexity: ADVANCED +// tags: +// - filter +// - strings +// - emulator +// emulators: +// - type: kafka +// topic: +// id: dataset +// source_dataset: CountWordsJson +// datasets: +// CountWordsJson: +// location: local +// format: json + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.values.KV; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; + +public class KafkaWordCountJson { + static final String TOKENIZER_PATTERN = "[^\\p{L}]+"; // Java pattern for letters + + public static void main(String[] args) { + final PipelineOptions options = PipelineOptionsFactory.create(); + final Pipeline p = Pipeline.create(options); + + final Map consumerConfig = new HashMap<>(); + consumerConfig.put("auto.offset.reset", "earliest"); + + p.apply( + KafkaIO.read() + .withBootstrapServers( + "kafka_server:9092") // The argument is hardcoded to a predefined value. Do not + // change it manually. It's replaced to the correct Kafka cluster address when code + // starts in backend. + .withTopicPartitions( + Collections.singletonList( + new TopicPartition( + "dataset", + 0))) // The argument is hardcoded to a predefined value. Do not + // change it manually. It's replaced to the correct topic name when code starts in + // backend. + .withKeyDeserializer(LongDeserializer.class) + .withValueDeserializer(StringDeserializer.class) + .withConsumerConfigUpdates(consumerConfig) + .withMaxNumRecords(5) + .withoutMetadata()) + .apply(Values.create()) + .apply( + "ExtractWords", + ParDo.of( + new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + for (String word : c.element().split(TOKENIZER_PATTERN, 0)) { + if (!word.isEmpty()) { + c.output(word); + } + } + } + })) + .apply(Count.perElement()) + .apply( + "FormatResults", + MapElements.via( + new SimpleFunction, String>() { + @Override + public String apply(KV input) { + System.out.printf("key: %s, value: %d%n", input.getKey(), input.getValue()); + return input.getKey() + ": " + input.getValue(); + } + })) + .apply(TextIO.write().to("word-counts")); + + p.run().waitUntilFinish(); + } +} \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/kafka-io/kafka-read/python-example/task.py b/learning/tour-of-beam/learning-content/io/kafka-io/kafka-read/python-example/task.py new file mode 100644 index 0000000000000..1385c485a7947 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/kafka-io/kafka-read/python-example/task.py @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# beam-playground: +# name: kafka-read +# description: TextIO read example. +# multifile: false +# context_line: 34 +# categories: +# - Quickstart +# complexity: ADVANCED +# tags: +# - hellobeam + + +import apache_beam as beam +from apache_beam.io.kafka import ReadFromKafka, WriteToKafka + +def process_data(element): + # Do some processing on the data + return element + +options = beam.options.pipeline_options.PipelineOptions() +p = beam.Pipeline(options=options) + +input_topic = 'input-topic' +output_topic = 'output-topic' +bootstrap_servers = {"bootstrap.servers": "kafka_server:9092"} + +# Set Kafka parameters: The Kafka topic to read from (input_topic), the Kafka topic to write to (output_topic), +# and the Kafka brokers to connect to (bootstrap_servers) are specified. + +# Read from Kafka topic: A KafkaIO ReadFromKafka transform is created, where the topics method is used to specify the +# Kafka topic to read from and the consumer_config method is used to specify the Kafka brokers to connect to. + +# Process the data: The data read from Kafka is processed using the beam.Map(process_data) method. In this case, +# the data is simply passed to the process_data function defined earlier. + + + +# (p | "Read from Kafka" >> ReadFromKafka( +# topics=[input_topic], +# consumer_config=bootstrap_servers) +# | "Process data" >> beam.Map(process_data)) + +p.run().wait_until_finish() diff --git a/learning/tour-of-beam/learning-content/io/kafka-io/kafka-read/unit-info.yaml b/learning/tour-of-beam/learning-content/io/kafka-io/kafka-read/unit-info.yaml new file mode 100644 index 0000000000000..5144dfe6ceb1c --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/kafka-io/kafka-read/unit-info.yaml @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +sdk: + - Java + - Python + - Go +complexity: ADVANCED +id: kafka-read +name: KafkaIO read +taskName: kafka-read \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/kafka-io/kafka-write/description.md b/learning/tour-of-beam/learning-content/io/kafka-io/kafka-write/description.md new file mode 100644 index 0000000000000..7adc8aaedc7c5 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/kafka-io/kafka-write/description.md @@ -0,0 +1,64 @@ + +### Writing to Kafka using KafkaIO + +When writing data processing pipelines using Apache Beam, developers can use the `WriteToKafka` transform to write data to Kafka topics. This transform takes a `PCollection` of data as input and writes the data to a specified Kafka topic using a Kafka producer. + +To use the `WriteToKafka` transform, developers need to provide the following parameters: + +* **producer_config**: a dictionary that contains the Kafka producer configuration properties, such as the Kafka broker addresses and the number of acknowledgments to wait for before considering a message as sent. +* **bootstrap.servers**: is a configuration property in Apache Kafka that specifies the list of bootstrap servers that the Kafka clients should use to connect to the Kafka cluster. +* **topic**: the name of the Kafka topic to write the data to. +* **key**: a function that takes an element from the input PCollection and returns the key to use for the Kafka message. The key is optional and can be None. +* **value**: a function that takes an element from the input PCollection and returns the value to use for the Kafka message. + +When writing data to Kafka using Apache Beam, it is important to ensure that the pipeline is fault-tolerant and can handle failures, such as network errors, broker failures, or message serialization errors. Apache Beam provides features such as checkpointing, retries, and dead-letter queues to help developers build robust and reliable data processing pipelines that can handle these types of failures. + +For detailed [information](https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/io/kafka/KafkaIO.html) +{{if (eq .Sdk "go")}} +``` +data := pubsubio.Read(s, "pubsub-public-data", "taxirides-realtime", nil) +kvData := beam.ParDo(s, func(elm []byte) ([]byte, []byte) { return []byte(""), elm }, data) +windowed := beam.WindowInto(s, window.NewFixedWindows(15*time.Second), kvData) +kafkaio.Write(s, *expansionAddr, *bootstrapServers, *topic, windowed) +``` +{{end}} + +{{if (eq .Sdk "java")}} +``` +Properties producerProps = new Properties(); + producerProps.setProperty("bootstrap.servers", "localhost:9092"); + producerProps.setProperty("key.serializer", StringSerializer.class.getName()); + producerProps.setProperty("value.serializer", ByteArraySerializer.class.getName()); + + + input.apply(KafkaIO.write() + .withBootstrapServers("localhost:9092") + .withTopic("my-topic") + .withKeySerializer(StringSerializer.class) + .withValueSerializer(StringSerializer.class) + .withProducerConfigUpdates(new HashMap<>()).values()); +``` +{{end}} + + +{{if (eq .Sdk "python")}} +``` +(input | "Write to Kafka" >> WriteToKafka( + topic=output_topic, + producer_config = bootstrap_servers, + key='key', + value='value')) +``` +{{end}} \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/kafka-io/kafka-write/go-example/main.go b/learning/tour-of-beam/learning-content/io/kafka-io/kafka-write/go-example/main.go new file mode 100644 index 0000000000000..05d75a755dc4e --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/kafka-io/kafka-write/go-example/main.go @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// beam-playground: +// name: kafka-write +// description: KafkaIO write example +// multifile: false +// default_example: false +// context_line: 45 +// categories: +// - IO +// complexity: ADVANCED + +package main + +import ( + "context" + "flag" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "time" + //"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + //"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window" + //"github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio" + //"github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/kafkaio" +) + +var ( + expansionAddr = flag.String("expansion_addr", "localhost:1234", + "Address of Expansion Service. If not specified, attempts to automatically start an appropriate expansion service.") + bootstrapServers = flag.String("bootstrap_servers", "kafka_server:9092", + "(Required) URL of the bootstrap servers for the Kafka cluster. Should be accessible by the runner.") + topic = flag.String("topic", "kafka_taxirides_realtime", "Kafka topic to write to and read from.") +) + +// func init() { +// register.DoFn2x0[context.Context, []byte](&LogFn{}) +// } + +// LogFn is a DoFn to log rides. +type LogFn struct{} + +// ProcessElement logs each element it receives. +func (fn *LogFn) ProcessElement(ctx context.Context, elm []byte) { + log.Infof(ctx, "Ride info: %v", string(elm)) +} + +// FinishBundle waits a bit so the job server finishes receiving logs. +func (fn *LogFn) FinishBundle() { + time.Sleep(2 * time.Second) +} + +func main() { + flag.Parse() + beam.Init() + /* + ctx := context.Background() + + p := beam.NewPipeline() + s := p.Root() + + // In the main function, the code creates a Beam pipeline, reads from the Pub/Sub source, transforms the data into a key-value pair, applies a windowing function to the data, and writes the windowed data to a Kafka topic. + + data := pubsubio.Read(s, "pubsub-public-data", "taxirides-realtime", nil) + kvData := beam.ParDo(s, func(elm []byte) ([]byte, []byte) { return []byte(""), elm }, data) + windowed := beam.WindowInto(s, window.NewFixedWindows(15*time.Second), kvData) + kafkaio.Write(s, *expansionAddr, *bootstrapServers, *topic, windowed) + + if err := beamx.Run(ctx, p); err != nil { + log.Fatalf(ctx, "Failed to execute job: %v", err) + } + */ +} diff --git a/learning/tour-of-beam/learning-content/io/kafka-io/kafka-write/java-example/Task.java b/learning/tour-of-beam/learning-content/io/kafka-io/kafka-write/java-example/Task.java new file mode 100644 index 0000000000000..3c13d0995660c --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/kafka-io/kafka-write/java-example/Task.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// beam-playground: +// name: kafka-write +// description: KafkaIO write example +// multifile: false +// default_example: false +// context_line: 45 +// categories: +// - IO +// complexity: ADVANCED + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +public class Task { + public static void main(String[] args) { + + PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); + Pipeline pipeline = Pipeline.create(options); + + PCollection input = pipeline.apply(Create.of("Hello", "KafkaIO")); + + Properties producerProps = new Properties(); + producerProps.setProperty("bootstrap.servers", "kafka_server:9092"); + producerProps.setProperty("key.serializer", StringSerializer.class.getName()); + producerProps.setProperty("value.serializer", ByteArraySerializer.class.getName()); + + // This pipeline is an example of how you can use Apache Beam's KafkaIO to write data to a Kafka topic. + // Make sure your Kafka server is accessible and running, and the topic exists. + + input.apply(KafkaIO.write() + .withBootstrapServers("kafka_server:9092") + .withTopic("my-topic") + .withKeySerializer(StringSerializer.class) + .withValueSerializer(StringSerializer.class) + .withProducerConfigUpdates(new HashMap<>()).values()); + + pipeline.run().waitUntilFinish(); + } +} \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/kafka-io/kafka-write/python-example/task.py b/learning/tour-of-beam/learning-content/io/kafka-io/kafka-write/python-example/task.py new file mode 100644 index 0000000000000..1620cd21c0fe4 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/kafka-io/kafka-write/python-example/task.py @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# beam-playground: +# name: kafka-write +# description: TextIO read query example. +# multifile: false +# context_line: 34 +# categories: +# - Quickstart +# complexity: ADVANCED +# tags: +# - hellobeam + + +import apache_beam as beam +from apache_beam.io.kafka import ReadFromKafka, WriteToKafka + +def process_data(element): + # Do some processing on the data + return element + +options = beam.options.pipeline_options.PipelineOptions() +p = beam.Pipeline(options=options) + +input_topic = 'input-topic' +output_topic = 'output-topic' +bootstrap_servers = {"bootstrap.servers": "kafka_server:9092"} + +input = p | beam.Create([{"key": "foo", "value": "bar"}]) + +""" +This pipeline is an example of how you can use Apache Beam's KafkaIO (in Python SDK) to write data to a Kafka +topic.Make sure your Kafka server is accessible and running, and the topic exists. +""" + +# (input | "Write to Kafka" >> WriteToKafka( +# topic=output_topic, +# producer_config = bootstrap_servers) +# ) + +p.run().wait_until_finish() diff --git a/learning/tour-of-beam/learning-content/io/kafka-io/kafka-write/unit-info.yaml b/learning/tour-of-beam/learning-content/io/kafka-io/kafka-write/unit-info.yaml new file mode 100644 index 0000000000000..c58b184a74b1a --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/kafka-io/kafka-write/unit-info.yaml @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +sdk: + - Java + - Python + - Go +complexity: ADVANCED +id: kafka-write +name: KafkaIO write +taskName: kafka-write \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/module-info.yaml b/learning/tour-of-beam/learning-content/io/module-info.yaml new file mode 100644 index 0000000000000..49b51a0bff754 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/module-info.yaml @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +sdk: + - Java + - Python + - Go +id: io +name: IO Connectors +complexity: ADVANCED +content: +- text-io +- big-query-io +- kafka-io +- rest-api \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/rest-api/description.md b/learning/tour-of-beam/learning-content/io/rest-api/description.md new file mode 100644 index 0000000000000..3ebdd2d0afe3c --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/rest-api/description.md @@ -0,0 +1,110 @@ + + +### BigQuery API +The BigQuery Storage Write API is a unified data-ingestion API for BigQuery. It combines streaming ingestion and batch loading into a single high-performance API. You can use the Storage Write API to stream records into BigQuery in real time or to batch process an arbitrarily large number of records and commit them in a single atomic operation. + + +{{if (eq .Sdk "java")}} +``` +PCollection weatherData = + p.apply( + BigQueryIO.read( + (SchemaAndRecord elem) -> { + GenericRecord record = elem.getRecord(); + return new WeatherData( + (Long) record.get("year"), + (Long) record.get("month"), + (Long) record.get("day"), + (Double) record.get("max_temperature")); + }) + .fromQuery( + "SELECT year, month, day, max_temperature " + + "FROM [clouddataflow-readonly:samples.weather_stations] " + + "WHERE year BETWEEN 2007 AND 2009") + .withCoder(AvroCoder.of(WeatherData.class))); + +// We will send the weather data into different tables for every year. +weatherData.apply( + BigQueryIO.write() + .to( + new DynamicDestinations() { + @Override + public Long getDestination(ValueInSingleWindow elem) { + return elem.getValue().year; + } + + @Override + public TableDestination getTable(Long destination) { + return new TableDestination( + new TableReference() + .setProjectId(writeProject) + .setDatasetId(writeDataset) + .setTableId(writeTable + "_" + destination), + "Table for year " + destination); + } + + @Override + public TableSchema getSchema(Long destination) { + return new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema() + .setName("year") + .setType("INTEGER") + .setMode("REQUIRED"), + new TableFieldSchema() + .setName("month") + .setType("INTEGER") + .setMode("REQUIRED"), + new TableFieldSchema() + .setName("day") + .setType("INTEGER") + .setMode("REQUIRED"), + new TableFieldSchema() + .setName("maxTemp") + .setType("FLOAT") + .setMode("NULLABLE"))); + } + }) + .withFormatFunction( + (WeatherData elem) -> + new TableRow() + .set("year", elem.year) + .set("month", elem.month) + .set("day", elem.day) + .set("maxTemp", elem.maxTemp)) + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE)); +``` +{{end}} +{{if (eq .Sdk "python")}} +``` +fictional_characters_view = beam.pvalue.AsDict( + pipeline | 'CreateCharacters' >> beam.Create([('Yoda', True),('Obi Wan Kenobi', True)])) + +def table_fn(element, fictional_characters): + if element in fictional_characters: + return 'my_dataset.fictional_quotes' + else: + return 'my_dataset.real_quotes' + +quotes | 'WriteWithDynamicDestination' >> beam.io.WriteToBigQuery( + table_fn, + schema=table_schema, + table_side_inputs=(fictional_characters_view, ), + write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) +``` +{{end}} \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/rest-api/java-example/Task.java b/learning/tour-of-beam/learning-content/io/rest-api/java-example/Task.java new file mode 100644 index 0000000000000..1431146f3e923 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/rest-api/java-example/Task.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 +/* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// beam-playground: +// name: rest-api-io +// description: REST-API BigQueryIO example. +// multifile: false +// context_line: 56 +// categories: +// - Quickstart +// complexity: ADVANCED +// tags: +// - hellobeam + +import avro.shaded.com.google.common.collect.ImmutableList; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.services.bigquery.model.TableFieldSchema; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import com.google.api.services.bigquery.model.TableSchema; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.gcp.bigquery.*; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.StreamUtils; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.List; + +public class Task { + + private static final Logger LOG = LoggerFactory.getLogger(Task.class); + + private static final String projectId = "tess-372508"; + private static final String dataset = "fir"; + private static final String table = "xasw"; + + public static void main(String[] args) { + LOG.info("Running Task"); + System.setProperty("GOOGLE_APPLICATION_CREDENTIALS", "C:\\Users\\menderes\\Downloads\\c.json"); + PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); + options.setTempLocation("gs://btestq"); + options.as(BigQueryOptions.class).setProject(projectId); + + Pipeline pipeline = Pipeline.create(options); + + Schema inputSchema = Schema.builder() + .addField("id", Schema.FieldType.INT32) + .addField("name", Schema.FieldType.STRING) + .addField("age", Schema.FieldType.INT32) + .build(); + + /* + * The idea behind this code is to read data from a BigQuery table, + * process it in some way (although the example provided doesn't perform any significant transformations beyond type conversion), + * and then write the data back into BigQuery, but with a unique table for each user based on the user's "id". + * */ + + /*PCollection pCollection = pipeline + .apply(BigQueryIO.readTableRows() + .from(String.format("%s.%s.%s", projectId, dataset, table))) + .apply(MapElements.into(TypeDescriptor.of(User.class)).via(it -> new User((String) it.get("id"), (String) it.get("name"), (Integer) it.get("age")))) + .setCoder(CustomCoder.of()) + .setRowSchema(inputSchema); + + pCollection.apply( + // Reading from BigQuery: The pipeline is configured to read data from a table in BigQuery, with the table's name, dataset, and project ID specified as variables. The data read is a collection of table rows. + BigQueryIO.write() + .to( + new DynamicDestinations() { + @Override + public String getDestination(ValueInSingleWindow elem) { + return elem.getValue().id; + } + + // The destination table schema is a list of three fields ("id", "name", and "age"), matching the fields of the User objects (as implemented in getSchema). + @Override + public TableDestination getTable(String destination) { + return new TableDestination( + new TableReference() + .setProjectId(projectId) + .setDatasetId(dataset) + .setTableId(table + "_" + destination), + "Table for year " + destination); + } + + @Override + public TableSchema getSchema(String destination) { + return new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema() + .setName("id") + .setType("STRING") + .setMode("REQUIRED"), + new TableFieldSchema() + .setName("name") + .setType("STRING") + .setMode("REQUIRED"), + new TableFieldSchema() + .setName("age") + .setType("INTEGER") + .setMode("REQUIRED"))); + } + }) + .withFormatFunction( + (User elem) -> + new TableRow() + .set("id", elem.id) + .set("name", elem.name) + .set("age", elem.age)) + +// The write operation is configured to create the destination table if it does not already exist (CREATE_IF_NEEDED) +// and to replace any existing data in the destination table (WRITE_TRUNCATE). + + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));*/ + + pipeline.run(); + } + + static class User { + private String id; + private String name; + private Integer age; + + public User(String id, String name, Integer age) { + this.id = id; + this.name = name; + this.age = age; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Integer getAge() { + return age; + } + + public void setAge(Integer age) { + this.age = age; + } + } + + static class CustomCoder extends Coder { + final ObjectMapper objectMapper = new ObjectMapper(); + private static final CustomCoder INSTANCE = new CustomCoder(); + + public static CustomCoder of() { + return INSTANCE; + } + + @Override + public void encode(User user, OutputStream outStream) throws IOException { + String line = user.toString(); + outStream.write(line.getBytes()); + } + + @Override + public User decode(InputStream inStream) throws IOException { + final String serializedDTOs = new String(StreamUtils.getBytesWithoutClosing(inStream)); + return objectMapper.readValue(serializedDTOs, User.class); + } + + @Override + public List> getCoderArguments() { + return Collections.emptyList(); + } + + @Override + public void verifyDeterministic() { + } + } + + static class LogOutput extends DoFn { + private final String prefix; + + LogOutput() { + this.prefix = "Processing element"; + } + + LogOutput(String prefix) { + this.prefix = prefix; + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + LOG.info(prefix + ": {}", c.element()); + } + } +} \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/rest-api/python-example/task.py b/learning/tour-of-beam/learning-content/io/rest-api/python-example/task.py new file mode 100644 index 0000000000000..03d732f82a7bf --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/rest-api/python-example/task.py @@ -0,0 +1,92 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# beam-playground: +# name: rest-api-io +# description: REST-API BigQueryIO example. +# multifile: false +# context_line: 34 +# categories: +# - Quickstart +# complexity: ADVANCED +# tags: +# - hellobeam + +""" +The idea behind this code is to read data from a BigQuery table, +process it in some way (although the example provided doesn't perform any significant transformations beyond type conversion), +and then write the data back into BigQuery, but with a unique table for each user based on the user's "id". +""" + +import argparse +import logging + +from apache_beam.io.gcp.internal.clients import bigquery +import apache_beam as beam + + +def create_random_record(record_id): + return { + 'kind': 'kind' + record_id, + 'fullName': 'fullName' + record_id, + 'age': int(record_id) * 10, + 'gender': 'male', + 'phoneNumber': { + 'areaCode': int(record_id) * 100, + 'number': int(record_id) * 100000 + }, + 'children': [ + 'child' + record_id + '1', + 'child' + record_id + '2', + 'child' + record_id + '3' + ] + } + + +def run(argv=None): + with beam.Pipeline() as p: + table_schema = bigquery.TableSchema() + + # Fields that use standard types. + # The destination table schema is a list of three fields ("id", "name", and "age"), matching the fields of the User objects (as implemented in getSchema). + kind_schema = bigquery.TableFieldSchema() + kind_schema.name = 'kind' + kind_schema.type = 'string' + kind_schema.mode = 'nullable' + table_schema.fields.append(kind_schema) + + full_name_schema = bigquery.TableFieldSchema() + full_name_schema.name = 'fullName' + full_name_schema.type = 'string' + full_name_schema.mode = 'required' + table_schema.fields.append(full_name_schema) + + # The write operation is configured to create the destination table if it does not already exist (CREATE_IF_NEEDED) and to replace any existing data in the destination table (WRITE_TRUNCATE). + # pylint: disable=expression-not-assigned + record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5']) + records = record_ids | 'CreateRecords' >> beam.Map(create_random_record) + """ + records | 'write' >> beam.io.WriteToBigQuery( + 'output.txt', + schema=table_schema, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE) + """ + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + run() diff --git a/learning/tour-of-beam/learning-content/io/rest-api/unit-info.yaml b/learning/tour-of-beam/learning-content/io/rest-api/unit-info.yaml new file mode 100644 index 0000000000000..b25e0549b8fcd --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/rest-api/unit-info.yaml @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +sdk: + - Java + - Python +complexity: ADVANCED +id: rest-api-io +name: REST API +taskName: rest-api-io \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/text-io/group-info.yaml b/learning/tour-of-beam/learning-content/io/text-io/group-info.yaml new file mode 100644 index 0000000000000..7c19f085324d1 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/group-info.yaml @@ -0,0 +1,31 @@ + # +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +sdk: + - Java + - Python + - Go +complexity: MEDIUM +id: textIO +name: TextIO +content: +- text-io-local-read +- text-io-local-write +- text-io-gcs-read +- text-io-gcs-write \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-read/description.md b/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-read/description.md new file mode 100644 index 0000000000000..3c9a817bebafb --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-read/description.md @@ -0,0 +1,55 @@ + +### Reading google cloud storage file using TextIO + +The `TextIO` class in the Apache Beam provides a way to read and write text files from **Google Cloud Storage** **(GCS)** in a pipeline. To read a text file from GCS using TextIO, you can use the Read method and pass in the GCS file path as a string, which starts with "**gs://**" prefix. Here is an example of reading a text file named "**myfile.txt**" from a GCS bucket named "**mybucket**" and printing its contents: + +{{if (eq .Sdk "go")}} +``` +p, s := beam.NewPipelineWithRoot() +lines := textio.Read(p, "gs://mybucket/myfile.txt") +beam.ParDo(p, func(line string) { + fmt.Println(line) +}, lines) +if err := p.Run(); err != nil { + fmt.Printf("Failed to execute job: %v", err) +} +``` +{{end}} +{{if (eq .Sdk "java")}} +``` +Pipeline pipeline = Pipeline.create(); +pipeline.apply(TextIO.read().from("gs://mybucket/myfile.txt")) + .apply(ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + System.out.println(c.element()); + } + })); +pipeline.run(); +``` +{{end}} +{{if (eq .Sdk "python")}} +``` +import apache_beam as beam + +p = beam.Pipeline() +p | beam.io.ReadFromText('gs://mybucket/myfile.txt') | beam.Map(print) +p.run() +``` +{{end}} + +### Playground exercise + +In the playground window, you can find an example that reads from a text file and outputs individual words found in the text. Can you modify this example to output found words to another file in reverse form? \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-read/go-example/main.go b/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-read/go-example/main.go new file mode 100644 index 0000000000000..c9edb6108d3cc --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-read/go-example/main.go @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// beam-playground: +// name: text-io-gcs-read +// description: TextIO GCS read example. +// multifile: false +// context_line: 29 +// categories: +// - Quickstart +// complexity: MEDIUM +// tags: +// - hellobeam + +package main + +import ( + "context" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "regexp" + "fmt" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/local" +) +var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`) + +func main() { + beam.Init() + + p := beam.NewPipeline() + s := p.Root() + + input := textio.Read(s, "gs://apache-beam-samples/shakespeare/kinglear.txt") + + applyTransform(s, input) + + err := beamx.Run(context.Background(), p) + if err != nil { + log.Exitf(context.Background(), "Failed to execute job: %v", err) + } +} + +func applyTransform(s beam.Scope, input beam.PCollection) { + beam.ParDo0(s, func(line string) { + for _, word := range wordRE.FindAllString(line, -1) { + fmt.Println(word) + } + }, input) +} \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-read/java-example/Task.java b/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-read/java-example/Task.java new file mode 100644 index 0000000000000..079a36943fa8e --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-read/java-example/Task.java @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// beam-playground: +// name: text-io-gcs-read +// description: TextIO GCS read example. +// multifile: false +// context_line: 34 +// categories: +// - Quickstart +// complexity: MEDIUM +// tags: +// - hellobeam + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; + +public class Task { + + public static void main(String[] args) { + PipelineOptions options = PipelineOptionsFactory.create(); + + Pipeline pipeline = Pipeline.create(options); + + pipeline.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt")) + .apply(ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + String[] line = c.element().split(" "); + for (String word : line) { + System.out.println(word); + } + } + })); + + pipeline.run(); + } +} diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-read/python-example/task.py b/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-read/python-example/task.py new file mode 100644 index 0000000000000..79fcc09e9d361 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-read/python-example/task.py @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# beam-playground: +# name: text-io-gcs-read +# description: TextIO GCS read example. +# multifile: false +# context_line: 34 +# categories: +# - Quickstart +# complexity: MEDIUM +# tags: +# - hellobeam + +import apache_beam as beam + +def print_words(line): + for word in line.split(): + print(word) + +p = beam.Pipeline() +input = p | 'ReadMyFile' >> beam.io.ReadFromText('gs://apache-beam-samples/shakespeare/kinglear.txt') + +input | 'Print words' >> beam.Map(print_words) + +p.run() \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-read/unit-info.yaml b/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-read/unit-info.yaml new file mode 100644 index 0000000000000..3e0c1311ee74e --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-read/unit-info.yaml @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +sdk: + - Java + - Python + - Go +complexity: MEDIUM +id: text-io-gcs-read +name: TextIO GCS read +taskName: text-io-gcs-read diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-write/description.md b/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-write/description.md new file mode 100644 index 0000000000000..6d7e9c6602133 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-write/description.md @@ -0,0 +1,87 @@ + +### Writing google cloud storage file using TextIO + +The `TextIO` class in the Apache Beam provides a way to read and write text files from **Google Cloud Storage** **(GCS)** in a pipeline. +To write data to a file on **GCS**, you can use the Write method and pass in the **GCS** file path as a string. Here is an example of writing a string to a text file named "**myfile.txt**" in a **GCS** bucket named "**mybucket**": + +{{if (eq .Sdk "go")}} +``` +p, s := beam.NewPipelineWithRoot() +s := beam.Create(p, "Hello, World!") +textio.Write(s, "gs://mybucket/myfile.txt") +if err := p.Run(); err != nil { + fmt.Printf("Failed to execute job: %v", err) +} +``` +{{end}} +{{if (eq .Sdk "java")}} +``` +Pipeline pipeline = Pipeline.create(); +pipeline.apply(Create.of("Hello, World!")) + .apply(TextIO.write().to("gs://mybucket/myfile.txt")); +pipeline.run(); +``` +{{end}} + +{{if (eq .Sdk "python")}} +``` +import apache_beam as beam + +p = beam.Pipeline() +data = ['Hello, World!', 'Apache Beam'] +p | beam.Create(data) | beam.io.WriteToText('gs://mybucket/myfile.txt') +p.run() +``` +{{end}} + +{{if (eq .Sdk "go")}} +It is important to note that in order to interact with GCS you will need to set up authentication, you can do that by setting the appropriate **GOOGLE_APPLICATION_CREDENTIALS** environment variable or using the `options.WithCredentials` method during pipeline creation. + +``` +options := []beam.PipelineOption{ + beam.WithCredentials(creds), +} +p, err := beam.NewPipeline(options...) +``` +Where `creds` is an instance of `google.Credentials`. +{{end}} + +{{if (eq .Sdk "python")}} +It is important to note that in order to interact with **GCS** you will need to set up authentication, you can do that by setting the appropriate **GOOGLE_APPLICATION_CREDENTIALS** environment variable or by using the with_options method during pipeline creation and passing gcp_project and `gcp_credentials` options. + +``` +options = PipelineOptions() +google_cloud_options = options.view_as(GoogleCloudOptions) +google_cloud_options.project = 'my-project-id' +google_cloud_options.job_name = 'myjob' +google_cloud_options.staging_location = 'gs://my-bucket/staging' +google_cloud_options.temp_location = 'gs://my-bucket/temp' +google_cloud_options.region = 'us-central1' + +# set credentials +credentials = GoogleCredentials.get_application_default() +``` +{{end}} + +{{if (eq .Sdk "java")}} +It is important to note that in order to interact with **GCS** you will need to set up authentication, need specify in the console as an additional parameter +``` +--tempLocation=gs://my-bucket/temp +``` +{{end}} + +### Playground exercise + +In the playground window, you can find an example that writes data to **GCS**. Can you modify this example to generate numbers and write them to **GCS** sorted? \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-write/go-example/main.go b/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-write/go-example/main.go new file mode 100644 index 0000000000000..10fe912e5554d --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-write/go-example/main.go @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// beam-playground: +// name: text-io-gcs-write +// description: TextIO GCS write example. +// multifile: false +// context_line: 30 +// categories: +// - Quickstart +// complexity: MEDIUM +// tags: +// - hellobeam + + +package main + +import ( + "context" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "regexp" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs" + _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/local" +) +var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`) + +var locationGCS = "gs://mybucket/myfile.txt" + +func main() { + beam.Init() + + p := beam.NewPipeline() + s := p.Root() + + input := beam.Create(s, "Hello write from playground","First example") + + // It may be unsupported. Since gcs requires credentials + // textio.Write(s, locationGCS, input) + + debug.Printf(s,"Recorded data:", input) + + err := beamx.Run(context.Background(), p) + if err != nil { + log.Exitf(context.Background(), "Failed to execute job: %v", err) + } +} diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-write/java-example/Task.java b/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-write/java-example/Task.java new file mode 100644 index 0000000000000..744ff63e4478a --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-write/java-example/Task.java @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// beam-playground: +// name: text-io-gcs-write +// description: TextIO GCS write example. +// multifile: false +// context_line: 35 +// categories: +// - Quickstart +// complexity: MEDIUM +// tags: +// - hellobeam + + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.transforms.Create; + +public class Task { + + public String locationGCS = "gs://mybucket/myfile.txt"; + + public static void main(String[] args) { + PipelineOptions options = PipelineOptionsFactory.create(); + + Pipeline pipeline = Pipeline.create(options); + + PCollection input = pipeline.apply(Create.of("Hello, World!")); + + // It may be unsupported. Since gcs requires credentials +/* + input.apply(TextIO.write().to(locationGCS)); +*/ + pipeline.run(); + } +} diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-write/python-example/task.py b/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-write/python-example/task.py new file mode 100644 index 0000000000000..2087d9d9c1fb4 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-write/python-example/task.py @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# beam-playground: +# name: text-io-gcs-write +# description: TextIO GCS write example. +# multifile: false +# context_line: 29 +# categories: +# - Quickstart +# complexity: MEDIUM +# tags: +# - hellobeam + + +import apache_beam as beam + +def print_lines(line): + print(line) + +p = beam.Pipeline() + +data = ['Hello, World!', 'Apache Beam'] + +# It may be unsupported. Since gcs requires credentials +""" +p | 'CreateMyData' >> beam.Create(data) | 'WriteMyFile' >> beam.io.WriteToText('gs://mybucket/myfile.txt') +""" +p.run() \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-write/unit-info.yaml b/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-write/unit-info.yaml new file mode 100644 index 0000000000000..e3a48ecaa2c8e --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/text-io-gcs-write/unit-info.yaml @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +sdk: + - Java + - Python + - Go +complexity: MEDIUM +id: text-io-gcs-write +name: TextIO GCS write +taskName: text-io-gcs-write diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/description.md b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/description.md new file mode 100644 index 0000000000000..3aa4d214cca64 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/description.md @@ -0,0 +1,88 @@ + +### Reading from local text files using TextIO + +The `TextIO` class in Apache Beam provides a way to read and write text files in a pipeline. To read a local file using `TextIO`, you can use the `Read` method and pass in the file path as a string. Here is an example of reading a local text file named **"myfile.txt"** and printing its contents: +{{if (eq .Sdk "go")}} +``` +p := beam.NewPipeline() +lines := textio.Read(p, "myfile.txt") +beam.ParDo(p, func(line string) { + fmt.Println(line) +}, lines) +if err := p.Run(); err != nil { + fmt.Printf("Failed to execute job: %v", err) +} +``` +{{end}} +{{if (eq .Sdk "java")}} +``` +Pipeline p = Pipeline.create(); +p.apply(TextIO.read().from("myfile.txt")) + .apply(ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + System.out.println(c.element()); + } + })); +p.run(); +``` +{{end}} +{{if (eq .Sdk "python")}} +``` +import apache_beam as beam + +p = beam.Pipeline() +lines = p | beam.io.ReadFromText('myfile.txt') +lines | beam.Map(print) +p.run() +``` +{{end}} + +To write data to a local file, you can use the Write method and pass in the file path as a string. Here is an example of writing a string to a local text file named "**myfile.txt**": + +{{if (eq .Sdk "go")}} +``` +p := beam.NewPipeline() +s := beam.Create(p, "Hello, World!") +textio.Write(s, "myfile.txt") +if err := p.Run(); err != nil { + fmt.Printf("Failed to execute job: %v", err) +} +``` +{{end}} + +{{if (eq .Sdk "java")}} +``` +Pipeline p = Pipeline.create(); +p.apply(Create.of("Hello, World!")) + .apply(TextIO.write().to("myfile.txt")); +p.run(); +``` +{{end}} +{{if (eq .Sdk "python")}} +``` +import apache_beam as beam + +p = beam.Pipeline() +data = ['Hello, World!', 'Apache Beam'] +p | beam.Create(data) | beam.io.WriteToText('myfile.txt') +p.run() +``` +{{end}} +It is important to note that the `Read` and `Write` methods only read and write to local file systems and not the distributed file systems like **HDFS**, **GCS**, **S3** etc. + +### Playground exercise + +In the playground window, you can find an example that reads from a text file and outputs individual words found in the text. Can you modify this example to output found words to another file in reverse form? \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/go-example/main.go b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/go-example/main.go new file mode 100644 index 0000000000000..ed040ac97158e --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/go-example/main.go @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// beam-playground: +// name: text-io-local-read +// description: TextIO read local file example. +// multifile: true +// files: +// - name: myfile.txt +// context_line: 30 +// categories: +// - Quickstart +// complexity: MEDIUM +// tags: +// - hellobeam + + +package main + +import ( + "regexp" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" + "context" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" +) + +var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`) + + +func main() { + p, s := beam.NewPipelineWithRoot() + input := textio.Read(s, "myfile.txt") + + beam.ParDo(s, func(line string, emit func(string)) { + for _, word := range wordRE.FindAllString(line, -1) { + emit(word) + } + }, input) + +err := beamx.Run(context.Background(), p) + if err != nil { + log.Exitf(context.Background(), "Failed to execute job: %v", err) + } +} \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/go-example/myfile.txt b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/go-example/myfile.txt new file mode 100644 index 0000000000000..543f539d33753 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/go-example/myfile.txt @@ -0,0 +1 @@ +Hi, this is a file to read \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/java-example/Task.java b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/java-example/Task.java new file mode 100644 index 0000000000000..3f975868ff13d --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/java-example/Task.java @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// beam-playground: +// name: text-io-local-read +// description: TextIO read local file example. +// multifile: true +// files: +// - name: myfile.txt +// context_line: 34 +// categories: +// - Quickstart +// complexity: MEDIUM +// tags: +// - hellobeam + + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; + +public class Task { + public static void main(String[] args) { + Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create()); + + pipeline.apply(TextIO.read().from("myfile.txt")) + .apply(ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + String[] line = c.element().split(" "); + for (String word : line) { + System.out.println(word); + } + } + })); + + pipeline.run().waitUntilFinish(); + } +} diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/java-example/myfile.txt b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/java-example/myfile.txt new file mode 100644 index 0000000000000..543f539d33753 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/java-example/myfile.txt @@ -0,0 +1 @@ +Hi, this is a file to read \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/python-example/myfile.txt b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/python-example/myfile.txt new file mode 100644 index 0000000000000..543f539d33753 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/python-example/myfile.txt @@ -0,0 +1 @@ +Hi, this is a file to read \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/python-example/task.py b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/python-example/task.py new file mode 100644 index 0000000000000..309fb6d1f7e44 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/python-example/task.py @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# beam-playground: +# name: text-io-local-read +# description: TextIO read local file example. +# multifile: true +# files: +# - name: myfile.txt +# context_line: 30 +# categories: +# - Quickstart +# complexity: MEDIUM +# tags: +# - hellobeam + + +import apache_beam as beam + +p = beam.Pipeline() + +input = p | 'ReadMyFile' >> beam.io.ReadFromText('myfile.txt') +input | 'PrintMyFile' >> beam.Map(print) + +p.run() diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/unit-info.yaml b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/unit-info.yaml new file mode 100644 index 0000000000000..fd090c6c17284 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-read/unit-info.yaml @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +sdk: + - Java + - Python + - Go +complexity: MEDIUM +id: text-io-local-read +name: TextIO read local file +taskName: text-io-local-read diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/description.md b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/description.md new file mode 100644 index 0000000000000..28eed25c5d068 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/description.md @@ -0,0 +1,88 @@ + +### Writing local text files using TextIO + +Apache Beam is a programming model for data processing pipelines that can be executed on a variety of runtime environments, including **Apache Flink**, **Apache Spark**, and **Google Cloud Dataflow**. The `TextIO` class in Apache Beam provides a way to read and write text files in a pipeline. To read a local file using TextIO, you can use the Read method and pass in the file path as a string. Here is an example of reading a local text file named "**myfile.txt**" and printing its contents: +{{if (eq .Sdk "go")}} +``` +p := beam.NewPipeline() +lines := textio.Read(p, "myfile.txt") +beam.ParDo(p, func(line string) { + fmt.Println(line) +}, lines) +if err := p.Run(); err != nil { + fmt.Printf("Failed to execute job: %v", err) +} +``` +{{end}} +{{if (eq .Sdk "java")}} +``` +Pipeline p = Pipeline.create(); +p.apply(TextIO.read().from("myfile.txt")) + .apply(ParDo.of(new DoFn() { + @ProcessElement + public void processElement(ProcessContext c) { + System.out.println(c.element()); + } + })); +p.run(); +``` +{{end}} +{{if (eq .Sdk "python")}} +``` +import apache_beam as beam + +p = beam.Pipeline() +lines = p | beam.io.ReadFromText('myfile.txt') +lines | beam.Map(print) +p.run() +``` +{{end}} + +To write data to a local file, you can use the Write method and pass in the file path as a string. Here is an example of writing a string to a local text file named "**myfile.txt**": + +{{if (eq .Sdk "go")}} +``` +p := beam.NewPipeline() +s := beam.Create(p, "Hello, World!") +textio.Write(s, "myfile.txt") +if err := p.Run(); err != nil { + fmt.Printf("Failed to execute job: %v", err) +} +``` +{{end}} + +{{if (eq .Sdk "java")}} +``` +Pipeline p = Pipeline.create(); +p.apply(Create.of("Hello, World!")) + .apply(TextIO.write().to("myfile.txt")); +p.run(); +``` +{{end}} +{{if (eq .Sdk "python")}} +``` +import apache_beam as beam + +p = beam.Pipeline() +data = ['Hello, World!', 'Apache Beam'] +p | beam.Create(data) | beam.io.WriteToText('myfile.txt') +p.run() +``` +{{end}} +It is important to note that the `Read` and `Write` methods only read and write to local file systems and not the distributed file systems like **HDFS**, **GCS**, **S3** etc. + +### Playground exercise + +In the playground window, you can find an example that writes data to local file. Can you modify this example to generate numbers and write them to local file sorted? \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/go-example/main.go b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/go-example/main.go new file mode 100644 index 0000000000000..7917e1fc37086 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/go-example/main.go @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// beam-playground: +// name: text-io-local-write +// description: TextIO write local file example. +// multifile: true +// files: +// - name: myfile.txt +// context_line: 30 +// categories: +// - Quickstart +// complexity: MEDIUM +// tags: +// - hellobeam + +package main + +import ( + "context" + "github.com/apache/beam/sdks/v2/go/pkg/beam" + "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" + "github.com/apache/beam/sdks/v2/go/pkg/beam/log" + "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" + "regexp" +) + +var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`) + +func main() { + p, s := beam.NewPipelineWithRoot() + + input := beam.Create(s, "Hello write from playground", "First example") + + textio.Write(s, "myfile.txt", input) + + err := beamx.Run(context.Background(), p) + if err != nil { + log.Exitf(context.Background(), "Failed to execute job: %v", err) + } +} diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/go-example/myfile.txt b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/go-example/myfile.txt new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/java-example/Task.java b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/java-example/Task.java new file mode 100644 index 0000000000000..d22df75731e65 --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/java-example/Task.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// beam-playground: +// name: text-io-local-write +// description: TextIO write local file example. +// multifile: true +// files: +// - name: myfile.txt +// context_line: 34 +// categories: +// - Quickstart +// complexity: MEDIUM +// tags: +// - hellobeam + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Create; + + +public class Task { + public static void main(String[] args) { + Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create()); + + pipeline.apply(Create.of("Hello, World!")) + .apply(TextIO.write().to("myfile.txt")); + + pipeline.run(); + } +} diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/java-example/myfile.txt b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/java-example/myfile.txt new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/python-example/myfile.txt b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/python-example/myfile.txt new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/python-example/task.py b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/python-example/task.py new file mode 100644 index 0000000000000..2ebe0d56fc12d --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/python-example/task.py @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# beam-playground: +# name: text-io-local-write +# description: TextIO write local file example. +# multifile: true +# files: +# - name: myfile.txt +# context_line: 30 +# categories: +# - Quickstart +# complexity: ADVANCED +# tags: +# - hellobeam + +import apache_beam as beam + +def print_lines(line): + print(line) + +p = beam.Pipeline() + +data = ['Hello, World!', 'Apache Beam'] + +p | 'CreateMyData' >> beam.Create(data) | 'WriteMyFile' >> beam.io.WriteToText(file_path_prefix='myfile.txt',shard_name_template='') + +p.run().wait_until_finish() \ No newline at end of file diff --git a/learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/unit-info.yaml b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/unit-info.yaml new file mode 100644 index 0000000000000..dae2bf5f3386b --- /dev/null +++ b/learning/tour-of-beam/learning-content/io/text-io/text-io-local-write/unit-info.yaml @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +sdk: + - Java + - Python + - Go +complexity: MEDIUM +id: text-io-local-write +name: TextIO write local file +taskName: text-io-local-write