Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

destination: add implementation for mysql as destination #3242

Merged
merged 8 commits into from
May 7, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions airbyte-db/src/main/java/io/airbyte/db/Databases.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration;
import io.airbyte.db.jdbc.StreamingJdbcDatabase;
import java.util.Optional;
import org.apache.commons.dbcp2.BasicDataSource;
import org.jooq.SQLDialect;

Expand Down Expand Up @@ -60,6 +61,17 @@ public static JdbcDatabase createJdbcDatabase(final String username,
return new DefaultJdbcDatabase(connectionPool);
}

public static JdbcDatabase createJdbcDatabase(final String username,
final String password,
final String jdbcConnectionString,
final String driverClassName,
final String connectionProperties) {
final BasicDataSource connectionPool =
createBasicDataSource(username, password, jdbcConnectionString, driverClassName, Optional.of(connectionProperties));

return new DefaultJdbcDatabase(connectionPool);
}

public static JdbcDatabase createStreamingJdbcDatabase(final String username,
final String password,
final String jdbcConnectionString,
Expand All @@ -75,11 +87,21 @@ private static BasicDataSource createBasicDataSource(final String username,
final String password,
final String jdbcConnectionString,
final String driverClassName) {
return createBasicDataSource(username, password, jdbcConnectionString, driverClassName,
Optional.empty());
}

private static BasicDataSource createBasicDataSource(final String username,
final String password,
final String jdbcConnectionString,
final String driverClassName,
final Optional<String> connectionProperties) {
final BasicDataSource connectionPool = new BasicDataSource();
connectionPool.setDriverClassName(driverClassName);
connectionPool.setUsername(username);
connectionPool.setPassword(password);
connectionPool.setUrl(jdbcConnectionString);
connectionProperties.ifPresent(connectionPool::setConnectionProperties);
return connectionPool;
}

Expand Down
11 changes: 11 additions & 0 deletions airbyte-db/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,17 @@ default void execute(String sql) throws SQLException {
execute(connection -> connection.createStatement().execute(sql));
}

default void executeWithinTransaction(List<String> queries) throws SQLException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this cleans things up a lot 👍

execute(connection -> {
connection.setAutoCommit(false);
for (String s : queries) {
connection.createStatement().execute(s);
}
connection.commit();
connection.setAutoCommit(true);
});
}

