Skip to content

Commit

Permalink
feat: add synapse support (#128)
Browse files Browse the repository at this point in the history
* feat: add synapse support

* chore: add examples tests for synapse

* docs: add docs for synapse support
  • Loading branch information
jayachithra authored Aug 30, 2024
1 parent 9c3b637 commit 10ab4c6
Show file tree
Hide file tree
Showing 29 changed files with 581 additions and 288 deletions.
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ repos:
exclude: |
(?x)^(
tests/functional/utf_16_encoding/pipeline-content.json|
examples/fabric/simple_web_hook/fabric/ExamplePipeline.DataPipeline/pipeline-content.json
examples/fabric/simple_web_hook/fabric/ExamplePipeline.DataPipeline/pipeline-content.json|
tests/functional/test_framework/data/fabric/pipeline-content.json
)$
- id: trailing-whitespace
- id: mixed-line-ending
Expand Down
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Data Factory - Testing Framework :hammer_and_wrench:

A stand-alone test framework that allows to write unit tests for Data Factory pipelines on [Microsoft Fabric](https://learn.microsoft.com/en-us/fabric/data-factory/) and [Azure Data Factory](https://learn.microsoft.com/en-us/azure/data-factory/concepts-pipelines-activities?tabs=data-factory).
A stand-alone test framework that allows to write unit tests for Data Factory pipelines on [Microsoft Fabric](https://learn.microsoft.com/en-us/fabric/data-factory/), [Azure Data Factory](https://learn.microsoft.com/en-us/azure/data-factory/concepts-pipelines-activities?tabs=data-factory) and [Azure Synapse Analytics](https://learn.microsoft.com/en-us/azure/data-factory/concepts-pipelines-activities?tabs=data-factory).

> The framework is currently in _Public Preview_ and is not officially supported by Microsoft.
Expand Down Expand Up @@ -84,6 +84,10 @@ Azure Data Factory:
1. [Copy blobs example](examples/data_factory/copy_blobs/README.md)
2. [Batch job example](examples/data_factory/batch_job/README.md)

Azure Synapse Analytics:

1. [Copy blobs example](examples/synapse/copy_blobs/README.md)

## Contributing :handshake:

This project welcomes contributions and suggestions. Most contributions require you to agree to a
Expand Down
4 changes: 2 additions & 2 deletions docs/advanced/development_workflow.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Recommended development workflow for Azure Data Factory v2
# Recommended development workflow for Azure Data Factory (ADF) v2 and Azure Synapse Analytics

* Use ADF Git integration
* Use ADF / Azure Synapse Analytics Git integration
* Use UI to create a feature branch, build the initial pipeline, and save it to the feature branch
* Pull feature branch locally
* Start writing unit and functional tests, run them locally for immediate feedback, and fix bugs
Expand Down
5 changes: 3 additions & 2 deletions docs/basic/repository_setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ To be able to write tests for data factory, the pipeline and activity definition

1. [Fabric - Git integration process](https://learn.microsoft.com/fabric/cicd/git-integration/git-integration-process)
2. [Azure Data Factory - Git integration process](https://learn.microsoft.com/azure/data-factory/source-control)
3. [Azure Synapse Analytics - Git integration process](https://learn.microsoft.com/en-us/azure/synapse-analytics/cicd/source-control)

### Alternative for Azure Data Factory
### Alternative for Azure Data Factory and Azure Synapse Analytics

To download a single JSON file for testing purposes, follow these steps:

1. Open the Data Factory instance, and open the pipeline to be tested.
1. Open the Data Factory or Synapse Analytics instance, and open the pipeline to be tested.
2. Click on the action ellipses
3. Click "Download support files"
4. Extract the zip file containing the pipeline definition in a folder of choice.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from data_factory_testing_framework.models.activities import Activity, ForEachActivity
from data_factory_testing_framework.state import (
PipelineRunState,
PipelineRunVariable,
RunParameter,
RunParameterType,
)
Expand All @@ -27,9 +26,6 @@ def test_list_blobs(pipeline: Pipeline) -> None:
# Arrange
activity = pipeline.get_activity_by_name("List Folders")
state = PipelineRunState(
variables=[
PipelineRunVariable(name="SourceContainerName", default_value="source"),
],
parameters=[
RunParameter(RunParameterType.Global, "SourceStorageAccountName", "sourcestorage"),
RunParameter(
Expand All @@ -55,9 +51,6 @@ def test_for_each(pipeline: Pipeline) -> None:
# Arrange
activity = pipeline.get_activity_by_name("For Each SourceFolder")
state = PipelineRunState(
variables=[
PipelineRunVariable(name="SourceContainerName", default_value="source"),
],
parameters=[
RunParameter(RunParameterType.Global, "SourceStorageAccountName", "sourcestorage"),
RunParameter(
Expand Down
12 changes: 12 additions & 0 deletions examples/synapse/copy_blobs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Copy Blobs

This is an example pipeline which intends to list all the blobs in a given container and copies these blobs to another container

![image](copy_blobs.png)

The pipeline has two activities:

1. **List folders**: Web activity to list all blobs in a container that has a given prefix
2. **For each activity**: Iterates over each item in the list returned above and executes the sub-activity on each item.

2.1. **Copy files to destination**: Copy activity which copies the blobs to a given destination.
Binary file added examples/synapse/copy_blobs/copy_blobs.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
172 changes: 172 additions & 0 deletions examples/synapse/copy_blobs/pipeline/copy_blobs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
{
"name": "copy_blobs",
"properties": {
"activities": [
{
"name": "List Folders",
"type": "WebActivity",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"method": "GET",
"headers": {
"x-ms-version": "2023-01-03"
},
"url": {
"value": "@concat('https://',pipeline().parameters.SourceStorageAccountName,'.blob.core.windows.net/',pipeline().parameters.SourceContainerName,'?restype=container&comp=list&prefix=',pipeline().parameters.SourceFolderPrefix,'&delimiter=$SourceBlobDelimiter')",
"type": "Expression"
},
"connectVia": {
"referenceName": "AutoResolveIntegrationRuntime",
"type": "IntegrationRuntimeReference"
},
"authentication": {
"type": "MSI",
"resource": "https://storage.azure.com"
}
}
},
{
"name": "For Each SourceFolder",
"type": "ForEach",
"dependsOn": [
{
"activity": "List Folders",
"dependencyConditions": [
"Succeeded"
]
}
],
"userProperties": [],
"typeProperties": {
"items": {
"value": "@xpath(xml(activity('List Folders').output.Response),'/EnumerationResults/Blobs/BlobPrefix/Name/text()')",
"type": "Expression"
},
"activities": [
{
"name": "Copy files to Destination",
"type": "Copy",
"dependsOn": [],
"policy": {
"timeout": "0.12:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"userProperties": [],
"typeProperties": {
"source": {
"type": "ParquetSource",
"storeSettings": {
"type": "AzureBlobStorageReadSettings",
"recursive": true,
"wildcardFolderPath": {
"value": "@item()",
"type": "Expression"
},
"wildcardFileName": "*.parquet"
},
"formatSettings": {
"type": "ParquetReadSettings"
}
},
"sink": {
"type": "ParquetSink",
"storeSettings": {
"type": "AzureBlobStorageWriteSettings",
"copyBehavior": "FlattenHierarchy"
},
"formatSettings": {
"type": "ParquetWriteSettings"
}
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"typeConversion": true,
"typeConversionSettings": {
"allowDataTruncation": true,
"treatBooleanAsNumber": false
}
}
},
"inputs": [
{
"referenceName": "Binary",
"type": "DatasetReference",
"parameters": {
"ServiceURI": {
"value": "@concat('https://',pipeline().parameters.SourceStorageAccountName,'.blob.core.windows.net')",
"type": "Expression"
},
"ContainerName": {
"value": "@pipeline().parameters.SourceContainerName",
"type": "Expression"
},
"FolderName": {
"value": "@pipeline().parameters.SourceFolderPrefix",
"type": "Expression"
}
}
}
],
"outputs": [
{
"referenceName": "Binary",
"type": "DatasetReference",
"parameters": {
"ServiceURI": {
"value": "@concat('https://',pipeline().parameters.SinkStorageAccountName,'.blob.core.windows.net')",
"type": "Expression"
},
"ContainerName": {
"value": "@pipeline().parameters.SinkContainerName",
"type": "Expression"
},
"FolderName": {
"value": "@pipeline().parameters.SinkFolderName",
"type": "Expression"
}
}
}
]
}
]
}
}
],
"parameters": {
"SourceContainerName": {
"type": "string"
},
"SourceFolderPrefix": {
"type": "string"
},
"SinkStorageAccountName": {
"type": "string"
},
"SinkContainerName": {
"type": "string"
},
"SinkFolderName": {
"type": "string"
},
"SourceStorageAccountName": {
"type": "string"
}
},
"folder": {
"name": "batch"
},
"annotations": []
}
}
75 changes: 75 additions & 0 deletions examples/synapse/copy_blobs/test_synapse_copy_blobs_functional.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import pytest
from data_factory_testing_framework import TestFramework, TestFrameworkType
from data_factory_testing_framework.state import (
DependencyCondition,
RunParameter,
RunParameterType,
)


def test_copy_blobs_pipeline(request: pytest.FixtureRequest) -> None:
# Arrange
test_framework = TestFramework(
framework_type=TestFrameworkType.DataFactory, root_folder_path=request.fspath.dirname
)
pipeline = test_framework.get_pipeline_by_name("copy_blobs")

# Act
activities = test_framework.evaluate_pipeline(
pipeline=pipeline,
parameters=[
RunParameter(RunParameterType.Pipeline, "SourceStorageAccountName", "sourcestorageaccount"),
RunParameter(RunParameterType.Pipeline, "SourceContainerName", "sourcecontainer"),
RunParameter(RunParameterType.Pipeline, "SourceFolderPrefix", "sourcefolder"),
RunParameter(RunParameterType.Pipeline, "SinkStorageAccountName", "sinkstorageaccount"),
RunParameter(RunParameterType.Pipeline, "SinkContainerName", "sinkcontainer"),
RunParameter(RunParameterType.Pipeline, "SinkFolderName", "sinkfolder"),
],
)

# Assert
list_folder_activity = next(activities)
assert list_folder_activity.name == "List Folders"
assert (
list_folder_activity.type_properties["url"].result
== "https://sourcestorageaccount.blob.core.windows.net/sourcecontainer?restype=container&comp=list&prefix=sourcefolder&delimiter=$SourceBlobDelimiter"
)
assert list_folder_activity.type_properties["method"] == "GET"
list_folder_activity.set_result(
result=DependencyCondition.SUCCEEDED,
output={
"Response": """
<EnumerationResults ServiceEndpoint="http://myaccount.blob.core.windows.net/" ContainerName="mycontainer">
<Prefix>testfolder</Prefix>
<Delimiter>$SourceBlobDelimiter</Delimiter>
<Blobs>
<BlobPrefix>
<Name>testfolder_1/$SourceBlobDelimiter</Name>
</BlobPrefix>
<BlobPrefix>
<Name>testfolder_2/$SourceBlobDelimiter</Name>
</BlobPrefix>
</Blobs>
</EnumerationResults>
"""
},
)

copy_activity = next(activities)

assert copy_activity.name == "Copy files to Destination"
assert copy_activity.type == "Copy"
assert (
copy_activity.type_properties["source"]["storeSettings"]["wildcardFolderPath"].result
== "testfolder_1/$SourceBlobDelimiter"
)

copy_activity = next(activities)
assert copy_activity.name == "Copy files to Destination"
assert copy_activity.type == "Copy"
assert (
copy_activity.type_properties["source"]["storeSettings"]["wildcardFolderPath"].result
== "testfolder_2/$SourceBlobDelimiter"
)

pytest.raises(StopIteration, lambda: next(activities))
Loading

0 comments on commit 10ab4c6

Please sign in to comment.