Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iceberg connector support for Materialized Views #4832

Merged
merged 5 commits into from
Nov 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ public class HiveMetadata
public static final String PRESTO_QUERY_ID_NAME = "presto_query_id";
public static final String BUCKETING_VERSION = "bucketing_version";
public static final String TABLE_COMMENT = "comment";
public static final String STORAGE_TABLE = "storage_table";
private static final String TRANSACTIONAL = "transactional";

private static final String ORC_BLOOM_FILTER_COLUMNS_KEY = "orc.bloom.filter.columns";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
import io.airlift.json.JsonCodec;
import io.airlift.json.JsonCodecFactory;
import io.airlift.json.ObjectMapperProvider;
import io.airlift.log.Logger;
import io.prestosql.plugin.base.CatalogName;
import io.prestosql.plugin.hive.authentication.HiveIdentity;
import io.prestosql.plugin.hive.metastore.CoralSemiTransactionalHiveMSCAdapter;
import io.prestosql.plugin.hive.metastore.SemiTransactionalHiveMetastore;
import io.prestosql.plugin.hive.metastore.Table;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ConnectorMaterializedViewDefinition;
import io.prestosql.spi.connector.ConnectorViewDefinition;
import io.prestosql.spi.connector.ConnectorViewDefinition.ViewColumn;
import io.prestosql.spi.type.TypeManager;
Expand Down Expand Up @@ -69,9 +71,15 @@ public static ViewReader createViewReader(SemiTransactionalHiveMetastore metasto
public static final String PRESTO_VIEW_FLAG = "presto_view";
static final String VIEW_PREFIX = "/* Presto View: ";
static final String VIEW_SUFFIX = " */";
private static final String MATERIALIZED_VIEW_PREFIX = "/* Presto Materialized View: ";
private static final String MATERIALIZED_VIEW_SUFFIX = " */";
private static final JsonCodec<ConnectorViewDefinition> VIEW_CODEC =
new JsonCodecFactory(new ObjectMapperProvider()).jsonCodec(ConnectorViewDefinition.class);

private static Logger log = Logger.get(io.prestosql.plugin.hive.ViewReaderUtil.class);
private static final JsonCodec<ConnectorMaterializedViewDefinition> MATERIALIZED_VIEW_CODEC =
new JsonCodecFactory(new ObjectMapperProvider()).jsonCodec(ConnectorMaterializedViewDefinition.class);

public static boolean isPrestoView(Table table)
{
return "true".equals(table.getParameters().get(PRESTO_VIEW_FLAG));
Expand All @@ -95,6 +103,13 @@ public static String encodeViewData(ConnectorViewDefinition definition)
return VIEW_PREFIX + data + VIEW_SUFFIX;
}

public static String encodeMaterializedViewData(ConnectorMaterializedViewDefinition definition)
anjalinorwood marked this conversation as resolved.
Show resolved Hide resolved
{
byte[] bytes = MATERIALIZED_VIEW_CODEC.toJsonBytes(definition);
String data = Base64.getEncoder().encodeToString(bytes);
return MATERIALIZED_VIEW_PREFIX + data + MATERIALIZED_VIEW_SUFFIX;
}

/**
* Supports decoding of Presto views
*/
Expand All @@ -111,6 +126,16 @@ public ConnectorViewDefinition decodeViewData(String viewData, Table table, Cata
byte[] bytes = Base64.getDecoder().decode(viewData);
return VIEW_CODEC.fromJson(bytes);
}

public static ConnectorMaterializedViewDefinition decodeMaterializedViewData(String data)
anjalinorwood marked this conversation as resolved.
Show resolved Hide resolved
{
checkCondition(data.startsWith(MATERIALIZED_VIEW_PREFIX), HIVE_INVALID_VIEW_DATA, "Materialized View data missing prefix: %s", data);
checkCondition(data.endsWith(MATERIALIZED_VIEW_SUFFIX), HIVE_INVALID_VIEW_DATA, "Materialized View data missing suffix: %s", data);
data = data.substring(MATERIALIZED_VIEW_PREFIX.length());
data = data.substring(0, data.length() - MATERIALIZED_VIEW_SUFFIX.length());
byte[] bytes = Base64.getDecoder().decode(data);
return MATERIALIZED_VIEW_CODEC.fromJson(bytes);
}
}

