Skip to content

Commit

Permalink
Airlock processor handles request Submission (#1978)
Browse files Browse the repository at this point in the history
* Handling submitted/approved/rejected statusChangedEvent
terraform support for the az function

* Consolidate blob created queues to single SB topic + two subscriptions
Added support for blob created event handling in the airlock processor
airlock processor uses managed identity to get keys (+ added role assignment to SAs)
  • Loading branch information
eladiw authored Jun 14, 2022
1 parent a85e52c commit 11a3962
Show file tree
Hide file tree
Showing 24 changed files with 435 additions and 97 deletions.
3 changes: 2 additions & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@
"mikestead.dotenv",
"humao.rest-client",
"timonwong.shellcheck",
"ms-azuretools.vscode-bicep"
"ms-azuretools.vscode-bicep",
"ms-azuretools.vscode-azurefunctions"
],
"forwardPorts": [
8000
Expand Down
1 change: 1 addition & 0 deletions airlock_processor/.funcignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.venv
35 changes: 35 additions & 0 deletions airlock_processor/BlobCreatedTrigger/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import logging

import azure.functions as func
import datetime
import uuid
import json
import re


def main(msg: func.ServiceBusMessage,
outputEvent: func.Out[func.EventGridOutputEvent]):

logging.info("Python ServiceBus topic trigger processed message - A new blob was created!.")
body = msg.get_body().decode('utf-8')
logging.info('Python ServiceBus queue trigger processed message: %s', body)

json_body = json.loads(body)
# message is due to blob creation in an 'in-progress' blob
if "stalimip" in json_body["topic"]:
completed_step = "submitted"
new_status = "in-progress"
request_id = re.search(r'/blobServices/default/containers/(.*?)/blobs', json_body["subject"]).group(1)

# Todo delete old container here
# https://github.com/microsoft/AzureTRE/issues/1963

# reply with a step completed event
outputEvent.set(
func.EventGridOutputEvent(
id=str(uuid.uuid4()),
data={"completed_step": completed_step, "new_status": new_status, "request_id": request_id},
subject=request_id,
event_type="Airlock.StepResult",
event_time=datetime.datetime.utcnow(),
data_version="1.0"))
21 changes: 21 additions & 0 deletions airlock_processor/BlobCreatedTrigger/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"scriptFile": "__init__.py",
"entryPoint": "main",
"bindings": [
{
"name": "msg",
"type": "serviceBusTrigger",
"direction": "in",
"topicName": "%BLOB_CREATED_TOPIC_NAME%",
"subscriptionName": "%TOPIC_SUBSCRIPTION_NAME%",
"connection": "SB_CONNECTION_STRING"
},
{
"type": "eventGrid",
"name": "outputEvent",
"topicEndpointUri": "EVENT_GRID_TOPIC_URI_SETTING",
"topicKeySetting": "EVENT_GRID_TOPIC_KEY_SETTING",
"direction": "out"
}
]
}
147 changes: 135 additions & 12 deletions airlock_processor/StatusChangedQueueTrigger/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,141 @@
import logging

import azure.functions as func
import datetime
import os
import json
from shared_code import blob_operations, constants
from azure.identity import DefaultAzureCredential
from azure.mgmt.storage import StorageManagementClient
from pydantic import BaseModel, parse_obj_as


def main(msg: func.ServiceBusMessage,
outputEvent: func.Out[func.EventGridOutputEvent]):
class RequestProperties(BaseModel):
request_id: str
status: str
type: str
workspace_id: str

logging.info('Python ServiceBus queue trigger processed message: %s', msg.get_body().decode('utf-8'))
outputEvent.set(
func.EventGridOutputEvent(
id="step-result-id",
data={"tag1": "value1", "tag2": "value2"},
subject="test-subject",
event_type="test-event-1",
event_time=datetime.datetime.utcnow(),
data_version="1.0"))

class ContainersCopyMetadata:
source_account_name: str
source_account_key: str
sa_source_connection_string: str
sa_dest_connection_string: str

def __init__(self, source_account_name: str, source_account_key: str, sa_source_connection_string: str, sa_dest_connection_string: str):
self.source_account_name = source_account_name
self.source_account_key = source_account_key
self.sa_source_connection_string = sa_source_connection_string
self.sa_dest_connection_string = sa_dest_connection_string


