From 153842600bc4d21462b061ef7c5cc6b2530c060e Mon Sep 17 00:00:00 2001 From: Vamsi Date: Tue, 27 Aug 2024 15:11:03 +0530 Subject: [PATCH] convert json to row using MercifulJsonToRowConverter --- .../hudi/avro/MercifulJsonConverter.java | 15 +- .../DecimalLogicalTypeProcessor.java | 26 +- .../avro/MercifulJsonConverterTestBase.java | 368 ++++++++++++++ .../hudi/avro/TestMercifulJsonConverter.java | 295 +---------- .../HoodieJsonToRowConversionException.java | 32 ++ .../helpers/MercifulJsonToRowConverter.java | 267 ++++++++++ .../sources/helpers/RowConverter.java | 116 +++++ .../streamer/SourceFormatAdapter.java | 34 +- .../utilities/sources/TestJsonDFSSource.java | 22 +- .../sources/TestJsonKafkaSource.java | 30 ++ .../TestMercifulJsonToRowConverter.java | 465 ++++++++++++++++++ .../simple-test-with-default-value.avsc | 37 ++ 12 files changed, 1384 insertions(+), 323 deletions(-) create mode 100644 hudi-common/src/test/java/org/apache/hudi/avro/MercifulJsonConverterTestBase.java create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieJsonToRowConversionException.java create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/RowConverter.java create mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverter.java create mode 100644 hudi-utilities/src/test/resources/schema/simple-test-with-default-value.avsc diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java index a6eabc547f48d..f086ef62c8804 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java @@ -304,25 +304,12 @@ public Pair convert(Object value, String name, Schema schema) { // Case 2: Input is a number or String number. LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) schema.getLogicalType(); - Pair parseResult = parseObjectToBigDecimal(value); + Pair parseResult = parseObjectToBigDecimal(value, schema); if (Boolean.FALSE.equals(parseResult.getLeft())) { return Pair.of(false, null); } BigDecimal bigDecimal = parseResult.getRight(); - // As we don't do rounding, the validation will enforce the scale part and the integer part are all within the - // limit. As a result, if scale is 2 precision is 5, we only allow 3 digits for the integer. - // Allowed: 123.45, 123, 0.12 - // Disallowed: 1234 (4 digit integer while the scale has already reserved 2 digit out of the 5 digit precision) - // 123456, 0.12345 - if (bigDecimal.scale() > decimalType.getScale() - || (bigDecimal.precision() - bigDecimal.scale()) > (decimalType.getPrecision() - decimalType.getScale())) { - // Correspond to case - // org.apache.avro.AvroTypeException: Cannot encode decimal with scale 5 as scale 2 without rounding. - // org.apache.avro.AvroTypeException: Cannot encode decimal with scale 3 as scale 2 without rounding - return Pair.of(false, null); - } - switch (schema.getType()) { case BYTES: // Convert to primitive Arvo type that logical type Decimal uses. diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/processors/DecimalLogicalTypeProcessor.java b/hudi-common/src/main/java/org/apache/hudi/avro/processors/DecimalLogicalTypeProcessor.java index 1c49e086e46d9..04ed97069affd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/processors/DecimalLogicalTypeProcessor.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/processors/DecimalLogicalTypeProcessor.java @@ -49,21 +49,37 @@ protected static boolean isValidDecimalTypeConfig(Schema schema) { * @return Pair object, with left as boolean indicating if the parsing was successful and right as the * BigDecimal value. */ - protected static Pair parseObjectToBigDecimal(Object obj) { + protected static Pair parseObjectToBigDecimal(Object obj, Schema schema) { + BigDecimal bigDecimal = null; if (obj instanceof Number) { - return Pair.of(true, BigDecimal.valueOf(((Number) obj).doubleValue())); + bigDecimal = BigDecimal.valueOf(((Number) obj).doubleValue()); } // Case 2: Object is a number in String format. if (obj instanceof String) { - BigDecimal bigDecimal = null; try { bigDecimal = new BigDecimal(((String) obj)); } catch (java.lang.NumberFormatException ignored) { /* ignore */ } - return Pair.of(bigDecimal != null, bigDecimal); } - return Pair.of(false, null); + + if (bigDecimal == null) { + return Pair.of(false, null); + } + // As we don't do rounding, the validation will enforce the scale part and the integer part are all within the + // limit. As a result, if scale is 2 precision is 5, we only allow 3 digits for the integer. + // Allowed: 123.45, 123, 0.12 + // Disallowed: 1234 (4 digit integer while the scale has already reserved 2 digit out of the 5 digit precision) + // 123456, 0.12345 + LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) schema.getLogicalType(); + if (bigDecimal.scale() > decimalType.getScale() + || (bigDecimal.precision() - bigDecimal.scale()) > (decimalType.getPrecision() - decimalType.getScale())) { + // Correspond to case + // org.apache.avro.AvroTypeException: Cannot encode decimal with scale 5 as scale 2 without rounding. + // org.apache.avro.AvroTypeException: Cannot encode decimal with scale 3 as scale 2 without rounding + return Pair.of(false, null); + } + return Pair.of(true, bigDecimal); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/MercifulJsonConverterTestBase.java b/hudi-common/src/test/java/org/apache/hudi/avro/MercifulJsonConverterTestBase.java new file mode 100644 index 0000000000000..6ccab7df6e7d8 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/avro/MercifulJsonConverterTestBase.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.avro; + +import org.junit.jupiter.params.provider.Arguments; + +import java.util.Arrays; +import java.util.UUID; +import java.util.stream.Stream; + +public class MercifulJsonConverterTestBase { + + private static final String DECIMAL_AVRO_FILE_INVALID_PATH = "/decimal-logical-type-invalid.avsc"; + private static final String DECIMAL_AVRO_FILE_PATH = "/decimal-logical-type.avsc"; + private static final String DECIMAL_FIXED_AVRO_FILE_PATH = "/decimal-logical-type-fixed-type.avsc"; + private static final String LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH = "/local-timestamp-micros-logical-type.avsc"; + private static final String LOCAL_TIMESTAMP_MILLI_AVRO_FILE_PATH = "/local-timestamp-millis-logical-type.avsc"; + private static final String DURATION_AVRO_FILE_PATH_INVALID = "/duration-logical-type-invalid.avsc"; + + private static final String DURATION_AVRO_FILE_PATH = "/duration-logical-type.avsc"; + private static final String DATE_AVRO_FILE_PATH = "/date-type.avsc"; + private static final String DATE_AVRO_INVALID_FILE_PATH = "/date-type-invalid.avsc"; + private static final String TIMESTAMP_AVRO_FILE_PATH = "/timestamp-logical-type2.avsc"; + + static Stream decimalBadCases() { + return Stream.of( + // Invalid schema definition. + Arguments.of(DECIMAL_AVRO_FILE_INVALID_PATH, "123.45", null, false), + // Schema set precision as 5, input overwhelmed the precision. + Arguments.of(DECIMAL_AVRO_FILE_PATH, "123333.45", null, false), + Arguments.of(DECIMAL_AVRO_FILE_PATH, null, 123333.45, false), + // Schema precision set to 5, scale set to 2, so there is only 3 digit to accommodate integer part. + // As we do not do rounding, any input with more than 3 digit integer would fail. + Arguments.of(DECIMAL_AVRO_FILE_PATH, "1233", null, false), + Arguments.of(DECIMAL_AVRO_FILE_PATH, null, 1233D, false), + // Schema set scale as 2, input overwhelmed the scale. + Arguments.of(DECIMAL_AVRO_FILE_PATH, "0.222", null, false), + Arguments.of(DECIMAL_AVRO_FILE_PATH, null, 0.222, false), + // Invalid string which cannot be parsed as number. + Arguments.of(DECIMAL_AVRO_FILE_PATH, "", null, false), + Arguments.of(DECIMAL_AVRO_FILE_PATH, "NotAValidString", null, false), + Arguments.of(DECIMAL_AVRO_FILE_PATH, "-", null, false), + // Schema requires byte type while input is fixed type raw data. + Arguments.of(DECIMAL_AVRO_FILE_PATH, null, null, true) + ); + } + + static Stream decimalGoodCases() { + return Stream.of( + // The schema all set precision as 5, scale as 2. + // Test dimension: Schema file, Ground truth, string input, number input, fixed byte array input. + // Test some random numbers. + Arguments.of(DECIMAL_AVRO_FILE_PATH, "123.45", "123.45", null, false), + Arguments.of(DECIMAL_AVRO_FILE_PATH, "123.45", null, 123.45, false), + // Test MIN/MAX allowed by the schema. + Arguments.of(DECIMAL_AVRO_FILE_PATH, "-999.99", "-999.99", null, false), + Arguments.of(DECIMAL_AVRO_FILE_PATH, "999.99",null, 999.99, false), + // Test 0. + Arguments.of(DECIMAL_AVRO_FILE_PATH, "0", null, 0D, false), + Arguments.of(DECIMAL_AVRO_FILE_PATH, "0", "0", null, false), + Arguments.of(DECIMAL_AVRO_FILE_PATH, "0", "000.00", null, false), + // Same set of coverage over schame using byte/fixed type. + Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "123.45", "123.45", null, false), + Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "123.45", null, 123.45, false), + Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "-999.99", "-999.99", null, false), + Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "999.99",null, 999.99, false), + Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "0", null, 0D, false), + Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "0", "0", null, true), + Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "0", "000.00", null, true), + Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "123.45", null, null, true), + Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "123.45", null, 123.45, true), + Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "-999.99", null, null, true), + Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "999.99", null, 999.99, true), + Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "0", null, null, true) + ); + } + + static Stream zeroScaleDecimalCases() { + return Stream.of( + // Input value in JSON, expected decimal, whether conversion should be successful + // Values that can be converted + Arguments.of("0.0", "0", true), + Arguments.of("20.0", "20", true), + Arguments.of("320", "320", true), + Arguments.of("320.00", "320", true), + Arguments.of("-1320.00", "-1320", true), + Arguments.of("1520423524459", "1520423524459", true), + Arguments.of("1520423524459.0", "1520423524459", true), + Arguments.of("1000000000000000.0", "1000000000000000", true), + // Values that are big enough and out of range of int or long types + // Note that we can have at most 17 significant decimal digits in double values + Arguments.of("1.2684037455962608e+16", "12684037455962608", true), + Arguments.of("4.0100001e+16", "40100001000000000", true), + Arguments.of("3.52838e+17", "352838000000000000", true), + Arguments.of("9223372036853999600.0000", "9223372036853999600", true), + Arguments.of("999998887654321000000000000000.0000", "999998887654321000000000000000", true), + Arguments.of("-999998887654321000000000000000.0000", "-999998887654321000000000000000", true), + // Values covering high precision decimals that lose precision when converting to a double + Arguments.of("3.781239258857277e+16", "37812392588572770", true), + Arguments.of("1.6585135379127473e+18", "1658513537912747300", true), + // Values that should not be converted + Arguments.of("0.0001", null, false), + Arguments.of("300.9999", null, false), + Arguments.of("1928943043.0001", null, false) + ); + } + + static Stream durationGoodCases() { + return Stream.of( + // Normal inputs. + Arguments.of(1, 2, 3), + // Negative int would be interpreted as some unsigned int by Avro. They all 4-byte. + Arguments.of(-1, -2, -3), + // Signed -1 interpreted to unsigned would be unsigned MAX + Arguments.of(-1, -1, -1), + // Other special edge cases. + Arguments.of(0, 0, 0), + Arguments.of(Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE), + Arguments.of(Integer.MIN_VALUE, Integer.MIN_VALUE, Integer.MIN_VALUE) + ); + } + + static Stream durationBadCases() { + return Stream.of( + // As duration uses 12 byte fixed type to store 3 unsigned int numbers, Long.MAX would cause overflow. + // Verify it is gracefully handled. + Arguments.of(DURATION_AVRO_FILE_PATH, Arrays.asList(Long.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE)), + // Invalid num of element count + Arguments.of(DURATION_AVRO_FILE_PATH, Arrays.asList(1, 2, 3, 4)), + Arguments.of(DURATION_AVRO_FILE_PATH, Arrays.asList(1, 2)), + Arguments.of(DURATION_AVRO_FILE_PATH, (Object) new int[]{}), + Arguments.of(DURATION_AVRO_FILE_PATH, "InvalidString"), + Arguments.of(DURATION_AVRO_FILE_PATH_INVALID, Arrays.asList(1, 2, 3)) + ); + } + + static Stream localTimestampGoodCaseProvider() { + return Stream.of( + Arguments.of( + (long)(1715644416 * 1e6 + 4000000 / 1e3), // Num of micro sec since unix epoch + "2024-05-13T23:53:36.004", // Timestamp equivalence + "2024-05-13T23:53:36.004"), + Arguments.of( + (long)(1715644416 * 1e6), // Num of micro sec since unix epoch + "2024-05-13T23:53:36", // Timestamp equivalence + "2024-05-13T23:53:36"), + Arguments.of( + 2024L, "2", "2024"), + Arguments.of( + (long)(1715644416 * 1e6 + 4000000 / 1e3), + (long)(1715644416 * 1e3 + 4000000 / 1e6), + (long)(1715644416 * 1e6 + 4000000 / 1e3)), + Arguments.of( + (long)(1715644416 * 1e6 + 4000000 / 1e3), + (long)(1715644416 * 1e3 + 4000000 / 1e6), + Long.toString((long)(1715644416 * 1e6 + 4000000 / 1e3))), + // Test higher precision that only micro sec unit can capture. + Arguments.of( + (long)(1715644416 * 1e6 + 4000000 / 1e6), + "2024-05-13T23:53:36.000", // Timestamp equivalence + "2024-05-13T23:53:36.000004"), + // Test full range of time + Arguments.of( + 0L, + "1970-01-01T00:00:00.000", // Timestamp equivalence + "1970-01-01T00:00:00.000000"), + Arguments.of( + Long.MAX_VALUE, + "+294247-01-10T04:00:54.775", // Timestamp in far future must be prefixed with '+' + "+294247-01-10T04:00:54.775807"), + Arguments.of( + 0L, 0L, 0L), + Arguments.of( + -1L * 1000, -1L, -1L * 1000), + Arguments.of( + Long.MIN_VALUE, Long.MIN_VALUE / 1000, Long.MIN_VALUE), + Arguments.of( + Long.MAX_VALUE, Long.MAX_VALUE / 1000, Long.MAX_VALUE), + Arguments.of( + -62167219200000000L, "0000-01-01T00:00:00.00000", "0000-01-01T00:00:00.00000"), + Arguments.of( + -62167219200000000L, -62167219200000000L / 1000, -62167219200000000L) + ); + } + + static Stream localTimestampBadCaseProvider() { + return Stream.of( + Arguments.of(LOCAL_TIMESTAMP_MILLI_AVRO_FILE_PATH, "2024-05-1323:53:36.000"), + Arguments.of(LOCAL_TIMESTAMP_MILLI_AVRO_FILE_PATH, "2024-05-1T23:53:36.000"), + Arguments.of(LOCAL_TIMESTAMP_MILLI_AVRO_FILE_PATH, "2024-0-13T23:53:36.000"), + Arguments.of(LOCAL_TIMESTAMP_MILLI_AVRO_FILE_PATH, "20242-05-13T23:53:36.000"), + Arguments.of(LOCAL_TIMESTAMP_MILLI_AVRO_FILE_PATH, "202-05-13T23:53:36.0000000"), + Arguments.of(LOCAL_TIMESTAMP_MILLI_AVRO_FILE_PATH, "202-05-13T23:53:36.000"), + Arguments.of(LOCAL_TIMESTAMP_MILLI_AVRO_FILE_PATH, "2024-05-13T23:53:36.000Z"), + Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "2024-05-1323:53:36.000"), + Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "2024-05-1T23:53:36.000"), + Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "2024-0-13T23:53:36.000"), + Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "20242-05-13T23:53:36.000"), + Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "202-05-13T23:53:36.0000000"), + Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "202-05-13T23:53:36.000"), + Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "2022-05-13T99:99:99.000"), + Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "2024-05-13T23:53:36.000Z"), + Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "Not a timestamp at all!"), + Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "2024 05 13T23:00"), + Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "2024-05"), + Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "2011-12-03T10:15:30+01:00"), + Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "2011-12-03T10:15:30[Europe/ Paris]") + ); + } + + static Stream timestampGoodCaseProvider() { + return Stream.of( + Arguments.of( + (long)(1715644416 * 1e6 + 4000000 / 1e3), // Num of micro sec since unix epoch + "2024-05-13T23:53:36.004Z", // Timestamp equivalence + "2024-05-13T23:53:36.004Z"), + Arguments.of( + (long)(1715644416 * 1e6), // Num of micro sec since unix epoch + "2024-05-13T23:53:36Z", // Timestamp equivalence + "2024-05-13T23:53:36Z"), + Arguments.of( + 2024L, "2", "2024"), + Arguments.of( + (long)(1715644416 * 1e6 + 4000000 / 1e3), + (long)(1715644416 * 1e3 + 4000000 / 1e6), + (long)(1715644416 * 1e6 + 4000000 / 1e3)), + Arguments.of( + (long)(1715644416 * 1e6 + 4000000 / 1e3), + (long)(1715644416 * 1e3 + 4000000 / 1e6), + Long.toString((long)(1715644416 * 1e6 + 4000000 / 1e3))), + // Test higher precision that only micro sec unit can capture. + Arguments.of( + (long)(1715644416 * 1e6 + 4000000 / 1e6), + "2024-05-13T23:53:36.000Z", // Timestamp equivalence + "2024-05-13T23:53:36.000004Z"), + // Test full range of time + Arguments.of( + 0L, + "1970-01-01T00:00:00.000Z", // Timestamp equivalence + "1970-01-01T00:00:00.000000Z"), + // The test case leads to long overflow due to how java calculate duration between 2 timestamps + // Arguments.of( + // Long.MAX_VALUE, + // "+294247-01-10T04:00:54.775Z", // Timestamp in far future must be prefixed with '+' + // "+294247-01-10T04:00:54.775807Z"), + Arguments.of( + 0L, 0L, 0L), + Arguments.of( + -1L * 1000, -1L, -1L * 1000), + Arguments.of( + Long.MIN_VALUE, Long.MIN_VALUE / 1000, Long.MIN_VALUE), + Arguments.of( + Long.MAX_VALUE, Long.MAX_VALUE / 1000, Long.MAX_VALUE), + // The test case leads to long overflow due to how java calculate duration between 2 timestamps + // Arguments.of( + // -62167219200000000L, "0000-01-01T00:00:00.00000Z", "0000-01-01T00:00:00.00000Z"), + Arguments.of( + -62167219200000000L, -62167219200000000L / 1000, -62167219200000000L) + ); + } + + static Stream timestampBadCaseProvider() { + return Stream.of( + Arguments.of(TIMESTAMP_AVRO_FILE_PATH, "2024-05-1323:53:36.000"), + Arguments.of(TIMESTAMP_AVRO_FILE_PATH, "2024-05-1323:99:99.000Z"), + Arguments.of(TIMESTAMP_AVRO_FILE_PATH, "2024-05-1323:53:36.000 UTC"), + Arguments.of(TIMESTAMP_AVRO_FILE_PATH, "Tue, 3 Jun 2008 11:05:30 GMT") + ); + } + + static Stream timeGoodCaseProvider() { + return Stream.of( + // 12 hours and 30 minutes in milliseconds / microseconds + Arguments.of((long)4.5e10, (int)4.5e7, (long)4.5e10), + // 12 hours and 30 minutes in milliseconds / microseconds as string + Arguments.of((long)4.5e10, Integer.toString((int)4.5e7), Long.toString((long)4.5e10)), + // 12 hours and 30 minutes + Arguments.of((long)4.5e10, "12:30:00", "12:30:00"), + Arguments.of( + (long)(4.5e10 + 1e3), // 12 hours, 30 minutes and 0.001 seconds in microseconds + "12:30:00.001", // 12 hours, 30 minutes and 0.001 seconds + "12:30:00.001" // 12 hours, 30 minutes and 0.001 seconds + ), + // Test value ranges + Arguments.of( + 0L, + "00:00:00.000", + "00:00:00.00000" + ), + Arguments.of( + 86399999990L, + "23:59:59.999", + "23:59:59.99999" + ), + Arguments.of((long)Integer.MAX_VALUE, Integer.MAX_VALUE / 1000, (long)Integer.MAX_VALUE), + Arguments.of((long)Integer.MIN_VALUE, Integer.MIN_VALUE / 1000, (long)Integer.MIN_VALUE) + ); + } + + static Stream timeBadCaseProvider() { + return Stream.of( + Arguments.of("00:0"), + Arguments.of("00:00:99") + ); + } + + static Stream dateBadCaseProvider() { + return Stream.of( + Arguments.of(DATE_AVRO_INVALID_FILE_PATH, 18506), // epochDays + Arguments.of(DATE_AVRO_FILE_PATH, "#$@#%$@$%#@"), + Arguments.of(DATE_AVRO_FILE_PATH, "22020-09-01000"), + Arguments.of(DATE_AVRO_FILE_PATH, "2020-02-45"), + Arguments.of(DATE_AVRO_FILE_PATH, Arrays.asList(1, 2, 3)) + ); + } + + static Stream uuidDimension() { + return Stream.of( + // Normal UUID + UUID.randomUUID().toString(), + // Arbitrary string will also pass as neither Avro library nor json convertor validate the string content. + "", + "NotAnUUID" + ); + } + + static Stream dateGoodCaseProvider() { + return Stream.of( + Arguments.of(18506, 18506), // epochDays + Arguments.of(18506, "2020-09-01"), // dateString + Arguments.of(7323356, "+22020-09-01"), // dateString + Arguments.of(18506, "18506"), // epochDaysString + Arguments.of(Integer.MAX_VALUE, Integer.toString(Integer.MAX_VALUE)), + Arguments.of(Integer.MIN_VALUE, Integer.toString(Integer.MIN_VALUE)) + ); + } + + static Stream dateProviderForRow() { + return Stream.of( + // 18506 epoch days since Unix epoch is 2020-09-01, while + // 18506 * MILLI_SECONDS_PER_DAY is 2020-08-31. + // That's why you see for same 18506 days from avro side we can have different + // row equivalence. + Arguments.of("2020-09-01", 18506), // epochDays + Arguments.of("2020-09-01", "2020-09-01"), // dateString + Arguments.of(null, "+22020-09-01"), // dateString, not supported by row + Arguments.of("2020-09-01", "18506"), // epochDaysString, not supported by row + Arguments.of(null, Integer.toString(Integer.MAX_VALUE)), // not supported by row + Arguments.of(null, Integer.toString(Integer.MIN_VALUE)) // not supported by row + ); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java index 74a611ab24524..ed6248d785268 100644 --- a/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java +++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestMercifulJsonConverter.java @@ -30,7 +30,6 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; @@ -39,16 +38,13 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; -import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertThrows; -public class TestMercifulJsonConverter { +public class TestMercifulJsonConverter extends MercifulJsonConverterTestBase { private static final ObjectMapper MAPPER = new ObjectMapper(); private static final MercifulJsonConverter CONVERTER = new MercifulJsonConverter(true,"__"); @@ -90,9 +86,7 @@ void nestedJsonAsString(String nameInput) throws IOException { Assertions.assertEquals(rec, CONVERTER.convert(json, simpleSchema)); } - private static final String DECIMAL_AVRO_FILE_INVALID_PATH = "/decimal-logical-type-invalid.avsc"; private static final String DECIMAL_AVRO_FILE_PATH = "/decimal-logical-type.avsc"; - private static final String DECIMAL_FIXED_AVRO_FILE_PATH = "/decimal-logical-type-fixed-type.avsc"; /** * Covered case: * Avro Logical Type: Decimal @@ -122,29 +116,6 @@ void decimalLogicalTypeInvalidCaseTest(String avroFile, String strInput, Double }); } - static Stream decimalBadCases() { - return Stream.of( - // Invalid schema definition. - Arguments.of(DECIMAL_AVRO_FILE_INVALID_PATH, "123.45", null, false), - // Schema set precision as 5, input overwhelmed the precision. - Arguments.of(DECIMAL_AVRO_FILE_PATH, "123333.45", null, false), - Arguments.of(DECIMAL_AVRO_FILE_PATH, null, 123333.45, false), - // Schema precision set to 5, scale set to 2, so there is only 3 digit to accommodate integer part. - // As we do not do rounding, any input with more than 3 digit integer would fail. - Arguments.of(DECIMAL_AVRO_FILE_PATH, "1233", null, false), - Arguments.of(DECIMAL_AVRO_FILE_PATH, null, 1233D, false), - // Schema set scale as 2, input overwhelmed the scale. - Arguments.of(DECIMAL_AVRO_FILE_PATH, "0.222", null, false), - Arguments.of(DECIMAL_AVRO_FILE_PATH, null, 0.222, false), - // Invalid string which cannot be parsed as number. - Arguments.of(DECIMAL_AVRO_FILE_PATH, "", null, false), - Arguments.of(DECIMAL_AVRO_FILE_PATH, "NotAValidString", null, false), - Arguments.of(DECIMAL_AVRO_FILE_PATH, "-", null, false), - // Schema requires byte type while input is fixed type raw data. - Arguments.of(DECIMAL_AVRO_FILE_PATH, null, null, true) - ); - } - /** * Covered case: * Avro Logical Type: Decimal @@ -198,36 +169,6 @@ void decimalLogicalTypeTest(String avroFilePath, String groundTruth, String strI Assertions.assertEquals(record, real); } - static Stream decimalGoodCases() { - return Stream.of( - // The schema all set precision as 5, scale as 2. - // Test dimension: Schema file, Ground truth, string input, number input, fixed byte array input. - // Test some random numbers. - Arguments.of(DECIMAL_AVRO_FILE_PATH, "123.45", "123.45", null, false), - Arguments.of(DECIMAL_AVRO_FILE_PATH, "123.45", null, 123.45, false), - // Test MIN/MAX allowed by the schema. - Arguments.of(DECIMAL_AVRO_FILE_PATH, "-999.99", "-999.99", null, false), - Arguments.of(DECIMAL_AVRO_FILE_PATH, "999.99",null, 999.99, false), - // Test 0. - Arguments.of(DECIMAL_AVRO_FILE_PATH, "0", null, 0D, false), - Arguments.of(DECIMAL_AVRO_FILE_PATH, "0", "0", null, false), - Arguments.of(DECIMAL_AVRO_FILE_PATH, "0", "000.00", null, false), - // Same set of coverage over schame using byte/fixed type. - Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "123.45", "123.45", null, false), - Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "123.45", null, 123.45, false), - Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "-999.99", "-999.99", null, false), - Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "999.99",null, 999.99, false), - Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "0", null, 0D, false), - Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "0", "0", null, true), - Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "0", "000.00", null, true), - Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "123.45", null, null, true), - Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "123.45", null, 123.45, true), - Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "-999.99", null, null, true), - Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "999.99", null, 999.99, true), - Arguments.of(DECIMAL_FIXED_AVRO_FILE_PATH, "0", null, null, true) - ); - } - private static final String DURATION_AVRO_FILE_PATH = "/duration-logical-type.avsc"; private static final String DURATION_AVRO_FILE_PATH_INVALID = "/duration-logical-type-invalid.avsc"; /** @@ -262,21 +203,6 @@ void durationLogicalTypeTest(int months, int days, int milliseconds) throws IOEx Assertions.assertEquals(durationRecord, real); } - static Stream durationGoodCases() { - return Stream.of( - // Normal inputs. - Arguments.of(1, 2, 3), - // Negative int would be interpreted as some unsigned int by Avro. They all 4-byte. - Arguments.of(-1, -2, -3), - // Signed -1 interpreted to unsigned would be unsigned MAX - Arguments.of(-1, -1, -1), - // Other special edge cases. - Arguments.of(0, 0, 0), - Arguments.of(Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE), - Arguments.of(Integer.MIN_VALUE, Integer.MIN_VALUE, Integer.MIN_VALUE) - ); - } - @ParameterizedTest @MethodSource("durationBadCases") void durationLogicalTypeBadTest(String schemaFile, Object input) throws IOException { @@ -291,23 +217,7 @@ void durationLogicalTypeBadTest(String schemaFile, Object input) throws IOExcept }); } - static Stream durationBadCases() { - return Stream.of( - // As duration uses 12 byte fixed type to store 3 unsigned int numbers, Long.MAX would cause overflow. - // Verify it is gracefully handled. - Arguments.of(DURATION_AVRO_FILE_PATH, Arrays.asList(Long.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE)), - // Invalid num of element count - Arguments.of(DURATION_AVRO_FILE_PATH, Arrays.asList(1, 2, 3, 4)), - Arguments.of(DURATION_AVRO_FILE_PATH, Arrays.asList(1, 2)), - Arguments.of(DURATION_AVRO_FILE_PATH, (Object) new int[]{}), - Arguments.of(DURATION_AVRO_FILE_PATH, "InvalidString"), - Arguments.of(DURATION_AVRO_FILE_PATH_INVALID, Arrays.asList(1, 2, 3)) - ); - } - - private static final String DATE_AVRO_FILE_PATH = "/date-type.avsc"; - private static final String DATE_AVRO_INVALID_FILE_PATH = "/date-type-invalid.avsc"; /** * Covered case: * Avro Logical Type: Date @@ -330,17 +240,6 @@ void dateLogicalTypeTest(int groundTruth, Object dateInput) throws IOException { Assertions.assertEquals(record, real); } - static Stream dateGoodCaseProvider() { - return Stream.of( - Arguments.of(18506, 18506), // epochDays - Arguments.of(18506, "2020-09-01"), // dateString - Arguments.of(7323356, "+22020-09-01"), // dateString - Arguments.of(18506, "18506"), // epochDaysString - Arguments.of(Integer.MAX_VALUE, Integer.toString(Integer.MAX_VALUE)), - Arguments.of(Integer.MIN_VALUE, Integer.toString(Integer.MIN_VALUE)) - ); - } - /** * Covered case: * Avro Logical Type: Date @@ -361,16 +260,6 @@ void dateLogicalTypeTest( }); } - static Stream dateBadCaseProvider() { - return Stream.of( - Arguments.of(DATE_AVRO_INVALID_FILE_PATH, 18506), // epochDays - Arguments.of(DATE_AVRO_FILE_PATH, "#$@#%$@$%#@"), - Arguments.of(DATE_AVRO_FILE_PATH, "22020-09-01000"), - Arguments.of(DATE_AVRO_FILE_PATH, "2020-02-45"), - Arguments.of(DATE_AVRO_FILE_PATH, Arrays.asList(1, 2, 3)) - ); - } - private static final String LOCAL_TIME_AVRO_FILE_PATH = "/local-timestamp-logical-type.avsc"; /** * Covered case: @@ -401,57 +290,6 @@ void localTimestampLogicalTypeGoodCaseTest( Assertions.assertEquals(record, real); } - static Stream localTimestampGoodCaseProvider() { - return Stream.of( - Arguments.of( - (long)(1715644416 * 1e6 + 4000000 / 1e3), // Num of micro sec since unix epoch - "2024-05-13T23:53:36.004", // Timestamp equivalence - "2024-05-13T23:53:36.004"), - Arguments.of( - (long)(1715644416 * 1e6), // Num of micro sec since unix epoch - "2024-05-13T23:53:36", // Timestamp equivalence - "2024-05-13T23:53:36"), - Arguments.of( - 2024L, "2", "2024"), - Arguments.of( - (long)(1715644416 * 1e6 + 4000000 / 1e3), - (long)(1715644416 * 1e3 + 4000000 / 1e6), - (long)(1715644416 * 1e6 + 4000000 / 1e3)), - Arguments.of( - (long)(1715644416 * 1e6 + 4000000 / 1e3), - (long)(1715644416 * 1e3 + 4000000 / 1e6), - Long.toString((long)(1715644416 * 1e6 + 4000000 / 1e3))), - // Test higher precision that only micro sec unit can capture. - Arguments.of( - (long)(1715644416 * 1e6 + 4000000 / 1e6), - "2024-05-13T23:53:36.000", // Timestamp equivalence - "2024-05-13T23:53:36.000004"), - // Test full range of time - Arguments.of( - 0L, - "1970-01-01T00:00:00.000", // Timestamp equivalence - "1970-01-01T00:00:00.000000"), - Arguments.of( - Long.MAX_VALUE, - "+294247-01-10T04:00:54.775", // Timestamp in far future must be prefixed with '+' - "+294247-01-10T04:00:54.775807"), - Arguments.of( - 0L, 0L, 0L), - Arguments.of( - -1L * 1000, -1L, -1L * 1000), - Arguments.of( - Long.MIN_VALUE, Long.MIN_VALUE / 1000, Long.MIN_VALUE), - Arguments.of( - Long.MAX_VALUE, Long.MAX_VALUE / 1000, Long.MAX_VALUE), - Arguments.of( - -62167219200000000L, "0000-01-01T00:00:00.00000", "0000-01-01T00:00:00.00000"), - Arguments.of( - -62167219200000000L, -62167219200000000L / 1000, -62167219200000000L) - ); - } - - private static final String LOCAL_TIMESTAMP_MILLI_AVRO_FILE_PATH = "/local-timestamp-millis-logical-type.avsc"; - private static final String LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH = "/local-timestamp-micros-logical-type.avsc"; @ParameterizedTest @MethodSource("localTimestampBadCaseProvider") void localTimestampLogicalTypeBadTest( @@ -467,31 +305,6 @@ void localTimestampLogicalTypeBadTest( }); } - static Stream localTimestampBadCaseProvider() { - return Stream.of( - Arguments.of(LOCAL_TIMESTAMP_MILLI_AVRO_FILE_PATH, "2024-05-1323:53:36.000"), - Arguments.of(LOCAL_TIMESTAMP_MILLI_AVRO_FILE_PATH, "2024-05-1T23:53:36.000"), - Arguments.of(LOCAL_TIMESTAMP_MILLI_AVRO_FILE_PATH, "2024-0-13T23:53:36.000"), - Arguments.of(LOCAL_TIMESTAMP_MILLI_AVRO_FILE_PATH, "20242-05-13T23:53:36.000"), - Arguments.of(LOCAL_TIMESTAMP_MILLI_AVRO_FILE_PATH, "202-05-13T23:53:36.0000000"), - Arguments.of(LOCAL_TIMESTAMP_MILLI_AVRO_FILE_PATH, "202-05-13T23:53:36.000"), - Arguments.of(LOCAL_TIMESTAMP_MILLI_AVRO_FILE_PATH, "2024-05-13T23:53:36.000Z"), - Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "2024-05-1323:53:36.000"), - Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "2024-05-1T23:53:36.000"), - Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "2024-0-13T23:53:36.000"), - Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "20242-05-13T23:53:36.000"), - Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "202-05-13T23:53:36.0000000"), - Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "202-05-13T23:53:36.000"), - Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "2022-05-13T99:99:99.000"), - Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "2024-05-13T23:53:36.000Z"), - Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "Not a timestamp at all!"), - Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "2024 05 13T23:00"), - Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "2024-05"), - Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "2011-12-03T10:15:30+01:00"), - Arguments.of(LOCAL_TIMESTAMP_MICRO_AVRO_FILE_PATH, "2011-12-03T10:15:30[Europe/ Paris]") - ); - } - private static final String TIMESTAMP_AVRO_FILE_PATH = "/timestamp-logical-type2.avsc"; /** * Covered case: @@ -522,57 +335,6 @@ void timestampLogicalTypeGoodCaseTest( Assertions.assertEquals(record, real); } - static Stream timestampGoodCaseProvider() { - return Stream.of( - Arguments.of( - (long)(1715644416 * 1e6 + 4000000 / 1e3), // Num of micro sec since unix epoch - "2024-05-13T23:53:36.004Z", // Timestamp equivalence - "2024-05-13T23:53:36.004Z"), - Arguments.of( - (long)(1715644416 * 1e6), // Num of micro sec since unix epoch - "2024-05-13T23:53:36Z", // Timestamp equivalence - "2024-05-13T23:53:36Z"), - Arguments.of( - 2024L, "2", "2024"), - Arguments.of( - (long)(1715644416 * 1e6 + 4000000 / 1e3), - (long)(1715644416 * 1e3 + 4000000 / 1e6), - (long)(1715644416 * 1e6 + 4000000 / 1e3)), - Arguments.of( - (long)(1715644416 * 1e6 + 4000000 / 1e3), - (long)(1715644416 * 1e3 + 4000000 / 1e6), - Long.toString((long)(1715644416 * 1e6 + 4000000 / 1e3))), - // Test higher precision that only micro sec unit can capture. - Arguments.of( - (long)(1715644416 * 1e6 + 4000000 / 1e6), - "2024-05-13T23:53:36.000Z", // Timestamp equivalence - "2024-05-13T23:53:36.000004Z"), - // Test full range of time - Arguments.of( - 0L, - "1970-01-01T00:00:00.000Z", // Timestamp equivalence - "1970-01-01T00:00:00.000000Z"), - // The test case leads to long overflow due to how java calculate duration between 2 timestamps - // Arguments.of( - // Long.MAX_VALUE, - // "+294247-01-10T04:00:54.775Z", // Timestamp in far future must be prefixed with '+' - // "+294247-01-10T04:00:54.775807Z"), - Arguments.of( - 0L, 0L, 0L), - Arguments.of( - -1L * 1000, -1L, -1L * 1000), - Arguments.of( - Long.MIN_VALUE, Long.MIN_VALUE / 1000, Long.MIN_VALUE), - Arguments.of( - Long.MAX_VALUE, Long.MAX_VALUE / 1000, Long.MAX_VALUE), - // The test case leads to long overflow due to how java calculate duration between 2 timestamps - // Arguments.of( - // -62167219200000000L, "0000-01-01T00:00:00.00000Z", "0000-01-01T00:00:00.00000Z"), - Arguments.of( - -62167219200000000L, -62167219200000000L / 1000, -62167219200000000L) - ); - } - @ParameterizedTest @MethodSource("timestampBadCaseProvider") void timestampLogicalTypeBadTest(Object badInput) throws IOException { @@ -598,15 +360,6 @@ void timestampLogicalTypeBadTest(Object badInput) throws IOException { }); } - static Stream timestampBadCaseProvider() { - return Stream.of( - Arguments.of(TIMESTAMP_AVRO_FILE_PATH, "2024-05-1323:53:36.000"), - Arguments.of(TIMESTAMP_AVRO_FILE_PATH, "2024-05-1323:99:99.000Z"), - Arguments.of(TIMESTAMP_AVRO_FILE_PATH, "2024-05-1323:53:36.000 UTC"), - Arguments.of(TIMESTAMP_AVRO_FILE_PATH, "Tue, 3 Jun 2008 11:05:30 GMT") - ); - } - private static final String TIME_AVRO_FILE_PATH = "/time-logical-type.avsc"; /** * Covered case: @@ -636,35 +389,6 @@ void timeLogicalTypeTest(Long expectedMicroSecOfDay, Object timeMilli, Object ti Assertions.assertEquals(record, real); } - static Stream timeGoodCaseProvider() { - return Stream.of( - // 12 hours and 30 minutes in milliseconds / microseconds - Arguments.of((long)4.5e10, (int)4.5e7, (long)4.5e10), - // 12 hours and 30 minutes in milliseconds / microseconds as string - Arguments.of((long)4.5e10, Integer.toString((int)4.5e7), Long.toString((long)4.5e10)), - // 12 hours and 30 minutes - Arguments.of((long)4.5e10, "12:30:00", "12:30:00"), - Arguments.of( - (long)(4.5e10 + 1e3), // 12 hours, 30 minutes and 0.001 seconds in microseconds - "12:30:00.001", // 12 hours, 30 minutes and 0.001 seconds - "12:30:00.001" // 12 hours, 30 minutes and 0.001 seconds - ), - // Test value ranges - Arguments.of( - 0L, - "00:00:00.000", - "00:00:00.00000" - ), - Arguments.of( - 86399999990L, - "23:59:59.999", - "23:59:59.99999" - ), - Arguments.of((long)Integer.MAX_VALUE, Integer.MAX_VALUE / 1000, (long)Integer.MAX_VALUE), - Arguments.of((long)Integer.MIN_VALUE, Integer.MIN_VALUE / 1000, (long)Integer.MIN_VALUE) - ); - } - @ParameterizedTest @MethodSource("timeBadCaseProvider") void timeLogicalTypeBadCaseTest(Object invalidInput) throws IOException { @@ -690,13 +414,6 @@ void timeLogicalTypeBadCaseTest(Object invalidInput) throws IOException { }); } - static Stream timeBadCaseProvider() { - return Stream.of( - Arguments.of("00:0"), - Arguments.of("00:00:99") - ); - } - private static final String UUID_AVRO_FILE_PATH = "/uuid-logical-type.avsc"; /** * Covered case: @@ -720,16 +437,6 @@ void uuidLogicalTypeTest(String uuid) throws IOException { Assertions.assertEquals(record, real); } - static Stream uuidDimension() { - return Stream.of( - // Normal UUID - UUID.randomUUID().toString(), - // Arbitrary string will also pass as neither Avro library nor json convertor validate the string content. - "", - "NotAnUUID" - ); - } - @Test public void conversionWithFieldNameSanitization() throws IOException { String sanitizedSchemaString = "{\"namespace\": \"example.avro\", \"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"__name\", \"type\": \"string\"}, " diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieJsonToRowConversionException.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieJsonToRowConversionException.java new file mode 100644 index 0000000000000..f228aaf3e49b6 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/exception/HoodieJsonToRowConversionException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.exception; + +import org.apache.hudi.exception.HoodieJsonConversionException; + +public class HoodieJsonToRowConversionException extends HoodieJsonConversionException { + + public HoodieJsonToRowConversionException(String msg) { + super(msg); + } + + public HoodieJsonToRowConversionException(String msg, Throwable t) { + super(msg, t); + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java new file mode 100644 index 0000000000000..a82e7f9d00da3 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/MercifulJsonToRowConverter.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers; + +import org.apache.hudi.avro.MercifulJsonConverter; +import org.apache.hudi.avro.processors.DateLogicalTypeProcessor; +import org.apache.hudi.avro.processors.DecimalLogicalTypeProcessor; +import org.apache.hudi.avro.processors.DurationLogicalTypeProcessor; +import org.apache.hudi.avro.processors.EnumTypeProcessor; +import org.apache.hudi.avro.processors.FixedTypeProcessor; +import org.apache.hudi.avro.processors.JsonFieldProcessor; +import org.apache.hudi.avro.processors.Parser; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.utilities.exception.HoodieJsonToRowConversionException; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.avro.Conversions; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericFixed; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import scala.collection.JavaConverters; + +/** + * Converts Json record to Row Record. + */ +public class MercifulJsonToRowConverter extends MercifulJsonConverter { + + /** + * Uses a default objectMapper to deserialize a json string. + */ + public MercifulJsonToRowConverter() { + this(false, "__"); + } + + /** + * Allows enabling sanitization and allows choice of invalidCharMask for sanitization + */ + public MercifulJsonToRowConverter(boolean shouldSanitize, String invalidCharMask) { + this(new ObjectMapper().enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS), shouldSanitize, invalidCharMask); + } + + /** + * Allows a configured ObjectMapper to be passed for converting json records to row. + */ + public MercifulJsonToRowConverter(ObjectMapper mapper, boolean shouldSanitize, String invalidCharMask) { + super(mapper, shouldSanitize, invalidCharMask); + } + + /** + * Converts json to row. + * NOTE: if sanitization is needed for row conversion, the schema input to this method is already sanitized. + * During the conversion here, we sanitize the fields in the data + * + * @param json Json record + * @param schema Schema + */ + public Row convertToRow(String json, Schema schema) { + try { + Map jsonObjectMap = mapper.readValue(json, Map.class); + return convertJsonToRow(jsonObjectMap, schema); + } catch (HoodieException | JsonProcessingException e) { + throw new HoodieJsonToRowConversionException("Failed to convert json to row", e); + } + } + + private Row convertJsonToRow(Map inputJson, Schema schema) { + List fields = schema.getFields(); + List values = new ArrayList<>(Collections.nCopies(fields.size(), null)); + + for (Schema.Field f : fields) { + Object val = shouldSanitize ? getFieldFromJson(f, inputJson, schema.getFullName(), invalidCharMask) : inputJson.get(f.name()); + if (val != null) { + values.set(f.pos(), convertJsonField(val, f.name(), f.schema())); + } + } + return RowFactory.create(values.toArray()); + } + + private class DecimalToRowLogicalTypeProcessor extends DecimalLogicalTypeProcessor { + @Override + public Pair convert(Object value, String name, Schema schema) { + if (!isValidDecimalTypeConfig(schema)) { + return Pair.of(false, null); + } + + if (schema.getType() == Type.FIXED && value instanceof List) { + // Case 1: Input is a list. It is expected to be raw Fixed byte array input, and we only support + // parsing it to Fixed type. + JsonFieldProcessor processor = generateFixedTypeHandler(); + Pair fixedTypeResult = processor.convert(value, name, schema); + if (fixedTypeResult.getLeft()) { + byte[] byteArray = (byte[]) fixedTypeResult.getRight(); + GenericFixed fixedValue = new GenericData.Fixed(schema, byteArray); + // Convert the GenericFixed to BigDecimal + return Pair.of(true, new Conversions + .DecimalConversion() + .fromFixed( + fixedValue, + schema, + schema.getLogicalType() + ) + ); + } + } + + // Case 2: Input is a number or String number or base64 encoded string number + Pair parseResult = parseObjectToBigDecimal(value, schema); + return Pair.of(parseResult.getLeft(), parseResult.getRight()); + } + } + + @Override + protected JsonFieldProcessor generateDecimalLogicalTypeHandler() { + return new DecimalToRowLogicalTypeProcessor(); + } + + @Override + protected JsonFieldProcessor generateDateLogicalTypeHandler() { + return new DateToRowLogicalTypeProcessor(); + } + + @Override + protected JsonFieldProcessor generateDurationLogicalTypeHandler() { + return new DurationToRowLogicalTypeProcessor(); + } + + private static class DurationToRowLogicalTypeProcessor extends DurationLogicalTypeProcessor { + + @Override + public Pair convert( + Object value, String name, Schema schema) { + throw new HoodieJsonToRowConversionException("Duration type is not supported in Row object"); + } + } + + private static class DateToRowLogicalTypeProcessor extends DateLogicalTypeProcessor { + + @Override + public Pair convert(Object value, String name, Schema schema) { + return convertCommon(new Parser.DateParser(), value, schema); + } + } + + @Override + protected JsonFieldProcessor generateBytesTypeHandler() { + return new JsonFieldProcessor() { + @Override + public Pair convert(Object value, String name, Schema schema) { + return Pair.of(true, value.toString().getBytes()); + } + }; + } + + @Override + protected JsonFieldProcessor generateFixedTypeHandler() { + return new FixedToRowTypeProcessor(); + } + + private static class FixedToRowTypeProcessor extends FixedTypeProcessor { + @Override + public Pair convert(Object value, String name, Schema schema) { + return Pair.of(true, convertToJavaObject(value, name, schema)); + } + } + + @Override + protected JsonFieldProcessor generateEnumTypeHandler() { + return new EnumToRowTypeProcessor(); + } + + private static class EnumToRowTypeProcessor extends EnumTypeProcessor { + @Override + public Pair convert(Object value, String name, Schema schema) { + return Pair.of(true, convertToJavaObject(value, name, schema)); + } + } + + @Override + protected JsonFieldProcessor generateRecordTypeHandler() { + return new JsonFieldProcessor() { + @Override + public Pair convert(Object value, String name, Schema schema) { + return Pair.of(true, convertJsonToRow((Map) value, schema)); + } + }; + } + + @Override + protected JsonFieldProcessor generateArrayTypeHandler() { + return new JsonFieldProcessor() { + private List convertToJavaObject(Object value, String name, Schema schema) { + Schema elementSchema = schema.getElementType(); + List listRes = new ArrayList<>(); + for (Object v : (List) value) { + listRes.add(convertJsonField(v, name, elementSchema)); + } + return listRes; + } + + @Override + public Pair convert(Object value, String name, Schema schema) { + return Pair.of(true, + convertToJavaObject( + value, + name, + schema).toArray()); + } + }; + } + + @Override + protected JsonFieldProcessor generateMapTypeHandler() { + return new JsonFieldProcessor() { + public Map convertToJavaObject( + Object value, + String name, + Schema schema) { + Schema valueSchema = schema.getValueType(); + Map mapRes = new HashMap<>(); + for (Map.Entry v : ((Map) value).entrySet()) { + mapRes.put(v.getKey(), convertJsonField(v.getValue(), name, valueSchema)); + } + return mapRes; + } + + @Override + public Pair convert(Object value, String name, Schema schema) { + return Pair.of(true, JavaConverters + .mapAsScalaMapConverter( + convertToJavaObject( + value, + name, + schema)).asScala()); + } + }; + } +} \ No newline at end of file diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/RowConverter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/RowConverter.java new file mode 100644 index 0000000000000..d5ded4582834e --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/RowConverter.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers; + +import org.apache.hudi.internal.schema.HoodieSchemaException; + +import org.apache.avro.Schema; +import org.apache.spark.sql.Row; + +import java.io.Serializable; + +import scala.util.Either; +import scala.util.Left; +import scala.util.Right; + +import static org.apache.hudi.utilities.config.HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES; +import static org.apache.hudi.utilities.config.HoodieStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK; + +public class RowConverter implements Serializable { + private static final long serialVersionUID = 1L; + /** + * To be lazily initialized on executors. + */ + private transient Schema schema; + + private final String schemaStr; + private final String invalidCharMask; + private final boolean shouldSanitize; + + /** + * To be lazily initialized on executors. + */ + private transient MercifulJsonToRowConverter jsonConverter; + + public RowConverter(String schemaStr) { + this(schemaStr, SANITIZE_SCHEMA_FIELD_NAMES.defaultValue(), SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue()); + } + + public RowConverter(String schemaStr, boolean shouldSanitize, String invalidCharMask) { + this.schemaStr = schemaStr; + this.shouldSanitize = shouldSanitize; + this.invalidCharMask = invalidCharMask; + } + + public RowConverter(Schema schema) { + this(schema, SANITIZE_SCHEMA_FIELD_NAMES.defaultValue(), SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.defaultValue()); + } + + public RowConverter(Schema schema, boolean shouldSanitize, String invalidCharMask) { + this.schemaStr = schema.toString(); + this.schema = schema; + this.shouldSanitize = shouldSanitize; + this.invalidCharMask = invalidCharMask; + } + + private void initSchema() { + if (schema == null) { + Schema.Parser parser = new Schema.Parser(); + schema = parser.parse(schemaStr); + } + } + + private void initJsonConvertor() { + if (jsonConverter == null) { + jsonConverter = new MercifulJsonToRowConverter(this.shouldSanitize, this.invalidCharMask); + } + } + + public Row fromJson(String json) { + try { + initSchema(); + initJsonConvertor(); + return jsonConverter.convertToRow(json, schema); + } catch (Exception e) { + if (json != null) { + throw new HoodieSchemaException("Failed to convert schema from json to avro: " + json, e); + } else { + throw new HoodieSchemaException("Failed to convert schema from json to avro. Schema string was null.", e); + } + } + } + + public Either fromJsonToRowWithError(String json) { + Row row; + try { + row = fromJson(json); + } catch (Exception e) { + return new Right<>(json); + } + return new Left<>(row); + } + + public Schema getSchema() { + try { + return new Schema.Parser().parse(schemaStr); + } catch (Exception e) { + throw new HoodieSchemaException("Failed to parse json schema: " + schemaStr, e); + } + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java index c379472b26eb6..1385a7103cde9 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java @@ -34,6 +34,7 @@ import org.apache.hudi.utilities.sources.InputBatch; import org.apache.hudi.utilities.sources.Source; import org.apache.hudi.utilities.sources.helpers.AvroConvertor; +import org.apache.hudi.utilities.sources.helpers.RowConverter; import org.apache.hudi.utilities.sources.helpers.SanitizationUtils; import com.google.protobuf.Message; @@ -128,6 +129,21 @@ private JavaRDD transformJsonToGenericRdd(InputBatch transformJsonToRowRdd(InputBatch> inputBatch) { + MercifulJsonConverter.clearCache(inputBatch.getSchemaProvider().getSourceSchema().getFullName()); + RowConverter convertor = new RowConverter(inputBatch.getSchemaProvider().getSourceSchema(), isFieldNameSanitizingEnabled(), getInvalidCharMask()); + return inputBatch.getBatch().map(rdd -> { + if (errorTableWriter.isPresent()) { + JavaRDD> javaRDD = rdd.map(convertor::fromJsonToRowWithError); + errorTableWriter.get().addErrorEvents(javaRDD.filter(Either::isRight).map(x -> + new ErrorEvent<>(x.right().get(), ErrorEvent.ErrorReason.JSON_ROW_DESERIALIZATION_FAILURE))); + return javaRDD.filter(Either::isLeft).map(x -> x.left().get()); + } else { + return rdd.map(convertor::fromJson); + } + }).orElse(null); + } + /** * transform datasets with error events when error table is enabled * @param eventsRow @@ -221,14 +237,20 @@ public InputBatch> fetchNewDataInRowFormat(Option lastCkptS return avroDataInRowFormat(r); } case JSON: { - if (isFieldNameSanitizingEnabled()) { - //leverage the json -> avro sanitizing. TODO([HUDI-5829]) Optimize by sanitizing during direct conversion - InputBatch> r = fetchNewDataInAvroFormat(lastCkptStr, sourceLimit); - return avroDataInRowFormat(r); - - } InputBatch> r = ((Source>) source).fetchNext(lastCkptStr, sourceLimit); Schema sourceSchema = r.getSchemaProvider().getSourceSchema(); + + if (isFieldNameSanitizingEnabled()) { + StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema); + JavaRDD rowRDD = transformJsonToRowRdd(r); + if (rowRDD != null) { + Dataset rowDataset = source.getSparkSession().createDataFrame(rowRDD, dataType); + return new InputBatch<>(Option.of(rowDataset), r.getCheckpointForNextBatch(), r.getSchemaProvider()); + } else { + return new InputBatch<>(Option.empty(), r.getCheckpointForNextBatch(), r.getSchemaProvider()); + } + } + if (errorTableWriter.isPresent()) { // if error table writer is enabled, during spark read `columnNameOfCorruptRecord` option is configured. // Any records which spark is unable to read successfully are transferred to the column diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java index 3a64747eda5b4..93fdb229ec630 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.SchemaCompatibilityException; +import org.apache.hudi.internal.schema.HoodieSchemaException; import org.apache.hudi.utilities.config.HoodieStreamerConfig; import org.apache.hudi.utilities.streamer.SourceFormatAdapter; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; @@ -83,13 +84,26 @@ public void testCorruptedSourceFile() throws IOException { assertTrue(batch.getBatch().isPresent()); Throwable t = assertThrows(Exception.class, () -> batch.getBatch().get().show(30)); - while (t != null) { - if (t instanceof SchemaCompatibilityException) { + verifyException(t, SchemaCompatibilityException.class); + // check for exception when sanitization is enabled. + props.setProperty(HoodieStreamerConfig.ROW_THROW_EXPLICIT_EXCEPTIONS.key(), "true"); + props.put(HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.key(), true); + props.put(HoodieStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(), "__"); + sourceFormatAdapter = new SourceFormatAdapter(prepareDFSSource(props), Option.empty(), Option.of(props)); + InputBatch> batch2 = sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE); + t = assertThrows(Exception.class, + () -> batch2.getBatch().get().show(30)); + verifyException(t, HoodieSchemaException.class); + } + + private void verifyException(Throwable throwable, Class expectedExceptionClass) { + while (throwable != null) { + if (expectedExceptionClass.isInstance(throwable)) { return; } - t = t.getCause(); + throwable = throwable.getCause(); } - throw new AssertionError("Exception does not have SchemaCompatibility in its trace", t); + throw new AssertionError("Exception does not have " + expectedExceptionClass.getSimpleName() + " in its trace", throwable); } protected void corruptFile(Path path) throws IOException { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java index 32658e11d420d..15be5417ef937 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java @@ -262,6 +262,36 @@ public void testErrorEventsForDataInRowFormat() throws IOException { InProcessTimeGenerator.createNewInstantTime(), Option.empty()).get()).count()); } + @Test + void testErrorEventsForDataInRowFormatWithSanitizationEnabled() { + // topic setup. + final String topic = TEST_TOPIC_PREFIX + "testErrorEventsForDataInRowFormatWithSanitizationEnabled"; + + testUtils.createTopic(topic, 2); + List topicPartitions = new ArrayList<>(); + TopicPartition topicPartition0 = new TopicPartition(topic, 0); + topicPartitions.add(topicPartition0); + TopicPartition topicPartition1 = new TopicPartition(topic, 1); + topicPartitions.add(topicPartition1); + sendJsonSafeMessagesToKafka(topic, 1000, 2); + testUtils.sendMessages(topic, new String[]{"error_event1", "error_event2"}); + + TypedProperties props = createPropsForKafkaSource(topic, null, "earliest"); + props.put(ENABLE_KAFKA_COMMIT_OFFSET.key(), "true"); + props.put(ERROR_TABLE_BASE_PATH.key(),"/tmp/qurantine_table_test/json_kafka_row_events"); + props.put(ERROR_TARGET_TABLE.key(),"json_kafka_row_events"); + props.put(HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.key(), true); + props.put(HoodieStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(), "__"); + props.put("hoodie.errortable.validate.targetschema.enable", "true"); + props.put("hoodie.base.path","/tmp/json_kafka_row_events"); + Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics); + Option errorTableWriter = Option.of(getAnonymousErrorTableWriter(props)); + SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource, errorTableWriter, Option.of(props)); + assertEquals(1000, kafkaSource.fetchNewDataInRowFormat(Option.empty(),Long.MAX_VALUE).getBatch().get().count()); + assertEquals(2,((JavaRDD)errorTableWriter.get().getErrorEvents( + HoodieActiveTimeline.createNewInstantTime(), Option.empty()).get()).count()); + } + @Test public void testErrorEventsForDataInAvroFormat() throws IOException { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverter.java new file mode 100644 index 0000000000000..b3c59e7c37c46 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverter.java @@ -0,0 +1,465 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers; + +import org.apache.hudi.avro.MercifulJsonConverterTestBase; +import org.apache.hudi.common.testutils.SchemaTestUtil; +import org.apache.hudi.utilities.exception.HoodieJsonToRowConversionException; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.avro.Conversions; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericFixed; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class TestMercifulJsonToRowConverter extends MercifulJsonConverterTestBase { + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final MercifulJsonToRowConverter CONVERTER = new MercifulJsonToRowConverter(true, "__"); + + private static final String SIMPLE_AVRO_WITH_DEFAULT = "/schema/simple-test-with-default-value.avsc"; + + @Test + void basicConversion() throws IOException { + Schema simpleSchema = SchemaTestUtil.getSchema(SIMPLE_AVRO_WITH_DEFAULT); + String name = "John Smith"; + int number = 1337; + String color = "Blue. No yellow!"; + Map data = new HashMap<>(); + data.put("name", name); + data.put("favorite_number", number); + data.put("favorite_color", color); + String json = MAPPER.writeValueAsString(data); + + List values = new ArrayList<>(Collections.nCopies(simpleSchema.getFields().size(), null)); + values.set(0, name); + values.set(1, number); + values.set(3, color); + Row recRow = RowFactory.create(values.toArray()); + + assertEquals(recRow, CONVERTER.convertToRow(json, simpleSchema)); + } + + private static final String DECIMAL_AVRO_FILE_PATH = "/decimal-logical-type.avsc"; + /** + * Covered case: + * Avro Logical Type: Decimal + * Avro type: bytes + * Input: String number "123.45" + * Output: Object using Byte data type as the schema specified. + */ + @Test + void decimalLogicalTypeByteTypeTest() throws IOException { + String num = "123.45"; + BigDecimal bigDecimal = new BigDecimal(num); + + Map data = new HashMap<>(); + data.put("decimalField", num); + String json = MAPPER.writeValueAsString(data); + + Schema schema = SchemaTestUtil.getSchema(DECIMAL_AVRO_FILE_PATH); + + Row expectRow = RowFactory.create(bigDecimal); + Row realRow = CONVERTER.convertToRow(json, schema); + assertEquals(expectRow, realRow); + } + + /** + * Covered case: + * Avro Logical Type: Decimal + * Exhaustive unsupported input coverage. + */ + @ParameterizedTest + @MethodSource("decimalBadCases") + void decimalLogicalTypeInvalidCaseTest(String avroFile, String strInput, Double numInput) throws IOException { + Schema schema = SchemaTestUtil.getSchema(avroFile); + + Map data = new HashMap<>(); + if (strInput != null) { + data.put("decimalField", strInput); + } else { + data.put("decimalField", numInput); + } + String json = MAPPER.writeValueAsString(data); + + // Schedule with timestamp same as that of committed instant + assertThrows(HoodieJsonToRowConversionException.class, () -> { + CONVERTER.convertToRow(json, schema); + }); + } + + /** + * Covered case: + * Avro Logical Type: Decimal + * Avro type: bytes, fixed + * Input: Check test parameter + * Output: Object using Byte data type as the schema specified. + */ + @ParameterizedTest + @MethodSource("decimalGoodCases") + void decimalLogicalTypeTest(String avroFilePath, String groundTruth, String strInput, + Number numInput, boolean testFixedByteArray) throws IOException { + BigDecimal bigDecimal = new BigDecimal(groundTruth); + Map data = new HashMap<>(); + + Schema schema = SchemaTestUtil.getSchema(avroFilePath); + + // Decide the decimal field input according to the test dimension. + if (strInput != null) { + data.put("decimalField", strInput); // String number input + } else if (numInput != null) { + data.put("decimalField", numInput); // Number input + } else if (testFixedByteArray) { + // Fixed byte array input. + // Example: 123.45 - byte array [0, 0, 48, 57]. + Schema fieldSchema = schema.getField("decimalField").schema(); + GenericFixed fixedValue = new Conversions.DecimalConversion().toFixed( + bigDecimal, fieldSchema, fieldSchema.getLogicalType()); + // Convert the fixed value to int array, which is used as json value literals. + byte[] byteArray = fixedValue.bytes(); + int[] intArray = new int[byteArray.length]; + for (int i = 0; i < byteArray.length; i++) { + // Byte is signed in Java, int is 32-bit. Convert by & 0xFF to handle negative values correctly. + intArray[i] = byteArray[i] & 0xFF; + } + data.put("decimalField", intArray); + } + + String json = MAPPER.writeValueAsString(data); + + Row expectRow = RowFactory.create(bigDecimal); + Row realRow = CONVERTER.convertToRow(json, schema); + assertEquals(expectRow, realRow); + } + + private static final String DURATION_AVRO_FILE_PATH = "/duration-logical-type.avsc"; + + @ParameterizedTest + @MethodSource("zeroScaleDecimalCases") + void zeroScaleDecimalConversion(String inputValue, String expected, boolean shouldConvert) { + Schema schema = new Schema.Parser().parse("{\"namespace\": \"example.avro\",\"type\": \"record\",\"name\": \"decimalLogicalType\",\"fields\": [{\"name\": \"decimalField\", " + + "\"type\": {\"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 38, \"scale\": 0}}]}"); + String json = String.format("{\"decimalField\":%s}", inputValue); + + if (shouldConvert) { + BigDecimal bigDecimal = new BigDecimal(expected); + Row expectedRow = RowFactory.create(bigDecimal); + Row actualRow = CONVERTER.convertToRow(json, schema); + assertEquals(expectedRow, actualRow); + } else { + assertThrows(HoodieJsonToRowConversionException.class, () -> CONVERTER.convertToRow(json, schema)); + } + } + + /** + * Covered case: + * Avro Logical Type: Duration + * Avro type: 12 byte fixed + * Input: 3-element list [month, days, milliseconds] + * Output: Object using the avro data type as the schema specified. + */ + @ParameterizedTest + @MethodSource("durationGoodCases") + void durationLogicalTypeTest(int months, int days, int milliseconds) throws IOException { + List num = new ArrayList<>(); + num.add(months); + num.add(days); + num.add(milliseconds); + + Map data = new HashMap<>(); + data.put("duration", num); + String json = MAPPER.writeValueAsString(data); + + ByteBuffer buffer = ByteBuffer.allocate(12).order(ByteOrder.LITTLE_ENDIAN); + buffer.putInt(months); // months + buffer.putInt(days); // days + buffer.putInt(milliseconds); // milliseconds + buffer.flip(); + Schema schema = SchemaTestUtil.getSchema(DURATION_AVRO_FILE_PATH); + + // Duration type is not supported in Row object. + assertThrows(HoodieJsonToRowConversionException.class, () -> { + CONVERTER.convertToRow(json, schema); + }); + } + + @ParameterizedTest + @MethodSource("durationBadCases") + void durationLogicalTypeBadTest(Long months, Long days, Long millis) throws IOException { + // As duration uses 12 byte fixed type to store 3 unsigned int numbers, Long.MAX would cause overflow. + // Verify it is gracefully handled. + List num = new ArrayList<>(); + num.add(months); + num.add(days); + num.add(millis); + + Map data = new HashMap<>(); + data.put("duration", num); + String json = MAPPER.writeValueAsString(data); + + Schema schema = SchemaTestUtil.getSchema(DURATION_AVRO_FILE_PATH); + // Schedule with timestamp same as that of committed instant + assertThrows(HoodieJsonToRowConversionException.class, () -> { + CONVERTER.convertToRow(json, schema); + }); + } + + private static final String DATE_AVRO_FILE_PATH = "/date-type.avsc"; + private static final String DATE_AVRO_INVALID_FILE_PATH = "/date-type-invalid.avsc"; + + /** + * Covered case: + * Avro Logical Type: Date + * Avro type: int + * Input: Check parameter definition + * Output: Object using the avro data type as the schema specified. + */ + @ParameterizedTest + @MethodSource("dateProviderForRow") + void dateLogicalTypeTest(String groundTruthRow, Object dateInput) throws IOException { + // Define the schema for the date logical type + Schema schema = SchemaTestUtil.getSchema(DATE_AVRO_FILE_PATH); + + Map data = new HashMap<>(); + data.put("dateField", dateInput); + String json = MAPPER.writeValueAsString(data); + + if (groundTruthRow == null) { + return; + } + Row rec = RowFactory.create(java.sql.Date.valueOf(groundTruthRow)); + Row realRow = CONVERTER.convertToRow(json, schema); + assertEquals(rec.getDate(0).toString(), realRow.getDate(0).toString()); + } + + /** + * Covered case: + * Avro Logical Type: Date + * Invalid schema configuration. + */ + @Test + void dateLogicalTypeTest() throws IOException { + // Define the schema for the date logical type + Schema schema = SchemaTestUtil.getSchema(DATE_AVRO_INVALID_FILE_PATH); + + Map data = new HashMap<>(); + data.put("dateField", 1); + String json = MAPPER.writeValueAsString(data); + assertThrows(HoodieJsonToRowConversionException.class, () -> { + CONVERTER.convertToRow(json, schema); + }); + } + + private static final String LOCAL_TIME_AVRO_FILE_PATH = "/local-timestamp-logical-type.avsc"; + + /** + * Covered case: + * Avro Logical Type: localTimestampMillisField & localTimestampMillisField + * Avro type: long for both + * Input: Check parameter definition + * Output: Object using the avro data type as the schema specified. + */ + @ParameterizedTest + @MethodSource("localTimestampGoodCaseProvider") + void localTimestampLogicalTypeGoodCaseTest( + Long expectedMicroSecOfDay, Object timeMilli, Object timeMicro) throws IOException { + // Example inputs + long microSecOfDay = expectedMicroSecOfDay; + long milliSecOfDay = expectedMicroSecOfDay / 1000; // Represents 12h 30 min since the start of the day + + // Define the schema for the date logical type + Schema schema = SchemaTestUtil.getSchema(LOCAL_TIME_AVRO_FILE_PATH); + + Map data = new HashMap<>(); + data.put("localTimestampMillisField", timeMilli); + data.put("localTimestampMicrosField", timeMicro); + String json = MAPPER.writeValueAsString(data); + + Row rec = RowFactory.create(milliSecOfDay, microSecOfDay); + assertEquals(rec, CONVERTER.convertToRow(json, schema)); + } + + @ParameterizedTest + @MethodSource("localTimestampBadCaseProvider") + void localTimestampLogicalTypeBadTest( + String schemaFile, Object input) throws IOException { + // Define the schema for the date logical type + Schema schema = SchemaTestUtil.getSchema(schemaFile); + Map data = new HashMap<>(); + data.put("timestamp", input); + String json = MAPPER.writeValueAsString(data); + + assertThrows(HoodieJsonToRowConversionException.class, () -> { + CONVERTER.convertToRow(json, schema); + }); + } + + private static final String TIMESTAMP_AVRO_FILE_PATH = "/timestamp-logical-type2.avsc"; + + /** + * Covered case: + * Avro Logical Type: localTimestampMillisField & localTimestampMillisField + * Avro type: long for both + * Input: Check parameter definition + * Output: Object using the avro data type as the schema specified. + */ + @ParameterizedTest + @MethodSource("timestampGoodCaseProvider") + void timestampLogicalTypeGoodCaseTest( + Long expectedMicroSecOfDay, Object timeMilli, Object timeMicro) throws IOException { + // Example inputs + long microSecOfDay = expectedMicroSecOfDay; + long milliSecOfDay = expectedMicroSecOfDay / 1000; // Represents 12h 30 min since the start of the day + + // Define the schema for the date logical type + Schema schema = SchemaTestUtil.getSchema(TIMESTAMP_AVRO_FILE_PATH); + + Map data = new HashMap<>(); + data.put("timestampMillisField", timeMilli); + data.put("timestampMicrosField", timeMicro); + String json = MAPPER.writeValueAsString(data); + + Row rec = RowFactory.create(milliSecOfDay, microSecOfDay); + assertEquals(rec, CONVERTER.convertToRow(json, schema)); + } + + @ParameterizedTest + @MethodSource("timestampBadCaseProvider") + void timestampLogicalTypeBadTest(Object input) throws IOException { + // Define the schema for the date logical type + Schema schema = SchemaTestUtil.getSchema(TIMESTAMP_AVRO_FILE_PATH); + Map data = new HashMap<>(); + data.put("timestampMillisField", input); + data.put("timestampMicrosField", input); + String json = MAPPER.writeValueAsString(data); + // Schedule with timestamp same as that of committed instant + + assertThrows(HoodieJsonToRowConversionException.class, () -> { + CONVERTER.convertToRow(json, schema); + }); + } + + private static final String TIME_AVRO_FILE_PATH = "/time-logical-type.avsc"; + + /** + * Covered case: + * Avro Logical Type: time-micros & time-millis + * Avro type: long for time-micros, int for time-millis + * Input: Check parameter definition + * Output: Object using the avro data type as the schema specified. + */ + @ParameterizedTest + @MethodSource("timeGoodCaseProvider") + void timeLogicalTypeTest(Long expectedMicroSecOfDay, Object timeMilli, Object timeMicro) throws IOException { + // Example inputs + long microSecOfDay = expectedMicroSecOfDay; + int milliSecOfDay = (int) (expectedMicroSecOfDay / 1000); // Represents 12h 30 min since the start of the day + + // Define the schema for the date logical type + Schema schema = SchemaTestUtil.getSchema(TIME_AVRO_FILE_PATH); + + Map data = new HashMap<>(); + data.put("timeMicroField", timeMicro); + data.put("timeMillisField", timeMilli); + String json = MAPPER.writeValueAsString(data); + + Row rec = RowFactory.create(microSecOfDay, milliSecOfDay); + Row realRow = CONVERTER.convertToRow(json, schema); + assertEquals(rec.get(0).toString(), realRow.get(0).toString()); + assertEquals(rec.get(1).toString(), realRow.get(1).toString()); + } + + @ParameterizedTest + @MethodSource("timeBadCaseProvider") + void timeLogicalTypeBadCaseTest(Object timeMilli, Object timeMicro) throws IOException { + // Define the schema for the date logical type + Schema schema = SchemaTestUtil.getSchema(TIME_AVRO_FILE_PATH); + + Map data = new HashMap<>(); + data.put("timeMicroField", timeMicro); + data.put("timeMillisField", timeMilli); + String json = MAPPER.writeValueAsString(data); + + assertThrows(Exception.class, () -> { + CONVERTER.convertToRow(json, schema); + }); + } + + private static final String UUID_AVRO_FILE_PATH = "/uuid-logical-type.avsc"; + + /** + * Covered case: + * Avro Logical Type: uuid + * Avro type: string + * Input: uuid string + * Output: Object using the avro data type as the schema specified. + */ + @ParameterizedTest + @MethodSource("uuidDimension") + void uuidLogicalTypeTest(String uuid) throws IOException { + // Define the schema for the date logical type + Schema schema = SchemaTestUtil.getSchema(UUID_AVRO_FILE_PATH); + + Map data = new HashMap<>(); + data.put("uuidField", uuid); + String json = MAPPER.writeValueAsString(data); + + Row rec = RowFactory.create(uuid); + assertEquals(rec, CONVERTER.convertToRow(json, schema)); + } + + @Test + void conversionWithFieldNameAliases() throws IOException { + String schemaStringWithAliases = "{\"namespace\": \"example.avro\", \"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\", \"aliases\": [\"$name\"]}, " + + "{\"name\": \"favorite_number\", \"type\": \"int\", \"aliases\": [\"unused\", \"favorite-number\"]}, {\"name\": \"favorite_color\", \"type\": \"string\", \"aliases\": " + + "[\"favorite.color!\"]}, {\"name\": \"unmatched\", \"type\": \"string\", \"default\": \"default_value\"}]}"; + Schema sanitizedSchema = new Schema.Parser().parse(schemaStringWithAliases); + String name = "John Smith"; + int number = 1337; + String color = "Blue. No yellow!"; + Map data = new HashMap<>(); + data.put("$name", name); + data.put("favorite-number", number); + data.put("favorite.color!", color); + String json = MAPPER.writeValueAsString(data); + + List values = new ArrayList<>(Collections.nCopies(sanitizedSchema.getFields().size(), null)); + values.set(0, name); + values.set(1, number); + values.set(2, color); + Row recRow = RowFactory.create(values.toArray()); + assertEquals(recRow, CONVERTER.convertToRow(json, sanitizedSchema)); + } +} \ No newline at end of file diff --git a/hudi-utilities/src/test/resources/schema/simple-test-with-default-value.avsc b/hudi-utilities/src/test/resources/schema/simple-test-with-default-value.avsc new file mode 100644 index 0000000000000..4717b269b7b87 --- /dev/null +++ b/hudi-utilities/src/test/resources/schema/simple-test-with-default-value.avsc @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": "int"}, + { + "name": "age", + "type": "int", + "default": 30 + }, + {"name": "favorite_color", "type": "string"}, + { + "name": "email", + "type": ["null", "string"], + "default": null + } + ] +}