diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java index b037cb6354f..0f0061ed29f 100644 --- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java +++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java @@ -46,6 +46,7 @@ import org.apache.gobblin.metadata.OperationType; import org.apache.gobblin.metadata.SchemaSource; import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource; import org.apache.gobblin.util.ClustersNames; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; import org.apache.gobblin.writer.PartitionedDataWriter; @@ -149,6 +150,14 @@ private void setBasicInformationForGMCE(GobblinMetadataChangeEvent.Builder gmceB regProperties.put(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME, state.getProp(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME)); } + if (state.contains(HiveRegistrationPolicyBase.HIVE_TABLE_NAME)) { + regProperties.put(HiveRegistrationPolicyBase.HIVE_TABLE_NAME, + state.getProp(HiveRegistrationPolicyBase.HIVE_TABLE_NAME)); + } + if (state.contains(KafkaSource.TOPIC_NAME)) { + regProperties.put(KafkaSource.TOPIC_NAME, + state.getProp(KafkaSource.TOPIC_NAME)); + } if (state.contains(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES)) { regProperties.put(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES, state.getProp(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES)); diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java index 9c2c4fdf168..27a2629e5b0 100644 --- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java +++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java @@ -48,12 +48,14 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.specific.SpecificData; +import org.apache.commons.lang3.tuple.Pair; import org.apache.gobblin.hive.writer.MetadataWriterKeys; import org.apache.gobblin.source.extractor.extract.LongWatermark; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFiles; @@ -68,12 +70,12 @@ import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdateProperties; import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.hive.HiveCatalog; import org.apache.iceberg.hive.HiveCatalogs; import org.apache.iceberg.types.Types; import org.joda.time.DateTime; @@ -186,9 +188,9 @@ public class IcebergMetadataWriter implements MetadataWriter { private final Map tableTopicPartitionMap; @Getter private final KafkaSchemaRegistry schemaRegistry; - private final Map tableMetadataMap; + protected final Map tableMetadataMap; @Setter - protected HiveCatalog catalog; + protected Catalog catalog; protected final Configuration conf; protected final ReadWriteLock readWriteLock; private final HiveLock locks; @@ -330,7 +332,7 @@ public void write(GobblinMetadataChangeEvent gmce, Map new TableMetadata()); tableMetadata.newProperties = Optional.of(IcebergUtils.getTableProperties(table)); @@ -449,7 +451,7 @@ private void computeCandidateSchema(GobblinMetadataChangeEvent gmce, TableIdenti .expireAfterAccess(conf.getInt(MetadataWriter.CACHE_EXPIRING_TIME, MetadataWriter.DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS) .build())); - Cache candidate = tableMetadata.candidateSchemas.get(); + Cache> candidate = tableMetadata.candidateSchemas.get(); try { switch (gmce.getSchemaSource()) { case SCHEMAREGISTRY: { @@ -457,15 +459,15 @@ private void computeCandidateSchema(GobblinMetadataChangeEvent gmce, TableIdenti String createdOn = AvroUtils.getSchemaCreationTime(schema); if (createdOn == null) { candidate.put(DEFAULT_CREATION_TIME, - IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema); + Pair.of(IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema, gmce.getTableSchema())); } else if (!createdOn.equals(lastSchemaVersion)) { - candidate.put(createdOn, IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema); + candidate.put(createdOn, Pair.of(IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema, gmce.getTableSchema())); } break; } case EVENT: { candidate.put(DEFAULT_CREATION_TIME, - IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema); + Pair.of(IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema, gmce.getTableSchema())); break; } case NONE: { @@ -780,6 +782,21 @@ private StructLike getIcebergPartitionVal(Collection specs, String fil return partitionVal; } + /** + * We will firstly try to use datasetOffsetRange to get the topic name, as the pattern for datasetOffsetRange key should be ({topicName}-{partitionNumber}) + * In case there is no datasetOffsetRange, we fall back to the table property that we set previously for "topic.name" + * @return kafka topic name for this table + */ + protected String getTopicName(TableIdentifier tid, TableMetadata tableMetadata) { + if (tableMetadata.dataOffsetRange.isPresent()) { + String topicPartitionString = tableMetadata.dataOffsetRange.get().keySet().iterator().next(); + //In case the topic name is not the table name or the topic name contains '-' + return topicPartitionString.substring(0, topicPartitionString.lastIndexOf('-')); + } + return tableMetadata.newProperties.or( + Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties()))).get(TOPIC_NAME_KEY); + } + /** * For flush of each table, we do the following logic: * 1. Commit the appendFiles if it exist @@ -801,12 +818,14 @@ public void flush(String dbName, String tableName) throws IOException { Transaction transaction = tableMetadata.transaction.get(); Map props = tableMetadata.newProperties.or( Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties()))); - String topic = props.get(TOPIC_NAME_KEY); + //Set data offset range + setDatasetOffsetRange(tableMetadata, props); + String topicName = getTopicName(tid, tableMetadata); if (tableMetadata.appendFiles.isPresent()) { tableMetadata.appendFiles.get().commit(); - sendAuditCounts(topic, tableMetadata.serializedAuditCountMaps); + sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps); if (tableMetadata.completenessEnabled) { - checkAndUpdateCompletenessWatermark(tableMetadata, topic, tableMetadata.datePartitions, props); + checkAndUpdateCompletenessWatermark(tableMetadata, topicName, tableMetadata.datePartitions, props); } } if (tableMetadata.deleteFiles.isPresent()) { @@ -817,15 +836,15 @@ public void flush(String dbName, String tableName) throws IOException { if(!tableMetadata.appendFiles.isPresent() && !tableMetadata.deleteFiles.isPresent() && tableMetadata.completenessEnabled) { if (tableMetadata.completionWatermark > DEFAULT_COMPLETION_WATERMARK) { - log.info(String.format("Checking kafka audit for %s on change_property ", topic)); + log.info(String.format("Checking kafka audit for %s on change_property ", topicName)); SortedSet timestamps = new TreeSet<>(); ZonedDateTime prevWatermarkDT = Instant.ofEpochMilli(tableMetadata.completionWatermark).atZone(ZoneId.of(this.timeZone)); timestamps.add(TimeIterator.inc(prevWatermarkDT, TimeIterator.Granularity.valueOf(this.auditCheckGranularity), 1)); - checkAndUpdateCompletenessWatermark(tableMetadata, topic, timestamps, props); + checkAndUpdateCompletenessWatermark(tableMetadata, topicName, timestamps, props); } else { log.info(String.format("Need valid watermark, current watermark is %s, Not checking kafka audit for %s", - tableMetadata.completionWatermark, topic)); + tableMetadata.completionWatermark, topicName)); } } @@ -842,14 +861,6 @@ public void flush(String dbName, String tableName) throws IOException { props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, Integer.toString( conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT))); - //Set data offset range - boolean containOffsetRange = setDatasetOffsetRange(tableMetadata, props); - String topicName = tableName; - if (containOffsetRange) { - String topicPartitionString = tableMetadata.dataOffsetRange.get().keySet().iterator().next(); - //In case the topic name is not the table name or the topic name contains '-' - topicName = topicPartitionString.substring(0, topicPartitionString.lastIndexOf('-')); - } //Update schema(commit) updateSchema(tableMetadata, props, topicName); //Update properties @@ -882,7 +893,7 @@ public void flush(String dbName, String tableName) throws IOException { @Override public void reset(String dbName, String tableName) throws IOException { - this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName)); + this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName)); } /** @@ -952,7 +963,7 @@ private long computeCompletenessWatermark(String catalogDbTableName, String topi long timestampMillis = timestampDT.toInstant().toEpochMilli(); ZonedDateTime auditCountCheckLowerBoundDT = TimeIterator.dec(timestampDT, granularity, 1); if (auditCountVerifier.get().isComplete(topicName, - auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) { + auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) { completionWatermark = timestampMillis; // Also persist the watermark into State object to share this with other MetadataWriters // we enforce ourselves to always use lower-cased table name here @@ -1026,7 +1037,8 @@ private void updateSchema(TableMetadata tableMetadata, Map props Cache candidates = tableMetadata.candidateSchemas.get(); //Only have default schema, so either we calculate schema from event or the schema does not have creation time, directly update it if (candidates.size() == 1 && candidates.getIfPresent(DEFAULT_CREATION_TIME) != null) { - updateSchemaHelper(DEFAULT_CREATION_TIME, (Schema) candidates.getIfPresent(DEFAULT_CREATION_TIME), props, + updateSchemaHelper(DEFAULT_CREATION_TIME, + (Pair) candidates.getIfPresent(DEFAULT_CREATION_TIME), props, tableMetadata.table.get()); } else { //update schema if candidates contains the schema that has the same creation time with the latest schema @@ -1037,7 +1049,7 @@ private void updateSchema(TableMetadata tableMetadata, Map props log.warn( "Schema from schema registry does not contain creation time, check config for schema registry class"); } else if (candidates.getIfPresent(creationTime) != null) { - updateSchemaHelper(creationTime, (Schema) candidates.getIfPresent(creationTime), props, + updateSchemaHelper(creationTime, (Pair) candidates.getIfPresent(creationTime), props, tableMetadata.table.get()); } } @@ -1047,10 +1059,11 @@ private void updateSchema(TableMetadata tableMetadata, Map props } } - private void updateSchemaHelper(String schemaCreationTime, Schema schema, Map props, Table table) { + private void updateSchemaHelper(String schemaCreationTime, Pair schema, Map props, Table table) { try { - table.updateSchema().unionByNameWith(schema).commit(); + table.updateSchema().unionByNameWith(schema.getLeft()).commit(); props.put(SCHEMA_CREATION_TIME_KEY, schemaCreationTime); + props.put(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), schema.getRight()); } catch (Exception e) { log.error("Cannot update schema to " + schema.toString() + "for table " + table.location(), e); } @@ -1122,7 +1135,7 @@ public void close() throws IOException { * * Also note the difference with {@link org.apache.iceberg.TableMetadata}. */ - private class TableMetadata { + public class TableMetadata { Optional table = Optional.absent(); /** @@ -1133,10 +1146,10 @@ private class TableMetadata { Optional transaction = Optional.absent(); private Optional appendFiles = Optional.absent(); private Optional deleteFiles = Optional.absent(); + public Optional> newProperties = Optional.absent(); Optional> lastProperties = Optional.absent(); - Optional> newProperties = Optional.absent(); - Optional> candidateSchemas = Optional.absent(); + Optional>> candidateSchemas = Optional.absent(); Optional>> dataOffsetRange = Optional.absent(); Optional lastSchemaVersion = Optional.absent(); Optional lowWatermark = Optional.absent(); diff --git a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java index b6174ec7dba..294ef08ab95 100644 --- a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java +++ b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java @@ -124,11 +124,11 @@ public void setUp() throws Exception { startMetastore(); tmpDir = Files.createTempDir(); - hourlyDataFile_1 = new File(tmpDir, "testDB/testIcebergTable/hourly/2020/03/17/08/data.avro"); + hourlyDataFile_1 = new File(tmpDir, "testDB/testTopic/hourly/2020/03/17/08/data.avro"); Files.createParentDirs(hourlyDataFile_1); - hourlyDataFile_2 = new File(tmpDir, "testDB/testIcebergTable/hourly/2020/03/17/09/data.avro"); + hourlyDataFile_2 = new File(tmpDir, "testDB/testTopic/hourly/2020/03/17/09/data.avro"); Files.createParentDirs(hourlyDataFile_2); - dailyDataFile = new File(tmpDir, "testDB/testIcebergTable/daily/2020/03/17/data.avro"); + dailyDataFile = new File(tmpDir, "testDB/testTopic/daily/2020/03/17/data.avro"); Files.createParentDirs(dailyDataFile); dataDir = new File(hourlyDataFile_1.getParent()); Assert.assertTrue(dataDir.exists()); @@ -139,7 +139,7 @@ public void setUp() throws Exception { .setDatasetIdentifier(DatasetIdentifier.newBuilder() .setDataOrigin(DataOrigin.EI) .setDataPlatformUrn("urn:namespace:dataPlatform:hdfs") - .setNativeName(new File(tmpDir, "testDB/testIcebergTable").getAbsolutePath()) + .setNativeName(new File(tmpDir, "testDB/testTopic").getAbsolutePath()) .build()) .setTopicPartitionOffsetsRange(ImmutableMap.builder().put("testTopic-1", "0-1000").build()) .setFlowId("testFlow") @@ -221,7 +221,7 @@ public void testWriteAddFileGMCE() throws IOException { Table table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0)); Assert.assertFalse(table.properties().containsKey("offset.range.testTopic-1")); Assert.assertEquals(table.location(), - new File(tmpDir, "testDB/testIcebergTable/_iceberg_metadata/").getAbsolutePath() + "/" + dbName); + new File(tmpDir, "testDB/testTopic/_iceberg_metadata/").getAbsolutePath() + "/" + dbName); gmce.setTopicPartitionOffsetsRange(ImmutableMap.builder().put("testTopic-1", "1000-2000").build()); GenericRecord genericGmce_1000_2000 = GenericData.get().deepCopy(gmce.getSchema(), gmce); @@ -363,11 +363,11 @@ public void testFaultTolerant() throws Exception { Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().size(), 1); Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().values().iterator().next().size(), 1); Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap() - .get(new File(tmpDir, "testDB/testIcebergTable").getAbsolutePath()) - .get("hivedb.testIcebergTable").get(0).lowWatermark, 50L); + .get(new File(tmpDir, "testDB/testTopic").getAbsolutePath()) + .get("hivedb.testTopic").get(0).lowWatermark, 50L); Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap() - .get(new File(tmpDir, "testDB/testIcebergTable").getAbsolutePath()) - .get("hivedb.testIcebergTable").get(0).highWatermark, 52L); + .get(new File(tmpDir, "testDB/testTopic").getAbsolutePath()) + .get("hivedb.testTopic").get(0).highWatermark, 52L); // No events sent yet since the topic has not been flushed Assert.assertEquals(eventsSent.size(), 0); @@ -378,7 +378,7 @@ public void testFaultTolerant() throws Exception { // Since this topic has been flushed, there should be an event sent for previous failure, and the table // should be removed from the error map Assert.assertEquals(eventsSent.size(), 1); - Assert.assertEquals(eventsSent.get(0).getMetadata().get(MetadataWriterKeys.TABLE_NAME_KEY), "testIcebergTable"); + Assert.assertEquals(eventsSent.get(0).getMetadata().get(MetadataWriterKeys.TABLE_NAME_KEY), "testTopic"); Assert.assertEquals(eventsSent.get(0).getMetadata().get(MetadataWriterKeys.GMCE_LOW_WATERMARK), "50"); Assert.assertEquals(eventsSent.get(0).getMetadata().get(MetadataWriterKeys.GMCE_HIGH_WATERMARK), "52"); Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().values().iterator().next().size(), 0); @@ -398,7 +398,7 @@ public void testWriteAddFileGMCECompleteness() throws IOException { // Creating a copy of gmce with static type in GenericRecord to work with writeEnvelop method // without risking running into type cast runtime error. gmce.setOperationType(OperationType.add_files); - File hourlyFile = new File(tmpDir, "testDB/testIcebergTable/hourly/2021/09/16/10/data.avro"); + File hourlyFile = new File(tmpDir, "testDB/testTopic/hourly/2021/09/16/10/data.avro"); long timestampMillis = 1631811600000L; Files.createParentDirs(hourlyFile); writeRecord(hourlyFile); @@ -421,13 +421,13 @@ public void testWriteAddFileGMCECompleteness() throws IOException { // Test when completeness watermark = -1 bootstrap case KafkaAuditCountVerifier verifier = Mockito.mock(TestAuditCountVerifier.class); - Mockito.when(verifier.isComplete("testIcebergTable", timestampMillis - TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true); + Mockito.when(verifier.isComplete("testTopic", timestampMillis - TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true); IcebergMetadataWriter imw = (IcebergMetadataWriter) gobblinMCEWriterWithCompletness.metadataWriters.iterator().next(); imw.setAuditCountVerifier(verifier); gobblinMCEWriterWithCompletness.flush(); table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0)); //completeness watermark = "2020-09-16-10" - Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testIcebergTable"); + Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic"); Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY), "America/Los_Angeles"); Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), String.valueOf(timestampMillis)); // 1631811600000L correspond to 2020-09-16-10 in PT @@ -437,7 +437,7 @@ public void testWriteAddFileGMCECompleteness() throws IOException { Assert.assertTrue(dfl.hasNext()); // Test when completeness watermark is still "2021-09-16-10" but have a late file for "2021-09-16-09" - File hourlyFile1 = new File(tmpDir, "testDB/testIcebergTable/hourly/2021/09/16/09/data1.avro"); + File hourlyFile1 = new File(tmpDir, "testDB/testTopic/hourly/2021/09/16/09/data1.avro"); Files.createParentDirs(hourlyFile1); writeRecord(hourlyFile1); gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder() @@ -460,7 +460,7 @@ public void testWriteAddFileGMCECompleteness() throws IOException { Assert.assertEquals((int) dfl.next().partition().get(1, Integer.class), 1); // Test when completeness watermark will advance to "2021-09-16-11" - File hourlyFile2 = new File(tmpDir, "testDB/testIcebergTable/hourly/2021/09/16/11/data.avro"); + File hourlyFile2 = new File(tmpDir, "testDB/testTopic/hourly/2021/09/16/11/data.avro"); long timestampMillis1 = timestampMillis + TimeUnit.HOURS.toMillis(1); Files.createParentDirs(hourlyFile2); writeRecord(hourlyFile2); @@ -476,7 +476,7 @@ public void testWriteAddFileGMCECompleteness() throws IOException { new KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(), new LongWatermark(60L)))); - Mockito.when(verifier.isComplete("testIcebergTable", timestampMillis1 - TimeUnit.HOURS.toMillis(1), timestampMillis1)).thenReturn(true); + Mockito.when(verifier.isComplete("testTopic", timestampMillis1 - TimeUnit.HOURS.toMillis(1), timestampMillis1)).thenReturn(true); gobblinMCEWriterWithCompletness.flush(); table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0)); Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), String.valueOf(timestampMillis1)); @@ -495,7 +495,7 @@ public void testChangePropertyGMCECompleteness() throws IOException { Table table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0)); long watermark = Long.parseLong(table.properties().get(COMPLETION_WATERMARK_KEY)); long expectedWatermark = watermark + TimeUnit.HOURS.toMillis(1); - File hourlyFile2 = new File(tmpDir, "testDB/testIcebergTable/hourly/2021/09/16/11/data.avro"); + File hourlyFile2 = new File(tmpDir, "testDB/testTopic/hourly/2021/09/16/11/data.avro"); gmce.setOldFilePrefixes(null); gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder() .setFilePath(hourlyFile2.toString()) @@ -511,14 +511,14 @@ public void testChangePropertyGMCECompleteness() throws IOException { new LongWatermark(65L)))); KafkaAuditCountVerifier verifier = Mockito.mock(TestAuditCountVerifier.class); - Mockito.when(verifier.isComplete("testIcebergTable", watermark, expectedWatermark)).thenReturn(true); + Mockito.when(verifier.isComplete("testTopic", watermark, expectedWatermark)).thenReturn(true); ((IcebergMetadataWriter) gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier); gobblinMCEWriterWithCompletness.flush(); table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0)); Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), "0-7000"); Assert.assertEquals(table.spec().fields().get(1).name(), "late"); - Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testIcebergTable"); + Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic"); Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY), "America/Los_Angeles"); Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), String.valueOf(expectedWatermark)); @@ -558,7 +558,7 @@ protected Optional getPartition(Path path, HiveTable table) throw partitionValue = "2020-03-17-00"; } return Optional.of(new HivePartition.Builder().withPartitionValues(Lists.newArrayList(partitionValue)) - .withDbName("hivedb").withTableName("testIcebergTable").build()); + .withDbName("hivedb").withTableName("testTopic").build()); } @Override protected List getTables(Path path) throws IOException { @@ -577,7 +577,7 @@ protected List getTableNames(Optional dbPrefix, Path path) { if (path.toString().contains("testFaultTolerant")) { return Lists.newArrayList("testFaultTolerantIcebergTable"); } - return Lists.newArrayList("testIcebergTable"); + return Lists.newArrayList("testTopic"); } }