Skip to content

Commit

Permalink
create configuration for tracking hard deletes with scd version (#124)
Browse files Browse the repository at this point in the history
Co-authored-by: jvalenzuela <jvalenzuela@indeed.com>
  • Loading branch information
jmriego and jvalenzuela authored Jun 28, 2022
1 parent 2842147 commit 9738d2c
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 2 deletions.
13 changes: 12 additions & 1 deletion target_bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def add_metadata_columns_to_schema(schema_message):
'format': 'date-time'}
extended_schema_message['schema']['properties']['_sdc_deleted_at'] = {'type': ['null', 'string'],
'format': 'date-time'}
extended_schema_message['schema']['properties']['_sdc_table_version'] = {'type': ['null', 'integer']}

return extended_schema_message

Expand Down Expand Up @@ -234,7 +235,17 @@ def persist_lines(config, lines) -> None:
flush_timestamp[stream] = datetime.utcnow()

elif t == 'ACTIVATE_VERSION':
LOGGER.debug('ACTIVATE_VERSION message')
stream = o['stream']
version = o['version']

if hard_delete_mapping.get(stream, default_hard_delete):
if stream in stream_to_sync:
LOGGER.debug('ACTIVATE_VERSION message, clearing records with versions other than {}'.format(version))
stream_to_sync[stream].activate_table_version(stream, version)
else:
LOGGER.warn('ACTIVATE_VERSION message, unknown stream {}'.format(stream))
else:
LOGGER.debug('ACTIVATE_VERSION message - ignoring due hard_delete not set')

elif t == 'STATE':
LOGGER.debug('Setting state to {}'.format(o['value']))
Expand Down
13 changes: 13 additions & 0 deletions target_bigquery/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,19 @@ def delete_rows(self, stream):
logger.info("Deleting rows from '{}' table... {}".format(table_id, query))
logger.info("DELETE {}".format(self.query(query).result().total_rows))

def activate_table_version(self, stream, version):
stream_schema_message = self.stream_schema_message
stream = stream_schema_message['stream']

table_ref = self.ref_helper.table_ref_from_stream(stream, is_temporary=False)
table_id = table_ref.table_id

query = "DELETE FROM {} WHERE _sdc_table_version != {}".format(
sql_utils.safe_table_ref(table_ref),
version)
logger.info("Removing rows from previous versions from '{}' table... {}".format(table_id, query))
logger.info("DELETE {}".format(self.query(query).result().total_rows))

def create_schema_if_not_exists(self):
schema_name = self.schema_name
temp_schema = self.connection_config.get('temp_schema', self.schema_name)
Expand Down
1 change: 1 addition & 0 deletions target_bigquery/stream_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def parse_datetime(dt):
extended_record['_sdc_extracted_at'] = parse_datetime(record_message.get('time_extracted', datetime.now()))
extended_record['_sdc_batched_at'] = datetime.now()
extended_record['_sdc_deleted_at'] = parse_datetime(record_message.get('record', {}).get('_sdc_deleted_at'))
extended_record['_sdc_table_version'] = record_message.get('version')

return extended_record

Expand Down
12 changes: 12 additions & 0 deletions tests/integration/resources/table_with_multiple_version.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_versions", "bookmarks": {"tap_mysql_test-test_table_versions": {"initial_full_table_complete": true}}}}
{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_versions", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_date": {"format": "date-time", "inclusion": "available", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["c_pk", "c_varchar"]}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_versions", "version": 3}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_versions", "record": {"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_date": "2019-02-12 02:00:00Z"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_versions", "record": {"c_pk": 3, "c_varchar": "2", "c_int": 3, "c_date": "2019-02-15 02:00:00Z"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"}
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_versions", "bookmarks": {"tap_mysql_test-test_table_versions": {"initial_full_table_complete": true}}}}
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_versions", "bookmarks": {"tap_mysql_test-test_table_versions": {"initial_full_table_complete": true}}}}
{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_versions", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_date": {"format": "date-time", "inclusion": "available", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["c_pk", "c_varchar"]}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_versions", "version": 3}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_versions", "version": 4}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_versions", "record": {"c_pk": 3, "c_varchar": "2", "c_int": 3, "c_date": "2019-02-15 02:00:00Z"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"}
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_versions", "bookmarks": {"tap_mysql_test-test_table_versions": {"initial_full_table_complete": true}}}}
20 changes: 20 additions & 0 deletions tests/integration/test_target_bigquery_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,26 @@ def test_loading_tables_with_hard_delete(self):
should_hard_deleted_rows=True
)

def test_loading_version_hard_delete(self):
"""Loading unicode encoded characters"""
tap_lines = test_utils.get_test_tap_lines('table_with_multiple_version.json')

# Turning on hard delete mode
self.config['hard_delete'] = True
self.persist_lines(tap_lines)

# Get loaded rows from tables
bigquery = DbSync(self.config)
target_schema = self.config.get('default_target_schema', '')
table = query(bigquery, "SELECT * FROM {}.test_table_versions ORDER BY c_pk".format(target_schema))

expected_table = [
{'c_pk': 3, 'c_int': 3, 'c_varchar': '2', 'c_date': datetime.datetime(2019, 2, 15, 2, 0, 0, tzinfo=timezone.utc)}
]

self.assertEqual(self.remove_metadata_columns_from_rows(table), expected_table)


def test_loading_with_multiple_schema(self):
"""Loading table with multiple SCHEMA messages"""
tap_lines = test_utils.get_test_tap_lines('messages-with-multi-schemas.json')
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
METADATA_COLUMNS = [
'_sdc_extracted_at',
'_sdc_batched_at',
'_sdc_deleted_at'
'_sdc_deleted_at',
'_sdc_table_version'
]


Expand Down

0 comments on commit 9738d2c

Please sign in to comment.