/**
* Use a connection to create a {@link ResultSet} and map it into a list. The entire
* {@link ResultSet} will be buffered in memory before the list is returned. The caller does not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ public abstract class AbstractJdbcDestination extends BaseConnector implements D
private final NamingConventionTransformer namingResolver;
private final SqlOperations sqlOperations;

protected String getDriverClass() {
return driverClass;
}

protected NamingConventionTransformer getNamingResolver() {
return namingResolver;
}

protected SqlOperations getSqlOperations() {
return sqlOperations;
}

public AbstractJdbcDestination(final String driverClass,
final NamingConventionTransformer namingResolver,
final SqlOperations sqlOperations) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void insertRecords(JdbcDatabase database, List<AirbyteRecordMessage> reco
});
}

private void writeBatchToFile(File tmpFile, List<AirbyteRecordMessage> records) throws Exception {
protected void writeBatchToFile(File tmpFile, List<AirbyteRecordMessage> records) throws Exception {
PrintWriter writer = null;
try {
writer = new PrintWriter(tmpFile, StandardCharsets.UTF_8);
Expand All @@ -124,7 +124,6 @@ private void writeBatchToFile(File tmpFile, List<AirbyteRecordMessage> records)
writer.close();
}
}

}

@Override
Expand All @@ -138,8 +137,14 @@ public String copyTableQuery(String schemaName, String srcTableName, String dstT
}

@Override
public void executeTransaction(JdbcDatabase database, String queries) throws Exception {
database.execute("BEGIN;\n" + queries + "COMMIT;");
public void executeTransaction(JdbcDatabase database, List<String> queries) throws Exception {
final StringBuilder appendedQueries = new StringBuilder();
appendedQueries.append("BEGIN;\n");
for (String query : queries) {
appendedQueries.append(query);
}
appendedQueries.append("COMMIT;");
database.execute(appendedQueries.toString());
}

@Override
Expand All @@ -151,6 +156,11 @@ private String dropTableIfExistsQuery(String schemaName, String tableName) {
return String.format("DROP TABLE IF EXISTS %s.%s;\n", schemaName, tableName);
}

@Override
public boolean isSchemaRequired() {
return true;
}

@Override
public boolean isValidData(String data) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
Expand All @@ -66,7 +67,7 @@ public static AirbyteMessageConsumer create(JdbcDatabase database,
NamingConventionTransformer namingResolver,
JsonNode config,
ConfiguredAirbyteCatalog catalog) {
final List<WriteConfig> writeConfigs = createWriteConfigs(namingResolver, config, catalog);
final List<WriteConfig> writeConfigs = createWriteConfigs(namingResolver, config, catalog, sqlOperations.isSchemaRequired());

return new BufferedStreamConsumer(
onStartFunction(database, sqlOperations, writeConfigs),
Expand All @@ -77,25 +78,41 @@ public static AirbyteMessageConsumer create(JdbcDatabase database,
sqlOperations::isValidData);
}

private static List<WriteConfig> createWriteConfigs(NamingConventionTransformer namingResolver, JsonNode config, ConfiguredAirbyteCatalog catalog) {
Preconditions.checkState(config.has("schema"), "jdbc destinations must specify a schema.");
private static List<WriteConfig> createWriteConfigs(NamingConventionTransformer namingResolver,
JsonNode config,
ConfiguredAirbyteCatalog catalog,
boolean schemaRequired) {
if (schemaRequired) {
Preconditions.checkState(config.has("schema"), "jdbc destinations must specify a schema.");
}
final Instant now = Instant.now();
return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config, now)).collect(Collectors.toList());
return catalog.getStreams().stream().map(toWriteConfig(namingResolver, config, now, schemaRequired)).collect(Collectors.toList());
}

private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(NamingConventionTransformer namingResolver,
private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(
NamingConventionTransformer namingResolver,
JsonNode config,
Instant now) {
Instant now,
boolean schemaRequired) {
return stream -> {
Preconditions.checkNotNull(stream.getDestinationSyncMode(), "Undefined destination sync mode");
final AirbyteStream abStream = stream.getStream();

final String defaultSchemaName = namingResolver.getIdentifier(config.get("schema").asText());
final String defaultSchemaName = schemaRequired ? namingResolver.getIdentifier(config.get("schema").asText())
: namingResolver.getIdentifier(config.get("database").asText());
final String outputSchema = getOutputSchema(abStream, defaultSchemaName);

final String streamName = abStream.getName();
final String tableName = Names.concatQuotedNames("_airbyte_raw_", namingResolver.getIdentifier(streamName));
final String tmpTableName = Names.concatQuotedNames("_airbyte_" + now.toEpochMilli() + "_", tableName);
String tmpTableName = Names.concatQuotedNames("_airbyte_" + now.toEpochMilli() + "_", tableName);

// This is for MySQL destination, the table names can't have more than 64 characters.
if (tmpTableName.length() > 64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs collision handling in case the 0-31 and 32-63 are the same for different tables.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually @ChristopheDuong mentioned that this is a more general problem. Beyond the scope of this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap. Discussed this with Subodh. Will create a follow up ticket to track this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, a related issue on Postgres is here:
https://github.com/airbytehq/airbyte/issues/2948

We'd probably need to handle these table name truncations and collisions in a common class, maybe in StandardNameTransformer as each destination defines some character limits anyway

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, Subodh, can you leave a TODO here and reference the issue?

e.g.

TODO (#2948): Refactor into StandardNameTransformed 

or something of the sort so we remember when we get to this.

String prefix = tmpTableName.substring(0, 31); // 31
String suffix = tmpTableName.substring(32, 63); // 31
tmpTableName = prefix + "__" + suffix;
}

final DestinationSyncMode syncMode = stream.getDestinationSyncMode();

return new WriteConfig(streamName, abStream.getNamespace(), outputSchema, tmpTableName, tableName, syncMode);
Expand Down Expand Up @@ -155,7 +172,7 @@ private static OnCloseFunction onCloseFunction(JdbcDatabase database, SqlOperati
return (hasFailed) -> {
// copy data
if (!hasFailed) {
final StringBuilder queries = new StringBuilder();
List<String> queryList = new ArrayList<>();
LOGGER.info("Finalizing tables in destination started for {} streams", writeConfigs.size());
for (WriteConfig writeConfig : writeConfigs) {
final String schemaName = writeConfig.getOutputSchemaName();
Expand All @@ -166,16 +183,16 @@ private static OnCloseFunction onCloseFunction(JdbcDatabase database, SqlOperati

sqlOperations.createTableIfNotExists(database, schemaName, dstTableName);
switch (writeConfig.getSyncMode()) {
case OVERWRITE -> queries.append(sqlOperations.truncateTableQuery(schemaName, dstTableName));
case OVERWRITE -> queryList.add(sqlOperations.truncateTableQuery(schemaName, dstTableName));
case APPEND -> {}
case APPEND_DEDUP -> {}
default -> throw new IllegalStateException("Unrecognized sync mode: " + writeConfig.getSyncMode());
}
queries.append(sqlOperations.copyTableQuery(schemaName, srcTableName, dstTableName));
queryList.add(sqlOperations.copyTableQuery(schemaName, srcTableName, dstTableName));
}

LOGGER.info("Executing finalization of tables.");
sqlOperations.executeTransaction(database, queries.toString());
sqlOperations.executeTransaction(database, queryList);
LOGGER.info("Finalizing tables in destination completed.");
}
// clean up
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,18 @@ public interface SqlOperations {
* @param queries queries to execute
* @throws Exception exception
*/
void executeTransaction(JdbcDatabase database, String queries) throws Exception;
void executeTransaction(JdbcDatabase database, List<String> queries) throws Exception;

