Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Crest Work #142

Merged
merged 4 commits into from
Jun 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,24 @@ jobs:
source /usr/local/share/virtualenvs/tap-salesforce/bin/activate
pip install pylint
echo "pylint will skip the following: $PYLINT_DISABLE_LIST"
pylint tap_salesforce -d "$PYLINT_DISABLE_LIST,stop-iteration-return"
pylint tap_salesforce -d "$PYLINT_DISABLE_LIST,stop-iteration-return,logging-format-interpolation"
run_unit_tests:
executor: docker-executor
steps:
- checkout
- attach_workspace:
at: /usr/local/share/virtualenvs
- run:
name: 'Run Unit Tests'
command: |
source /usr/local/share/virtualenvs/tap-salesforce/bin/activate
pip install nose coverage
nosetests --with-coverage --cover-erase --cover-package=tap_salesforce --cover-html-dir=htmlcov tests/unittests
coverage html
- store_test_results:
path: test_output/report.xml
- store_artifacts:
path: htmlcov
run_integration_tests:
executor: docker-executor
parallelism: 8
Expand Down Expand Up @@ -83,6 +100,12 @@ workflows:
- tier-1-tap-user
requires:
- ensure_env
- run_unit_tests:
context:
- circleci-user
- tier-1-tap-user
requires:
- ensure_env
- run_integration_tests:
context:
- circleci-user
Expand All @@ -104,4 +127,4 @@ workflows:
filters:
branches:
only:
- master
- master
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ $ tap-salesforce --config config.json --properties properties.json --state state
"start_date": "2017-11-02T00:00:00Z",
"api_type": "BULK",
"select_fields_by_default": true
"lookback_window": 10
}
```

Expand All @@ -40,6 +41,8 @@ The `start_date` is used by the tap as a bound on SOQL queries when searching fo

The `api_type` is used to switch the behavior of the tap between using Salesforce's "REST" and "BULK" APIs. When new fields are discovered in Salesforce objects, the `select_fields_by_default` key describes whether or not the tap will select those fields by default.

The `lookback_window` (in seconds) subtracts the desired amount of seconds from the bookmark to sync past data. Recommended value: 10 seconds.

## Run Discovery

To run discovery mode, execute the tap with the config file.
Expand Down
10 changes: 8 additions & 2 deletions tap_salesforce/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,10 @@ def main_impl():

sf = None
try:
# get lookback window from config
lookback_window = CONFIG.get('lookback_window')
lookback_window = int(lookback_window) if lookback_window else None

sf = Salesforce(
refresh_token=CONFIG['refresh_token'],
sf_client_id=CONFIG['client_id'],
Expand All @@ -384,7 +388,8 @@ def main_impl():
is_sandbox=CONFIG.get('is_sandbox'),
select_fields_by_default=CONFIG.get('select_fields_by_default'),
default_start_date=CONFIG.get('start_date'),
api_type=CONFIG.get('api_type'))
api_type=CONFIG.get('api_type'),
lookback_window=lookback_window)
sf.login()

if args.discover:
Expand Down Expand Up @@ -417,5 +422,6 @@ def main():
LOGGER.critical(e)
sys.exit(1)
except Exception as e:
LOGGER.critical(e)
for error_line in str(e).splitlines():
LOGGER.critical(error_line)
raise e
35 changes: 29 additions & 6 deletions tap_salesforce/salesforce/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import re
import threading
import time
Expand Down Expand Up @@ -210,7 +211,8 @@ def __init__(self,
is_sandbox=None,
select_fields_by_default=None,
default_start_date=None,
api_type=None):
api_type=None,
lookback_window=None):
self.api_type = api_type.upper() if api_type else None
self.refresh_token = refresh_token
self.token = token
Expand All @@ -235,9 +237,10 @@ def __init__(self,
self.login_timer = None
self.data_url = "{}/services/data/v52.0/{}"
self.pk_chunking = False
self.lookback_window = lookback_window

# validate start_date
singer_utils.strptime(default_start_date)
singer_utils.strptime_to_utc(default_start_date)

def _get_standard_headers(self):
return {"Authorization": "Bearer {}".format(self.access_token)}
Expand Down Expand Up @@ -370,7 +373,6 @@ def describe(self, sobject=None):

return resp.json()

# pylint: disable=no-self-use
def _get_selected_properties(self, catalog_entry):
mdata = metadata.to_map(catalog_entry['metadata'])
properties = catalog_entry['schema'].get('properties', {})
Expand All @@ -382,12 +384,22 @@ def _get_selected_properties(self, catalog_entry):


def get_start_date(self, state, catalog_entry):
"""
return start date if state is not provided
else return bookmark from the state by subtracting lookback if provided
"""
catalog_metadata = metadata.to_map(catalog_entry['metadata'])
replication_key = catalog_metadata.get((), {}).get('replication-key')

return (singer.get_bookmark(state,
catalog_entry['tap_stream_id'],
replication_key) or self.default_start_date)
# get bookmark value from the state
bookmark_value = singer.get_bookmark(state, catalog_entry['tap_stream_id'], replication_key)
sync_start_date = bookmark_value or self.default_start_date

# if the state contains a bookmark, subtract the lookback window from the bookmark
if bookmark_value and self.lookback_window:
sync_start_date = singer_utils.strftime(singer_utils.strptime_with_tz(sync_start_date) - datetime.timedelta(seconds=self.lookback_window))

return sync_start_date

def _build_query_string(self, catalog_entry, start_date, end_date=None, order_by_clause=True):
selected_properties = self._get_selected_properties(catalog_entry)
Expand Down Expand Up @@ -447,3 +459,14 @@ def get_blacklisted_fields(self):
raise TapSalesforceException(
"api_type should be REST or BULK was: {}".format(
self.api_type))

def get_window_end_date(self, start_date, end_date):
# to update end_date, substract 'half_day_range' (i.e. half of the days between start_date and end_date)
# when the 'half_day_range' is an odd number, we will round down to the nearest integer because of the '//'
half_day_range = (end_date - start_date) // 2

if half_day_range.days == 0:
raise TapSalesforceException(
"Attempting to query by 0 day range, this would cause infinite looping.")

return end_date - half_day_range
97 changes: 65 additions & 32 deletions tap_salesforce/salesforce/bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import tempfile
import singer
import singer.utils as singer_utils
from singer import metrics
import requests
from requests.exceptions import RequestException
Expand All @@ -18,7 +19,7 @@
PK_CHUNKED_BATCH_STATUS_POLLING_SLEEP = 60
ITER_CHUNK_SIZE = 1024
DEFAULT_CHUNK_SIZE = 100000 # Max is 250000

MAX_RETRIES = 4
LOGGER = singer.get_logger()

# pylint: disable=inconsistent-return-statements
Expand Down Expand Up @@ -100,7 +101,7 @@ def _get_bulk_headers(self):
return {"X-SFDC-Session": self.sf.access_token,
"Content-Type": "application/json"}

def _can_pk_chunk_job(self, failure_message): # pylint: disable=no-self-use
def _can_pk_chunk_job(self, failure_message):
return "QUERY_TIMEOUT" in failure_message or \
"Retried more than 15 times" in failure_message or \
"Failed to write query result" in failure_message
Expand All @@ -117,25 +118,28 @@ def _bulk_query(self, catalog_entry, state):

if batch_status['state'] == 'Failed':
if self._can_pk_chunk_job(batch_status['stateMessage']):
batch_status = self._bulk_query_with_pk_chunking(catalog_entry, start_date)
job_id = batch_status['job_id']

# Set pk_chunking to True to indicate that we should write a bookmark differently
self.sf.pk_chunking = True

# Add the bulk Job ID and its batches to the state so it can be resumed if necessary
tap_stream_id = catalog_entry['tap_stream_id']
state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id)
state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batch_status['completed'][:])

for completed_batch_id in batch_status['completed']:
for result in self.get_batch_results(job_id, completed_batch_id, catalog_entry):
yield result
# Remove the completed batch ID and write state
state['bookmarks'][catalog_entry['tap_stream_id']]["BatchIDs"].remove(completed_batch_id)
LOGGER.info("Finished syncing batch %s. Removing batch from state.", completed_batch_id)
LOGGER.info("Batches to go: %d", len(state['bookmarks'][catalog_entry['tap_stream_id']]["BatchIDs"]))
singer.write_state(state)
# Get list of batch_status with pk_chunking or date_windowing
status_list = self._bulk_with_window([], catalog_entry, start_date)

for batch_status in status_list:
job_id = batch_status['job_id']

# Set pk_chunking to True to indicate that we should write a bookmark differently
self.sf.pk_chunking = True

# Add the bulk Job ID and its batches to the state so it can be resumed if necessary
tap_stream_id = catalog_entry['tap_stream_id']
state = singer.write_bookmark(state, tap_stream_id, 'JobID', job_id)
state = singer.write_bookmark(state, tap_stream_id, 'BatchIDs', batch_status['completed'][:])

for completed_batch_id in batch_status['completed']:
for result in self.get_batch_results(job_id, completed_batch_id, catalog_entry):
yield result
# Remove the completed batch ID and write state
state['bookmarks'][catalog_entry['tap_stream_id']]["BatchIDs"].remove(completed_batch_id)
LOGGER.info("Finished syncing batch %s. Removed batch from state.", completed_batch_id)
LOGGER.info("Batches to go: %d", len(state['bookmarks'][catalog_entry['tap_stream_id']]["BatchIDs"]))
singer.write_state(state)
else:
raise TapSalesforceException(batch_status['stateMessage'])
else:
Expand All @@ -148,18 +152,11 @@ def _bulk_query_with_pk_chunking(self, catalog_entry, start_date):
# Create a new job
job_id = self._create_job(catalog_entry, True)

self._add_batch(catalog_entry, job_id, start_date, False)
self._add_batch(catalog_entry, job_id, start_date, order_by_clause=False)

batch_status = self._poll_on_pk_chunked_batch_status(job_id)
batch_status['job_id'] = job_id

if batch_status['failed']:
raise TapSalesforceException(
"One or more batches failed during PK chunked job. {} failed out of {} total batches. First 20 failed batches: {}".format(
len(batch_status['failed']),
len(batch_status['completed']) + len(batch_status['failed']),
list(batch_status['failed'].items())[:20]))

# Close the job after all the batches are complete
self._close_job(job_id)

Expand Down Expand Up @@ -194,11 +191,11 @@ def _create_job(self, catalog_entry, pk_chunking=False):

return job['id']

def _add_batch(self, catalog_entry, job_id, start_date, order_by_clause=True):
def _add_batch(self, catalog_entry, job_id, start_date, end_date=None, order_by_clause=True):
endpoint = "job/{}/batch".format(job_id)
url = self.bulk_url.format(self.sf.instance_url, endpoint)

body = self.sf._build_query_string(catalog_entry, start_date, order_by_clause=order_by_clause)
body = self.sf._build_query_string(catalog_entry, start_date, end_date, order_by_clause=order_by_clause)

headers = self._get_bulk_headers()
headers['Content-Type'] = 'text/csv'
Expand Down Expand Up @@ -334,7 +331,6 @@ def _close_job(self, job_id):
headers=self._get_bulk_headers(),
body=json.dumps(body))

# pylint: disable=no-self-use
def _iter_lines(self, response):
"""Clone of the iter_lines function from the requests library with the change
to pass keepends=True in order to ensure that we do not strip the line breaks
Expand All @@ -357,3 +353,40 @@ def _iter_lines(self, response):

if pending is not None:
yield pending

def _bulk_with_window(self, status_list, catalog_entry, start_date_str, end_date=None, retries=MAX_RETRIES):
"""Bulk api call with date windowing"""
sync_start = singer_utils.now()
if end_date is None:
end_date = sync_start
LOGGER.info("Retrying Bulk Query with PK Chunking")
else:
LOGGER.info("Retrying Bulk Query with window of date {} to {}".format(start_date_str, end_date.strftime('%Y-%m-%dT%H:%M:%SZ')))

if retries == 0:
raise TapSalesforceException("Ran out of retries attempting to query Salesforce Object {}".format(catalog_entry['stream']))

job_id = self._create_job(catalog_entry, True)
self._add_batch(catalog_entry, job_id, start_date_str, end_date.strftime('%Y-%m-%dT%H:%M:%SZ'), False)
batch_status = self._poll_on_pk_chunked_batch_status(job_id)
batch_status['job_id'] = job_id
# Close the job after all the batches are complete
self._close_job(job_id)

if batch_status['failed']:
LOGGER.info("Failed Bulk Query with window of date {} to {}".format(start_date_str, end_date.strftime('%Y-%m-%dT%H:%M:%SZ')))
# If batch_status is failed then reduce date window by half by updating end_date
end_date = self.sf.get_window_end_date(singer_utils.strptime_with_tz(start_date_str), end_date)

return self._bulk_with_window(status_list, catalog_entry, start_date_str, end_date, retries - 1)

else:
status_list.append(batch_status)

# If the date range was chunked (an end_date was passed), sync
# from the end_date -> now
if end_date < sync_start:
next_start_date_str = end_date.strftime('%Y-%m-%dT%H:%M:%SZ')
return self._bulk_with_window(status_list, catalog_entry, next_start_date_str, retries=retries)

return status_list
8 changes: 1 addition & 7 deletions tap_salesforce/salesforce/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,7 @@ def _query_recur(
raise ex

if retryable:
start_date = singer_utils.strptime_with_tz(start_date_str)
half_day_range = (end_date - start_date) // 2
end_date = end_date - half_day_range

if half_day_range.days == 0:
raise TapSalesforceException(
"Attempting to query by 0 day range, this would cause infinite looping.")
end_date = self.sf.get_window_end_date(singer_utils.strptime_with_tz(start_date_str), end_date)

query = self.sf._build_query_string(catalog_entry, singer_utils.strftime(start_date),
singer_utils.strftime(end_date))
Expand Down
4 changes: 2 additions & 2 deletions tap_salesforce/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ def sync_records(sf, catalog_entry, state, counter):
state = singer.write_bookmark(
state, catalog_entry['tap_stream_id'], 'version', None)

# If pk_chunking is set, only write a bookmark at the end
if sf.pk_chunking:
# If pk_chunking is set, and selected streams has replication key then only write a bookmark at the end
if sf.pk_chunking and replication_key:
# Write a bookmark with the highest value we've seen
state = singer.write_bookmark(
state,
Expand Down
Loading