Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Destination S3: support anyOf, allOf, and oneOf #4613

Merged
merged 8 commits into from
Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "4816b78f-1489-44c1-9060-4b19d5fa9362",
"name": "S3",
"dockerRepository": "airbyte/destination-s3",
"dockerImageTag": "0.1.7",
"dockerImageTag": "0.1.8",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/s3"
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
- destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
name: S3
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.1.7
dockerImageTag: 0.1.8
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
name: Redshift
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.name=airbyte/destination-s3
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.integrations.destination.s3.avro;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;

/**
* Mapping of JsonSchema types to Avro types.
Expand All @@ -37,7 +38,8 @@ public enum JsonSchemaType {
BOOLEAN("boolean", true, Schema.Type.BOOLEAN),
NULL("null", true, Schema.Type.NULL),
OBJECT("object", false, Schema.Type.RECORD),
ARRAY("array", false, Schema.Type.ARRAY);
ARRAY("array", false, Schema.Type.ARRAY),
COMBINED("combined", false, Schema.Type.UNION);

private final String jsonSchemaType;
private final boolean isPrimitive;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.integrations.destination.s3.avro;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.integrations.base.JavaBaseConstants;
Expand All @@ -34,6 +35,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -64,23 +66,46 @@ public class JsonToAvroSchemaConverter {

private final Map<String, String> standardizedNames = new HashMap<>();

static List<JsonSchemaType> getNonNullTypes(String fieldName, JsonNode typeProperty) {
return getTypes(fieldName, typeProperty).stream()
static List<JsonSchemaType> getNonNullTypes(String fieldName, JsonNode fieldDefinition) {
return getTypes(fieldName, fieldDefinition).stream()
.filter(type -> type != JsonSchemaType.NULL).collect(Collectors.toList());
}

static List<JsonSchemaType> getTypes(String fieldName, JsonNode typeProperty) {
if (typeProperty == null) {
static List<JsonSchemaType> getTypes(String fieldName, JsonNode fieldDefinition) {
Optional<JsonNode> combinedRestriction = getCombinedRestriction(fieldDefinition);
if (combinedRestriction.isPresent()) {
return Collections.singletonList(JsonSchemaType.COMBINED);
}

JsonNode typeProperty = fieldDefinition.get("type");
if (typeProperty == null || typeProperty.isNull()) {
throw new IllegalStateException(String.format("Field %s has no type", fieldName));
} else if (typeProperty.isArray()) {
}

if (typeProperty.isArray()) {
return MoreIterators.toList(typeProperty.elements()).stream()
.map(s -> JsonSchemaType.fromJsonSchemaType(s.asText()))
.collect(Collectors.toList());
} else if (typeProperty.isTextual()) {
}

if (typeProperty.isTextual()) {
return Collections.singletonList(JsonSchemaType.fromJsonSchemaType(typeProperty.asText()));
} else {
throw new IllegalStateException("Unexpected type: " + typeProperty);
}

throw new IllegalStateException("Unexpected type: " + typeProperty);
}

static Optional<JsonNode> getCombinedRestriction(JsonNode fieldDefinition) {
if (fieldDefinition.has("anyOf")) {
return Optional.of(fieldDefinition.get("anyOf"));
}
if (fieldDefinition.has("allOf")) {
return Optional.of(fieldDefinition.get("allOf"));
}
if (fieldDefinition.has("oneOf")) {
return Optional.of(fieldDefinition.get("oneOf"));
}
return Optional.empty();
}

public Map<String, String> getStandardizedNames() {
Expand Down Expand Up @@ -153,21 +178,20 @@ Schema getSingleFieldType(String fieldName,
Schema fieldSchema;
switch (fieldType) {
case STRING, NUMBER, INTEGER, BOOLEAN -> fieldSchema = Schema.create(fieldType.getAvroType());
case COMBINED -> {
Optional<JsonNode> combinedRestriction = getCombinedRestriction(fieldDefinition);
List<Schema> unionTypes = getSchemaFromTypes(fieldName, (ArrayNode) combinedRestriction.get());
fieldSchema = Schema.createUnion(unionTypes);
}
case ARRAY -> {
JsonNode items = fieldDefinition.get("items");
Preconditions.checkNotNull(items, "Array field %s misses the items property.", fieldName);

if (items.isObject()) {
fieldSchema = Schema
.createArray(getNullableFieldTypes(String.format("%s.items", fieldName), items));
fieldSchema = Schema.createArray(getNullableFieldTypes(String.format("%s.items", fieldName), items));
} else if (items.isArray()) {
List<Schema> arrayElementTypes = MoreIterators.toList(items.elements())
.stream()
.flatMap(itemDefinition -> getNonNullTypes(fieldName, itemDefinition.get("type")).stream()
.map(type -> getSingleFieldType(fieldName, type, itemDefinition, false)))
.distinct()
.collect(Collectors.toList());
arrayElementTypes.add(0, Schema.create(Schema.Type.NULL));
List<Schema> arrayElementTypes = getSchemaFromTypes(fieldName, (ArrayNode) items);
arrayElementTypes.add(0, Schema.create(Type.NULL));
fieldSchema = Schema.createArray(Schema.createUnion(arrayElementTypes));
} else {
throw new IllegalStateException(
Expand All @@ -181,12 +205,22 @@ Schema getSingleFieldType(String fieldName,
return fieldSchema;
}

List<Schema> getSchemaFromTypes(String fieldName, ArrayNode types) {
List<Schema> schemas = MoreIterators.toList(types.elements())
.stream()
.flatMap(definition -> getNonNullTypes(fieldName, definition).stream()
.map(type -> getSingleFieldType(fieldName, type, definition, false)))
.distinct()
.collect(Collectors.toList());
return schemas;
}

/**
* @param fieldDefinition - Json schema field definition. E.g. { type: "number" }.
*/
Schema getNullableFieldTypes(String fieldName, JsonNode fieldDefinition) {
// Filter out null types, which will be added back in the end.
List<Schema> nonNullFieldTypes = getNonNullTypes(fieldName, fieldDefinition.get("type"))
List<Schema> nonNullFieldTypes = getNonNullTypes(fieldName, fieldDefinition)
.stream()
.flatMap(fieldType -> {
Schema singleFieldSchema = getSingleFieldType(fieldName, fieldType, fieldDefinition, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
package io.airbyte.integrations.destination.s3.avro;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
Expand All @@ -47,15 +48,27 @@ public void testGetSingleTypes() {
JsonNode input1 = Jsons.deserialize("{ \"type\": \"number\" }");
assertEquals(
Collections.singletonList(JsonSchemaType.NUMBER),
JsonToAvroSchemaConverter.getTypes("field", input1.get("type")));
JsonToAvroSchemaConverter.getTypes("field", input1));
}

@Test
public void testGetUnionTypes() {
JsonNode input2 = Jsons.deserialize("{ \"type\": [\"null\", \"string\"] }");
assertEquals(
Lists.newArrayList(JsonSchemaType.NULL, JsonSchemaType.STRING),
JsonToAvroSchemaConverter.getTypes("field", input2.get("type")));
JsonToAvroSchemaConverter.getTypes("field", input2));
}

@Test
public void testNoCombinedRestriction() {
JsonNode input1 = Jsons.deserialize("{ \"type\": \"number\" }");
assertTrue(JsonToAvroSchemaConverter.getCombinedRestriction(input1).isEmpty());
}

@Test
public void testWithCombinedRestriction() {
JsonNode input2 = Jsons.deserialize("{ \"anyOf\": [{ \"type\": \"string\" }, { \"type\": \"integer\" }] }");
assertTrue(JsonToAvroSchemaConverter.getCombinedRestriction(input2).isPresent());
}

public static class GetFieldTypeTestCaseProvider implements ArgumentsProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,5 +253,44 @@
}
]
}
},
{
"schemaName": "combined_restriction",
"namespace": "namespace8",
"appendAirbyteFields": false,
"jsonSchema": {
"properties": {
"created_at": {
tuliren marked this conversation as resolved.
Show resolved Hide resolved
"anyOf": [
{
"type": "string",
"format": "date-time"
},
{
"type": ["null","string"]
},
{
"type": "integer"
}
]
}
}
},
"avroSchema": {
"type": "record",
"name": "combined_restriction",
"namespace": "namespace8",
"fields": [
{
"name": "created_at",
"type": [
"null",
"string",
"int"
],
"default": null
}
]
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,35 @@
]
}
]
},
{
"fieldName": "any_of_field",
"jsonFieldSchema": {
"anyOf": [
{ "type": "string" },
{ "type": "integer" }
]
},
"avroFieldType": ["null", "string", "int"]
},
{
"fieldName": "all_of_field",
"jsonFieldSchema": {
"allOf": [
{ "type": "string" },
{ "type": "integer" }
]
},
"avroFieldType": ["null", "string", "int"]
},
{
"fieldName": "one_of_field",
"jsonFieldSchema": {
"oneOf": [
{ "type": "string" },
{ "type": "integer" }
]
},
"avroFieldType": ["null", "string", "int"]
}
]
1 change: 1 addition & 0 deletions docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.8 | 2021-07-07 | [#4613](https://github.com/airbytehq/airbyte/pull/4613) | Patched schema converter to support combined restrictions. |
| 0.1.7 | 2021-06-23 | [#4227](https://github.com/airbytehq/airbyte/pull/4227) | Added Avro and JSONL output. |
| 0.1.6 | 2021-06-16 | [#4130](https://github.com/airbytehq/airbyte/pull/4130) | Patched the check to verify prefix access instead of full-bucket access. |
| 0.1.5 | 2021-06-14 | [#3908](https://github.com/airbytehq/airbyte/pull/3908) | Fixed default `max_padding_size_mb` in `spec.json`. |
Expand Down