Skip to content

Commit

Permalink
Add IcebergIO integration tests (apache#31220)
Browse files Browse the repository at this point in the history
* read integration test

* write integration test

* test with Managed interface
  • Loading branch information
ahmedabu98 authored May 9, 2024
1 parent 13708ea commit e0bc8e7
Show file tree
Hide file tree
Showing 7 changed files with 388 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run"
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,19 @@ on:
branches: ['master', 'release-*']
paths:
- "sdks/java/io/iceberg/**"
- ".github/workflows/IO_Iceberg.yml"
- "sdks/java/io/managed/**"
- "sdks/java/expansion-service/**"
- "sdks/java/io/expansion-service/**"
- ".github/workflows/IO_Iceberg_Unit_Tests.yml"
pull_request_target:
branches: ['master', 'release-*']
paths:
- "sdks/java/io/iceberg/**"
- "sdks/java/io/managed/**"
- "sdks/java/expansion-service/**"
- "sdks/java/io/expansion-service/**"
- 'release/trigger_all_tests.json'
- '.github/trigger_files/IO_Iceberg.json'
- '.github/trigger_files/IO_Iceberg_Unit_Tests.json'
issue_comment:
types: [created]
schedule:
Expand Down Expand Up @@ -61,11 +67,11 @@ env:
GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}

jobs:
IO_Iceberg:
IO_Iceberg_Unit_Tests:
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
strategy:
matrix:
job_name: ["IO_Iceberg"]
job_name: ["IO_Iceberg_Unit_Tests"]
job_phrase: ["Run IcebergIO Unit Tests"]
timeout-minutes: 60
if: |
Expand All @@ -88,7 +94,7 @@ jobs:
- name: run IcebergIO build script
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :sdks:java:io:iceberg:build
gradle-command: :sdks:java:io:iceberg:build :sdks:java:io:iceberg:integrationTest
arguments: |
-PdisableSpotlessCheck=true \
-PdisableCheckStyle=true \
Expand Down
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ tasks.register("javaHadoopVersionsTest") {
dependsOn(":sdks:java:io:hadoop-file-system:hadoopVersionsTest")
dependsOn(":sdks:java:io:hadoop-format:hadoopVersionsTest")
dependsOn(":sdks:java:io:hcatalog:hadoopVersionsTest")
dependsOn(":sdks:java:io:iceberg:hadoopVersionsTest")
dependsOn(":sdks:java:io:parquet:hadoopVersionsTest")
dependsOn(":sdks:java:extensions:sorter:hadoopVersionsTest")
dependsOn(":runners:spark:3:hadoopVersionsTest")
Expand Down
26 changes: 25 additions & 1 deletion sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import groovy.json.JsonOutput

import java.util.stream.Collectors

/*
Expand Down Expand Up @@ -38,7 +40,6 @@ hadoopVersions.each {kv -> configurations.create("hadoopVersion$kv.key")}
def iceberg_version = "1.4.2"
def parquet_version = "1.12.0"
def orc_version = "1.9.2"
def hive_version = "3.1.3"

dependencies {
implementation library.java.vendored_guava_32_1_2_jre
Expand All @@ -55,9 +56,13 @@ dependencies {
implementation library.java.hadoop_common

testImplementation library.java.hadoop_client
testImplementation library.java.bigdataoss_gcsio
testImplementation "com.google.cloud.bigdataoss:gcs-connector:hadoop2-2.2.16"
testImplementation "com.google.cloud.bigdataoss:util-hadoop:hadoop2-2.2.16"
testImplementation "org.apache.iceberg:iceberg-gcp:$iceberg_version"
testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version"
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation project(":sdks:java:extensions:google-cloud-platform-core")
testImplementation library.java.junit
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
Expand Down Expand Up @@ -89,3 +94,22 @@ hadoopVersions.each { kv ->
include '**/*Test.class'
}
}

task integrationTest(type: Test, dependsOn: processTestResources) {
group = "Verification"
def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
def gcpTempLocation = project.findProperty('gcpTempLocation') ?: 'gs://temp-storage-for-end-to-end-tests'
systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
"--project=${gcpProject}",
"--tempLocation=${gcpTempLocation}",
])

// Disable Gradle cache: these ITs interact with live service that should always be considered "out of date"
outputs.upToDateWhen { false }

include '**/*IT.class'

maxParallelForks 4
classpath = sourceSets.test.runtimeClasspath
testClassesDirs = sourceSets.test.output.classesDirs
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,34 @@
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

class SchemaAndRowConversions {

private SchemaAndRowConversions() {}

public static final String ICEBERG_TYPE_OPTION_NAME = "icebergTypeID";
static final Map<Schema.FieldType, Type> BEAM_TYPES_TO_ICEBERG_TYPES =
ImmutableMap.<Schema.FieldType, Type>builder()
.put(Schema.FieldType.BOOLEAN, Types.BooleanType.get())
.put(Schema.FieldType.INT32, Types.IntegerType.get())
.put(Schema.FieldType.INT64, Types.LongType.get())
.put(Schema.FieldType.FLOAT, Types.FloatType.get())
.put(Schema.FieldType.DOUBLE, Types.DoubleType.get())
.put(Schema.FieldType.STRING, Types.StringType.get())
.put(Schema.FieldType.BYTES, Types.BinaryType.get())
.build();

public static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {
switch (type.typeId()) {
Expand Down Expand Up @@ -76,11 +88,6 @@ public static Schema.FieldType icebergTypeToBeamFieldType(final Type type) {

public static Schema.Field icebergFieldToBeamField(final Types.NestedField field) {
return Schema.Field.of(field.name(), icebergTypeToBeamFieldType(field.type()))
.withOptions(
Schema.Options.builder()
.setOption(
ICEBERG_TYPE_OPTION_NAME, Schema.FieldType.STRING, field.type().typeId().name())
.build())
.withNullable(field.isOptional());
}

Expand All @@ -101,13 +108,11 @@ public static Schema icebergStructTypeToBeamSchema(final Types.StructType struct
}

public static Types.NestedField beamFieldToIcebergField(int fieldId, final Schema.Field field) {
String typeId = field.getOptions().getValue(ICEBERG_TYPE_OPTION_NAME, String.class);
if (typeId != null) {
@Nullable Type icebergType = BEAM_TYPES_TO_ICEBERG_TYPES.get(field.getType());

if (icebergType != null) {
return Types.NestedField.of(
fieldId,
field.getType().getNullable(),
field.getName(),
Types.fromPrimitiveString(typeId));
fieldId, field.getType().getNullable(), field.getName(), icebergType);
} else {
return Types.NestedField.of(
fieldId, field.getType().getNullable(), field.getName(), Types.StringType.get());
Expand Down
Loading

0 comments on commit e0bc8e7

Please sign in to comment.