Skip to content

Commit

Permalink
[GOBBLIN-1810] Support general iceberg catalog in icebergMetadataWrit…
Browse files Browse the repository at this point in the history
…er (#3672)

* address comments

* use connectionmanager when httpclient is not cloesable

* [GOBBLIN-1810] Support general iceberg catalog in icebergMetadataWriter

* Revert "[GOBBLIN-1810] Support general iceberg catalog in icebergMetadataWriter"

This reverts commit b0844e8.

* [GOBBLIN-1810] Support general iceberg catalog in icebergMetadataWriter

* fix unit test

* add java doc

---------

Co-authored-by: Zihan Li <zihli@zihli-mn2.linkedin.biz>
  • Loading branch information
ZihanLi58 and Zihan Li authored Apr 13, 2023
1 parent 80c45fb commit eda83a2
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.gobblin.metadata.OperationType;
import org.apache.gobblin.metadata.SchemaSource;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
import org.apache.gobblin.util.ClustersNames;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.writer.PartitionedDataWriter;
Expand Down Expand Up @@ -149,6 +150,14 @@ private void setBasicInformationForGMCE(GobblinMetadataChangeEvent.Builder gmceB
regProperties.put(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME,
state.getProp(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME));
}
if (state.contains(HiveRegistrationPolicyBase.HIVE_TABLE_NAME)) {
regProperties.put(HiveRegistrationPolicyBase.HIVE_TABLE_NAME,
state.getProp(HiveRegistrationPolicyBase.HIVE_TABLE_NAME));
}
if (state.contains(KafkaSource.TOPIC_NAME)) {
regProperties.put(KafkaSource.TOPIC_NAME,
state.getProp(KafkaSource.TOPIC_NAME));
}
if (state.contains(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES)) {
regProperties.put(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES,
state.getProp(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.gobblin.hive.writer.MetadataWriterKeys;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFiles;
Expand All @@ -68,12 +70,12 @@
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.HiveCatalogs;
import org.apache.iceberg.types.Types;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -186,9 +188,9 @@ public class IcebergMetadataWriter implements MetadataWriter {
private final Map<TableIdentifier, String> tableTopicPartitionMap;
@Getter
private final KafkaSchemaRegistry schemaRegistry;
private final Map<TableIdentifier, TableMetadata> tableMetadataMap;
protected final Map<TableIdentifier, TableMetadata> tableMetadataMap;
@Setter
protected HiveCatalog catalog;
protected Catalog catalog;
protected final Configuration conf;
protected final ReadWriteLock readWriteLock;
private final HiveLock locks;
Expand Down Expand Up @@ -330,7 +332,7 @@ public void write(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSp
tableMetadata.lowestGMCEEmittedTime = Long.min(tableMetadata.lowestGMCEEmittedTime, gmce.getGMCEmittedTime());
switch (gmce.getOperationType()) {
case add_files: {
updateTableProperty(tableSpec, tid);
updateTableProperty(tableSpec, tid, gmce);
addFiles(gmce, newSpecsMap, table, tableMetadata);
if (gmce.getAuditCountMap() != null && auditWhitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(),
tableSpec.getTable().getTableName())) {
Expand All @@ -342,7 +344,7 @@ public void write(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSp
break;
}
case rewrite_files: {
updateTableProperty(tableSpec, tid);
updateTableProperty(tableSpec, tid, gmce);
rewriteFiles(gmce, newSpecsMap, oldSpecsMap, table, tableMetadata);
break;
}
Expand All @@ -351,7 +353,7 @@ public void write(GobblinMetadataChangeEvent gmce, Map<String, Collection<HiveSp
break;
}
case change_property: {
updateTableProperty(tableSpec, tid);
updateTableProperty(tableSpec, tid, gmce);
if (gmce.getTopicPartitionOffsetsRange() != null) {
mergeOffsets(gmce, tid);
}
Expand Down Expand Up @@ -418,7 +420,7 @@ public int compare(Range o1, Range o2) {
}
}

private void updateTableProperty(HiveSpec tableSpec, TableIdentifier tid) {
protected void updateTableProperty(HiveSpec tableSpec, TableIdentifier tid, GobblinMetadataChangeEvent gmce) {
org.apache.hadoop.hive.metastore.api.Table table = HiveMetaStoreUtils.getTable(tableSpec.getTable());
TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> new TableMetadata());
tableMetadata.newProperties = Optional.of(IcebergUtils.getTableProperties(table));
Expand Down Expand Up @@ -449,23 +451,23 @@ private void computeCandidateSchema(GobblinMetadataChangeEvent gmce, TableIdenti
.expireAfterAccess(conf.getInt(MetadataWriter.CACHE_EXPIRING_TIME,
MetadataWriter.DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
.build()));
Cache<String, Schema> candidate = tableMetadata.candidateSchemas.get();
Cache<String, Pair<Schema, String>> candidate = tableMetadata.candidateSchemas.get();
try {
switch (gmce.getSchemaSource()) {
case SCHEMAREGISTRY: {
org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(gmce.getTableSchema());
String createdOn = AvroUtils.getSchemaCreationTime(schema);
if (createdOn == null) {
candidate.put(DEFAULT_CREATION_TIME,
IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema);
Pair.of(IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema, gmce.getTableSchema()));
} else if (!createdOn.equals(lastSchemaVersion)) {
candidate.put(createdOn, IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema);
candidate.put(createdOn, Pair.of(IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema, gmce.getTableSchema()));
}
break;
}
case EVENT: {
candidate.put(DEFAULT_CREATION_TIME,
IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema);
Pair.of(IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema, gmce.getTableSchema()));
break;
}
case NONE: {
Expand Down Expand Up @@ -780,6 +782,21 @@ private StructLike getIcebergPartitionVal(Collection<HiveSpec> specs, String fil
return partitionVal;
}

/**
* We will firstly try to use datasetOffsetRange to get the topic name, as the pattern for datasetOffsetRange key should be ({topicName}-{partitionNumber})
* In case there is no datasetOffsetRange, we fall back to the table property that we set previously for "topic.name"
* @return kafka topic name for this table
*/
protected String getTopicName(TableIdentifier tid, TableMetadata tableMetadata) {
if (tableMetadata.dataOffsetRange.isPresent()) {
String topicPartitionString = tableMetadata.dataOffsetRange.get().keySet().iterator().next();
//In case the topic name is not the table name or the topic name contains '-'
return topicPartitionString.substring(0, topicPartitionString.lastIndexOf('-'));
}
return tableMetadata.newProperties.or(
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties()))).get(TOPIC_NAME_KEY);
}

/**
* For flush of each table, we do the following logic:
* 1. Commit the appendFiles if it exist
Expand All @@ -801,12 +818,14 @@ public void flush(String dbName, String tableName) throws IOException {
Transaction transaction = tableMetadata.transaction.get();
Map<String, String> props = tableMetadata.newProperties.or(
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
String topic = props.get(TOPIC_NAME_KEY);
//Set data offset range
setDatasetOffsetRange(tableMetadata, props);
String topicName = getTopicName(tid, tableMetadata);
if (tableMetadata.appendFiles.isPresent()) {
tableMetadata.appendFiles.get().commit();
sendAuditCounts(topic, tableMetadata.serializedAuditCountMaps);
sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
if (tableMetadata.completenessEnabled) {
checkAndUpdateCompletenessWatermark(tableMetadata, topic, tableMetadata.datePartitions, props);
checkAndUpdateCompletenessWatermark(tableMetadata, topicName, tableMetadata.datePartitions, props);
}
}
if (tableMetadata.deleteFiles.isPresent()) {
Expand All @@ -817,15 +836,15 @@ public void flush(String dbName, String tableName) throws IOException {
if(!tableMetadata.appendFiles.isPresent() && !tableMetadata.deleteFiles.isPresent()
&& tableMetadata.completenessEnabled) {
if (tableMetadata.completionWatermark > DEFAULT_COMPLETION_WATERMARK) {
log.info(String.format("Checking kafka audit for %s on change_property ", topic));
log.info(String.format("Checking kafka audit for %s on change_property ", topicName));
SortedSet<ZonedDateTime> timestamps = new TreeSet<>();
ZonedDateTime prevWatermarkDT =
Instant.ofEpochMilli(tableMetadata.completionWatermark).atZone(ZoneId.of(this.timeZone));
timestamps.add(TimeIterator.inc(prevWatermarkDT, TimeIterator.Granularity.valueOf(this.auditCheckGranularity), 1));
checkAndUpdateCompletenessWatermark(tableMetadata, topic, timestamps, props);
checkAndUpdateCompletenessWatermark(tableMetadata, topicName, timestamps, props);
} else {
log.info(String.format("Need valid watermark, current watermark is %s, Not checking kafka audit for %s",
tableMetadata.completionWatermark, topic));
tableMetadata.completionWatermark, topicName));
}
}

Expand All @@ -842,14 +861,6 @@ public void flush(String dbName, String tableName) throws IOException {
props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, Integer.toString(
conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
//Set data offset range
boolean containOffsetRange = setDatasetOffsetRange(tableMetadata, props);
String topicName = tableName;
if (containOffsetRange) {
String topicPartitionString = tableMetadata.dataOffsetRange.get().keySet().iterator().next();
//In case the topic name is not the table name or the topic name contains '-'
topicName = topicPartitionString.substring(0, topicPartitionString.lastIndexOf('-'));
}
//Update schema(commit)
updateSchema(tableMetadata, props, topicName);
//Update properties
Expand Down Expand Up @@ -882,7 +893,7 @@ public void flush(String dbName, String tableName) throws IOException {

@Override
public void reset(String dbName, String tableName) throws IOException {
this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
}

/**
Expand Down Expand Up @@ -952,7 +963,7 @@ private long computeCompletenessWatermark(String catalogDbTableName, String topi
long timestampMillis = timestampDT.toInstant().toEpochMilli();
ZonedDateTime auditCountCheckLowerBoundDT = TimeIterator.dec(timestampDT, granularity, 1);
if (auditCountVerifier.get().isComplete(topicName,
auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), timestampMillis)) {
completionWatermark = timestampMillis;
// Also persist the watermark into State object to share this with other MetadataWriters
// we enforce ourselves to always use lower-cased table name here
Expand Down Expand Up @@ -1026,7 +1037,8 @@ private void updateSchema(TableMetadata tableMetadata, Map<String, String> props
Cache candidates = tableMetadata.candidateSchemas.get();
//Only have default schema, so either we calculate schema from event or the schema does not have creation time, directly update it
if (candidates.size() == 1 && candidates.getIfPresent(DEFAULT_CREATION_TIME) != null) {
updateSchemaHelper(DEFAULT_CREATION_TIME, (Schema) candidates.getIfPresent(DEFAULT_CREATION_TIME), props,
updateSchemaHelper(DEFAULT_CREATION_TIME,
(Pair<Schema, String>) candidates.getIfPresent(DEFAULT_CREATION_TIME), props,
tableMetadata.table.get());
} else {
//update schema if candidates contains the schema that has the same creation time with the latest schema
Expand All @@ -1037,7 +1049,7 @@ private void updateSchema(TableMetadata tableMetadata, Map<String, String> props
log.warn(
"Schema from schema registry does not contain creation time, check config for schema registry class");
} else if (candidates.getIfPresent(creationTime) != null) {
updateSchemaHelper(creationTime, (Schema) candidates.getIfPresent(creationTime), props,
updateSchemaHelper(creationTime, (Pair<Schema, String>) candidates.getIfPresent(creationTime), props,
tableMetadata.table.get());
}
}
Expand All @@ -1047,10 +1059,11 @@ private void updateSchema(TableMetadata tableMetadata, Map<String, String> props
}
}

private void updateSchemaHelper(String schemaCreationTime, Schema schema, Map<String, String> props, Table table) {
private void updateSchemaHelper(String schemaCreationTime, Pair<Schema, String> schema, Map<String, String> props, Table table) {
try {
table.updateSchema().unionByNameWith(schema).commit();
table.updateSchema().unionByNameWith(schema.getLeft()).commit();
props.put(SCHEMA_CREATION_TIME_KEY, schemaCreationTime);
props.put(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), schema.getRight());
} catch (Exception e) {
log.error("Cannot update schema to " + schema.toString() + "for table " + table.location(), e);
}
Expand Down Expand Up @@ -1122,7 +1135,7 @@ public void close() throws IOException {
*
* Also note the difference with {@link org.apache.iceberg.TableMetadata}.
*/
private class TableMetadata {
public class TableMetadata {
Optional<Table> table = Optional.absent();

/**
Expand All @@ -1133,10 +1146,10 @@ private class TableMetadata {
Optional<Transaction> transaction = Optional.absent();
private Optional<AppendFiles> appendFiles = Optional.absent();
private Optional<DeleteFiles> deleteFiles = Optional.absent();
public Optional<Map<String, String>> newProperties = Optional.absent();

Optional<Map<String, String>> lastProperties = Optional.absent();
Optional<Map<String, String>> newProperties = Optional.absent();
Optional<Cache<String, Schema>> candidateSchemas = Optional.absent();
Optional<Cache<String, Pair<Schema, String>>> candidateSchemas = Optional.absent();
Optional<Map<String, List<Range>>> dataOffsetRange = Optional.absent();
Optional<String> lastSchemaVersion = Optional.absent();
Optional<Long> lowWatermark = Optional.absent();
Expand Down
Loading

0 comments on commit eda83a2

Please sign in to comment.