Skip to content

Commit

Permalink
[INLONG-9240][Manager] Optimize iceberg connector options
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Nov 8, 2023
1 parent b1c0beb commit 959dd89
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ public class IcebergSink extends StreamSink {
@ApiModelProperty("Primary key")
private String primaryKey;

@ApiModelProperty("Upsert mode")
private Boolean upsert;

public IcebergSink() {
this.setSinkType(SinkType.ICEBERG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,7 @@ public class IcebergSinkRequest extends SinkRequest {
@ApiModelProperty("Primary key")
private String primaryKey;

@ApiModelProperty("Upsert mode")
private Boolean upsert;

}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> con
icebergSink.getPrimaryKey(),
catalogType,
icebergSink.getCatalogUri(),
icebergSink.getWarehouse());
icebergSink.getWarehouse(),
icebergSink.getUpsert());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class IcebergConstant {
public static final String STREAMING = "streaming";
public static final String STARTING_STRATEGY_KEY = "starting-strategy";

public static final String UPSERT_KEY = "upsert";

/**
* Iceberg supported catalog type
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

@JsonTypeName("icebergLoad")
Expand Down Expand Up @@ -73,6 +74,9 @@ public class IcebergLoadNode extends LoadNode implements InlongMetric, Metadata,
@JsonProperty("warehouse")
private String warehouse;

@JsonProperty("upsert")
private Boolean upsert;

@JsonCreator
public IcebergLoadNode(@JsonProperty("id") String id,
@JsonProperty("name") String name,
Expand All @@ -87,14 +91,16 @@ public IcebergLoadNode(@JsonProperty("id") String id,
@JsonProperty("primaryKey") String primaryKey,
@JsonProperty("catalogType") IcebergConstant.CatalogType catalogType,
@JsonProperty("uri") String uri,
@JsonProperty("warehouse") String warehouse) {
@JsonProperty("warehouse") String warehouse,
@JsonProperty("upsert") Boolean upsert) {
super(id, name, fields, fieldRelations, filters, filterStrategy, sinkParallelism, properties);
this.tableName = Preconditions.checkNotNull(tableName, "table name is null");
this.dbName = Preconditions.checkNotNull(dbName, "db name is null");
this.primaryKey = primaryKey;
this.catalogType = catalogType == null ? CatalogType.HIVE : catalogType;
this.uri = uri;
this.warehouse = warehouse;
this.upsert = Optional.ofNullable(upsert).orElse(false);
}

@Override
Expand All @@ -108,11 +114,12 @@ public Map<String, String> tableOptions() {
options.put(IcebergConstant.DEFAULT_DATABASE_KEY, dbName);
options.put(IcebergConstant.CATALOG_TYPE_KEY, catalogType.name());
options.put(IcebergConstant.CATALOG_NAME_KEY, catalogType.name());
options.put(IcebergConstant.UPSERT_KEY, upsert.toString());
if (null != uri) {
options.put("uri", uri);
options.put(IcebergConstant.URI_KEY, uri);
}
if (null != warehouse) {
options.put("warehouse", warehouse);
options.put(IcebergConstant.WAREHOUSE_KEY, warehouse);
}
return options;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public IcebergLoadNode getTestObject() {
"id",
CatalogType.HIVE,
"thrift://localhost:9083",
"hdfs://localhost:9000/user/iceberg/warehouse");
"hdfs://localhost:9000/user/iceberg/warehouse",
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ private IcebergLoadNode buildIcebergLoadNodeWithHadoopCatalog() {
null,
CatalogType.HADOOP,
null,
"hdfs://localhost:9000/iceberg/warehouse");
"hdfs://localhost:9000/iceberg/warehouse",
null);
}

private IcebergLoadNode buildIcebergLoadNodeWithHiveCatalog() {
Expand Down Expand Up @@ -139,7 +140,8 @@ private IcebergLoadNode buildIcebergLoadNodeWithHiveCatalog() {
null,
CatalogType.HIVE,
"thrift://localhost:9083",
"/hive/warehouse");
"/hive/warehouse",
null);
}

/**
Expand Down

0 comments on commit 959dd89

Please sign in to comment.