Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1810] Support general iceberg catalog in icebergMetadataWriter #3672

Merged
merged 7 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.gobblin.metadata.OperationType;
import org.apache.gobblin.metadata.SchemaSource;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
import org.apache.gobblin.util.ClustersNames;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.writer.PartitionedDataWriter;
Expand Down Expand Up @@ -149,6 +150,14 @@ private void setBasicInformationForGMCE(GobblinMetadataChangeEvent.Builder gmceB
regProperties.put(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME,
state.getProp(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME));
}
if (state.contains(HiveRegistrationPolicyBase.HIVE_TABLE_NAME)) {
regProperties.put(HiveRegistrationPolicyBase.HIVE_TABLE_NAME,
state.getProp(HiveRegistrationPolicyBase.HIVE_TABLE_NAME));
}
if (state.contains(KafkaSource.TOPIC_NAME)) {
regProperties.put(KafkaSource.TOPIC_NAME,
state.getProp(KafkaSource.TOPIC_NAME));
}
if (state.contains(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES)) {
regProperties.put(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES,
state.getProp(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.gobblin.hive.writer.MetadataWriterKeys;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFiles;
Expand All @@ -68,12 +70,12 @@
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.HiveCatalogs;
import org.apache.iceberg.types.Types;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -186,9 +188,9 @@ public class IcebergMetadataWriter implements MetadataWriter {
private final Map<TableIdentifier, String> tableTopicPartitionMap;
@Getter
private final KafkaSchemaRegistry schemaRegistry;
private final Map<TableIdentifier, TableMetadata> tableMetadataMap;
protected final Map<TableIdentifier, TableMetadata> tableMetadataMap;
@Setter
protected HiveCatalog catalog;
protected Catalog catalog;
protected final Configuration conf;
protected final ReadWriteLock readWriteLock;
private final HiveLock locks;
Expand Down Expand Up @@ -330,7 +332,7 @@ public void write(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSp
tableMetadata.lowestGMCEEmittedTime = Long.min(tableMetadata.lowestGMCEEmittedTime, gmce.getGMCEmittedTime());
switch (gmce.getOperationType()) {
case add_files: {
updateTableProperty(tableSpec, tid);
updateTableProperty(tableSpec, tid, gmce);
addFiles(gmce, newSpecsMap, table, tableMetadata);
if (gmce.getAuditCountMap() != null && auditWhitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(),
tableSpec.getTable().getTableName())) {
Expand All @@ -342,7 +344,7 @@ public void write(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSp
break;
}
case rewrite_files: {
updateTableProperty(tableSpec, tid);
updateTableProperty(tableSpec, tid, gmce);
rewriteFiles(gmce, newSpecsMap, oldSpecsMap, table, tableMetadata);
break;
}
Expand All @@ -351,7 +353,7 @@ public void write(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSp
break;
}
case change_property: {
updateTableProperty(tableSpec, tid);
updateTableProperty(tableSpec, tid, gmce);
if (gmce.getTopicPartitionOffsetsRange() != null) {
mergeOffsets(gmce, tid);
}
Expand Down Expand Up @@ -418,7 +420,7 @@ public int compare(Range o1, Range o2) {
}
}

private void updateTableProperty(HiveSpec tableSpec, TableIdentifier tid) {
protected void updateTableProperty(HiveSpec tableSpec, TableIdentifier tid, GobblinMetadataChangeEvent gmce) {
org.apache.hadoop.hive.metastore.api.Table table = HiveMetaStoreUtils.getTable(tableSpec.getTable());
TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata());
tableMetadata.newProperties = Optional.of(IcebergUtils.getTableProperties(table));
Expand Down Expand Up @@ -449,23 +451,23 @@ private void computeCandidateSchema(GobblinMetadataChangeEvent gmce, TableIdenti
.expireAfterAccess(conf.getInt(MetadataWriter.CACHE_EXPIRING_TIME,
MetadataWriter.DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
.build()));
Cache<String, Schema> candidate = tableMetadata.candidateSchemas.get();
Cache<String, Pair<Schema, String>> candidate = tableMetadata.candidateSchemas.get();
try {
switch (gmce.getSchemaSource()) {
case SCHEMAREGISTRY: {
org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(gmce.getTableSchema());
String createdOn = AvroUtils.getSchemaCreationTime(schema);
if (createdOn == null) {
candidate.put(DEFAULT_CREATION_TIME,
IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema);
Pair.of(IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema, gmce.getTableSchema()));
} else if (!createdOn.equals(lastSchemaVersion)) {
candidate.put(createdOn, IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema);
candidate.put(createdOn, Pair.of(IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema, gmce.getTableSchema()));
}
break;
}
case EVENT: {
candidate.put(DEFAULT_CREATION_TIME,
IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema);
Pair.of(IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema, gmce.getTableSchema()));
break;
}
case NONE: {
Expand Down Expand Up @@ -780,6 +782,16 @@ private StructLike getIcebergPartitionVal(Collection<HiveSpec> specs, String fil
return partitionVal;
}

protected String getTopicName(TableIdentifier tid, TableMetadata tableMetadata) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add Javadoc

if (tableMetadata.dataOffsetRange.isPresent()) {
String topicPartitionString = tableMetadata.dataOffsetRange.get().keySet().iterator().next();
//In case the topic name is not the table name or the topic name contains '-'
return topicPartitionString.substring(0, topicPartitionString.lastIndexOf('-'));
}
return tableMetadata.newProperties.or(
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties()))).get(TOPIC_NAME_KEY);
}

/**
* For flush of each table, we do the following logic:
* 1. Commit the appendFiles if it exist
Expand All @@ -801,12 +813,14 @@ public void flush(String dbName, String tableName) throws IOException {
Transaction transaction = tableMetadata.transaction.get();
Map<String, String> props = tableMetadata.newProperties.or(
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
String topic = props.get(TOPIC_NAME_KEY);
//Set data offset range
setDatasetOffsetRange(tableMetadata, props);
String topicName = getTopicName(tid, tableMetadata);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is tableMetadataMap.get(tid).setDatasetName(gmce.getDatasetIdentifier().getNativeName()); required after this change to a new way to get topic name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are still fallback to the previous topic name calculated from NativeName here in case datasetOffsetRange does not exist. Also GTE require this datasetName to set the HDFS dataset path information

if (tableMetadata.appendFiles.isPresent()) {
tableMetadata.appendFiles.get().commit();
sendAuditCounts(topic, tableMetadata.serializedAuditCountMaps);
sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
if (tableMetadata.completenessEnabled) {
checkAndUpdateCompletenessWatermark(tableMetadata, topic, tableMetadata.datePartitions, props);
checkAndUpdateCompletenessWatermark(tableMetadata, topicName, tableMetadata.datePartitions, props);
}
}
if (tableMetadata.deleteFiles.isPresent()) {
Expand All @@ -817,15 +831,15 @@ public void flush(String dbName, String tableName) throws IOException {
if(!tableMetadata.appendFiles.isPresent() && !tableMetadata.deleteFiles.isPresent()
&& tableMetadata.completenessEnabled) {
if (tableMetadata.completionWatermark > DEFAULT_COMPLETION_WATERMARK) {
log.info(String.format("Checking kafka audit for %s on change_property ", topic));
log.info(String.format("Checking kafka audit for %s on change_property ", topicName));
SortedSet<ZonedDateTime> timestamps = new TreeSet<>();
ZonedDateTime prevWatermarkDT =
Instant.ofEpochMilli(tableMetadata.completionWatermark).atZone(ZoneId.of(this.timeZone));
timestamps.add(TimeIterator.inc(prevWatermarkDT, TimeIterator.Granularity.valueOf(this.auditCheckGranularity), 1));
checkAndUpdateCompletenessWatermark(tableMetadata, topic, timestamps, props);
checkAndUpdateCompletenessWatermark(tableMetadata, topicName, timestamps, props);
} else {
log.info(String.format("Need valid watermark, current watermark is %s, Not checking kafka audit for %s",
tableMetadata.completionWatermark, topic));
tableMetadata.completionWatermark, topicName));
}
}

Expand All @@ -842,14 +856,6 @@ public void flush(String dbName, String tableName) throws IOException {
props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, Integer.toString(
conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
//Set data offset range
boolean containOffsetRange = setDatasetOffsetRange(tableMetadata, props);
String topicName = tableName;
if (containOffsetRange) {
String topicPartitionString = tableMetadata.dataOffsetRange.get().keySet().iterator().next();
//In case the topic name is not the table name or the topic name contains '-'
topicName = topicPartitionString.substring(0, topicPartitionString.lastIndexOf('-'));
}
//Update schema(commit)
updateSchema(tableMetadata, props, topicName);
//Update properties
Expand Down Expand Up @@ -882,7 +888,7 @@ public void flush(String dbName, String tableName) throws IOException {

@Override
public void reset(String dbName, String tableName) throws IOException {
this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
}

/**
Expand Down Expand Up @@ -952,7 +958,7 @@ private long computeCompletenessWatermark(String catalogDbTableName, String topi
long timestampMillis = timestampDT.toInstant().toEpochMilli();
ZonedDateTime auditCountCheckLowerBoundDT = TimeIterator.dec(timestampDT, granularity, 1);
if (auditCountVerifier.get().isComplete(topicName,
auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
completionWatermark = timestampMillis;
// Also persist the watermark into State object to share this with other MetadataWriters
// we enforce ourselves to always use lower-cased table name here
Expand Down Expand Up @@ -1026,7 +1032,8 @@ private void updateSchema(TableMetadata tableMetadata, Map<String, String> props
Cache candidates = tableMetadata.candidateSchemas.get();
//Only have default schema, so either we calculate schema from event or the schema does not have creation time, directly update it
if (candidates.size() == 1 && candidates.getIfPresent(DEFAULT_CREATION_TIME) != null) {
updateSchemaHelper(DEFAULT_CREATION_TIME, (Schema) candidates.getIfPresent(DEFAULT_CREATION_TIME), props,
updateSchemaHelper(DEFAULT_CREATION_TIME,
(Pair<Schema, String>) candidates.getIfPresent(DEFAULT_CREATION_TIME), props,
tableMetadata.table.get());
} else {
//update schema if candidates contains the schema that has the same creation time with the latest schema
Expand All @@ -1037,7 +1044,7 @@ private void updateSchema(TableMetadata tableMetadata, Map<String, String> props
log.warn(
"Schema from schema registry does not contain creation time, check config for schema registry class");
} else if (candidates.getIfPresent(creationTime) != null) {
updateSchemaHelper(creationTime, (Schema) candidates.getIfPresent(creationTime), props,
updateSchemaHelper(creationTime, (Pair<Schema, String>) candidates.getIfPresent(creationTime), props,
tableMetadata.table.get());
}
}
Expand All @@ -1047,10 +1054,11 @@ private void updateSchema(TableMetadata tableMetadata, Map<String, String> props
}
}

private void updateSchemaHelper(String schemaCreationTime, Schema schema, Map<String, String> props, Table table) {
private void updateSchemaHelper(String schemaCreationTime, Pair<Schema, String> schema, Map<String, String> props, Table table) {
try {
table.updateSchema().unionByNameWith(schema).commit();
table.updateSchema().unionByNameWith(schema.getLeft()).commit();
props.put(SCHEMA_CREATION_TIME_KEY, schemaCreationTime);
props.put(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), schema.getRight());
} catch (Exception e) {
log.error("Cannot update schema to " + schema.toString() + "for table " + table.location(), e);
}
Expand Down Expand Up @@ -1122,7 +1130,7 @@ public void close() throws IOException {
*
* Also note the difference with {@link org.apache.iceberg.TableMetadata}.
*/
private class TableMetadata {
public class TableMetadata {
Optional<Table> table = Optional.absent();

/**
Expand All @@ -1133,10 +1141,10 @@ private class TableMetadata {
Optional<Transaction> transaction = Optional.absent();
private Optional<AppendFiles> appendFiles = Optional.absent();
private Optional<DeleteFiles> deleteFiles = Optional.absent();
public Optional<Map<String, String>> newProperties = Optional.absent();

Optional<Map<String, String>> lastProperties = Optional.absent();
Optional<Map<String, String>> newProperties = Optional.absent();
Optional<Cache<String, Schema>> candidateSchemas = Optional.absent();
Optional<Cache<String, Pair<Schema, String>>> candidateSchemas = Optional.absent();
Optional<Map<String, List<Range>>> dataOffsetRange = Optional.absent();
Optional<String> lastSchemaVersion = Optional.absent();
Optional<Long> lowWatermark = Optional.absent();
Expand Down
Loading