Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rc/test epic integration #35125

Open
wants to merge 34 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
19ba9a6
add epic secret vars
Robert-Costello Aug 22, 2024
30fe0d0
generate json web token to epic specs
Robert-Costello Aug 22, 2024
234bc5c
add method to request epic access token
Robert-Costello Aug 22, 2024
25db683
raise exception for failed token request
Robert-Costello Aug 28, 2024
01281d5
add method to get appointment data for patient
Robert-Costello Aug 28, 2024
b764511
change response format to json
Robert-Costello Aug 30, 2024
c6be7af
add FF toggle
Robert-Costello Aug 30, 2024
0ed677b
Merge branch 'master' into rc/test-epic-integration
Robert-Costello Sep 5, 2024
de2bac9
use patient fhir id for appointment search
Robert-Costello Sep 13, 2024
173c017
add time util methods
Robert-Costello Sep 13, 2024
9dd9c75
sync patient and appointment data
Robert-Costello Sep 13, 2024
f148de9
add periodic task
Robert-Costello Sep 13, 2024
9d0c563
catch access token failure
Robert-Costello Sep 13, 2024
442e112
Merge branch 'master' into rc/test-epic-integration
Robert-Costello Sep 13, 2024
9fe01a6
lint
Robert-Costello Sep 16, 2024
461a163
add reason and practitioner properties to new appointment cases
Robert-Costello Sep 16, 2024
cd705bc
Merge branch 'master' into rc/test-epic-integration
Robert-Costello Sep 16, 2024
e132919
lint
Robert-Costello Sep 16, 2024
4b57d2c
use urlencode for query params
Robert-Costello Sep 17, 2024
4a30572
lint
Robert-Costello Sep 17, 2024
6651007
use f-string
Robert-Costello Sep 17, 2024
2409a2e
ensure patient entry list is not empty
Robert-Costello Sep 17, 2024
1d51e47
continue past patient if no fhir id
Robert-Costello Sep 17, 2024
92ba265
name formatting
Robert-Costello Sep 17, 2024
5a90a7f
map appointment fhir ids to cases
Robert-Costello Sep 17, 2024
7d14e11
add response handler method
Robert-Costello Sep 17, 2024
7955b1c
account for secs and milli added by commcare
Robert-Costello Sep 17, 2024
36a1884
no need to get keys
Robert-Costello Sep 18, 2024
ccbf366
ensure required params are present
Robert-Costello Sep 29, 2024
b7e0699
fix param names
Robert-Costello Sep 29, 2024
5001b16
rename case property
Robert-Costello Sep 30, 2024
829cbdc
default properties to empty strings for case helper
Robert-Costello Sep 30, 2024
1f7bf89
check for correct fhir id property
Robert-Costello Sep 30, 2024
ac65877
handle absence of "reference"
Robert-Costello Dec 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 252 additions & 1 deletion corehq/motech/fhir/tasks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import dateutil, requests, jwt, time, uuid

Check failure on line 1 in corehq/motech/fhir/tasks.py

View workflow job for this annotation

GitHub Actions / Flake8

corehq/motech/fhir/tasks.py#L1

Multiple imports on one line (E401)
from datetime import datetime

from collections import namedtuple
from typing import Generator, List
from uuid import uuid4
Expand All @@ -12,8 +15,9 @@
from corehq import toggles
from corehq.apps.celery import periodic_task, task
from corehq.apps.hqcase.utils import submit_case_blocks
from corehq.apps.hqcase.case_helper import CaseHelper
from corehq.form_processor.exceptions import CaseNotFound
from corehq.form_processor.models import CommCareCase
from corehq.form_processor.models import CommCareCase, CommCareCaseIndex
from corehq.motech.const import (
IMPORT_FREQUENCY_DAILY,
IMPORT_FREQUENCY_MONTHLY,
Expand Down Expand Up @@ -394,5 +398,252 @@
)


def generate_epic_jwt():
key = settings.EPIC_PRIVATE_KEY
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you consider using ConnectionSettings for storing (and encrypting) API auth secrets?

# token will expire in 4 mins
exp = int(time.time()) + 240
jti = str(uuid.uuid4())
header = {
"alg": "RS256",
"typ": "JWT",
}
payload = {
"iss": settings.EPIC_CLIENT_ID,
"sub": settings.EPIC_CLIENT_ID,
"aud": "https://fhir.epic.com/interconnect-fhir-oauth/oauth2/token",
"jti": jti,
"exp": exp
}
token = jwt.encode(payload, key, algorithm="RS256", headers=header)
return token


