Skip to content

Commit

Permalink
Add DDL support for JDBC connectors
Browse files Browse the repository at this point in the history
- create table, add column, and drop column for everything
- rename column and rename table for MySQL and PostgreSQL

Extracted-From: https://github.com/prestodb/presto
  • Loading branch information
tdcmeehan authored and electrum committed Mar 12, 2019
1 parent d47cf66 commit 30234cb
Show file tree
Hide file tree
Showing 10 changed files with 350 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,17 @@ public PreparedStatement buildSql(ConnectorSession session, Connection connectio
split.getAdditionalPredicate());
}

@Override
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
try {
createTable(session, tableMetadata, tableMetadata.getTable().getTableName());
}
catch (SQLException e) {
throw new PrestoException(JDBC_ERROR, e);
}
}

@Override
public JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
Expand All @@ -287,6 +298,17 @@ public JdbcOutputTableHandle beginInsertTable(ConnectorSession session, Connecto
}

private JdbcOutputTableHandle beginWriteTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
try {
return createTable(session, tableMetadata, generateTemporaryTableName());
}
catch (SQLException e) {
throw new PrestoException(JDBC_ERROR, e);
}
}

protected JdbcOutputTableHandle createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, String tableName)
throws SQLException
{
SchemaTableName schemaTableName = tableMetadata.getTable();
String schema = schemaTableName.getSchemaName();
Expand All @@ -302,11 +324,10 @@ private JdbcOutputTableHandle beginWriteTable(ConnectorSession session, Connecto
if (uppercase) {
schema = schema.toUpperCase(ENGLISH);
table = table.toUpperCase(ENGLISH);
tableName = tableName.toUpperCase(ENGLISH);
}
String catalog = connection.getCatalog();

String temporaryName = generateTemporaryTableName();

ImmutableList.Builder<String> columnNames = ImmutableList.builder();
ImmutableList.Builder<Type> columnTypes = ImmutableList.builder();
ImmutableList.Builder<String> columnList = ImmutableList.builder();
Expand All @@ -318,12 +339,12 @@ private JdbcOutputTableHandle beginWriteTable(ConnectorSession session, Connecto
columnNames.add(columnName);
columnTypes.add(column.getType());
// TODO in INSERT case, we should reuse original column type and, ideally, constraints (then JdbcPageSink must get writer from toPrestoType())
columnList.add(format("%s %s", quoted(columnName), toWriteMapping(session, column.getType()).getDataType()));
columnList.add(getColumnSql(session, column, columnName));
}

String sql = format(
"CREATE TABLE %s (%s)",
quoted(catalog, schema, temporaryName),
quoted(catalog, schema, tableName),
join(", ", columnList.build()));
execute(connection, sql);

Expand All @@ -333,13 +354,19 @@ private JdbcOutputTableHandle beginWriteTable(ConnectorSession session, Connecto
table,
columnNames.build(),
columnTypes.build(),
temporaryName);
}
catch (SQLException e) {
throw new PrestoException(JDBC_ERROR, e);
tableName);
}
}

private String getColumnSql(ConnectorSession session, ColumnMetadata column, String columnName)
{
StringBuilder sb = new StringBuilder()
.append(quoted(columnName))
.append(" ")
.append(toWriteMapping(session, column.getType()).getDataType());
return sb.toString();
}

protected String generateTemporaryTableName()
{
return "tmp_presto_" + UUID.randomUUID().toString().replace("-", "");
Expand All @@ -348,12 +375,33 @@ protected String generateTemporaryTableName()
@Override
public void commitCreateTable(JdbcIdentity identity, JdbcOutputTableHandle handle)
{
String sql = format(
"ALTER TABLE %s RENAME TO %s",
quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTemporaryTableName()),
quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTableName()));
renameTable(
identity,
handle.getCatalogName(),
handle.getSchemaName(),
handle.getTemporaryTableName(),
new SchemaTableName(handle.getSchemaName(), handle.getTableName()));
}