def main(msg: func.ServiceBusMessage):

body = msg.get_body().decode('utf-8')
logging.info('Python ServiceBus queue trigger processed message: %s', body)

try:
request_properties = extract_properties(body)

new_status = request_properties.status
req_id = request_properties.request_id
ws_id = request_properties.workspace_id
request_type = request_properties.type
except Exception as e:
logging.error(f'Failed processing request - invalid message: {body}, exc: {e}')
raise

logging.info('Processing request with id %s. new status is "%s", type is "%s"', req_id, new_status, type)

if (is_require_data_copy(new_status)):
logging.info('Request with id %s. requires data copy between storage accounts', req_id)
containers_metadata = get_source_dest_env_vars(new_status, request_type, ws_id)
blob_operations.copy_data(containers_metadata.source_account_name, containers_metadata.source_account_key, containers_metadata.sa_source_connection_string, containers_metadata.sa_dest_connection_string, req_id)
return

# Todo: handle other cases...


def extract_properties(body: str) -> RequestProperties:
try:
json_body = json.loads(body)
result = parse_obj_as(RequestProperties, json_body["data"])
if not result:
raise Exception("Failed parsing request properties")
except json.decoder.JSONDecodeError:
logging.error(f'Error decoding object: {body}')
raise
except Exception as e:
logging.error(f'Error extracting properties: {e}')
raise

return result


def is_require_data_copy(new_status: str):
if new_status.lower() in [constants.STAGE_SUBMITTED, constants.STAGE_APPROVED, constants.STAGE_REJECTED, constants.STAGE_BLOCKED]:
return True
return False


def get_source_dest_env_vars(new_status: str, request_type: str, short_workspace_id: str) -> ContainersCopyMetadata:

# sanity
if is_require_data_copy(new_status) is False:
raise Exception("Given new status is not supported")

try:
tre_id = os.environ["TRE_ID"]
subscription_id = os.environ["AZURE_SUBSCRIPTION_ID"]
except KeyError as e:
logging.error(f'Missing environment variable: {e}')
raise

request_type = request_type.lower()
if request_type != "import" and request_type != "export":
raise Exception("Request type must be either import or export")

if new_status == 'submitted' and request_type == 'import':
source_account_name = constants.STORAGE_ACCOUNT_NAME_IMPORT_EXTERNAL.format(tre_id)
dest_account_name = constants.STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS.format(tre_id)
source_account_rg = constants.CORE_RG_NAME.format(tre_id)
dest_account_rg = source_account_rg
logging.info("source account [%s rg: %s]. dest account [%s rg: %s]", source_account_name, source_account_rg, dest_account_name, dest_account_rg)
elif new_status == 'submitted' and request_type == 'export':
source_account_name = constants.STORAGE_ACCOUNT_NAME_EXPORT_INTERNAL.format(short_workspace_id)
dest_account_name = constants.STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS.format(short_workspace_id)
source_account_rg = constants.CORE_RG_NAME.format(tre_id, short_workspace_id)
dest_account_rg = source_account_rg
logging.info("source account [%s rg: %s]. dest account [%s rg: %s]", source_account_name, source_account_rg, dest_account_name, dest_account_rg)
elif new_status == 'approved' and request_type == 'import':
# https://github.com/microsoft/AzureTRE/issues/1841
pass
elif new_status == 'approved' and request_type == 'export':
# https://github.com/microsoft/AzureTRE/issues/1841
pass
elif new_status == 'rejected' and request_type == 'import':
# https://github.com/microsoft/AzureTRE/issues/1842
pass
elif new_status == 'rejected' and request_type == 'export':
# https://github.com/microsoft/AzureTRE/issues/1842
pass

managed_identity = os.environ.get("MANAGED_IDENTITY_CLIENT_ID")
if managed_identity:
logging.info("using the Airlock processor's managed identity to get build storage management client")
credential = DefaultAzureCredential(managed_identity_client_id=os.environ["MANAGED_IDENTITY_CLIENT_ID"], exclude_shared_token_cache_credential=True) if managed_identity else DefaultAzureCredential()

