-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
destination: add implementation for mysql as destination #3242
Conversation
/test connector=destination-mysql
|
@@ -79,7 +79,7 @@ | |||
|
|||
private static final long GRACEFUL_SHUTDOWN_MINUTES = 5L; | |||
private static final int MIN_RECORDS = 500; | |||
private static final int BATCH_SIZE = 10000; | |||
private static final int BATCH_SIZE = 500_000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since both MySQL and Postgres are now using bulk load mechanism, the batch size should be increased. I dont have any solid reasoning behind 500K but based on few stack overflows and other articles like https://www.citusdata.com/blog/2017/11/08/faster-bulk-loading-in-postgresql-with-copy/ it seems like bulk loads are super efficient for around 1M records and perhaps we can increase 500K to 1M as well after observing how this performs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two problems with this change:
- We usually aren't hitting the 10k batch limit consistently. For this to have the intended effect we probably need to increase
MIN_RECORDS
as well. Otherwise we would likely continue writing 1-5k records anyways. - We need this record size to be more byte-size based than row based. Some use cases may have large rows. Even in a case where you have 500k 2mb rows we'd run into serious disk space issues on some installations.
I do agree that we need to increase this batch size and for most common cases 500k-1M makes perfect sense; it's just more work than changing this value unfortunately.
Also, just fyi, after making a change to BufferedStreamConsumer
we would need to bump and deploy a version for all downstream deestinations (meilisearch and all jdbc-based destinations).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Subodh, let's leave this at 10000 for now.
@jrhizor any idea if this work is already planned/noted or should we create an issue to tackle making this byte based?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverted it to 10K
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally looks good.
We'll also need to add a page to the docs
for this destination.
String tmpTableName = Names.concatQuotedNames("_airbyte_" + now.toEpochMilli() + "_", tableName); | ||
|
||
//This is for MySQL destination, the table names cant have more than 64 characters. | ||
if (tmpTableName.length() > 64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs collision handling in case the 0-31 and 32-63 are the same for different tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually @ChristopheDuong mentioned that this is a more general problem. Beyond the scope of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap. Discussed this with Subodh. Will create a follow up ticket to track this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, a related issue on Postgres is here:
https://github.com/airbytehq/airbyte/issues/2948
We'd probably need to handle these table name truncations and collisions in a common class, maybe in StandardNameTransformer
as each destination defines some character limits anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, Subodh, can you leave a TODO here and reference the issue?
e.g.
TODO (#2948): Refactor into StandardNameTransformed
or something of the sort so we remember when we get to this.
@@ -50,6 +50,18 @@ default void execute(String sql) throws SQLException { | |||
execute(connection -> connection.createStatement().execute(sql)); | |||
} | |||
|
|||
default void executeWithinTransaction(List<String> queries) throws SQLException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this cleans things up a lot 👍
implementation project(':airbyte-integrations:connectors:destination-jdbc') | ||
|
||
// https://mvnrepository.com/artifact/mysql/mysql-connector-java | ||
implementation group: 'mysql', name: 'mysql-connector-java', version: '8.0.22' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
implementation group: 'mysql', name: 'mysql-connector-java', version: '8.0.22' | |
implementation 'mysql:mysql-connector-java:8.0.22' |
We generally try to use this format
implementation project(':airbyte-protocol:models') | ||
implementation project(':airbyte-integrations:connectors:destination-jdbc') | ||
|
||
// https://mvnrepository.com/artifact/mysql/mysql-connector-java |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not really necessary to include this since all of our public mvn dependencies can be looked up the same way
|
||
RUN tar xf ${APPLICATION}.tar --strip-components=1 | ||
|
||
LABEL io.airbyte.version=0.3.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LABEL io.airbyte.version=0.3.0 | |
LABEL io.airbyte.version=0.1.0 |
try (final JdbcDatabase database = getDatabase(config)) { | ||
String outputSchema = namingResolver.getIdentifier(config.get("database").asText()); | ||
attemptSQLCreateAndDropTableOperations(outputSchema, database, namingResolver, sqlOperations); | ||
boolean localFileEnabled = ((MySQLSqlOperations) sqlOperations) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this check go inside tryEnableLocalFile
? That would DRY up this and insertRecords
boolean localFileEnabled = ((MySQLSqlOperations) sqlOperations) | ||
.checkIfLocalFileIsEnabled(database); | ||
if (!localFileEnabled) { | ||
((MySQLSqlOperations) sqlOperations).tryEnableLocalFile(database); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could re-use a single casted version instead of casting each usage
"$schema": "http://json-schema.org/draft-07/schema#", | ||
"title": "MySQL Destination Spec", | ||
"type": "object", | ||
"required": ["host", "port", "username", "database", "password"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"required": ["host", "port", "username", "database", "password"], | |
"required": ["host", "port", "username", "database"], |
I don't think password is actually required? You already have checks to see if it's set before using it elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jared, when is this true? When a DB doesn't set a password?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. We do this for other dbs as well. Mostly allows users to test if they have a local db they didn't add a password for.
try (Statement statement = connection.createStatement()) { | ||
statement.execute("set global local_infile=true"); | ||
} catch (Exception e) { | ||
throw new RuntimeException("local_infile attribute could not be enabled", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like if people are using an Airbyte-specific user it's unlikely it will have permissions to do this.
We should provide as much context as possible in the RuntimeException
as possible so it's clear what they need to do.
Something like:
The DB user X was unable to set the local_infile attribute on the MySQL server. As an admin user, you will need to run "SET GLOBAL local_infile = true" before syncing data with Airbyte.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll also need to detail this requirement in the docs.
@@ -79,7 +79,7 @@ | |||
|
|||
private static final long GRACEFUL_SHUTDOWN_MINUTES = 5L; | |||
private static final int MIN_RECORDS = 500; | |||
private static final int BATCH_SIZE = 10000; | |||
private static final int BATCH_SIZE = 500_000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two problems with this change:
- We usually aren't hitting the 10k batch limit consistently. For this to have the intended effect we probably need to increase
MIN_RECORDS
as well. Otherwise we would likely continue writing 1-5k records anyways. - We need this record size to be more byte-size based than row based. Some use cases may have large rows. Even in a case where you have 500k 2mb rows we'd run into serious disk space issues on some installations.
I do agree that we need to increase this batch size and for most common cases 500k-1M makes perfect sense; it's just more work than changing this value unfortunately.
Also, just fyi, after making a change to BufferedStreamConsumer
we would need to bump and deploy a version for all downstream deestinations (meilisearch and all jdbc-based destinations).
@@ -0,0 +1,128 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the path of this file is incorrect
/test connector=destination-mysql
|
/test connector=destination-mysql
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quick work! Generally makes sense!
Jared's comments aside, some minor comments around readability and small code tweaks.
One thing came to mind: since we are modifying the JdbcBufferedConsumerFactory
, let's also run the integration test for Postgres, Redshift and Snowflake. All these destinations use this class under the hood so it's good to double check.
private final String driverClass; | ||
private final NamingConventionTransformer namingResolver; | ||
private final SqlOperations sqlOperations; | ||
protected final String driverClass; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: either use getter methods or the @VisibleForTesting. I prefer the getter methods, but either one works.
String tmpTableName = Names.concatQuotedNames("_airbyte_" + now.toEpochMilli() + "_", tableName); | ||
|
||
//This is for MySQL destination, the table names cant have more than 64 characters. | ||
if (tmpTableName.length() > 64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap. Discussed this with Subodh. Will create a follow up ticket to track this.
...s/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperations.java
Show resolved
Hide resolved
@@ -157,12 +157,4 @@ public static JsonNode fieldsToJsonSchema(List<Field> fields) { | |||
return allFieldNames; | |||
} | |||
|
|||
public static Set<String> getStreamNames(ConfiguredAirbyteCatalog catalog) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops sorry this is me when I formatted the files. I was going to get rid of it in a follow up PR so up to you to leave this here or remove it.
it's not used any where in our code today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its fine. Lets keep it
final String tmpTableName = Names.concatQuotedNames("_airbyte_" + now.toEpochMilli() + "_", tableName); | ||
String tmpTableName = Names.concatQuotedNames("_airbyte_" + now.toEpochMilli() + "_", tableName); | ||
|
||
// This is for MySQL destination, the table names cant have more than 64 characters. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// This is for MySQL destination, the table names cant have more than 64 characters. | |
// This is for MySQL destination, the table names can't have more than 64 characters. |
@Override | ||
public AirbyteConnectionStatus check(JsonNode config) { | ||
try (final JdbcDatabase database = getDatabase(config)) { | ||
String outputSchema = namingResolver.getIdentifier(config.get("database").asText()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MySQL doesn't have the schema concept. There are only databases.
"$schema": "http://json-schema.org/draft-07/schema#", | ||
"title": "MySQL Destination Spec", | ||
"type": "object", | ||
"required": ["host", "port", "username", "database", "password"], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jared, when is this true? When a DB doesn't set a password?
...nation-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLSqlOperations.java
Show resolved
Hide resolved
private final SqlOperations sqlOperations; | ||
protected final String driverClass; | ||
protected final NamingConventionTransformer namingResolver; | ||
protected final SqlOperations sqlOperations; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think sqlOperations
needs to be protected
?
/test connector=destination-mysql
|
/test connector=destination-postgres
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Let's make sure the additional tests pass first before merging. I'd also want to let @jrhizor sign off since he started the original review.
/test connector=destination-redshift
|
/test connector=destination-snowflake
|
/test connector=destination-redshift
|
/test connector=destination-mysql
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
Either in this PR before merging or in a separate PR, you'll still need to:
- add docs
- add mysql to
destination_definitions.yaml
and run./gradlew generateSeed
to make this connector available in the UI by default.
@jrhizor we are going to create a new PR for documentation and other changes |
What
Issue : #1483
How
The PR introduces module for mysql-destination. This mainly tackles loading data into the mysql tables without making too many changes to the existing architecture/code. We are using LOAD DATA INFILE mechanism to load data into mysql. This is the recommended way of doing it. Ref : https://dev.mysql.com/doc/refman/8.0/en/load-data.html
This implementation is not compatible with normalization. The MySQL integration tests pass locally.
Pre-merge Checklist
Recommended reading order
test.java
component.ts