From cdc615fbe9025915cef5975181698e6f0ee8d766 Mon Sep 17 00:00:00 2001 From: Harsh <80324346+harshpatel4crest@users.noreply.github.com> Date: Wed, 1 Jun 2022 15:26:06 +0530 Subject: [PATCH 1/4] TDL-7301: Handle multiline `CRITICAL` error messages (#137) * updated code to add CRITICAL for multiline error messages * run unittest in CCi * updated config.yml file --- .circleci/config.yml | 23 +++++++++++++++ tap_salesforce/__init__.py | 3 +- .../test_multiline_critical_error_message.py | 29 +++++++++++++++++++ 3 files changed, 54 insertions(+), 1 deletion(-) create mode 100644 tests/unittests/test_multiline_critical_error_message.py diff --git a/.circleci/config.yml b/.circleci/config.yml index 52e47638..bed7da97 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -45,6 +45,23 @@ jobs: pip install pylint echo "pylint will skip the following: $PYLINT_DISABLE_LIST" pylint tap_salesforce -d "$PYLINT_DISABLE_LIST,stop-iteration-return" + run_unit_tests: + executor: docker-executor + steps: + - checkout + - attach_workspace: + at: /usr/local/share/virtualenvs + - run: + name: 'Run Unit Tests' + command: | + source /usr/local/share/virtualenvs/tap-salesforce/bin/activate + pip install nose coverage + nosetests --with-coverage --cover-erase --cover-package=tap_salesforce --cover-html-dir=htmlcov tests/unittests + coverage html + - store_test_results: + path: test_output/report.xml + - store_artifacts: + path: htmlcov run_integration_tests: executor: docker-executor parallelism: 8 @@ -83,6 +100,12 @@ workflows: - tier-1-tap-user requires: - ensure_env + - run_unit_tests: + context: + - circleci-user + - tier-1-tap-user + requires: + - ensure_env - run_integration_tests: context: - circleci-user diff --git a/tap_salesforce/__init__.py b/tap_salesforce/__init__.py index 8dae2651..07aba2c9 100644 --- a/tap_salesforce/__init__.py +++ b/tap_salesforce/__init__.py @@ -417,5 +417,6 @@ def main(): LOGGER.critical(e) sys.exit(1) except Exception as e: - LOGGER.critical(e) + for error_line in str(e).splitlines(): + LOGGER.critical(error_line) raise e diff --git a/tests/unittests/test_multiline_critical_error_message.py b/tests/unittests/test_multiline_critical_error_message.py new file mode 100644 index 00000000..b7fe47e5 --- /dev/null +++ b/tests/unittests/test_multiline_critical_error_message.py @@ -0,0 +1,29 @@ +import unittest +from unittest import mock +from tap_salesforce import main + +# mock "main_impl" and raise multiline error +def raise_error(): + raise Exception("""Error syncing Transaction__c: 400 Client Error: Bad Request for url: https://test.my.salesforce.com/services/async/41.0/job/7502K00000IcACtQAN/batch/test123j/result/test123b Response: + InvalidSessionId + Invalid session id + """) + +class TestMultiLineCriticalErrorMessage(unittest.TestCase): + """ + Test case to verify every line in the multiline error contains 'CRITICAL' + """ + + @mock.patch("tap_salesforce.LOGGER.critical") + @mock.patch("tap_salesforce.main_impl") + def test_multiline_critical_error_message(self, mocked_main_impl, mocked_logger_critical): + # mock "main_impl" and raise multiline error + mocked_main_impl.side_effect = raise_error + + # verify "Exception" is raise on function call + with self.assertRaises(Exception): + main() + + # verify "LOGGER.critical" is called 5 times, as the error raised contains 5 lines + self.assertEqual(mocked_logger_critical.call_count, 5) From 7ea4e5279ac7dc98cc01e608df67df25a6b89990 Mon Sep 17 00:00:00 2001 From: karanpanchal-crest Date: Wed, 1 Jun 2022 15:38:14 +0530 Subject: [PATCH 2/4] TDL-15349 Add Date Windowing (#140) * add date windowing * resolved pylint error * resolved pylint error * resolved pylint error * add unittest * added commnets for logic explanation * resolved pylint error * update config.yml * add remaining unittest * add change in bulk * resolved pylint error * update config.yml * added unittest for date window gaps * resolve unittest failure * resolved review comments * resolved pylint duplicate code error Co-authored-by: harshpatel4crest --- .circleci/config.yml | 4 +- tap_salesforce/salesforce/__init__.py | 11 ++ tap_salesforce/salesforce/bulk.py | 94 +++++++---- tap_salesforce/salesforce/rest.py | 8 +- .../test_salesforce_date_windowing.py | 158 ++++++++++++++++++ 5 files changed, 236 insertions(+), 39 deletions(-) create mode 100644 tests/unittests/test_salesforce_date_windowing.py diff --git a/.circleci/config.yml b/.circleci/config.yml index bed7da97..1ecc1aa7 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -44,7 +44,7 @@ jobs: source /usr/local/share/virtualenvs/tap-salesforce/bin/activate pip install pylint echo "pylint will skip the following: $PYLINT_DISABLE_LIST" - pylint tap_salesforce -d "$PYLINT_DISABLE_LIST,stop-iteration-return" + pylint tap_salesforce -d "$PYLINT_DISABLE_LIST,stop-iteration-return,logging-format-interpolation" run_unit_tests: executor: docker-executor steps: @@ -127,4 +127,4 @@ workflows: filters: branches: only: - - master + - master \ No newline at end of file diff --git a/tap_salesforce/salesforce/__init__.py b/tap_salesforce/salesforce/__init__.py index 0ac961ad..b366388a 100644 --- a/tap_salesforce/salesforce/__init__.py +++ b/tap_salesforce/salesforce/__init__.py @@ -447,3 +447,14 @@ def get_blacklisted_fields(self): raise TapSalesforceException( "api_type should be REST or BULK was: {}".format( self.api_type)) + + def get_window_end_date(self, start_date, end_date): + # to update end_date, substract 'half_day_range' (i.e. half of the days between start_date and end_date) + # when the 'half_day_range' is an odd number, we will round down to the nearest integer because of the '//' + half_day_range = (end_date - start_date) // 2 + + if half_day_range.days == 0: + raise TapSalesforceException( + "Attempting to query by 0 day range, this would cause infinite looping.") + + return end_date - half_day_range diff --git a/tap_salesforce/salesforce/bulk.py b/tap_salesforce/salesforce/bulk.py index e6596bd1..3b99073b 100644 --- a/tap_salesforce/salesforce/bulk.py +++ b/tap_salesforce/salesforce/bulk.py @@ -5,6 +5,7 @@ import time import tempfile import singer +import singer.utils as singer_utils from singer import metrics import requests from requests.exceptions import RequestException @@ -18,7 +19,7 @@ PK_CHUNKED_BATCH_STATUS_POLLING_SLEEP = 60 ITER_CHUNK_SIZE = 1024 DEFAULT_CHUNK_SIZE = 100000 # Max is 250000 - +MAX_RETRIES = 4 LOGGER = singer.get_logger() # pylint: disable=inconsistent-return-statements @@ -117,25 +118,28 @@ def _bulk_query(self, catalog_entry, state): if batch_status['state'] == 'Failed': if self._can_pk_chunk_job(batch_status['stateMessage']): - batch_status = self._bulk_query_with_pk_chunking(catalog_entry, start_date) - job_id = batch_status['job_id'] - - # Set pk_chunking to True to indicate that we should write a bookmark differently - self.sf.pk_chunking = True - - # Add the bulk Job ID and its batches to the state so it can be resumed if necessary - tap_stream_id = catalog_entry['tap_stream_id'] - state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id) - state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batch_status['completed'][:]) - - for completed_batch_id in batch_status['completed']: - for result in self.get_batch_results(job_id, completed_batch_id, catalog_entry): - yield result - # Remove the completed batch ID and write state - state['bookmarks'][catalog_entry['tap_stream_id']]["BatchIDs"].remove(completed_batch_id) - LOGGER.info("Finished syncing batch %s. Removing batch from state.", completed_batch_id) - LOGGER.info("Batches to go: %d", len(state['bookmarks'][catalog_entry['tap_stream_id']]["BatchIDs"])) - singer.write_state(state) + # Get list of batch_status with pk_chunking or date_windowing + status_list = self._bulk_with_window([], catalog_entry, start_date) + + for batch_status in status_list: + job_id = batch_status['job_id'] + + # Set pk_chunking to True to indicate that we should write a bookmark differently + self.sf.pk_chunking = True + + # Add the bulk Job ID and its batches to the state so it can be resumed if necessary + tap_stream_id = catalog_entry['tap_stream_id'] + state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id) + state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batch_status['completed'][:]) + + for completed_batch_id in batch_status['completed']: + for result in self.get_batch_results(job_id, completed_batch_id, catalog_entry): + yield result + # Remove the completed batch ID and write state + state['bookmarks'][catalog_entry['tap_stream_id']]["BatchIDs"].remove(completed_batch_id) + LOGGER.info("Finished syncing batch %s. Removed batch from state.", completed_batch_id) + LOGGER.info("Batches to go: %d", len(state['bookmarks'][catalog_entry['tap_stream_id']]["BatchIDs"])) + singer.write_state(state) else: raise TapSalesforceException(batch_status['stateMessage']) else: @@ -148,18 +152,11 @@ def _bulk_query_with_pk_chunking(self, catalog_entry, start_date): # Create a new job job_id = self._create_job(catalog_entry, True) - self._add_batch(catalog_entry, job_id, start_date, False) + self._add_batch(catalog_entry, job_id, start_date, order_by_clause=False) batch_status = self._poll_on_pk_chunked_batch_status(job_id) batch_status['job_id'] = job_id - if batch_status['failed']: - raise TapSalesforceException( - "One or more batches failed during PK chunked job. {} failed out of {} total batches. First 20 failed batches: {}".format( - len(batch_status['failed']), - len(batch_status['completed']) + len(batch_status['failed']), - list(batch_status['failed'].items())[:20])) - # Close the job after all the batches are complete self._close_job(job_id) @@ -194,11 +191,11 @@ def _create_job(self, catalog_entry, pk_chunking=False): return job['id'] - def _add_batch(self, catalog_entry, job_id, start_date, order_by_clause=True): + def _add_batch(self, catalog_entry, job_id, start_date, end_date=None, order_by_clause=True): endpoint = "job/{}/batch".format(job_id) url = self.bulk_url.format(self.sf.instance_url, endpoint) - body = self.sf._build_query_string(catalog_entry, start_date, order_by_clause=order_by_clause) + body = self.sf._build_query_string(catalog_entry, start_date, end_date, order_by_clause=order_by_clause) headers = self._get_bulk_headers() headers['Content-Type'] = 'text/csv' @@ -357,3 +354,40 @@ def _iter_lines(self, response): if pending is not None: yield pending + + def _bulk_with_window(self, status_list, catalog_entry, start_date_str, end_date=None, retries=MAX_RETRIES): + """Bulk api call with date windowing""" + sync_start = singer_utils.now() + if end_date is None: + end_date = sync_start + LOGGER.info("Retrying Bulk Query with PK Chunking") + else: + LOGGER.info("Retrying Bulk Query with window of date {} to {}".format(start_date_str, end_date.strftime('%Y-%m-%dT%H:%M:%SZ'))) + + if retries == 0: + raise TapSalesforceException("Ran out of retries attempting to query Salesforce Object {}".format(catalog_entry['stream'])) + + job_id = self._create_job(catalog_entry, True) + self._add_batch(catalog_entry, job_id, start_date_str, end_date.strftime('%Y-%m-%dT%H:%M:%SZ'), False) + batch_status = self._poll_on_pk_chunked_batch_status(job_id) + batch_status['job_id'] = job_id + # Close the job after all the batches are complete + self._close_job(job_id) + + if batch_status['failed']: + LOGGER.info("Failed Bulk Query with window of date {} to {}".format(start_date_str, end_date.strftime('%Y-%m-%dT%H:%M:%SZ'))) + # If batch_status is failed then reduce date window by half by updating end_date + end_date = self.sf.get_window_end_date(singer_utils.strptime_with_tz(start_date_str), end_date) + + return self._bulk_with_window(status_list, catalog_entry, start_date_str, end_date, retries - 1) + + else: + status_list.append(batch_status) + + # If the date range was chunked (an end_date was passed), sync + # from the end_date -> now + if end_date < sync_start: + next_start_date_str = end_date.strftime('%Y-%m-%dT%H:%M:%SZ') + return self._bulk_with_window(status_list, catalog_entry, next_start_date_str, retries=retries) + + return status_list diff --git a/tap_salesforce/salesforce/rest.py b/tap_salesforce/salesforce/rest.py index 92c120bf..408cd4d0 100644 --- a/tap_salesforce/salesforce/rest.py +++ b/tap_salesforce/salesforce/rest.py @@ -71,13 +71,7 @@ def _query_recur( raise ex if retryable: - start_date = singer_utils.strptime_with_tz(start_date_str) - half_day_range = (end_date - start_date) // 2 - end_date = end_date - half_day_range - - if half_day_range.days == 0: - raise TapSalesforceException( - "Attempting to query by 0 day range, this would cause infinite looping.") + end_date = self.sf.get_window_end_date(singer_utils.strptime_with_tz(start_date_str), end_date) query = self.sf._build_query_string(catalog_entry, singer_utils.strftime(start_date), singer_utils.strftime(end_date)) diff --git a/tests/unittests/test_salesforce_date_windowing.py b/tests/unittests/test_salesforce_date_windowing.py new file mode 100644 index 00000000..6f0e937b --- /dev/null +++ b/tests/unittests/test_salesforce_date_windowing.py @@ -0,0 +1,158 @@ +import datetime +import unittest +from unittest import mock +from tap_salesforce import Salesforce +from tap_salesforce.salesforce import Bulk +from dateutil import tz +import singer + +# function to return batch status as 'failed' to force date windowing +def mocked_batch_status(count): + if count < 2: + return {"failed": "test"} + else: + return {"failed": {}} + +@mock.patch('tap_salesforce.salesforce.Bulk._close_job') +@mock.patch('tap_salesforce.salesforce.Bulk._poll_on_pk_chunked_batch_status', side_effect = mocked_batch_status) +@mock.patch('tap_salesforce.salesforce.Bulk._create_job', side_effect=[1, 2, 3, 4, 5]) +@mock.patch('tap_salesforce.salesforce.bulk.singer_utils.now') +class TestBulkDateWindow(unittest.TestCase): + + # start date + start_date = '2019-02-04T12:15:00Z' + # 'Salesforce' object + sf = Salesforce( + default_start_date=start_date, + api_type="BULK") + # dummy catalog entry + catalog_entry = { + 'stream': 'User', + 'tap_stream_id': 'User', + 'schema': { + "properties": { + "Id": { + "type": "string" + }, + "SystemModstamp": { + "anyOf": [{ + "type": "string", + "format": "date-time" + }, + { + "type": [ + "string", + "null" + ] + } + ] + } + } + }, + 'metadata': [ + { + "breadcrumb": [], + "metadata": { + "selected": True, + "replication-key": "SystemModstamp", + "table-key-properties": [ + "Id" + ] + } + }, + { + "breadcrumb": [ + "properties", + "SystemModstamp" + ], + "metadata": { + "inclusion": "automatic" + } + }, + { + "breadcrumb": [ + "properties", + "Id" + ], + "metadata": { + "inclusion": "automatic" + } + } + ] + } + + # mocked now date + now_date_1 = datetime.datetime(2022, 5, 2, 12, 15, 00, tzinfo=tz.UTC) + now_date_1_str = now_date_1.strftime("%Y-%m-%dT%H:%M:%SZ") + + # mocked now date + now_date_2 = datetime.datetime(2019, 2, 5, 12, 15, 00, tzinfo=tz.UTC) + + @mock.patch('tap_salesforce.salesforce.Bulk._add_batch') + def test_bulk_date_windowing_with_max_retries_0(self, mocked_add_batch, mocked_singer_util_now, mocked_create_job, mocked_batch_status, mocked_close_job): + """ + To verify that if data is too large then date windowing mechanism execute, + but after retrying upto MAX_RETRIES still not get data then raise proper exception + """ + + mocked_singer_util_now.return_value = self.now_date_1 + + with self.assertRaises(Exception) as e: + Bulk(self.sf)._bulk_with_window([], self.catalog_entry, self.start_date, retries=0) + + self.assertEqual(str(e.exception), 'Ran out of retries attempting to query Salesforce Object User', "Not get expected Exception") + + @mock.patch('tap_salesforce.salesforce.Bulk._add_batch') + def test_bulk_date_windowing_with_half_day_range_0(self, mocked_add_batch, mocked_singer_util_now, mocked_create_job, mocked_batch_status, mocked_close_job): + """ + To verify that if data is too large then date windowing mechanism execute, + but after retrying window goes to 0 days, still not get data then raise proper exception + """ + + mocked_singer_util_now.return_value = self.now_date_2 + + with self.assertRaises(Exception) as e: + Bulk(self.sf)._bulk_with_window([], self.catalog_entry, self.start_date) + + self.assertEqual(str(e.exception), 'Attempting to query by 0 day range, this would cause infinite looping.', "Not get expected Exception") + + @mock.patch('xmltodict.parse') + @mock.patch('tap_salesforce.salesforce.Salesforce._make_request') + def test_bulk_date_window_gaps(self, mocked_make_request, mocked_xmltodict_parse, mocked_singer_util_now, mocked_create_job, mocked_batch_status, mocked_close_job): + """ + Test case to verify there are no gaps in the date window + """ + + # mock singer.now + mocked_singer_util_now.return_value = self.now_date_1 + # mock xmltodict.parse + mocked_xmltodict_parse.return_value = { + "batchInfo": { + "id": 1234 + } + } + + # function call with start date + Bulk(self.sf)._bulk_with_window([], self.catalog_entry, self.start_date) + + # collect 'body' (query) from '_make_request' function arguments + actual_queries = [kwargs.get("body") for args, kwargs in mocked_make_request.call_args_list] + + # calculate half window date for assertion + half_day = (self.now_date_1 - singer.utils.strptime_with_tz(self.start_date)) // 2 + half_window_date = (self.now_date_1 - half_day).strftime('%Y-%m-%dT%H:%M:%SZ') + + # create expected queries + expected_queries = [ + # failed call of whole date window ie. start date to now date + f'SELECT Id,SystemModstamp FROM User WHERE SystemModstamp >= {self.start_date} AND SystemModstamp < {self.now_date_1_str}', + # date window divided into half, query from start date to half window + f'SELECT Id,SystemModstamp FROM User WHERE SystemModstamp >= {self.start_date} AND SystemModstamp < {half_window_date}', + # query from half window to now date + f'SELECT Id,SystemModstamp FROM User WHERE SystemModstamp >= {half_window_date} AND SystemModstamp < {self.now_date_1_str}' + ] + + # verify we called '_make_request' 3 times + self.assertEqual(mocked_make_request.call_count, 3, "Function is not called expected times") + # verify the queries are called as expected + self.assertEqual(actual_queries, expected_queries) From 1cc8d0378dff0f600960b3ede336135537f9f12c Mon Sep 17 00:00:00 2001 From: karanpanchal-crest Date: Wed, 1 Jun 2022 15:43:59 +0530 Subject: [PATCH 3/4] TDL-16006: Fix interruptible full table bookmarking strategy for bulk API (#138) * Fix Full Table Bookmarking Stratergy for bulk Api * add detialed comments * update config.yml * remove unused pakage * updated unittest Co-authored-by: harshpatel4crest --- tap_salesforce/sync.py | 4 ++-- .../test_salesforce_null_bookmark.py | 24 +++++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) create mode 100644 tests/unittests/test_salesforce_null_bookmark.py diff --git a/tap_salesforce/sync.py b/tap_salesforce/sync.py index a9de4f14..65ee3024 100644 --- a/tap_salesforce/sync.py +++ b/tap_salesforce/sync.py @@ -173,8 +173,8 @@ def sync_records(sf, catalog_entry, state, counter): state = singer.write_bookmark( state, catalog_entry['tap_stream_id'], 'version', None) - # If pk_chunking is set, only write a bookmark at the end - if sf.pk_chunking: + # If pk_chunking is set, and selected streams has replication key then only write a bookmark at the end + if sf.pk_chunking and replication_key: # Write a bookmark with the highest value we've seen state = singer.write_bookmark( state, diff --git a/tests/unittests/test_salesforce_null_bookmark.py b/tests/unittests/test_salesforce_null_bookmark.py new file mode 100644 index 00000000..a477933a --- /dev/null +++ b/tests/unittests/test_salesforce_null_bookmark.py @@ -0,0 +1,24 @@ +import unittest +from unittest import mock +from tap_salesforce import Salesforce, metrics +from tap_salesforce.sync import sync_records +import json + +class TestNullBookmarkTesting(unittest.TestCase): + @mock.patch('tap_salesforce.salesforce.Salesforce.query', side_effect=lambda test1, test2: []) + def test_not_null_bookmark_for_incremental_stream(self, mocked_query): + """ + To ensure that after resolving write bookmark logic not get "Null" as replication key in state file, + When we have selected incremental stream as Full table stream + + """ + sf = Salesforce(default_start_date='2019-02-04T12:15:00Z', api_type="BULK") + + sf.pk_chunking = True + catalog_entry = {"stream": "OpportunityLineItem", "schema": {}, "metadata":[], "tap_stream_id": "OpportunityLineItem"} + state = {} + counter = metrics.record_counter('OpportunityLineItem') + sync_records(sf, catalog_entry, state, counter) + # write state function convert python dictionary to json string + state = json.dumps(state) + self.assertEqual(state, '{"bookmarks": {"OpportunityLineItem": {"version": null}}}', "Not get expected state value") \ No newline at end of file From 87035fa423a5ac19a43fac422e792a833369cc34 Mon Sep 17 00:00:00 2001 From: Harsh <80324346+harshpatel4crest@users.noreply.github.com> Date: Fri, 3 Jun 2022 10:17:08 +0530 Subject: [PATCH 4/4] TDL-16115: Add lookback window advanced config option for Incremental Syncs (#135) * added lookback window * resolved pylint and unittest error * resolved unittest error * updated lookback window code * updated lookback code to work for incremental syncs * added some comments * updated comments * added function comments * resolved review comments * addressed review comments * updated lookback window integration test and unittests * Update unit tests to match expected implementation (#141) We do not want to apply the lookback window to the start date * updated the code to use lookback if present * resolved CCi failure and updated README file * set Account as INCREMENTAL before sync * updated state date and fixed pylint error Co-authored-by: Andy Lu --- README.md | 3 + tap_salesforce/__init__.py | 7 +- tap_salesforce/salesforce/__init__.py | 24 +++-- tap_salesforce/salesforce/bulk.py | 3 +- tests/test_salesforce_lookback_window.py | 107 +++++++++++++++++++++++ tests/unittests/test_lookback_window.py | 97 ++++++++++++++++++++ 6 files changed, 232 insertions(+), 9 deletions(-) create mode 100644 tests/test_salesforce_lookback_window.py create mode 100644 tests/unittests/test_lookback_window.py diff --git a/README.md b/README.md index ef66e3fc..be556e11 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,7 @@ $ tap-salesforce --config config.json --properties properties.json --state state "start_date": "2017-11-02T00:00:00Z", "api_type": "BULK", "select_fields_by_default": true + "lookback_window": 10 } ``` @@ -40,6 +41,8 @@ The `start_date` is used by the tap as a bound on SOQL queries when searching fo The `api_type` is used to switch the behavior of the tap between using Salesforce's "REST" and "BULK" APIs. When new fields are discovered in Salesforce objects, the `select_fields_by_default` key describes whether or not the tap will select those fields by default. +The `lookback_window` (in seconds) subtracts the desired amount of seconds from the bookmark to sync past data. Recommended value: 10 seconds. + ## Run Discovery To run discovery mode, execute the tap with the config file. diff --git a/tap_salesforce/__init__.py b/tap_salesforce/__init__.py index 07aba2c9..a0fb6874 100644 --- a/tap_salesforce/__init__.py +++ b/tap_salesforce/__init__.py @@ -375,6 +375,10 @@ def main_impl(): sf = None try: + # get lookback window from config + lookback_window = CONFIG.get('lookback_window') + lookback_window = int(lookback_window) if lookback_window else None + sf = Salesforce( refresh_token=CONFIG['refresh_token'], sf_client_id=CONFIG['client_id'], @@ -384,7 +388,8 @@ def main_impl(): is_sandbox=CONFIG.get('is_sandbox'), select_fields_by_default=CONFIG.get('select_fields_by_default'), default_start_date=CONFIG.get('start_date'), - api_type=CONFIG.get('api_type')) + api_type=CONFIG.get('api_type'), + lookback_window=lookback_window) sf.login() if args.discover: diff --git a/tap_salesforce/salesforce/__init__.py b/tap_salesforce/salesforce/__init__.py index b366388a..493f1ba6 100644 --- a/tap_salesforce/salesforce/__init__.py +++ b/tap_salesforce/salesforce/__init__.py @@ -1,3 +1,4 @@ +import datetime import re import threading import time @@ -210,7 +211,8 @@ def __init__(self, is_sandbox=None, select_fields_by_default=None, default_start_date=None, - api_type=None): + api_type=None, + lookback_window=None): self.api_type = api_type.upper() if api_type else None self.refresh_token = refresh_token self.token = token @@ -235,9 +237,10 @@ def __init__(self, self.login_timer = None self.data_url = "{}/services/data/v52.0/{}" self.pk_chunking = False + self.lookback_window = lookback_window # validate start_date - singer_utils.strptime(default_start_date) + singer_utils.strptime_to_utc(default_start_date) def _get_standard_headers(self): return {"Authorization": "Bearer {}".format(self.access_token)} @@ -370,7 +373,6 @@ def describe(self, sobject=None): return resp.json() - # pylint: disable=no-self-use def _get_selected_properties(self, catalog_entry): mdata = metadata.to_map(catalog_entry['metadata']) properties = catalog_entry['schema'].get('properties', {}) @@ -382,12 +384,22 @@ def _get_selected_properties(self, catalog_entry): def get_start_date(self, state, catalog_entry): + """ + return start date if state is not provided + else return bookmark from the state by subtracting lookback if provided + """ catalog_metadata = metadata.to_map(catalog_entry['metadata']) replication_key = catalog_metadata.get((), {}).get('replication-key') - return (singer.get_bookmark(state, - catalog_entry['tap_stream_id'], - replication_key) or self.default_start_date) + # get bookmark value from the state + bookmark_value = singer.get_bookmark(state, catalog_entry['tap_stream_id'], replication_key) + sync_start_date = bookmark_value or self.default_start_date + + # if the state contains a bookmark, subtract the lookback window from the bookmark + if bookmark_value and self.lookback_window: + sync_start_date = singer_utils.strftime(singer_utils.strptime_with_tz(sync_start_date) - datetime.timedelta(seconds=self.lookback_window)) + + return sync_start_date def _build_query_string(self, catalog_entry, start_date, end_date=None, order_by_clause=True): selected_properties = self._get_selected_properties(catalog_entry) diff --git a/tap_salesforce/salesforce/bulk.py b/tap_salesforce/salesforce/bulk.py index 3b99073b..081db803 100644 --- a/tap_salesforce/salesforce/bulk.py +++ b/tap_salesforce/salesforce/bulk.py @@ -101,7 +101,7 @@ def _get_bulk_headers(self): return {"X-SFDC-Session": self.sf.access_token, "Content-Type": "application/json"} - def _can_pk_chunk_job(self, failure_message): # pylint: disable=no-self-use + def _can_pk_chunk_job(self, failure_message): return "QUERY_TIMEOUT" in failure_message or \ "Retried more than 15 times" in failure_message or \ "Failed to write query result" in failure_message @@ -331,7 +331,6 @@ def _close_job(self, job_id): headers=self._get_bulk_headers(), body=json.dumps(body)) - # pylint: disable=no-self-use def _iter_lines(self, response): """Clone of the iter_lines function from the requests library with the change to pass keepends=True in order to ensure that we do not strip the line breaks diff --git a/tests/test_salesforce_lookback_window.py b/tests/test_salesforce_lookback_window.py new file mode 100644 index 00000000..915a2b06 --- /dev/null +++ b/tests/test_salesforce_lookback_window.py @@ -0,0 +1,107 @@ +from datetime import datetime, timedelta +from tap_tester import connections, runner, menagerie +from base import SalesforceBaseTest + +class SalesforceLookbackWindow(SalesforceBaseTest): + + # subtract the desired amount of seconds form the date and return + def get_simulated_date(self, dtime, format, seconds=0): + date_stripped = datetime.strptime(dtime, format) + return_date = date_stripped - timedelta(seconds=seconds) + + return datetime.strftime(return_date, format) + + @staticmethod + def name(): + return 'tap_tester_salesforce_lookback_window' + + def get_properties(self): # pylint: disable=arguments-differ + return { + 'start_date' : '2021-11-10T00:00:00Z', + 'instance_url': 'https://singer2-dev-ed.my.salesforce.com', + 'select_fields_by_default': 'true', + 'api_type': self.salesforce_api, + 'is_sandbox': 'false', + 'lookback_window': 86400 + } + + def expected_sync_streams(self): + return { + 'Account' + } + + def run_test(self): + # create connection + conn_id = connections.ensure_connection(self) + # create state file + state = { + 'bookmarks':{ + 'Account': { + 'SystemModstamp': '2021-11-12T00:00:00.000000Z' + } + } + } + # set state file to run in sync mode + menagerie.set_state(conn_id, state) + + # run in check mode + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # select certain catalogs + expected_streams = self.expected_sync_streams() + catalog_entries = [catalog for catalog in found_catalogs + if catalog.get('tap_stream_id') in expected_streams] + + # stream and field selection + self.select_all_streams_and_fields(conn_id, catalog_entries) + + # make 'Account' stream as INCREMENTAL to use lookback window + self.set_replication_methods(conn_id, catalog_entries, {'Account': self.INCREMENTAL}) + + # run sync + self.run_and_verify_sync(conn_id) + + # get synced records + sync_records = runner.get_records_from_target_output() + + # get replication keys + expected_replication_keys = self.expected_replication_keys() + + # get bookmark ie. date from which the sync started + bookmark = state.get('bookmarks').get('Account').get('SystemModstamp') + # calculate the simulated bookmark by subtracting lookback window seconds + bookmark_with_lookback_window = self.get_simulated_date(bookmark, format=self.BOOKMARK_COMPARISON_FORMAT, seconds=self.get_properties()['lookback_window']) + + for stream in expected_streams: + with self.subTest(stream=stream): + + # get replication key for stream + replication_key = list(expected_replication_keys[stream])[0] + + # get records + records = [record.get('data') for record in sync_records.get(stream).get('messages') + if record.get('action') == 'upsert'] + + # verify if we get records in ASCENDING order: + # every record's date should be lesser than the next record's date + for i in range(len(records) - 1): + self.assertLessEqual(self.parse_date(records[i].get(replication_key)), self.parse_date(records[i+1].get(replication_key))) + + # Verify the sync records respect the (simulated) bookmark value + for record in records: + self.assertGreaterEqual(self.parse_date(record.get(replication_key)), self.parse_date(bookmark_with_lookback_window), + msg='The record does not respect the lookback window.') + + # check if the 1st record is between lookback date and bookmark: + # lookback_date <= record < bookmark (state file date when sync started) + self.assertLessEqual(self.parse_date(bookmark_with_lookback_window), self.parse_date(records[0].get(replication_key))) + self.assertLess(self.parse_date(records[0].get(replication_key)), self.parse_date(bookmark)) + + def test_run(self): + # run with REST API + self.salesforce_api = 'REST' + self.run_test() + + # run with BULK API + self.salesforce_api = 'BULK' + self.run_test() diff --git a/tests/unittests/test_lookback_window.py b/tests/unittests/test_lookback_window.py new file mode 100644 index 00000000..2dd55850 --- /dev/null +++ b/tests/unittests/test_lookback_window.py @@ -0,0 +1,97 @@ +from tap_salesforce.salesforce import Salesforce +import unittest + +start_date = "2022-05-02T00:00:00.000000Z" +bookmark = "2022-05-23T00:00:00.000000Z" + +catalog_entry = { + "tap_stream_id": "Test", + "metadata": [ + { + "breadcrumb": [], + "metadata": { + "replication-method": "INCREMENTAL", + "replication-key": "SystemModstamp" + } + } + ] +} + +TEST_STATE = { + "bookmarks": { + "Test": { + "SystemModstamp": bookmark + } + } +} + +TEST_LOOKBACK_WINDOW = 60 + + +class SalesforceGetStartDateTests(unittest.TestCase): + """Test all of the combinations of having a bookmark or not and a + lookback_window or not + + These tests set up a minimal `Salesforce` object and call + `Salesforce.get_start_date()` for each of the following scenarios + + | Has Lookback | Has bookmark | Expectation | + |--------------+--------------+-------------------| + | No | No | start date | + | No | Yes | bookmark | + | Yes | No | start date | + | Yes | Yes | adjusted bookmark | + """ + def test_no_lookback_no_bookmark_returns_start_date(self): + sf_obj = Salesforce( + default_start_date=start_date + ) + + expected = start_date + actual = sf_obj.get_start_date( + {}, + catalog_entry + ) + + self.assertEqual(expected, actual) + + def test_no_lookback_yes_bookmark_returns_bookmark(self): + sf_obj = Salesforce( + default_start_date=start_date + ) + + expected = bookmark + actual = sf_obj.get_start_date( + TEST_STATE, + catalog_entry + ) + + self.assertEqual(expected, actual) + + def test_yes_lookback_no_bookmark_returns_start_date(self): + sf_obj = Salesforce( + default_start_date=start_date, + lookback_window=TEST_LOOKBACK_WINDOW, + ) + + expected = start_date + actual = sf_obj.get_start_date( + {}, + catalog_entry + ) + + self.assertEqual(expected, actual) + + def test_yes_lookback_yes_bookmark_returns_adjusted_bookmark(self): + sf_obj = Salesforce( + default_start_date=start_date, + lookback_window=TEST_LOOKBACK_WINDOW + ) + + expected = "2022-05-22T23:59:00.000000Z" + actual = sf_obj.get_start_date( + TEST_STATE, + catalog_entry + ) + + self.assertEqual(expected, actual)