storage_client = StorageManagementClient(credential, subscription_id)
source_storage_keys = storage_client.storage_accounts.list_keys(source_account_rg, source_account_name)
source_storage_keys = {v.key_name: v.value for v in source_storage_keys.keys}

dest_storage_keys = storage_client.storage_accounts.list_keys(dest_account_rg, dest_account_name)
dest_storage_keys = {v.key_name: v.value for v in dest_storage_keys.keys}

conn_string_base = "DefaultEndpointsProtocol=https;EndpointSuffix=core.windows.net;AccountName={};AccountKey={}"
source_account_key = source_storage_keys['key1']
sa_source_connection_string = conn_string_base.format(source_account_name, source_account_key)
dest_account_key = dest_storage_keys['key1']
sa_dest_connection_string = conn_string_base.format(dest_account_name, dest_account_key)

return ContainersCopyMetadata(source_account_name, source_account_key, sa_source_connection_string, sa_dest_connection_string)
7 changes: 0 additions & 7 deletions airlock_processor/StatusChangedQueueTrigger/function.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,6 @@
"direction": "in",
"queueName": "%AIRLOCK_STATUS_CHANGED_QUEUE_NAME%",
"connection": "SB_CONNECTION_STRING"
},
{
"type": "eventGrid",
"name": "outputEvent",
"topicEndpointUri": "EVENT_GRID_TOPIC_URI_SETTING",
"topicKeySetting": "EVENT_GRID_TOPIC_KEY_SETTING",
"direction": "out"
}
]
}
2 changes: 1 addition & 1 deletion airlock_processor/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.0.2"
__version__ = "0.0.3"
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class AirlockInvalidContainerException(Exception):
pass
12 changes: 12 additions & 0 deletions airlock_processor/local.settings.json-sample
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"IsEncrypted": false,
"Values": {
"FUNCTIONS_WORKER_RUNTIME": "python",
"AIRLOCK_STATUS_CHANGED_QUEUE_NAME": "status_changed",
"SB_CONNECTION_STRING": "Endpoint=sb://XXXX.servicebus.windows.net/;SharedAccessKeyName=.....",
"AZURE_SUBSCRIPTION_ID": "",
"BLOB_CREATED_TOPIC_NAME": "",
"TOPIC_SUBSCRIPTION_NAME":"",
"TRE_ID": ""
}
}
7 changes: 6 additions & 1 deletion airlock_processor/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# Do not include azure-functions-worker as it may conflict with the Azure Functions platform

azure-functions
azure-functions
azure-storage-blob
azure-identity
azure-mgmt-storage
azure-mgmt-resource
pydantic
55 changes: 55 additions & 0 deletions airlock_processor/shared_code/blob_operations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import logging

import datetime
from azure.storage.blob import ContainerSasPermissions, generate_container_sas, BlobServiceClient

from exceptions.AirlockInvalidContainerException import AirlockInvalidContainerException


def copy_data(source_account_name: str, source_account_key: str, sa_source_connection_string: str, sa_dest_connection_string: str, request_id: str):
container_name = request_id

# token geneation with expiry of 1 hour. since its not shared, we can leave it to expire (no need to track/delete)
# Remove sas token if not needed: https://github.com/microsoft/AzureTRE/issues/2034
sas_token = generate_container_sas(account_name=source_account_name,
container_name=container_name,
account_key=source_account_key,
permission=ContainerSasPermissions(read=True),
expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=1))

# Copy files
source_blob_service_client = BlobServiceClient.from_connection_string(sa_source_connection_string)
dest_blob_service_client = BlobServiceClient.from_connection_string(sa_dest_connection_string)

source_container_client = source_blob_service_client.get_container_client(container_name)

try:
found_blobs = 0
blob_name = ""
for blob in source_container_client.list_blobs():
if found_blobs > 0:
msg = "Request with id {} contains more than 1 file. flow aborted.".format(request_id)
logging.error(msg)
raise AirlockInvalidContainerException(msg)
blob_name = blob.name
found_blobs += 1

if found_blobs == 0:
logging.info('Request with id %s did not contain any files. flow aborted.', request_id)

