Skip to content

Commit

Permalink
refactor to generate and upload batches of resources, rather than one…
Browse files Browse the repository at this point in the history
… monster bundle.
  • Loading branch information
pbugni committed Oct 12, 2023
1 parent 3746bfc commit 2b20681
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 44 deletions.
43 changes: 43 additions & 0 deletions hydrant/models/bundle.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import logging
from datetime import datetime
import requests

from hydrant.audit import audit_entry


class Bundle(object):
Expand Down Expand Up @@ -26,6 +30,9 @@ def __init__(self, id='searchset', bundle_type='searchset', link=None):
self.bundle_type = bundle_type
self.link = link or []

def __len__(self):
return len(self.entries)

def add_entry(self, entry_or_resource):
"""Add entry to bundle
Expand All @@ -52,3 +59,39 @@ def as_fhir(self):
results.update({'entry': self.entries})
results.update({'total': len(self.entries)})
return results


class BatchUpload(object):
"""Upload a series of Bundles"""

def __init__(self, target_system, batch_size=20):
self.bundle = Bundle()
self.target_system = target_system
self.batch_size = batch_size
self.total_sent = 0

def add_entry(self, item):
self.bundle.add_entry(item)
if len(self.bundle) >= self.batch_size:
self.transmit_bundle()

def process(self, resources):
for r in resources:
self.add_entry(r.as_upsert_entry())
# catch the last bundle not yet sent
self.transmit_bundle()

def transmit_bundle(self):
if len(self.bundle) == 0:
return

fhir_bundle = self.bundle.as_fhir()
logging.info(f" - uploading next bundle to {self.target_system}")
response = requests.post(self.target_system, json=fhir_bundle)
response.raise_for_status()
self.total_sent += len(self.bundle)
extra = {'tags': ['upload'], 'system': self.target_system, 'user': 'system'}
audit_entry(f"uploaded: {response.json()}", extra)

# reset internal state for next bundle
self.bundle = Bundle()
35 changes: 19 additions & 16 deletions hydrant/models/resource_list.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,39 @@
import logging


class ResourceList(object):
"""Holds (ordered) list of FHIR Resources"""

def __init__(self, parser, adapter):
self.parser = parser
self.adapter = adapter
self.items = None
self.item_count = 0
self._iteration_complete = False

def _parse(self):
"""Use parser and adapter, build up list of available resources"""
self.items = []
def __iter__(self):
"""Use parser and adapter, yield each unique resource"""
keys_seen = set()
for row in self.parser.rows():
# Adapter may define unique_key() - if defined and a previous
# entry matches, skip over this "duplicate"
if hasattr(self.adapter, 'unique_key'):
key = self.adapter(row).unique_key()
if key in keys_seen:
logging.info("skipping duplicate: {key}")
continue
keys_seen.add(key)

self.items.append(self.adapter.RESOURCE_CLASS.factory(row, self.adapter))

def __iter__(self):
if self.items is None:
self._parse()

for i in self.items:
yield i
self.item_count += 1
yield self.adapter.RESOURCE_CLASS.factory(row, self.adapter)
self._iteration_complete = True

def __len__(self):
if self.items is None:
self._parse()

return len(self.items) if self.items else 0
"""Return length (count) of unique resources discovered in generator
NB: as a generator class, the full length is only known after iteration
has been exhausted. Therefore, this raises if iteration hasn't yet
occurred.
"""
if not self._iteration_complete:
raise RuntimeError("request for generator length before complete iteration")
return self.item_count
29 changes: 6 additions & 23 deletions hydrant/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
import requests
import sys

from hydrant.audit import audit_entry
from hydrant.models.bundle import Bundle
from hydrant.models.bundle import BatchUpload

base_blueprint = Blueprint('base', __name__, cli_group=None)

Expand Down Expand Up @@ -167,29 +166,13 @@ def upload_file(filename):
if not adapter:
raise click.BadParameter("column headers not found in any available adapters")

# With parser and adapter at hand, process the data
target_system = current_app.config['FHIR_SERVER_URL']
bundle = Bundle()
# With parser and adapter at hand, process & upload the data
resources = ResourceList(parser, adapter)
batcher = BatchUpload(target_system=current_app.config['FHIR_SERVER_URL'])
batcher.process(resources)

for r in resources:
bundle.add_entry(r.as_upsert_entry())

fhir_bundle = bundle.as_fhir()
click.echo(f" - parsed {fhir_bundle['total']}")
click.echo(f" - uploading bundle to {target_system}")
extra = {'tags': [adapter.RESOURCE_CLASS.RESOURCE_TYPE, 'upload'], 'user': 'system'}
current_app.logger.info(
f"upload {fhir_bundle['total']} from {filename}",
extra=extra)

response = requests.post(target_system, json=fhir_bundle)
click.echo(f" - response status {response.status_code}")
audit_entry(f"uploaded: {response.json()}", extra=extra)

if response.status_code != 200:
raise click.BadParameter(response.text)

click.echo(f" - parsed {resources.len()}")
click.echo(f" - uploaded {batcher.total_sent}")
click.echo("upload complete")


Expand Down
10 changes: 5 additions & 5 deletions tests/test_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ def test_csv_headers(parser_skagit1_csv):

def test_csv_patients(parser_skagit1_csv):
pl = ResourceList(parser_skagit1_csv, SkagitPatientAdapter)
assert len(pl) == 2
for pat in pl:
assert pat.as_fhir()['resourceType'] == 'Patient'
assert pat.as_fhir()['name']['given'] in (['Barney'], ['Fred'])
assert isinstance(pat.as_fhir()['birthDate'], str)
assert len(pl) == 2


def test_service_request_headers(skagit_service_requests):
Expand All @@ -64,13 +64,13 @@ def test_service_request_headers(skagit_service_requests):

def test_dawg_patients(parser_dawg_csv):
pl = ResourceList(parser_dawg_csv, DawgPatientAdapter)
assert len(pl) == 2
for pat in pl:
f = pat.as_fhir()
assert f['resourceType'] == 'Patient'
assert f['name']['family'] in ("Shy", "Rod-Pod")
assert f['name']['given'] in (['Guy'], ['Marmar'])
assert isinstance(pat.as_fhir()['birthDate'], str)
assert len(pl) == 2


def test_kent_headers(parser_kent1_csv):
Expand All @@ -79,30 +79,30 @@ def test_kent_headers(parser_kent1_csv):

def test_kent_patients(parser_kent1_csv):
pl = ResourceList(parser_kent1_csv, KentPatientAdapter)
assert len(pl) == 1
for pat in pl:
f = pat.as_fhir()
assert f['resourceType'] == 'Patient'
assert f['name']['family'] == 'Aabb'
assert f['name']['given'] == ['Cccddee']
assert isinstance(pat.as_fhir()['birthDate'], str)
assert len(pl) == 1


def test_example_patients(example_csv):
pl = ResourceList(example_csv, SkagitPatientAdapter)
assert len(pl) == 10
for patient in pl:
fp = patient.as_fhir()
assert len(fp['identifier']) == 1
assert len(pl) == 10


def test_dups_example(parser_dups_csv):
pl = ResourceList(parser_dups_csv, SkagitPatientAdapter)
assert len(pl) == 2
for patient in pl:
fp = patient.as_fhir()
assert fp['name']['family'] in ("Potter", "Granger")
assert fp['birthDate'] in ('1966-01-01', '1972-11-25')
assert len(pl) == 2


@pytest.mark.skip(reason="lack ability to mock patients in HAPI")
Expand Down

0 comments on commit 2b20681

Please sign in to comment.