/**
* Check if the data record is valid and ok to be written to destination
*/
boolean isValidData(final String data);

/**
* Denotes whether the destination has the concept of schema or not
*
* @return true if the destination supports schema (ex: Postgres), false if it doesn't(MySQL)
*/
boolean isSchemaRequired();
davinchia marked this conversation as resolved.
Show resolved Hide resolved

}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void close(boolean hasFailed) throws Exception {
public void closeAsOneTransaction(List<StreamCopier> streamCopiers, boolean hasFailed, JdbcDatabase db) throws Exception {
Exception firstException = null;
try {
StringBuilder mergeCopiersToFinalTableQuery = new StringBuilder();
List<String> queries = new ArrayList<>();
for (var copier : streamCopiers) {
try {
copier.closeStagingUploader(hasFailed);
Expand All @@ -149,7 +149,7 @@ public void closeAsOneTransaction(List<StreamCopier> streamCopiers, boolean hasF
copier.copyStagingFileToTemporaryTable();
var destTableName = copier.createDestinationTable();
var mergeQuery = copier.generateMergeStatement(destTableName);
mergeCopiersToFinalTableQuery.append(mergeQuery);
queries.add(mergeQuery);
}
} catch (Exception e) {
final String message = String.format("Failed to finalize copy to temp table due to: %s", e);
Expand All @@ -161,7 +161,7 @@ public void closeAsOneTransaction(List<StreamCopier> streamCopiers, boolean hasF
}
}
if (!hasFailed) {
sqlOperations.executeTransaction(db, mergeCopiersToFinalTableQuery.toString());
sqlOperations.executeTransaction(db, queries);
}
} finally {
for (var copier : streamCopiers) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*
!Dockerfile
!build
12 changes: 12 additions & 0 deletions airbyte-integrations/connectors/destination-mysql/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM airbyte/integration-base-java:dev

WORKDIR /airbyte

ENV APPLICATION destination-mysql

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/destination-mysql
26 changes: 26 additions & 0 deletions airbyte-integrations/connectors/destination-mysql/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}

application {
mainClass = 'io.airbyte.integrations.destination.mysql.MySQLDestination'
}

dependencies {
implementation project(':airbyte-db')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-integrations:connectors:destination-jdbc')

implementation 'mysql:mysql-connector-java:8.0.22'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-mysql')
integrationTestJavaImplementation "org.testcontainers:mysql:1.15.1"

implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs)
}

Loading