-
Notifications
You must be signed in to change notification settings - Fork 71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TDL-16115: Add lookback window advanced config option for Incremental Syncs #135
Conversation
tap_salesforce/__init__.py
Outdated
@@ -73,9 +75,10 @@ def build_state(raw_state, catalog): | |||
job_id = singer.get_bookmark(raw_state, tap_stream_id, 'JobID') | |||
batches = singer.get_bookmark(raw_state, tap_stream_id, 'BatchIDs') | |||
current_bookmark = singer.get_bookmark(raw_state, tap_stream_id, 'JobHighestBookmarkSeen') | |||
adjusted_current_bookmark = singer_utils.strftime(singer_utils.strptime_with_tz(current_bookmark) - datetime.timedelta(seconds=int(CONFIG.get('lookback_window', 10)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make a small function of it? It will increase the readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the code.
start_date = singer.get_bookmark(state, catalog_entry['tap_stream_id'], replication_key) or self.default_start_date | ||
if not with_lookback: | ||
return start_date | ||
|
||
adjusted_start_date = singer_utils.strftime(singer_utils.strptime_with_tz(start_date) - datetime.timedelta(seconds=self.lookback_window)) | ||
return adjusted_start_date |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add code comment here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comments.
@@ -381,13 +384,16 @@ def _get_selected_properties(self, catalog_entry): | |||
self.select_fields_by_default)] | |||
|
|||
|
|||
def get_start_date(self, state, catalog_entry): | |||
def get_start_date(self, state, catalog_entry, with_lookback=True): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change the default value of with_lookback
to False and explicitly pass True as an argument wherever required. it will give more clarity regarding where lookback is used rather than not used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
README.md
Outdated
@@ -40,6 +41,8 @@ The `start_date` is used by the tap as a bound on SOQL queries when searching fo | |||
|
|||
The `api_type` is used to switch the behavior of the tap between using Salesforce's "REST" and "BULK" APIs. When new fields are discovered in Salesforce objects, the `select_fields_by_default` key describes whether or not the tap will select those fields by default. | |||
|
|||
The `lookback_window` (in seconds) subtracts the desired amount of seconds from the start date to sync past data. Default value: 10 seconds. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The `lookback_window` (in seconds) subtracts the desired amount of seconds from the start date to sync past data. Default value: 10 seconds. | |
The `lookback_window` (in seconds) subtracts the desired amount of seconds from the bookmark to sync past data. Default value: 10 seconds. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the file.
catalog_entry['tap_stream_id'], | ||
replication_key) or self.default_start_date) | ||
sync_start_date = singer.get_bookmark(state, catalog_entry['tap_stream_id'], replication_key) or self.default_start_date | ||
# return date from the state file or start date if, 'without_lookback' is True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# return date from the state file or start date if, 'without_lookback' is True | |
# return bookmark from the state or start date if 'without_lookback' is True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
if without_lookback: | ||
return sync_start_date | ||
|
||
# if state file contains bookmark, subtract lookback window from the 'sync_start_date' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# if state file contains bookmark, subtract lookback window from the 'sync_start_date' | |
# if the state contains a bookmark, subtract the lookback window from the bookmark |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
…force into TDL-16115-add-lookback-window
@@ -381,13 +384,20 @@ def _get_selected_properties(self, catalog_entry): | |||
self.select_fields_by_default)] | |||
|
|||
|
|||
def get_start_date(self, state, catalog_entry): | |||
def get_start_date(self, state, catalog_entry, without_lookback=True): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add code comment and explain the effect of the without_lookback variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added function comment
The Integration tests are failing because we need to add |
class SalesforceLookbackWindow(SalesforceBaseTest): | ||
|
||
# subtract the desired amount of seconds form the date and return | ||
def timedelta_formatted(self, dtime, seconds=0): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better if this method took in the format as a parameter so that it's usage is explicit. We should never manipulate results in the tests without being explicit, else we may cover up a bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added format param in the function.
# calculate the simulated start date by subtracting lookback window seconds | ||
start_date_with_lookback_window = self.timedelta_formatted(start_date, seconds=-self.get_properties()['lookback_window']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you change the wording so that we do not mention start date? That term should be reserved for historical syncs. The point at which the sync starts on an incremental sync is based on the state you have set above, not the start date. Maybe I'm just being picky but I think it could be confusing to a tester in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the variable name.
'start_date' : '2021-11-10T00:00:00Z', | ||
'instance_url': 'https://singer2-dev-ed.my.salesforce.com', | ||
'select_fields_by_default': 'true', | ||
'api_type': 'REST', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this test be executed against the BULK API as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the code to run for BULK API too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to reiterate: I think the current implementation is confusing because of the way things are named and it is inconsistent with other taps where we've implemented a lookback window.
I would like to hear some reasons to defend this implementation before I am comfortable approving this PR.
For the tests, I think we should be careful with adding too much extra code.
For unit tests especially - if we are only trying to focus on
sf.get_start_date({}, mock_catalog_entry, without_lookback=False)
versus
sf.get_start_date(mock_state, mock_catalog_entry, without_lookback=False)
Then we should write the bare minimum amount of code to allow us to call get_start_date()
. In this case in particular, we don't need readers to spend time looking at how a Salesforce
object is created. We need them to focus on
- A
Salesforce
object is created with a start date - This is what the catalog entry looks like
- This is what the state looks like
- Here is what
get_start_date
returns
# verify if the record's bookmark value is between bookmark and (simulated) bookmark value | ||
if self.parse_date(bookmark_with_lookback_window) <= self.parse_date(replication_key_value) < self.parse_date(bookmark): | ||
is_between = True | ||
|
||
self.assertTrue(is_between, msg='No record found between bookmark and lookback date.') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think is_between
will always be true after the first record
. So I think this assert is pointless after the first record. Do you agree?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, if we are not using any lookback then we will get all data greater than the bookmark. Here, we are using lookback so, above assertion to verify we getting at least some data between bookmark - lookback
and bookmark
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think Andy's point is that we could assert on just the first record. But for that assertion to be valid, we'd need to also assert that records are returned in order of ascending replication key value. Which I think is a better approach, but both are valid.
Also it's always preferable to avoid assertTrue and assertFalse. They give less information than the comparison methods, even with the additional error msg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kspeer825 @luandy64 Added assertion to verify we got records in ascending order and also updated the existing is_between
assertion to verify that the 1st record is between the lookback date and the initial bookmark.
class MockSalesforce: | ||
rest_requests_attempted = 0 | ||
jobs_completed = 0 | ||
login_timer = None | ||
def __init__(self, *args, **kwargs): | ||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class MockSalesforce: | |
rest_requests_attempted = 0 | |
jobs_completed = 0 | |
login_timer = None | |
def __init__(self, *args, **kwargs): | |
return None | |
class MockSalesforce: | |
def __init__(self, *args, **kwargs): | |
self.rest_requests_attempted = 0 | |
self.jobs_completed = 0 | |
self.login_timer = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
@@ -381,13 +384,24 @@ def _get_selected_properties(self, catalog_entry): | |||
self.select_fields_by_default)] | |||
|
|||
|
|||
def get_start_date(self, state, catalog_entry): | |||
def get_start_date(self, state, catalog_entry, without_lookback=True): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think adding this as without_lookback=True
complicates the code.
The only two uses of this function in rest.py
and bulk.py
call this function with without_lookback=False
. Why do we set the default value to the opposite of what all use cases call the function with?
On top of that, the test code could be slightly simplified. 2 out of 5 of the test cases don't pass without_lookback=False
.
The other issue with this implementation is that this forces all users to run syncs with the lookback enabled.
If we use this boolean flag approach, then I think we need to set the default to 0 seconds so that the feature is implemented in the tap, but not in effect for everyone.
The other approach would be to remove the boolean flag and only use a lookback if it is present in the config. You can see this pattern in a lot of our taps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Tap is using get_start_date
in sync.py
for bookmarking so, at that time, we do not want our bookmark to go backward. Thus, here we are getting the date without the lookback window therefore we have added an argument without_lookback
. But to avoid confusion we have renamed that argument to with_lookback
.
In the DoD of the card TDL-16115. There is mentioned we need to add a default lookback of 10 seconds. So, we kept the default value of 10 rather than 0.
We do not want to apply the lookback window to the start date
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TDL-16115 Updated Requirements
Definition of Done:
- Implement an optional
lookback_window
parameter- The tap should not use a lookback window unless it was configured to do so
- The lookback window should never be applied to the start date
- Unit tests are passing
- Integration tests are passing
README.md
Outdated
@@ -40,6 +41,8 @@ The `start_date` is used by the tap as a bound on SOQL queries when searching fo | |||
|
|||
The `api_type` is used to switch the behavior of the tap between using Salesforce's "REST" and "BULK" APIs. When new fields are discovered in Salesforce objects, the `select_fields_by_default` key describes whether or not the tap will select those fields by default. | |||
|
|||
The `lookback_window` (in seconds) subtracts the desired amount of seconds from the bookmark to sync past data. Default value: 10 seconds. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this mention of a default value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should recommend a small value like 10
instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Removed default lookback from README file and added recommended value.
tap_salesforce/__init__.py
Outdated
@@ -6,7 +6,7 @@ | |||
from singer import metadata, metrics | |||
import tap_salesforce.salesforce | |||
from tap_salesforce.sync import (sync_stream, resume_syncing_bulk_query, get_stream_version) | |||
from tap_salesforce.salesforce import Salesforce | |||
from tap_salesforce.salesforce import Salesforce, DEFAULT_LOOKBACK_WINDOW |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this import
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the import as it will not be required now.
tap_salesforce/__init__.py
Outdated
@@ -384,7 +384,8 @@ def main_impl(): | |||
is_sandbox=CONFIG.get('is_sandbox'), | |||
select_fields_by_default=CONFIG.get('select_fields_by_default'), | |||
default_start_date=CONFIG.get('start_date'), | |||
api_type=CONFIG.get('api_type')) | |||
api_type=CONFIG.get('api_type'), | |||
lookback_window=int(CONFIG.get('lookback_window', DEFAULT_LOOKBACK_WINDOW))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not call int()
here.
Above this call to the Salesforce constructor, we should check if lookback_window
is in the CONFIG
- If it is, then call int()
on the value and save the value to a variable called lookback_window
- If it is not, then set the variable lookback_window
to None
Then here we just pass lookback_window
to the constructor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Updated lookback calculation as per the suggestion.
# default lookback window of 10 seconds | ||
DEFAULT_LOOKBACK_WINDOW = 10 | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove these lines
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the lines as it will not be required now.
@@ -210,7 +214,8 @@ def __init__(self, | |||
is_sandbox=None, | |||
select_fields_by_default=None, | |||
default_start_date=None, | |||
api_type=None): | |||
api_type=None, | |||
lookback_window=DEFAULT_LOOKBACK_WINDOW): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This parameter should default to None
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added default value of None
for lookback_window argument.
sync_start_date = singer.get_bookmark(state, catalog_entry['tap_stream_id'], replication_key) or self.default_start_date | ||
# return bookmark from the state or start date if 'with_lookback' is False | ||
if not with_lookback: | ||
return sync_start_date | ||
|
||
# if the state contains a bookmark, subtract the lookback window from the bookmark | ||
if singer.get_bookmark(state, catalog_entry['tap_stream_id'], replication_key): | ||
sync_start_date = singer_utils.strftime(singer_utils.strptime_with_tz(sync_start_date) - datetime.timedelta(seconds=self.lookback_window)) | ||
|
||
return sync_start_date |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update this implementation to get the tests in tests/unittests/test_lookback_window.py
passing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One helpful thing might be helpful is that self.lookback_window
is an integer or None, so we can use its value or fallback to zero
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Updated the code to only subtract lookback if the bookmark is present in the state and lookback is not None
.
""" | ||
if 'with_lookback' is False, then return start date or state file date | ||
else subtract lookback window from the state file date and return | ||
if no state file bookmark is found, do not subtract lookback window from the start date | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update this comment to match the new implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the comment as per the suggested implementation.
@@ -235,6 +240,7 @@ def __init__(self, | |||
self.login_timer = None | |||
self.data_url = "{}/services/data/v52.0/{}" | |||
self.pk_chunking = False | |||
self.lookback_window = lookback_window | |||
|
|||
# validate start_date | |||
singer_utils.strptime(default_start_date) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update this to singer_utils.strptime_to_utc
to get unit tests working again
singer_utils.strptime
is deprecated
The unit tests didn't catch this because we mocked away this function. My test removed the Mock and allowed this to fail properly.
The difference between singer_utils.strptime and singer_utils.strptime_to_utc is the parsing function used.
strptime
needs a datetime format to function correctly - which results in the error that you see in Circle
This is also backwards compatible assuming we only write bookmarks with singer-python
functions. This ensures that bookmarks are all converted to UTC first. And singer_utils.strptime_to_utc
would just work because it reads a variety of date time formats (with and without fractional seconds) and then converts it to UTC.
Example to show that we can parse fractional seconds:
>>> import dateutil.parser
>>> dateutil.parser.parse("2022-05-02T00:00:00.000000Z")
datetime.datetime(2022, 5, 2, 0, 0, tzinfo=tzlocal())
>>> dateutil.parser.parse("2022-05-02T00:00:00Z")
datetime.datetime(2022, 5, 2, 0, 0, tzinfo=tzlocal())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the information. Updated strptime
to strptime_to_utc
and we will use strptime_to_utc from now onwards when needed.
tap_salesforce/salesforce/bulk.py
Outdated
@@ -107,7 +107,7 @@ def _can_pk_chunk_job(self, failure_message): # pylint: disable=no-self-use | |||
|
|||
def _bulk_query(self, catalog_entry, state): | |||
job_id = self._create_job(catalog_entry) | |||
start_date = self.sf.get_start_date(state, catalog_entry) | |||
start_date = self.sf.get_start_date(state, catalog_entry, with_lookback=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove with_lookback
because it should not exist as a parameter in the new implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed with_lookback
param from all places.
tap_salesforce/salesforce/rest.py
Outdated
@@ -14,7 +14,7 @@ def __init__(self, sf): | |||
self.sf = sf | |||
|
|||
def query(self, catalog_entry, state): | |||
start_date = self.sf.get_start_date(state, catalog_entry) | |||
start_date = self.sf.get_start_date(state, catalog_entry, with_lookback=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove with_lookback
because it should not exist as a parameter in the new implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed with_lookback
param from all places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes look good to me. I will approve when Circle is passing
* TDL-7301: Handle multiline `CRITICAL` error messages (#137) * updated code to add CRITICAL for multiline error messages * run unittest in CCi * updated config.yml file * TDL-15349 Add Date Windowing (#140) * add date windowing * resolved pylint error * resolved pylint error * resolved pylint error * add unittest * added commnets for logic explanation * resolved pylint error * update config.yml * add remaining unittest * add change in bulk * resolved pylint error * update config.yml * added unittest for date window gaps * resolve unittest failure * resolved review comments * resolved pylint duplicate code error Co-authored-by: harshpatel4crest <harsh.patel4@crestdatasys.com> * TDL-16006: Fix interruptible full table bookmarking strategy for bulk API (#138) * Fix Full Table Bookmarking Stratergy for bulk Api * add detialed comments * update config.yml * remove unused pakage * updated unittest Co-authored-by: harshpatel4crest <harsh.patel4@crestdatasys.com> * TDL-16115: Add lookback window advanced config option for Incremental Syncs (#135) * added lookback window * resolved pylint and unittest error * resolved unittest error * updated lookback window code * updated lookback code to work for incremental syncs * added some comments * updated comments * added function comments * resolved review comments * addressed review comments * updated lookback window integration test and unittests * Update unit tests to match expected implementation (#141) We do not want to apply the lookback window to the start date * updated the code to use lookback if present * resolved CCi failure and updated README file * set Account as INCREMENTAL before sync * updated state date and fixed pylint error Co-authored-by: Andy Lu <andy@stitchdata.com> Co-authored-by: karanpanchal-crest <karan.panchal@crestdatasys.com> Co-authored-by: Andy Lu <andy@stitchdata.com>
Description of change
TDL-16115: Add lookback window advanced config option for Incremental Syncs
test_salesforce_lookback_window
for validating lookback windowQA steps
Risks
Rollback steps