/**
Expand Down
6 changes: 6 additions & 0 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@
<scope>test</scope>
</dependency>

<dependency>
anjalinorwood marked this conversation as resolved.
Show resolved Hide resolved
<groupId>io.prestosql</groupId>
<artifactId>presto-parser</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.prestosql</groupId>
<artifactId>presto-testing</artifactId>
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions presto-main/src/main/java/io/prestosql/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,15 @@ public interface Metadata
/**
* Begin refresh materialized view query
*/
InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle);
InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle, List<TableHandle> sourceTableHandles);
anjalinorwood marked this conversation as resolved.
Show resolved Hide resolved

/**
* Finish refresh materialized view query
*/
Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
Session session,
InsertTableHandle tableHandle,
TableHandle tableHandle,
InsertTableHandle insertTableHandle,
Collection<Slice> fragments,
Collection<ComputedStatistics> computedStatistics,
List<TableHandle> sourceTableHandles);
Expand Down Expand Up @@ -570,5 +571,5 @@ default ResolvedFunction getCoercion(Type fromType, Type toType)
* Method to get difference between the states of table at two different points in time/or as of given token-ids.
* The method is used by the engine to determine if a materialized view is current with respect to the tables it depends on.
*/
MaterializedViewFreshness getMaterializedViewFreshness(Session session, TableHandle tableHandle);
MaterializedViewFreshness getMaterializedViewFreshness(Session session, QualifiedObjectName name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -844,31 +845,47 @@ public Optional<ConnectorOutputMetadata> finishInsert(Session session, InsertTab
}

@Override
public InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle)
public InsertTableHandle beginRefreshMaterializedView(Session session, TableHandle tableHandle, List<TableHandle> sourceTableHandles)
{
CatalogName catalogName = tableHandle.getCatalogName();
CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, catalogName);
ConnectorMetadata metadata = catalogMetadata.getMetadata();
ConnectorTransactionHandle transactionHandle = catalogMetadata.getTransactionHandleFor(catalogName);
ConnectorInsertTableHandle handle = metadata.beginRefreshMaterializedView(session.toConnectorSession(catalogName), tableHandle.getConnectorHandle());

List<ConnectorTableHandle> sourceConnectorHandles = sourceTableHandles.stream()
anjalinorwood marked this conversation as resolved.
Show resolved Hide resolved
.map(TableHandle::getConnectorHandle)
.collect(Collectors.toList());
anjalinorwood marked this conversation as resolved.
Show resolved Hide resolved
sourceConnectorHandles.add(tableHandle.getConnectorHandle());

if (sourceConnectorHandles.stream()
.map(Object::getClass)
.distinct()
.count() > 1) {
throw new PrestoException(NOT_SUPPORTED, "Cross connector materialized views are not supported");
}

ConnectorInsertTableHandle handle = metadata.beginRefreshMaterializedView(session.toConnectorSession(catalogName), tableHandle.getConnectorHandle(), sourceConnectorHandles);

return new InsertTableHandle(tableHandle.getCatalogName(), transactionHandle, handle);
}

@Override
public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
Session session,
InsertTableHandle tableHandle,
TableHandle tableHandle,
InsertTableHandle insertHandle,
Collection<Slice> fragments,
Collection<ComputedStatistics> computedStatistics,
List<TableHandle> sourceTableHandles)
{
CatalogName catalogName = tableHandle.getCatalogName();
CatalogName catalogName = insertHandle.getCatalogName();
ConnectorMetadata metadata = getMetadata(session, catalogName);
List<ConnectorTableHandle> sourceConnectorHandles = new ArrayList<>();
for (TableHandle handle : sourceTableHandles) {
sourceConnectorHandles.add(handle.getConnectorHandle());
}
return metadata.finishRefreshMaterializedView(session.toConnectorSession(catalogName), tableHandle.getConnectorHandle(), fragments, computedStatistics, sourceConnectorHandles);

List<ConnectorTableHandle> sourceConnectorHandles = sourceTableHandles.stream()
.map(TableHandle::getConnectorHandle)
.collect(toImmutableList());
return metadata.finishRefreshMaterializedView(session.toConnectorSession(catalogName), tableHandle.getConnectorHandle(), insertHandle.getConnectorHandle(),
fragments, computedStatistics, sourceConnectorHandles);
}

@Override
Expand Down Expand Up @@ -1145,12 +1162,18 @@ public Optional<ConnectorMaterializedViewDefinition> getMaterializedView(Session
}

