Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for NOT NULL in DDL statements #418

Merged
merged 6 commits into from
Mar 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
<version>5.1.47</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -878,7 +878,7 @@
<dependency>
<groupId>io.airlift</groupId>
<artifactId>testing-mysql-server</artifactId>
<version>5.7.22-1</version>
<version>8.0.12-2</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import static io.prestosql.spi.type.Varchars.isVarcharType;
import static java.lang.String.format;
import static java.lang.String.join;
import static java.sql.DatabaseMetaData.columnNoNulls;
import static java.util.Collections.nCopies;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -209,7 +210,8 @@ public List<JdbcColumnHandle> getColumns(ConnectorSession session, JdbcTableHand
// skip unsupported column types
if (columnMapping.isPresent()) {
String columnName = resultSet.getString("COLUMN_NAME");
columns.add(new JdbcColumnHandle(columnName, typeHandle, columnMapping.get().getType()));
boolean nullable = (resultSet.getInt("NULLABLE") != columnNoNulls);
columns.add(new JdbcColumnHandle(columnName, typeHandle, columnMapping.get().getType(), nullable));
}
}
if (columns.isEmpty()) {
Expand Down Expand Up @@ -274,6 +276,17 @@ public PreparedStatement buildSql(ConnectorSession session, Connection connectio
split.getAdditionalPredicate());
}

@Override
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
try {
createTable(session, tableMetadata, tableMetadata.getTable().getTableName());
}
catch (SQLException e) {
throw new PrestoException(JDBC_ERROR, e);
}
}

@Override
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
Expand All @@ -287,6 +300,17 @@ public JdbcOutputTableHandle beginInsertTable(ConnectorSession session, Connecto
}

private JdbcOutputTableHandle beginWriteTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
try {
return createTable(session, tableMetadata, generateTemporaryTableName());
}
catch (SQLException e) {
throw new PrestoException(JDBC_ERROR, e);
}
}

protected JdbcOutputTableHandle createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, String tableName)
throws SQLException
{
SchemaTableName schemaTableName = tableMetadata.getTable();
String schema = schemaTableName.getSchemaName();
Expand All @@ -302,11 +326,10 @@ private JdbcOutputTableHandle beginWriteTable(ConnectorSession session, Connecto
if (uppercase) {
schema = schema.toUpperCase(ENGLISH);
table = table.toUpperCase(ENGLISH);
tableName = tableName.toUpperCase(ENGLISH);
}
String catalog = connection.getCatalog();

String temporaryName = generateTemporaryTableName();

ImmutableList.Builder<String> columnNames = ImmutableList.builder();
ImmutableList.Builder<Type> columnTypes = ImmutableList.builder();
ImmutableList.Builder<String> columnList = ImmutableList.builder();
Expand All @@ -318,12 +341,12 @@ private JdbcOutputTableHandle beginWriteTable(ConnectorSession session, Connecto
columnNames.add(columnName);
columnTypes.add(column.getType());
// TODO in INSERT case, we should reuse original column type and, ideally, constraints (then JdbcPageSink must get writer from toPrestoType())
columnList.add(format("%s %s", quoted(columnName), toWriteMapping(session, column.getType()).getDataType()));
columnList.add(getColumnSql(session, column, columnName));
}

String sql = format(
"CREATE TABLE %s (%s)",
quoted(catalog, schema, temporaryName),
quoted(catalog, schema, tableName),
join(", ", columnList.build()));
execute(connection, sql);

Expand All @@ -333,11 +356,20 @@ private JdbcOutputTableHandle beginWriteTable(ConnectorSession session, Connecto
table,
columnNames.build(),
columnTypes.build(),
temporaryName);
tableName);
}
catch (SQLException e) {
throw new PrestoException(JDBC_ERROR, e);
}

private String getColumnSql(ConnectorSession session, ColumnMetadata column, String columnName)
{
StringBuilder sb = new StringBuilder()
.append(quoted(columnName))
.append(" ")
.append(toWriteMapping(session, column.getType()).getDataType());
if (!column.isNullable()) {
sb.append(" NOT NULL");
}
return sb.toString();
}