try (Connection connection = getConnection(identity, handle)) {
@Override
public void renameTable(JdbcIdentity identity, JdbcTableHandle handle, SchemaTableName newTableName)
{
renameTable(identity, handle.getCatalogName(), handle.getSchemaName(), handle.getTableName(), newTableName);
}

protected void renameTable(JdbcIdentity identity, String catalogName, String schemaName, String tableName, SchemaTableName newTable)
{
try (Connection connection = connectionFactory.openConnection(identity)) {
String newSchemaName = newTable.getSchemaName();
String newTableName = newTable.getTableName();
if (connection.getMetaData().storesUpperCaseIdentifiers()) {
newSchemaName = newSchemaName.toUpperCase(ENGLISH);
newTableName = newTableName.toUpperCase(ENGLISH);
}
String sql = format(
"ALTER TABLE %s RENAME TO %s",
quoted(catalogName, schemaName, tableName),
quoted(catalogName, newSchemaName, newTableName));
execute(connection, sql);
}
catch (SQLException e) {
Expand Down Expand Up @@ -384,6 +432,59 @@ public void finishInsertTable(JdbcIdentity identity, JdbcOutputTableHandle handl
}
}

@Override
public void addColumn(ConnectorSession session, JdbcTableHandle handle, ColumnMetadata column)
{
try (Connection connection = connectionFactory.openConnection(JdbcIdentity.from(session))) {
String columnName = column.getName();
if (connection.getMetaData().storesUpperCaseIdentifiers()) {
columnName = columnName.toUpperCase(ENGLISH);
}
String sql = format(
"ALTER TABLE %s ADD %s",
quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTableName()),
getColumnSql(session, column, columnName));
execute(connection, sql);
}
catch (SQLException e) {
throw new PrestoException(JDBC_ERROR, e);
}
}

@Override
public void renameColumn(JdbcIdentity identity, JdbcTableHandle handle, JdbcColumnHandle jdbcColumn, String newColumnName)
{
try (Connection connection = connectionFactory.openConnection(identity)) {
if (connection.getMetaData().storesUpperCaseIdentifiers()) {
newColumnName = newColumnName.toUpperCase(ENGLISH);
}
String sql = format(
"ALTER TABLE %s RENAME COLUMN %s TO %s",
quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTableName()),
jdbcColumn.getColumnName(),
newColumnName);
execute(connection, sql);
}
catch (SQLException e) {
throw new PrestoException(JDBC_ERROR, e);
}
}

@Override
public void dropColumn(JdbcIdentity identity, JdbcTableHandle handle, JdbcColumnHandle column)
{
try (Connection connection = connectionFactory.openConnection(identity)) {
String sql = format(
"ALTER TABLE %s DROP COLUMN %s",
quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTableName()),
column.getColumnName());
execute(connection, sql);
}
catch (SQLException e) {
throw new PrestoException(JDBC_ERROR, e);
}
}

@Override
public void dropTable(JdbcIdentity identity, JdbcTableHandle handle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.prestosql.plugin.jdbc;

import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorSplitSource;
import io.prestosql.spi.connector.ConnectorTableMetadata;
Expand Down Expand Up @@ -62,6 +63,16 @@ default void abortReadConnection(Connection connection)
PreparedStatement buildSql(ConnectorSession session, Connection connection, JdbcSplit split, List<JdbcColumnHandle> columnHandles)
throws SQLException;

void addColumn(ConnectorSession session, JdbcTableHandle handle, ColumnMetadata column);

void dropColumn(JdbcIdentity identity, JdbcTableHandle handle, JdbcColumnHandle column);

void renameColumn(JdbcIdentity identity, JdbcTableHandle handle, JdbcColumnHandle jdbcColumn, String newColumnName);

void renameTable(JdbcIdentity identity, JdbcTableHandle handle, SchemaTableName newTableName);

void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata);

JdbcOutputTableHandle beginCreateTable(ConnectorSession session, ConnectorTableMetadata tableMetadata);

void commitCreateTable(JdbcIdentity identity, JdbcOutputTableHandle handle);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
return handle;
}