@Override
public MaterializedViewFreshness getMaterializedViewFreshness(Session session, TableHandle tableHandle)
public MaterializedViewFreshness getMaterializedViewFreshness(Session session, QualifiedObjectName viewName)
{
CatalogName catalogName = tableHandle.getCatalogName();
CatalogMetadata catalogMetadata = getCatalogMetadata(session, catalogName);
ConnectorMetadata metadata = catalogMetadata.getMetadataFor(catalogName);
return metadata.getMaterializedViewFreshness(session.toConnectorSession(catalogName), tableHandle.getConnectorHandle());
Optional<CatalogMetadata> catalog = getOptionalCatalogMetadata(session, viewName.getCatalogName());
if (catalog.isPresent()) {
CatalogMetadata catalogMetadata = catalog.get();
CatalogName catalogName = catalogMetadata.getConnectorId(session, viewName);
ConnectorMetadata metadata = catalogMetadata.getMetadataFor(catalogName);

ConnectorSession connectorSession = session.toConnectorSession(catalogName);
return metadata.getMaterializedViewFreshness(connectorSession, viewName.asSchemaTableName());
}
return new MaterializedViewFreshness(false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private ConnectorPageSink createPageSink()
return pageSinkManager.createPageSink(session, ((InsertTarget) target).getHandle());
}
if (target instanceof TableWriterNode.RefreshMaterializedViewTarget) {
return pageSinkManager.createPageSink(session, ((TableWriterNode.RefreshMaterializedViewTarget) target).getHandle());
return pageSinkManager.createPageSink(session, ((TableWriterNode.RefreshMaterializedViewTarget) target).getInsertHandle());
}
throw new UnsupportedOperationException("Unhandled target type: " + target.getClass().getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1031,14 +1031,14 @@ public Optional<NewTableLayout> getNewTableLayout()
@Immutable
public static final class RefreshMaterializedViewAnalysis
{
private final TableHandle materializedViewHandle;
private final QualifiedObjectName materializedViewName;
private final TableHandle target;
private final Query query;
private final List<ColumnHandle> columns;

public RefreshMaterializedViewAnalysis(TableHandle materializedViewHandle, TableHandle target, Query query, List<ColumnHandle> columns)
public RefreshMaterializedViewAnalysis(QualifiedObjectName materializedViewName, TableHandle target, Query query, List<ColumnHandle> columns)
{
this.materializedViewHandle = requireNonNull(materializedViewHandle, "Materialized view handle is null");
this.materializedViewName = requireNonNull(materializedViewName, "Materialized view handle is null");
this.target = requireNonNull(target, "target is null");
this.query = query;
this.columns = requireNonNull(columns, "columns is null");
Expand All @@ -1060,9 +1060,9 @@ public TableHandle getTarget()
return target;
}

public TableHandle getMaterializedViewHandle()
public QualifiedObjectName getMaterializedViewName()
{
return materializedViewHandle;
return materializedViewName;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,7 @@ protected Scope visitRefreshMaterializedView(RefreshMaterializedView refreshMate
throw semanticException(TABLE_NOT_FOUND, refreshMaterializedView, "Table '%s' does not exist", targetTable);
}

Optional<TableHandle> materializedViewHandle = metadata.getTableHandle(session, name);
if (targetTableHandle.isPresent() && metadata.getMaterializedViewFreshness(session, materializedViewHandle.get()).isMaterializedViewFresh()) {
if (targetTableHandle.isPresent() && metadata.getMaterializedViewFreshness(session, name).isMaterializedViewFresh()) {
analysis.setSkipMaterializedViewRefresh(true);
}
else {
Expand All @@ -503,9 +502,8 @@ protected Scope visitRefreshMaterializedView(RefreshMaterializedView refreshMate
accessControl.checkCanInsertIntoTable(session.toSecurityContext(), targetTable);

Map<String, ColumnHandle> columnHandles = metadata.getColumnHandles(session, targetTableHandle.get());
checkState(materializedViewHandle.isPresent());
analysis.setRefreshMaterializedView(new Analysis.RefreshMaterializedViewAnalysis(
materializedViewHandle.get(),
name,
targetTableHandle.get(), query,
insertColumns.stream().map(columnHandles::get).collect(toImmutableList())));

Expand Down Expand Up @@ -942,6 +940,10 @@ protected Scope visitCreateMaterializedView(CreateMaterializedView node, Optiona
QualifiedObjectName viewName = createQualifiedObjectName(session, node, node.getName());
analysis.setUpdateType("CREATE MATERIALIZED VIEW", viewName);

if (node.isReplace() && node.isNotExists()) {
throw semanticException(NOT_SUPPORTED, node, "'CREATE OR REPLACE' and 'IF NOT EXISTS' clauses can not be used together");
}

// analyze the query that creates the view
StatementAnalyzer analyzer = new StatementAnalyzer(analysis, metadata, sqlParser, accessControl, session, warningCollector, CorrelationSupport.ALLOWED);

Expand Down Expand Up @@ -1172,37 +1174,29 @@ protected Scope visitTable(Table table, Optional<Scope> scope)

QualifiedObjectName name = createQualifiedObjectName(session, table, table.getName());
analysis.addEmptyColumnReferencesForTable(accessControl, session.getIdentity(), name);
Optional<TableHandle> tableHandle = metadata.getTableHandle(session, name);

// If this is a materialized view, get the name of the storage table
Optional<QualifiedName> storageName = getMaterializedViewStorageTableName(name);
Optional<TableHandle> storageHandle = Optional.empty();
if (storageName.isPresent()) {
storageHandle = metadata.getTableHandle(session, createQualifiedObjectName(session, table, storageName.get()));
}

// If materialized view is current, answer the query using the storage table
Identifier catalogName = new Identifier(name.getCatalogName());
Identifier schemaName = new Identifier(name.getSchemaName());
Identifier tableName = new Identifier(name.getObjectName());
QualifiedName materializedViewName = QualifiedName.of(ImmutableList.of(catalogName, schemaName, tableName));
Optional<TableHandle> materializedViewHandle = metadata.getTableHandle(session, createQualifiedObjectName(session, table, materializedViewName));
if (storageHandle.isPresent() && metadata.getMaterializedViewFreshness(session, materializedViewHandle.get()).isMaterializedViewFresh()) {
tableHandle = storageHandle;
}
else {
// This is a stale materialized view and should be expanded like a logical view
if (storageHandle.isPresent()) {
Optional<ConnectorMaterializedViewDefinition> optionalMaterializedView = metadata.getMaterializedView(session, name);
if (optionalMaterializedView.isPresent()) {
return createScopeForMaterializedView(table, name, scope, optionalMaterializedView.get());
Optional<TableHandle> tableHandle = Optional.empty();

Optional<ConnectorMaterializedViewDefinition> optionalMaterializedView = metadata.getMaterializedView(session, name);
if (optionalMaterializedView.isPresent()) {
if (metadata.getMaterializedViewFreshness(session, name).isMaterializedViewFresh()) {
// If materialized view is current, answer the query using the storage table
Optional<QualifiedName> storageName = getMaterializedViewStorageTableName(name);
if (storageName.isPresent()) {
tableHandle = metadata.getTableHandle(session, createQualifiedObjectName(session, table, storageName.get()));
}
}
// This is a reference to a logical view
else {
// This is a stale materialized view and should be expanded like a logical view
return createScopeForMaterializedView(table, name, scope, optionalMaterializedView.get());
}
}
else {
// This is could be a reference to a logical view or a table
Optional<ConnectorViewDefinition> optionalView = metadata.getView(session, name);
if (optionalView.isPresent()) {
return createScopeForView(table, name, scope, optionalView.get());
}
tableHandle = metadata.getTableHandle(session, name);
}

if (tableHandle.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3102,7 +3102,8 @@ else if (target instanceof InsertTarget) {
}
else if (target instanceof TableWriterNode.RefreshMaterializedViewTarget) {
TableWriterNode.RefreshMaterializedViewTarget refreshTarget = (TableWriterNode.RefreshMaterializedViewTarget) target;
return metadata.finishRefreshMaterializedView(session, refreshTarget.getHandle(), fragments, statistics, refreshTarget.getSourceTableHandles());
return metadata.finishRefreshMaterializedView(session, refreshTarget.getTableHandle(), refreshTarget.getInsertHandle(),
fragments, statistics, refreshTarget.getSourceTableHandles());
}
else if (target instanceof DeleteTarget) {
metadata.finishDelete(session, ((DeleteTarget) target).getHandle(), fragments);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ private RelationPlan createRefreshMaterializedViewPlan(Analysis analysis)
TableHandle tableHandle = viewAnalysis.getTarget();
Query query = viewAnalysis.getQuery();
Optional<NewTableLayout> newTableLayout = metadata.getInsertLayout(session, viewAnalysis.getTarget());
TableWriterNode.RefreshMaterializedViewReference writerTarget = new TableWriterNode.RefreshMaterializedViewReference(viewAnalysis.getMaterializedViewHandle(),
TableWriterNode.RefreshMaterializedViewReference writerTarget = new TableWriterNode.RefreshMaterializedViewReference(viewAnalysis.getMaterializedViewName(),
tableHandle, new ArrayList<>(analysis.getTables()));
return getInsertPlan(analysis, query, tableHandle, viewAnalysis.getColumns(), newTableLayout, true, writerTarget);
}
Expand Down
Loading