protected String generateTemporaryTableName()
Expand All @@ -348,12 +380,33 @@ protected String generateTemporaryTableName()
@Override
public void commitCreateTable(JdbcIdentity identity, JdbcOutputTableHandle handle)
{
String sql = format(
"ALTER TABLE %s RENAME TO %s",
quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTemporaryTableName()),
quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTableName()));
renameTable(
identity,
handle.getCatalogName(),
handle.getSchemaName(),
handle.getTemporaryTableName(),
new SchemaTableName(handle.getSchemaName(), handle.getTableName()));
}

try (Connection connection = getConnection(identity, handle)) {
@Override
public void renameTable(JdbcIdentity identity, JdbcTableHandle handle, SchemaTableName newTableName)
{
renameTable(identity, handle.getCatalogName(), handle.getSchemaName(), handle.getTableName(), newTableName);
}

protected void renameTable(JdbcIdentity identity, String catalogName, String schemaName, String tableName, SchemaTableName newTable)
{
try (Connection connection = connectionFactory.openConnection(identity)) {
String newSchemaName = newTable.getSchemaName();
String newTableName = newTable.getTableName();
if (connection.getMetaData().storesUpperCaseIdentifiers()) {
newSchemaName = newSchemaName.toUpperCase(ENGLISH);
newTableName = newTableName.toUpperCase(ENGLISH);
}
String sql = format(
"ALTER TABLE %s RENAME TO %s",
quoted(catalogName, schemaName, tableName),
quoted(catalogName, newSchemaName, newTableName));
execute(connection, sql);
}
catch (SQLException e) {
Expand Down Expand Up @@ -384,6 +437,59 @@ public void finishInsertTable(JdbcIdentity identity, JdbcOutputTableHandle handl
}
}

@Override
public void addColumn(ConnectorSession session, JdbcTableHandle handle, ColumnMetadata column)
{
try (Connection connection = connectionFactory.openConnection(JdbcIdentity.from(session))) {
String columnName = column.getName();
if (connection.getMetaData().storesUpperCaseIdentifiers()) {
columnName = columnName.toUpperCase(ENGLISH);
}
String sql = format(
"ALTER TABLE %s ADD %s",
quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTableName()),
getColumnSql(session, column, columnName));
execute(connection, sql);
}
catch (SQLException e) {
throw new PrestoException(JDBC_ERROR, e);
}
}

@Override
public void renameColumn(JdbcIdentity identity, JdbcTableHandle handle, JdbcColumnHandle jdbcColumn, String newColumnName)
{
try (Connection connection = connectionFactory.openConnection(identity)) {
if (connection.getMetaData().storesUpperCaseIdentifiers()) {
newColumnName = newColumnName.toUpperCase(ENGLISH);
}
String sql = format(
"ALTER TABLE %s RENAME COLUMN %s TO %s",
quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTableName()),
jdbcColumn.getColumnName(),
newColumnName);
execute(connection, sql);
}
catch (SQLException e) {
throw new PrestoException(JDBC_ERROR, e);
}
}

@Override
public void dropColumn(JdbcIdentity identity, JdbcTableHandle handle, JdbcColumnHandle column)
{
try (Connection connection = connectionFactory.openConnection(identity)) {
String sql = format(
"ALTER TABLE %s DROP COLUMN %s",
quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTableName()),
column.getColumnName());
execute(connection, sql);
}
catch (SQLException e) {
throw new PrestoException(JDBC_ERROR, e);
}
}

@Override
public void dropTable(JdbcIdentity identity, JdbcTableHandle handle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.prestosql.plugin.jdbc;

import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorSplitSource;
import io.prestosql.spi.connector.ConnectorTableMetadata;
Expand Down Expand Up @@ -62,6 +63,16 @@ default void abortReadConnection(Connection connection)
PreparedStatement buildSql(ConnectorSession session, Connection connection, JdbcSplit split, List<JdbcColumnHandle> columnHandles)
throws SQLException;

void addColumn(ConnectorSession session, JdbcTableHandle handle, ColumnMetadata column);

void dropColumn(JdbcIdentity identity, JdbcTableHandle handle, JdbcColumnHandle column);

void renameColumn(JdbcIdentity identity, JdbcTableHandle handle, JdbcColumnHandle jdbcColumn, String newColumnName);

void renameTable(JdbcIdentity identity, JdbcTableHandle handle, SchemaTableName newTableName);

void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata);

JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata);