@Override
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting)
{
jdbcClient.createTable(session, tableMetadata);
}

@Override
public Optional<ConnectorOutputMetadata> finishCreateTable(ConnectorSession session, ConnectorOutputTableHandle tableHandle, Collection<Slice> fragments, Collection<ComputedStatistics> computedStatistics)
{
Expand Down Expand Up @@ -208,6 +214,36 @@ public Optional<ConnectorOutputMetadata> finishInsert(ConnectorSession session,
return Optional.empty();
}

@Override
public void addColumn(ConnectorSession session, ConnectorTableHandle table, ColumnMetadata columnMetadata)
{
JdbcTableHandle tableHandle = (JdbcTableHandle) table;
jdbcClient.addColumn(session, tableHandle, columnMetadata);
}

@Override
public void dropColumn(ConnectorSession session, ConnectorTableHandle table, ColumnHandle column)
{
JdbcTableHandle tableHandle = (JdbcTableHandle) table;
JdbcColumnHandle columnHandle = (JdbcColumnHandle) column;
jdbcClient.dropColumn(JdbcIdentity.from(session), tableHandle, columnHandle);
}

@Override
public void renameColumn(ConnectorSession session, ConnectorTableHandle table, ColumnHandle column, String target)
{
JdbcTableHandle tableHandle = (JdbcTableHandle) table;
JdbcColumnHandle columnHandle = (JdbcColumnHandle) column;
jdbcClient.renameColumn(JdbcIdentity.from(session), tableHandle, columnHandle, target);
}

@Override
public void renameTable(ConnectorSession session, ConnectorTableHandle table, SchemaTableName newTableName)
{
JdbcTableHandle tableHandle = (JdbcTableHandle) table;
jdbcClient.renameTable(JdbcIdentity.from(session), tableHandle, newTableName);
}

@Override
public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTableHandle tableHandle, Constraint<ColumnHandle> constraint)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,38 @@ public void getColumnMetadata()
new ColumnMetadata("text", VARCHAR));
}