def request_epic_access_token():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handling authentication and authorization inside the corehq.motech.fhir.tasks module feels wrong. I think this either belongs in corehq.motech.auth, or maybe if this code is specific to only one project, it should go in custom.epic?

headers = {
"Content_Type": "application/x-www-form-urlencoded",
}
data = {
"grant_type": "client_credentials",
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": generate_epic_jwt()
}
url = "https://fhir.epic.com/interconnect-fhir-oauth/oauth2/token"
response = requests.post(url, data=data, headers=headers)
if response.status_code == 200:
return response.json().get('access_token')
elif response.status_code >= 400:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens for 3xx and 4xx status codes?

return response.raise_for_status()


def get_patient_fhir_id(given_name, family_name, birthdate, access_token):
url = f"https://fhir.epic.com/interconnect-fhir-oauth/api/FHIR/R4/Patient?birthdate={birthdate}&family={family_name}&given={given_name}&_format=json"

Check failure on line 439 in corehq/motech/fhir/tasks.py

View workflow job for this annotation

GitHub Actions / Flake8

corehq/motech/fhir/tasks.py#L439

Line too long (153 > 115 characters) (E501)
Robert-Costello marked this conversation as resolved.
Show resolved Hide resolved
headers = {
'authorization': 'Bearer %s' % access_token,
Robert-Costello marked this conversation as resolved.
Show resolved Hide resolved
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
response_json = response.json()
fhir_id = None
entry = response_json.get('entry')[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be useful to have clearer docs available for what the response format is. It's hard to tell if this is correct. It seems like you expect it to be a list with at least 1 item in it but that item might be falsy (empty list, empty dict, null etc).

This code will error if entry is not in the response or if its an empty list.

Copy link
Contributor Author

@Robert-Costello Robert-Costello Sep 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the open epic docs entry is a required property, but may be an empty list. I added a check here 2409a2e

Name Description Is Optional Is Array
entry (Entry) An array of entries included in this bundle. false true

Name Description Is Optional Is Array
entry (Entry) An array of entries included in this bundle. false true

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this bundle

Bundles can be paginated. You can find code for iterating resources inside paginated bundles here.

if entry:

Check failure on line 448 in corehq/motech/fhir/tasks.py

View workflow job for this annotation

GitHub Actions / Flake8

corehq/motech/fhir/tasks.py#L448

Trailing whitespace (W291)
resource = entry.get('resource')
if resource:
fhir_id = resource.get('id')
return fhir_id
elif response.status_code >= 400:
response.raise_for_status()


# TODO add time param 12 weeks from study start date
def get_epic_appointments_for_patient(fhir_id, access_token):
appointments = []
headers = {
'authorization': 'Bearer %s' % access_token,
}
url = f"https://fhir.epic.com/interconnect-fhir-oauth/api/FHIR/R4/Appointment?&patient={fhir_id}&service-category=appointment&_format=json"

Check failure on line 463 in corehq/motech/fhir/tasks.py

View workflow job for this annotation

GitHub Actions / Flake8

corehq/motech/fhir/tasks.py#L463

Line too long (143 > 115 characters) (E501)
response = requests.get(url, headers=headers)
if response.status_code == 200:
json_response = response.json()
entries = json_response['entry']
for entry in entries:
appointments.append(entry)
elif response.status_code >= 400:
snopoke marked this conversation as resolved.
Show resolved Hide resolved
response.raise_for_status()
return appointments


def convert_date_and_time_to_utc_timestamp(date, time):
date_time = date + "T" + time
utc_zone = dateutil.tz.gettz('UTC')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can use the UTC constant: https://docs.python.org/3/library/datetime.html#datetime.timezone.utc

Suggested change
utc_zone = dateutil.tz.gettz('UTC')
utc_zone = timezone.utc

# Hardcoded for MGH study
local_zone = dateutil.tz.gettz('America/New_York')
local_datetime = datetime.fromisoformat(date_time)
local_datetime = local_datetime.replace(tzinfo=local_zone)
utc_datetime = local_datetime.astimezone(utc_zone)
Comment on lines +503 to +506
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be handled by out timezone utils:

utc_datetime = UserTime(date_time, tzinfo=local_zone).server_time().done()

utc_iso_format = utc_datetime.strftime('%Y-%m-%dT%H:%M:%SZ')

return utc_iso_format


def convert_utc_timestamp_to_date_and_time(utc_timestamp):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar comment to above re TZ conversion using utilities

utc_zone = dateutil.tz.gettz('UTC')
local_zone = dateutil.tz.gettz('America/New_York')
utc_datetime = datetime.fromisoformat(utc_timestamp.replace('Z', ''))
utc_datetime = utc_datetime.replace(tzinfo=utc_zone)
local_datetime = utc_datetime.astimezone(local_zone)
date = local_datetime.strftime('%Y-%m-%d')
time = local_datetime.strftime('%H:%M')

return date, time


def sync_all_appointments_domain(domain):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know the kinds of scale we expect (# patient / appointment cases). I'm just wondering if this should be done in batches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did give some thought to batching. I'm not sure about the number of patients, but I'm estimating that the number of appointments per patient will be relatively low for the timeframe of 12 weeks (still need to add the time constraint to the appointment query). One appointment per week seems like the upper end to me. If you double that it's still only 24 per patient. I'll talk to Chantal and see if she thinks those numbers are low and also get a sense of the number of patients.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from Chantal:
the max number of patients enrolled in the study will be 55, with rolling enrollment over the next year
~5 patients per month and they are enrolled for 12 weeks so maybe ~15 open cases at a time in a given month

try:
access_token = request_epic_access_token()
except Exception:
return None
# get all patient case ids for domain
patient_case_ids = CommCareCase.objects.get_open_case_ids_in_domain_by_type(domain, 'patient')
# get all patient cases
patient_cases = CommCareCase.objects.get_cases(patient_case_ids)

# get extension (appointment) cases ids, for all patients
appointment_case_ids = CommCareCaseIndex.objects.get_extension_case_ids(
domain, patient_case_ids, False, 'appointment')
# get appointment cases in commcare
appointment_cases = CommCareCase.objects.get_cases(appointment_case_ids)

# get fhir ids for appointments currently in commcare
appointment_fhir_ids = [
appointment_case.get_case_property('fhir_id') for appointment_case in appointment_cases
]

for patient in patient_cases:
patient_helper = CaseHelper(case=patient, domain=domain)
patient_fhir_id = patient.get_case_property('patient_fhir_id')
if not patient_fhir_id:
given = patient.get_case_property('given_name')
family = patient.get_case_property('family_name')
birthdate = patient.get_case_property('birthdate')
patient_fhir_id = get_patient_fhir_id(given, family, birthdate, access_token)

if patient_fhir_id is not None:
patient_helper.update({'properties': {
'patient_fhir_id': patient_fhir_id,
}})
Robert-Costello marked this conversation as resolved.
Show resolved Hide resolved

epic_appointments_to_add = []
epic_appointments_to_update = []
# Get all appointments for patient from epic
epic_appointment_records = get_epic_appointments_for_patient(patient_fhir_id, access_token)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you look at using FHIRImportConfig (documentation)?

Would it be a lot of effort to extend it to allow syncing appointments for this project?

  • We might need to make some changes to allow for extension cases instead of child cases.
  • We would probably need to be smarter regarding how we set case ownership.
  • We would either need all cases to store their corresponding FHIR ID in their external_id case property, or use case search to fetch them.
  • We probably need to add a (custom?) serializer to handle timestamps for this project.

... and maybe it's a lot more complicated than that.

for appointment in epic_appointment_records:
appointment_resource = appointment.get('resource')
appointment_id = appointment_resource.get('id')
if appointment_id and appointment_id not in appointment_fhir_ids:
epic_appointments_to_add.append(appointment)
elif appointment_id:
epic_appointments_to_update.append(appointment)

# Add new appointments to commcare
for appointment in epic_appointments_to_add:
appointment_create_helper = CaseHelper(domain=domain)
appointment_resource = appointment.get('resource')
if appointment_resource is not None:
appointment_description = appointment_resource.get('description') or 'NO DESCRIPTION LISTED'
appointment_fhir_timestamp = appointment_resource.get('start')
appointment_date, appointment_time = convert_utc_timestamp_to_date_and_time(
appointment_fhir_timestamp)
appointment_fhir_id = appointment_resource.get('id')
reason = None
practitioner = None
for p in appointment_resource.get('participant'):
actor = p.get('actor')
if actor and 'Practitioner' in actor.get('reference'):
practitioner = actor.get('display')
break
reason_code = appointment_resource.get('reasonCode')
if reason_code and reason_code[0] is not None:
reason = reason_code[0].get('text')
host_case_id = patient.get_case_property('case_id')
appointment_case_data = {
'case_name': appointment_fhir_timestamp + appointment_description,
Robert-Costello marked this conversation as resolved.
Show resolved Hide resolved
'case_type': 'appointment',
'indices': {
'patient': {
'case_id': host_case_id,
'case_type': 'patient',
'relationship': 'extension',
}
},
'properties': {
'appointment_description': appointment_description,
'appointment_fhir_timestamp': appointment_fhir_timestamp,
'appointment_date': appointment_date,
'appointment_time': appointment_time,
'patient_fhir_id': patient_fhir_id,
'fhir_id': appointment_fhir_id,
'reason': reason,
'practitioner': practitioner
}
}
appointment_create_helper.create_case(appointment_case_data)

# Update existing appointments in commcare if properties have changed in epic
for appointment in epic_appointments_to_update:
epic_properties_map = {}
appointment_resource = appointment.get('resource')
if appointment_resource is not None:
appointment_description = appointment_resource.get('description') or 'NO DESCRIPTION LISTED'
appointment_fhir_timestamp = appointment_resource.get('start')
appointment_date, appointment_time = convert_utc_timestamp_to_date_and_time(
appointment_fhir_timestamp)
appointment_fhir_id = appointment_resource.get('id')
reason = None
practitioner = None
for p in appointment_resource.get('participant'):
actor = p.get('actor')
if actor and 'Practitioner' in actor.get('reference'):
practitioner = actor.get('display')
break
reason_code = appointment_resource.get('reasonCode')
if reason_code and reason_code[0] is not None:
reason = reason_code[0].get('text')
epic_properties_map.update({
'appointment_description': appointment_description,
'appointment_fhir_timestamp': appointment_fhir_timestamp,
'practitioner': practitioner,
'reason': reason
})
appointment_case = None
for case in appointment_cases:
Robert-Costello marked this conversation as resolved.
Show resolved Hide resolved
if case.get_case_property('fhir_id') == appointment_fhir_id:
appointment_case = case
break
appointment_update_helper = CaseHelper(case=appointment_case, domain=domain)
case_properties_to_update = {}
changes = False
# check for changes and add to case_properties_to_update
for k, v in epic_properties_map.items():
current_value = appointment_case.get_case_property(k)
if current_value != v:
changes = True
case_properties_to_update.update({k: v})
if k == 'appointment_fhir_timestamp':
appointment_date, appointment_time = convert_utc_timestamp_to_date_and_time(v)
case_properties_to_update.update({
'appointment_date': appointment_date,
'appointment_time': appointment_time,
})

if changes:
appointment_update_helper.update({'properties': case_properties_to_update})


@periodic_task(run_every=crontab(hour="*", minute=1), queue=settings.CELERY_PERIODIC_QUEUE)
def sync_all_epic_appointments():
for domain in toggles.MGH_EPIC_STUDY.get_enabled_domains():
sync_all_appointments_domain(domain)


class ServiceRequestNotActive(Exception):
pass
7 changes: 7 additions & 0 deletions corehq/toggles/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2240,6 +2240,13 @@ def _commtrackify(domain_name, toggle_is_enabled):
help_link="https://confluence.dimagi.com/display/GS/FHIR+API+Documentation",
)

MGH_EPIC_STUDY = StaticToggle(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Note to self: Read all the code before you start to comment.)

Instead of using a feature flag, I'd advocate moving this code into custom.mgh_epic, and enable the custom module in settings.py for the project spaces you want.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking a look, Norman. This has been in a holding pattern for a while, but I'm coming back to it to do some more work now. I can move this to a custom module.

'mgh_epic_study',
'Enable updating/creating extension cases through open epic API',
TAG_CUSTOM,
namespaces=[NAMESPACE_DOMAIN],
)

AUTO_DEACTIVATE_MOBILE_WORKERS = StaticToggle(
'auto_deactivate_mobile_workers',
'Development flag for auto-deactivation of mobile workers. To be replaced '
Expand Down
3 changes: 3 additions & 0 deletions settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,9 @@ def _pkce_required(client_id):
# encryption or signing workflows.
HQ_PRIVATE_KEY = None

EPIC_PRIVATE_KEY = None
EPIC_CLIENT_ID = None

KAFKA_BROKERS = ['localhost:9092']
KAFKA_API_VERSION = None

Expand Down
Loading