void commitCreateTable(JdbcIdentity identity, JdbcOutputTableHandle handle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Objects;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Collections.emptyMap;
import static java.util.Objects.requireNonNull;

public final class JdbcColumnHandle
Expand All @@ -30,16 +31,19 @@ public final class JdbcColumnHandle
private final String columnName;
private final JdbcTypeHandle jdbcTypeHandle;
private final Type columnType;
private final boolean nullable;

@JsonCreator
public JdbcColumnHandle(
@JsonProperty("columnName") String columnName,
@JsonProperty("jdbcTypeHandle") JdbcTypeHandle jdbcTypeHandle,
@JsonProperty("columnType") Type columnType)
@JsonProperty("columnType") Type columnType,
@JsonProperty("nullable") boolean nullable)
{
this.columnName = requireNonNull(columnName, "columnName is null");
this.jdbcTypeHandle = requireNonNull(jdbcTypeHandle, "jdbcTypeHandle is null");
this.columnType = requireNonNull(columnType, "columnType is null");
this.nullable = nullable;
}

@JsonProperty
Expand All @@ -60,9 +64,15 @@ public Type getColumnType()
return columnType;
}

@JsonProperty
public boolean isNullable()
{
return nullable;
}

public ColumnMetadata getColumnMetadata()
{
return new ColumnMetadata(columnName, columnType);
return new ColumnMetadata(columnName, columnType, nullable, null, null, false, emptyMap());
}

@Override
Expand Down Expand Up @@ -91,6 +101,7 @@ public String toString()
.add("columnName", columnName)
.add("jdbcTypeHandle", jdbcTypeHandle)
.add("columnType", columnType)
.add("nullable", nullable)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.airlift.log.Logger;
import io.prestosql.spi.connector.Connector;
import io.prestosql.spi.connector.ConnectorAccessControl;
import io.prestosql.spi.connector.ConnectorCapabilities;
import io.prestosql.spi.connector.ConnectorMetadata;
import io.prestosql.spi.connector.ConnectorPageSinkProvider;
import io.prestosql.spi.connector.ConnectorRecordSetProvider;
Expand All @@ -34,6 +35,8 @@
import java.util.concurrent.ConcurrentMap;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Sets.immutableEnumSet;
import static io.prestosql.spi.connector.ConnectorCapabilities.NOT_NULL_COLUMN_CONSTRAINT;
import static io.prestosql.spi.transaction.IsolationLevel.READ_COMMITTED;
import static io.prestosql.spi.transaction.IsolationLevel.checkConnectorSupports;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -149,4 +152,10 @@ public final void shutdown()
log.error(e, "Error shutting down connector");
}
}

@Override
public Set<ConnectorCapabilities> getCapabilities()
{
return immutableEnumSet(NOT_NULL_COLUMN_CONSTRAINT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
return handle;
}

@Override
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting)
{
jdbcClient.createTable(session, tableMetadata);
}

@Override
public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
Expand Down Expand Up @@ -208,6 +214,36 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
return Optional.empty();
}

@Override
public void addColumn(ConnectorSession session, ConnectorTableHandle table, ColumnMetadata columnMetadata)
{
JdbcTableHandle tableHandle = (JdbcTableHandle) table;
jdbcClient.addColumn(session, tableHandle, columnMetadata);
}

@Override
public void dropColumn(ConnectorSession session, ConnectorTableHandle table, ColumnHandle column)
{
JdbcTableHandle tableHandle = (JdbcTableHandle) table;
JdbcColumnHandle columnHandle = (JdbcColumnHandle) column;
jdbcClient.dropColumn(JdbcIdentity.from(session), tableHandle, columnHandle);
}

@Override
public void renameColumn(ConnectorSession session, ConnectorTableHandle table, ColumnHandle column, String target)
{
JdbcTableHandle tableHandle = (JdbcTableHandle) table;
JdbcColumnHandle columnHandle = (JdbcColumnHandle) column;
jdbcClient.renameColumn(JdbcIdentity.from(session), tableHandle, columnHandle, target);
}

@Override
public void renameTable(ConnectorSession session, ConnectorTableHandle table, SchemaTableName newTableName)
{
JdbcTableHandle tableHandle = (JdbcTableHandle) table;
jdbcClient.renameTable(JdbcIdentity.from(session), tableHandle, newTableName);
}

@Override
public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTableHandle tableHandle, Constraint<ColumnHandle> constraint)
{
Expand Down
Loading