Skip to content

Commit

Permalink
Publish new mssql source (#13176)
Browse files Browse the repository at this point in the history
* Remove logger

* Make the spec backward compatible

* Match new replication config first

* auto-bump connector version

* Fix expected spec json

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
tuliren and octavia-squidington-iii authored May 28, 2022
1 parent e65a97a commit 15d8e46
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@
- name: Microsoft SQL Server (MSSQL)
sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerRepository: airbyte/source-mssql
dockerImageTag: 0.3.22
dockerImageTag: 0.4.0
documentationUrl: https://docs.airbyte.io/integrations/sources/mssql
icon: mssql.svg
sourceType: database
Expand Down
66 changes: 59 additions & 7 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4736,7 +4736,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mssql:0.3.22"
- dockerImage: "airbyte/source-mssql:0.4.0"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/mssql"
connectionSpecification:
Expand All @@ -4748,7 +4748,7 @@
- "port"
- "database"
- "username"
additionalProperties: false
additionalProperties: true
properties:
host:
description: "The hostname of the database."
Expand Down Expand Up @@ -4841,19 +4841,71 @@
description: "Specifies the host name of the server. The value of\
\ this property must match the subject property of the certificate."
order: 7
replication_method:
type: "string"
replication:
type: "object"
title: "Replication Method"
description: "The replication method used for extracting data from the database.\
\ STANDARD replication requires no setup on the DB side but will not be\
\ able to represent deletions incrementally. CDC uses {TBC} to detect\
\ inserts, updates, and deletes. This needs to be configured on the source\
\ database itself."
default: "STANDARD"
enum:
- "STANDARD"
- "CDC"
additionalProperties: true
order: 8
oneOf:
- title: "Standard"
additionalProperties: false
description: "Standard replication requires no setup on the DB side but\
\ will not be able to represent deletions incrementally."
required:
- "replication_type"
properties:
replication_type:
type: "string"
const: "Standard"
enum:
- "Standard"
default: "Standard"
order: 0
- title: "Logical Replication (CDC)"
additionalProperties: false
description: "CDC uses {TBC} to detect inserts, updates, and deletes.\
\ This needs to be configured on the source database itself."
required:
- "replication_type"
properties:
replication_type:
type: "string"
const: "CDC"
enum:
- "CDC"
default: "CDC"
order: 0
data_to_sync:
title: "Data to Sync"
type: "string"
default: "Existing and New"
enum:
- "Existing and New"
- "New Changes Only"
description: "What data should be synced under the CDC. \"Existing\
\ and New\" will read existing data as a snapshot, and sync new\
\ changes through CDC. \"New Changes Only\" will skip the initial\
\ snapshot, and only sync new changes through CDC."
order: 1
snapshot_isolation:
title: "Initial Snapshot Isolation Level"
type: "string"
default: "Snapshot"
enum:
- "Snapshot"
- "Read Committed"
description: "Existing data in the database are synced through an\
\ initial snapshot. This parameter controls the isolation level\
\ that will be used during the initial snapshotting. If you choose\
\ the \"Snapshot\" level, you must enable the <a href=\"https://docs.microsoft.com/en-us/dotnet/framework/data/adonet/sql/snapshot-isolation-in-sql-server\"\
>snapshot isolation mode</a> on the database."
order: 2
tunnel_method:
type: "object"
title: "SSH Tunnel Method"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"title": "MSSQL Source Spec",
"type": "object",
"required": ["host", "port", "database", "username"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"host": {
"description": "The hostname of the database.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.debezium.annotation.VisibleForTesting;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MssqlCdcHelper {

Expand Down Expand Up @@ -90,28 +88,22 @@ public static DataToSync from(final String value) {

}

private static final Logger LOGGER = LoggerFactory.getLogger(MssqlCdcHelper.class);

@VisibleForTesting
static boolean isCdc(final JsonNode config) {
// legacy replication method config before version 0.4.0
if (config.hasNonNull(LEGACY_REPLICATION_FIELD)) {
return ReplicationMethod.valueOf(config.get(LEGACY_REPLICATION_FIELD).asText()) == ReplicationMethod.CDC;
}
// new replication method config since version 0.4.0
if (config.hasNonNull(REPLICATION_FIELD)) {
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
return ReplicationMethod.valueOf(replicationConfig.get(REPLICATION_TYPE_FIELD).asText()) == ReplicationMethod.CDC;
}
// legacy replication method config before version 0.4.0
if (config.hasNonNull(LEGACY_REPLICATION_FIELD)) {
return ReplicationMethod.valueOf(config.get(LEGACY_REPLICATION_FIELD).asText()) == ReplicationMethod.CDC;
}
return false;
}

@VisibleForTesting
static SnapshotIsolation getSnapshotIsolationConfig(final JsonNode config) {
// legacy replication method config before version 0.4.0
if (config.hasNonNull(LEGACY_REPLICATION_FIELD)) {
return SnapshotIsolation.SNAPSHOT;
}
// new replication method config since version 0.4.0
if (config.hasNonNull(REPLICATION_FIELD)) {
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
Expand All @@ -123,10 +115,6 @@ static SnapshotIsolation getSnapshotIsolationConfig(final JsonNode config) {

@VisibleForTesting
static DataToSync getDataToSyncConfig(final JsonNode config) {
// legacy replication method config before version 0.4.0
if (config.hasNonNull(LEGACY_REPLICATION_FIELD)) {
return DataToSync.EXISTING_AND_NEW;
}
// new replication method config since version 0.4.0
if (config.hasNonNull(REPLICATION_FIELD)) {
final JsonNode replicationConfig = config.get(REPLICATION_FIELD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"title": "MSSQL Source Spec",
"type": "object",
"required": ["host", "port", "database", "username"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"host": {
"description": "The hostname of the database.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ public void testIsCdc() {
"data_to_sync", "Existing and New",
"snapshot_isolation", "Snapshot"))));
assertTrue(MssqlCdcHelper.isCdc(newCdc));

// migration from legacy to new config
final JsonNode mixNonCdc = Jsons.jsonNode(Map.of(
"replication_method", "CDC",
"replication", Jsons.jsonNode(Map.of("replication_type", "STANDARD"))));
assertFalse(MssqlCdcHelper.isCdc(mixNonCdc));

final JsonNode mixCdc = Jsons.jsonNode(Map.of(
"replication_method", "Standard",
"replication", Jsons.jsonNode(Map.of(
"replication_type", "CDC",
"data_to_sync", "Existing and New",
"snapshot_isolation", "Snapshot"))));
assertTrue(MssqlCdcHelper.isCdc(mixCdc));
}

@Test
Expand All @@ -58,6 +72,23 @@ public void testGetSnapshotIsolation() {
"data_to_sync", "Existing and New",
"snapshot_isolation", "Snapshot"))));
assertEquals(SnapshotIsolation.SNAPSHOT, MssqlCdcHelper.getSnapshotIsolationConfig(newCdcSnapshot));

// migration from legacy to new config
final JsonNode mixCdcNonSnapshot = Jsons.jsonNode(Map.of(
"replication_method", "Standard",
"replication", Jsons.jsonNode(Map.of(
"replication_type", "CDC",
"data_to_sync", "Existing and New",
"snapshot_isolation", "Read Committed"))));
assertEquals(SnapshotIsolation.READ_COMMITTED, MssqlCdcHelper.getSnapshotIsolationConfig(mixCdcNonSnapshot));

final JsonNode mixCdcSnapshot = Jsons.jsonNode(Map.of(
"replication_method", "Standard",
"replication", Jsons.jsonNode(Map.of(
"replication_type", "CDC",
"data_to_sync", "Existing and New",
"snapshot_isolation", "Snapshot"))));
assertEquals(SnapshotIsolation.SNAPSHOT, MssqlCdcHelper.getSnapshotIsolationConfig(mixCdcSnapshot));
}

@Test
Expand All @@ -79,6 +110,23 @@ public void testGetDataToSyncConfig() {
"data_to_sync", "New Changes Only",
"snapshot_isolation", "Snapshot"))));
assertEquals(DataToSync.NEW_CHANGES_ONLY, MssqlCdcHelper.getDataToSyncConfig(newCdcNewOnly));

final JsonNode mixCdcExistingAndNew = Jsons.jsonNode(Map.of(
"replication_method", "Standard",
"replication", Jsons.jsonNode(Map.of(
"replication_type", "CDC",
"data_to_sync", "Existing and New",
"snapshot_isolation", "Read Committed"))));
assertEquals(DataToSync.EXISTING_AND_NEW, MssqlCdcHelper.getDataToSyncConfig(mixCdcExistingAndNew));

final JsonNode mixCdcNewOnly = Jsons.jsonNode(Map.of(
"replication_method", "Standard",
"replication",
Jsons.jsonNode(Map.of(
"replication_type", "CDC",
"data_to_sync", "New Changes Only",
"snapshot_isolation", "Snapshot"))));
assertEquals(DataToSync.NEW_CHANGES_ONLY, MssqlCdcHelper.getDataToSyncConfig(mixCdcNewOnly));
}

}

0 comments on commit 15d8e46

Please sign in to comment.