Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export Exasol Table into AWS S3 as Parquet format #16

Merged
merged 21 commits into from
Feb 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
a369225
[skip ci] Add link to latest Github release in readme
morazow Jan 22, 2019
1d777fe
Start Exasol table to S3 export as parquet format
morazow Jan 23, 2019
7fab7da
[skip ci] Update readme with export api description
morazow Jan 23, 2019
c6b2907
Add test to check the parquet schema
morazow Jan 25, 2019
6faa6c2
Minor refactoring and typo fix in tests
morazow Jan 25, 2019
c268f63
Fix type `import` -> `export`
morazow Jan 29, 2019
a81db06
Add tests for export path object
morazow Jan 29, 2019
53dff6b
Add exasol columns to parquet message type conversion functions
morazow Jan 30, 2019
a5ded42
Add exaColumnToValue function that returns value from ExaIterator index
morazow Jan 30, 2019
81b119b
Restructure resource test data files
morazow Feb 1, 2019
661e6f6
Add tests for schema conversion for unsupported type
morazow Feb 1, 2019
3932d77
Add initial export functionality
morazow Feb 1, 2019
e9e1dd8
Converts decimal values into int32 or int64 if precision is within bo…
morazow Feb 4, 2019
b32e100
Use type method to obtain primitive type
morazow Feb 4, 2019
9f1d1f5
Add date and timestamp reader functionality
morazow Feb 5, 2019
e572bad
Merge branch 'master' into feature/export-parquet-s3
morazow Feb 8, 2019
c6013c2
[skip ci] Minor reformatting notice in readme.md
morazow Feb 8, 2019
a8daf06
Do not separate decimal values into int32 or int64 values
morazow Feb 11, 2019
9540696
ExaIterator cannot emit Scala `math.BigDecimal`. Change it into `java…
morazow Feb 11, 2019
7466de1
Add fs.LocalFileSystem into configuration
morazow Feb 11, 2019
5023000
Fix bug when converting date into days (since epoch) and back
morazow Feb 11, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@

[![Build Status][travis-badge]][travis-link]
[![Codecov][codecov-badge]][codecov-link]
[![GitHub Latest Release][gh-release-badge]][gh-release-link]

<p style="border: 1px solid black;padding: 10px; background-color: #FFFFCC;"><span style="font-size:200%">&#128712;</span> Please note that this is an open source project which is officially supported by Exasol. For any question, you can contact our support team.</p>
<p style="border: 1px solid black;padding: 10px; background-color: #FFFFCC;">
<span style="font-size:200%">&#128712;</span> Please note that this is an open
source project which is officially supported by Exasol. For any question, you
can contact our support team.
</p>

## Table of Contents

Expand Down Expand Up @@ -52,6 +57,8 @@ Please change required parameters.
CREATE SCHEMA ETL;
OPEN SCHEMA ETL;

-- Import related scripts

CREATE OR REPLACE JAVA SET SCRIPT IMPORT_PATH(...) EMITS (...) AS
%scriptclass com.exasol.cloudetl.scriptclasses.ImportPath;
%jar /buckets/bfsdefault/bucket1/cloud-storage-etl-udfs-{VERSION}.jar;
Expand All @@ -68,6 +75,18 @@ EMITS (filename VARCHAR(200), partition_index VARCHAR(100)) AS
%scriptclass com.exasol.cloudetl.scriptclasses.ImportMetadata;
%jar /buckets/bfsdefault/bucket1/cloud-storage-etl-udfs-{VERSION}.jar;
/

-- Export related scripts

CREATE OR REPLACE JAVA SET SCRIPT EXPORT_PATH(...) EMITS (...) AS
%scriptclass com.exasol.cloudetl.scriptclasses.ExportPath;
%jar /buckets/bfsdefault/bucket1/cloud-storage-etl-udfs-{VERSION}.jar;
/

CREATE OR REPLACE JAVA SET SCRIPT EXPORT_TABLE(...) EMITS (ROWS_AFFECTED INT) AS
%scriptclass com.exasol.cloudetl.scriptclasses.ExportTable;
%jar /buckets/bfsdefault/bucket1/cloud-storage-etl-udfs-{VERSION}.jar;
/
```

### Import data from cloud storages
Expand All @@ -77,8 +96,8 @@ Please follow steps below in order to import from cloud strorages.
#### Create an Exasol schema and table

```sql
CREATE SCHEMA TEST;
OPEN SCHEMA TEST;
CREATE SCHEMA RETAIL;
OPEN SCHEMA RETAIL;

DROP TABLE IF EXISTS SALES_POSITIONS;

Expand Down Expand Up @@ -151,6 +170,20 @@ FROM SCRIPT ETL.IMPORT_PATH WITH
SELECT * FROM SALES_POSITIONS LIMIT 10;
```

#### Export to AWS S3

```sql
EXPORT SALES_POSITIONS
INTO SCRIPT ETL.EXPORT_PATH WITH
BUCKET_PATH = 's3a://exa-mo-frankfurt/export/retail/sales_positions/'
S3_ACCESS_KEY = 'MY_AWS_ACCESS_KEY'
S3_SECRET_KEY = 'MY_AWS_SECRET_KEY'
S3_ENDPOINT = 's3.MY_REGION.amazonaws.com'
PARALLELISM = 'nproc()';

-- MY_REGION is one of AWS regions, for example, eu-central-1
```