except Exception:
logging.error('Request with id %s failed.', request_id)
raise()

source_blob = source_container_client.get_blob_client(blob_name)

source_url = f'{source_blob.url}?{sas_token}'
# source_url = source_blob.url

copied_blob = dest_blob_service_client.get_blob_client(container_name, source_blob.blob_name)
copy = copied_blob.start_copy_from_url(source_url)

try:
logging.info("Copy operation returned 'copy_id': '%s', 'copy_status': '%s'", copy["copy_id"], copy["copy_status"])
except KeyError as e:
logging.error(f"Failed getting operation id and status {e}")
17 changes: 17 additions & 0 deletions airlock_processor/shared_code/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# RG
CORE_RG_NAME = "rg-{}"
WS_RG_NAME = "rg-{}-ws-{}"

# Import
STORAGE_ACCOUNT_NAME_IMPORT_EXTERNAL = "stalimex{}"
STORAGE_ACCOUNT_NAME_IMPORT_INPROGRESS = "stalimip{}"

# Export
STORAGE_ACCOUNT_NAME_EXPORT_INTERNAL = "stalexintws{}"
STORAGE_ACCOUNT_NAME_EXPORT_INPROGRESS = "stalexipws{}"

# Stages
STAGE_SUBMITTED = "submitted"
STAGE_APPROVED = "approved"
STAGE_REJECTED = "rejected"
STAGE_BLOCKED = "blocked"
Empty file.
51 changes: 51 additions & 0 deletions airlock_processor/tests/test_copy_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from json import JSONDecodeError
import unittest

from StatusChangedQueueTrigger import extract_properties, get_source_dest_env_vars, is_require_data_copy


class TestPropertiesExtraction(unittest.TestCase):
def test_extract_prop_valid_body_return_all_values(self):
msg = "{ \"data\": { \"request_id\":\"123\",\"status\":\"456\" , \"type\":\"789\", \"workspace_id\":\"ws1\" }}"
req_prop = extract_properties(msg)
self.assertEqual(req_prop.request_id, "123")
self.assertEqual(req_prop.status, "456")
self.assertEqual(req_prop.type, "789")
self.assertEqual(req_prop.workspace_id, "ws1")

def test_extract_prop_missing_arg_throws(self):
msg = "{ \"data\": { \"status\":\"456\" , \"type\":\"789\", \"workspace_id\":\"ws1\" }}"
self.assertRaises(Exception, extract_properties, msg)

msg = "{ \"data\": { \"request_id\":\"123\", \"type\":\"789\", \"workspace_id\":\"ws1\" }}"
self.assertRaises(Exception, extract_properties, msg)

msg = "{ \"data\": { \"request_id\":\"123\",\"status\":\"456\" , \"workspace_id\":\"ws1\" }}"
self.assertRaises(Exception, extract_properties, msg)

msg = "{ \"data\": { \"request_id\":\"123\",\"status\":\"456\" , \"type\":\"789\" }}"
self.assertRaises(Exception, extract_properties, msg)

def test_extract_prop_invalid_json_throws(self):
msg = "Hi"
self.assertRaises(JSONDecodeError, extract_properties, msg)


class TestDataCopyProperties(unittest.TestCase):
def test_only_specific_status_are_triggering_copy(self):
self.assertEqual(is_require_data_copy("Mitzi"), False)
self.assertEqual(is_require_data_copy(""), False)
self.assertEqual(is_require_data_copy("submit"), False)

# Testing all values that should return true
self.assertEqual(is_require_data_copy("submITted"), True)
self.assertEqual(is_require_data_copy("submitted"), True)
self.assertEqual(is_require_data_copy("approved"), True)
self.assertEqual(is_require_data_copy("REJected"), True)
self.assertEqual(is_require_data_copy("blocked"), True)

def test_wrong_status_raises_when_getting_storage_account_properties(self):
self.assertRaises(Exception, get_source_dest_env_vars, "Miaow", "import")

def test_wrong_type_raises_when_getting_storage_account_properties(self):
self.assertRaises(Exception, get_source_dest_env_vars, "accepted", "somethingelse")
Loading

0 comments on commit 11a3962

Please sign in to comment.