Skip to content

Commit

Permalink
Merge pull request #31 from singer-io/fix-streaming
Browse files Browse the repository at this point in the history
Fix Streaming
  • Loading branch information
KAllan357 authored Jan 19, 2018
2 parents d1f6909 + a342961 commit f4cd46e
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 18 deletions.
6 changes: 2 additions & 4 deletions tap_salesforce/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ def build_state(raw_state, catalog):
state = singer.write_bookmark(
state, tap_stream_id, replication_key, replication_key_value)
elif replication_method == 'FULL_TABLE' and version is None:
if version is not None:
state = singer.write_bookmark(
state, tap_stream_id, 'version', version)
state = singer.write_bookmark(state, tap_stream_id, 'version', version)

return state

Expand Down Expand Up @@ -300,7 +298,7 @@ def do_sync(sf, catalog, state):
job_id = singer.get_bookmark(state, catalog_entry['tap_stream_id'], 'JobID')
if job_id:
with metrics.record_counter(stream) as counter:
LOGGER.info("Resuming sync for stream: %s", stream_name)
LOGGER.info("Found JobID from previous Bulk Query. Resuming sync for job: %s", job_id)
# Resuming a sync should clear out the remaining state once finished
counter = resume_syncing_bulk_query(sf, catalog_entry, job_id, state, counter)
LOGGER.info("%s: Completed sync (%s rows)", stream_name, counter.value)
Expand Down
40 changes: 26 additions & 14 deletions tap_salesforce/salesforce/bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,28 @@
import csv
import json
import time
import tempfile
import singer
import singer.metrics as metrics

import xmltodict

from tap_salesforce.salesforce.exceptions import (
TapSalesforceException, TapSalesforceQuotaExceededException)

BATCH_STATUS_POLLING_SLEEP = 20
PK_CHUNKED_BATCH_STATUS_POLLING_SLEEP = 60
ITER_CHUNK_SIZE = 512
ITER_CHUNK_SIZE = 1024
DEFAULT_CHUNK_SIZE = 50000

LOGGER = singer.get_logger()

salesforce_object_to_parent_map = {
"AccountHistory": "Account",
"ContactCleanInfo": "Contact"
}


class Bulk(object):

bulk_url = "{}/services/async/41.0/{}"
Expand Down Expand Up @@ -131,7 +139,10 @@ def _create_job(self, catalog_entry, pk_chunking=False):

if pk_chunking:
LOGGER.info("ADDING PK CHUNKING HEADER")

headers['Sforce-Enable-PKChunking'] = "true; chunkSize={}".format(DEFAULT_CHUNK_SIZE)
if salesforce_object_to_parent_map.get(catalog_entry['stream']):
headers['Sforce-Enable-PKChunking'] = headers['Sforce-Enable-PKChunking'] + "; parent={}".format(salesforce_object_to_parent_map[catalog_entry['stream']])

with metrics.http_request_timer("create_job") as timer:
timer.tags['sobject'] = catalog_entry['stream']
Expand Down Expand Up @@ -237,22 +248,23 @@ def get_batch_results(self, job_id, batch_id, catalog_entry):
url = self.bulk_url.format(self.sf.instance_url, endpoint)
headers['Content-Type'] = 'text/csv'

with metrics.http_request_timer("batch_result") as timer:
timer.tags['sobject'] = catalog_entry['stream']
# Removed the stream=True param because Salesforce was snapping open connections
result_response = self.sf._make_request('GET', url, headers=headers)
with tempfile.NamedTemporaryFile(mode="w+", encoding="utf8") as csv_file:
resp = self.sf._make_request('GET', url, headers=headers, stream=True)
for chunk in resp.iter_content(chunk_size=ITER_CHUNK_SIZE, decode_unicode=True):
if chunk:
# Replace any NULL bytes in the chunk so it can be safely given to the CSV reader
csv_file.write(chunk.replace('\0', ''))

# Starting with a streaming generator, replace any NULL bytes in the line given by the CSV reader
streaming_response = self._iter_lines(result_response)
csv_stream = csv.reader((line.replace('\0', '') for line in streaming_response),
delimiter=',',
quotechar='"')
csv_file.seek(0)
csv_reader = csv.reader(csv_file,
delimiter=',',
quotechar='"')

column_name_list = next(csv_stream)
column_name_list = next(csv_reader)

for line in csv_stream:
rec = dict(zip(column_name_list, line))
yield rec
for line in csv_reader:
rec = dict(zip(column_name_list, line))
yield rec

def _close_job(self, job_id):
endpoint = "job/{}".format(job_id)
Expand Down

0 comments on commit f4cd46e

Please sign in to comment.