@Test(expectedExceptions = PrestoException.class)
public void testCreateTable()
@Test
public void testCreateAndAlterTable()
{
metadata.createTable(
SESSION,
new ConnectorTableMetadata(
new SchemaTableName("example", "foo"),
ImmutableList.of(new ColumnMetadata("text", VARCHAR))),
false);
SchemaTableName table = new SchemaTableName("example", "foo");
metadata.createTable(SESSION, new ConnectorTableMetadata(table, ImmutableList.of(new ColumnMetadata("text", VARCHAR))), false);

JdbcTableHandle handle = metadata.getTableHandle(SESSION, table);

ConnectorTableMetadata layout = metadata.getTableMetadata(SESSION, handle);
assertEquals(layout.getTable(), table);
assertEquals(layout.getColumns().size(), 1);
assertEquals(layout.getColumns().get(0), new ColumnMetadata("text", VARCHAR));

metadata.addColumn(SESSION, handle, new ColumnMetadata("x", VARCHAR));
layout = metadata.getTableMetadata(SESSION, handle);
assertEquals(layout.getColumns().size(), 2);
assertEquals(layout.getColumns().get(0), new ColumnMetadata("text", VARCHAR));
assertEquals(layout.getColumns().get(1), new ColumnMetadata("x", VARCHAR));

JdbcColumnHandle columnHandle = new JdbcColumnHandle("x", JDBC_VARCHAR, VARCHAR);
metadata.dropColumn(SESSION, handle, columnHandle);
layout = metadata.getTableMetadata(SESSION, handle);
assertEquals(layout.getColumns().size(), 1);
assertEquals(layout.getColumns().get(0), new ColumnMetadata("text", VARCHAR));

SchemaTableName newTableName = new SchemaTableName("example", "bar");
metadata.renameTable(SESSION, handle, newTableName);
handle = metadata.getTableHandle(SESSION, newTableName);
layout = metadata.getTableMetadata(SESSION, handle);
assertEquals(layout.getTable(), newTableName);
assertEquals(layout.getColumns().size(), 1);
assertEquals(layout.getColumns().get(0), new ColumnMetadata("text", VARCHAR));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
import io.prestosql.plugin.jdbc.BaseJdbcConfig;
import io.prestosql.plugin.jdbc.ConnectionFactory;
import io.prestosql.plugin.jdbc.DriverConnectionFactory;
import io.prestosql.plugin.jdbc.JdbcColumnHandle;
import io.prestosql.plugin.jdbc.JdbcIdentity;
import io.prestosql.plugin.jdbc.JdbcTableHandle;
import io.prestosql.plugin.jdbc.WriteMapping;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorTableMetadata;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.VarcharType;
Expand All @@ -40,18 +43,23 @@
import java.util.Set;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.mysql.jdbc.SQLError.SQL_STATE_ER_TABLE_EXISTS_ERROR;
import static com.mysql.jdbc.SQLError.SQL_STATE_SYNTAX_ERROR;
import static io.prestosql.plugin.jdbc.DriverConnectionFactory.basicConnectionProperties;
import static io.prestosql.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
import static io.prestosql.plugin.jdbc.StandardColumnMappings.realWriteFunction;
import static io.prestosql.plugin.jdbc.StandardColumnMappings.timestampWriteFunction;
import static io.prestosql.plugin.jdbc.StandardColumnMappings.varbinaryWriteFunction;
import static io.prestosql.plugin.jdbc.StandardColumnMappings.varcharWriteFunction;
import static io.prestosql.spi.StandardErrorCode.ALREADY_EXISTS;
import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.prestosql.spi.type.RealType.REAL;
import static io.prestosql.spi.type.TimeWithTimeZoneType.TIME_WITH_TIME_ZONE;
import static io.prestosql.spi.type.TimestampType.TIMESTAMP;
import static io.prestosql.spi.type.TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE;
import static io.prestosql.spi.type.VarbinaryType.VARBINARY;
import static io.prestosql.spi.type.Varchars.isVarcharType;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;

public class MySqlClient
Expand Down Expand Up @@ -187,4 +195,48 @@ else if (varcharType.getBoundedLength() <= 16777215) {

return super.toWriteMapping(session, type);
}

@Override
public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
try {
createTable(session, tableMetadata, tableMetadata.getTable().getTableName());
}
catch (SQLException e) {
boolean exists = SQL_STATE_ER_TABLE_EXISTS_ERROR.equals(e.getSQLState());
throw new PrestoException(exists ? ALREADY_EXISTS : JDBC_ERROR, e);
}
}

@Override
public void renameColumn(JdbcIdentity identity, JdbcTableHandle handle, JdbcColumnHandle jdbcColumn, String newColumnName)
{
try (Connection connection = connectionFactory.openConnection(identity)) {
DatabaseMetaData metadata = connection.getMetaData();
if (metadata.storesUpperCaseIdentifiers()) {
newColumnName = newColumnName.toUpperCase(ENGLISH);
}
String sql = format(
"ALTER TABLE %s RENAME COLUMN %s TO %s",
quoted(handle.getCatalogName(), handle.getSchemaName(), handle.getTableName()),
quoted(jdbcColumn.getColumnName()),
quoted(newColumnName));
execute(connection, sql);
}
catch (SQLException e) {
// MySQL versions earlier than 8 do not support the above RENAME COLUMN syntax
if (SQL_STATE_SYNTAX_ERROR.equals(e.getSQLState())) {
throw new PrestoException(NOT_SUPPORTED, format("Rename column not supported in catalog: '%s'", handle.getCatalogName()), e);
}
throw new PrestoException(JDBC_ERROR, e);
}
}

@Override
public void renameTable(JdbcIdentity identity, JdbcTableHandle handle, SchemaTableName newTableName)
{
// MySQL doesn't support specifying the catalog name in a rename. By setting the
// catalogName parameter to null, it will be omitted in the ALTER TABLE statement.
renameTable(identity, null, handle.getSchemaName(), handle.getTableName(), newTableName);
}
}
Loading

0 comments on commit 30234cb

Please sign in to comment.