## Building from Source

Clone the repository,
Expand All @@ -174,6 +207,8 @@ The packaged jar should be located at
[travis-link]: https://travis-ci.org/exasol/cloud-storage-etl-udfs
[codecov-badge]: https://codecov.io/gh/exasol/cloud-storage-etl-udfs/branch/master/graph/badge.svg
[codecov-link]: https://codecov.io/gh/exasol/cloud-storage-etl-udfs
[gh-release-badge]: https://img.shields.io/github/release/exasol/cloud-storage-etl-udfs.svg
[gh-release-link]: https://github.com/exasol/cloud-storage-etl-udfs/releases/latest
[exasol]: https://www.exasol.com/en/
[s3]: https://aws.amazon.com/s3/
[gcs]: https://cloud.google.com/storage/
Expand Down
1 change: 1 addition & 0 deletions project/Compilation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ object Compilation {
)

val WartremoverTestFlags: Seq[Wart] = ExtraWartremoverFlags ++ Warts.allBut(
Wart.Any,
Wart.NonUnitStatements,
Wart.Null
)
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/com/exasol/cloudetl/bucket/Bucket.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ final case class S3Bucket(path: String, params: Map[String, String]) extends Buc
validate()

val conf = new Configuration()
conf.set("fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName)
conf.set("fs.s3a.impl", classOf[org.apache.hadoop.fs.s3a.S3AFileSystem].getName)
conf.set("fs.s3a.endpoint", Bucket.requiredParam(params, "S3_ENDPOINT"))
conf.set("fs.s3a.access.key", Bucket.requiredParam(params, "S3_ACCESS_KEY"))
Expand Down
12 changes: 12 additions & 0 deletions src/main/scala/com/exasol/cloudetl/data/ExaColumnInfo.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.exasol.cloudetl.data

/** An Exasol table column information */
@SuppressWarnings(Array("org.wartremover.warts.DefaultArguments"))
final case class ExaColumnInfo(
name: String,
`type`: Class[_],
precision: Int = 0,
scale: Int = 0,
length: Int = 0,
isNullable: Boolean = true
)
3 changes: 3 additions & 0 deletions src/main/scala/com/exasol/cloudetl/data/Row.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.exasol.cloudetl.data

final case class Row(val values: Seq[Any])
41 changes: 41 additions & 0 deletions src/main/scala/com/exasol/cloudetl/parquet/ParquetRowWriter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.exasol.cloudetl.parquet

import com.exasol.cloudetl.data.Row

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.ParquetFileWriter
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.schema.MessageType

object ParquetRowWriter {

private[this] class Builder(path: Path, messageType: MessageType)
extends ParquetWriter.Builder[Row, Builder](path) {

override def getWriteSupport(conf: Configuration): WriteSupport[Row] =
new RowWriteSupport(messageType)

override def self(): Builder = this
}

def apply(
path: Path,
conf: Configuration,
messageType: MessageType,
options: ParquetWriteOptions
): ParquetWriter[Row] =
new Builder(path, messageType)
.withRowGroupSize(options.blockSize)
.withPageSize(options.pageSize)
.withCompressionCodec(options.compressionCodec)
.withDictionaryEncoding(options.enableDictionaryEncoding)
.withValidation(options.enableValidation)
.withWriteMode(ParquetFileWriter.Mode.CREATE)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
.withConf(conf)
.build()

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package com.exasol.cloudetl.source
package com.exasol.cloudetl.parquet

import scala.collection.JavaConverters._
import scala.language.reflectiveCalls

import com.exasol.cloudetl.row.Row
import com.exasol.cloudetl.row.RowReadSupport
import com.exasol.cloudetl.data.Row
import com.exasol.cloudetl.util.FsUtil

import org.apache.hadoop.conf.Configuration
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.exasol.cloudetl.parquet

import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName

final case class ParquetWriteOptions(
blockSize: Int,
pageSize: Int,
compressionCodec: CompressionCodecName,
enableDictionaryEncoding: Boolean,
enableValidation: Boolean
)

object ParquetWriteOptions {

@SuppressWarnings(
Array("org.wartremover.warts.Overloading", "org.danielnixon.extrawarts.StringOpsPartial")
)
def apply(params: Map[String, String]): ParquetWriteOptions = {
val compressionCodec = params.getOrElse("PARQUET_COMPRESSION_CODEC", "").toUpperCase() match {
case "SNAPPY" => CompressionCodecName.SNAPPY
case "GZIP" => CompressionCodecName.GZIP
case "LZO" => CompressionCodecName.LZO
case _ => CompressionCodecName.UNCOMPRESSED
}
val blockSize =
params.get("PARQUET_BLOCK_SIZE").fold(ParquetWriter.DEFAULT_BLOCK_SIZE)(_.toInt)
val pageSize = params.get("PARQUET_PAGE_SIZE").fold(ParquetWriter.DEFAULT_PAGE_SIZE)(_.toInt)
val dictionary = params.get("PARQUET_DICTIONARY_ENCODING").fold(true)(_.toBoolean)
val validation = params.get("PARQUET_VALIDAIONT").fold(true)(_.toBoolean)

ParquetWriteOptions(blockSize, pageSize, compressionCodec, dictionary, validation)
}

}
Loading