Documentation | Release Notes | Examples
DLT-META
is a metadata-driven framework based on Databricks Delta Live Tables (aka DLT) which lets you automate your bronze and silver data pipelines.
With this framework you need to record the source and target metadata in an onboarding json file which acts as the data flow specification aka Dataflowspec. A single generic DLT
pipeline takes the Dataflowspec
and runs your workloads.
- Capture input/output metadata in onboarding file
- Capture Data Quality Rules
- Capture processing logic as sql in Silver transformation file
- Apply appropriate readers based on input metadata
- Apply data quality rules with DLT expectations
- Apply CDC apply changes if specified in metadata
- Builds DLT graph based on input/output metadata
- Launch DLT pipeline
Refer to the FAQ and DLT-META documentation
- Create
onboarding.json
metadata file and save to s3/adls/dbfs e.g.onboarding file - Create
silver_transformations.json
and save to s3/adls/dbfs e.g Silver transformation file - Create data quality rules json and store to s3/adls/dbfs e.g Data Quality Rules
-
Go to your Databricks landing page and do one of the following:
-
In the sidebar, click Jobs Icon Workflows and click Create Job Button.
-
In the sidebar, click New Icon New and select Job from the menu.
-
In the task dialog box that appears on the Tasks tab, replace Add a name for your job… with your job name, for example, Python wheel example.
-
In Task name, enter a name for the task, for example,
dlt_meta_onboarding_pythonwheel_task
. -
In Type, select Python wheel.
-
In Package name, enter
dlt_meta
. -
In Entry point, enter
run
. -
Click Add under Dependent Libraries. In the Add dependent library dialog, under Library Type, click PyPI. Enter Package:
dlt-meta
-
Click Add.
-
In Parameters, select keyword argument then select JSON. Past below json parameters with :
{ "database": "dlt_demo", "onboarding_file_path": "dbfs:/onboarding_files/users_onboarding.json", "silver_dataflowspec_table": "silver_dataflowspec_table", "silver_dataflowspec_path": "dbfs:/onboarding_tables_cdc/silver", "bronze_dataflowspec_table": "bronze_dataflowspec_table", "import_author": "Ravi", "version": "v1", "bronze_dataflowspec_path": "dbfs:/onboarding_tables_cdc/bronze", "overwrite": "True", "env": "dev" }
Alternatly you can enter keyword arguments, click + Add and enter a key and value. Click + Add again to enter more arguments.
-
Click Save task.
-
Run now
-
Make sure job run successfully. Verify metadata in your dataflow spec tables entered in step: 9 e.g
dlt_demo.bronze_dataflowspec_table
,dlt_demo.silver_dataflowspec_table
-
Go to your Databricks landing page and select Create a notebook, or click New Icon New in the sidebar and select Notebook. The Create Notebook dialog appears.
-
In the Create Notebook dialogue, give your notebook a name e.g
dlt_meta_pipeline
and select Python from the Default Language dropdown menu. You can leave Cluster set to the default value. The Delta Live Tables runtime creates a cluster before it runs your pipeline. -
Click Create.
-
You can add the example dlt pipeline code or import iPython notebook as is.
-
Click Jobs Icon Workflows in the sidebar, click the Delta Live Tables tab, and click Create Pipeline.
-
Give the pipeline a name e.g. DLT_META_BRONZE and click File Picker Icon to select a notebook
dlt_meta_pipeline
created in step:Create a dlt launch notebook
. -
Optionally enter a storage location for output data from the pipeline. The system uses a default location if you leave Storage location empty.
-
Select Triggered for Pipeline Mode.
-
Enter Configuration parameters e.g.
"layer": "bronze", "bronze.dataflowspecTable": "dataflowspec table name", "bronze.group": "enter group name from metadata e.g. G1",
-
Enter target schema where you wants your bronze/silver tables to be created
-
Click Create.
-
Start pipeline: click the Start button on in top panel. The system returns a message confirming that your pipeline is starting
You can run integration tests from you local with dlt-meta.
-
Clone DLT-META
-
Open terminal and Goto root folder
DLT-META
-
Create environment variables.
export DATABRICKS_HOST=<DATABRICKS HOST>
export DATABRICKS_TOKEN=<DATABRICKS TOKEN> # Account needs permission to create clusters/dlt pipelines.
-
Run itegration tests for different supported input sources: cloudfiles, eventhub, kafka
4a. Run the command for cloudfiles
python integration-tests/run-integration-test.py --cloud_provider_name=aws --dbr_version=11.3.x-scala2.12 --source=cloudfiles --dbfs_path=dbfs:/tmp/DLT-META/
4b. Run the command for eventhub
python integration-tests/run-integration-test.py --cloud_provider_name=azure --dbr_version=11.3.x-scala2.12 --source=eventhub --dbfs_path=dbfs:/tmp/DLT-META/ --eventhub_name=iot --eventhub_secrets_scope_name=eventhubs_creds --eventhub_namespace=int_test-standard --eventhub_port=9093 --eventhub_producer_accesskey_name=producer ----eventhub_consumer_accesskey_name=consumer
For eventhub integration tests, the following are the prerequisites: 1. Needs eventhub instance running 2. Using Databricks CLI, Create databricks secrets scope for eventhub keys 3. Using Databricks CLI, Create databricks secrets to store producer and consumer keys using the scope created in step 2 Following are the mandatory arguments for running EventHubs integration test 1. Provide your eventhub topic name : ```--eventhub_name``` 2. Provide eventhub namespace using ```--eventhub_namespace``` 3. Provide eventhub port using ```--eventhub_port``` 4. Provide databricks secret scope name using ```----eventhub_secrets_scope_name``` 5. Provide eventhub producer access key name using ```--eventhub_producer_accesskey_name``` 6. Provide eventhub access key name using ```--eventhub_consumer_accesskey_name```
4c. Run the command for kafka
python3 integration-tests/run-integration-test.py --cloud_provider_name=aws --dbr_version=11.3.x-scala2.12 --source=kafka --dbfs_path=dbfs:/tmp/DLT-META/ --kafka_topic_name=dlt-meta-integration-test --kafka_broker=host:9092
For kafka integration tests, the following are the prerequisites: 1. Needs kafka instance running Following are the mandatory arguments for running EventHubs integration test 1. Provide your kafka topic name : ```--kafka_topic_name``` 2. Provide kafka_broker ```--kafka_broker```
Once finished integration output file will be copied locally to
integration-test-output_<run_id>.csv
-
Output of a successful run should have the following in the file
,0 0,Completed Bronze DLT Pipeline. 1,Completed Silver DLT Pipeline. 2,Validating DLT Bronze and Silver Table Counts... 3,Validating Counts for Table bronze_7b866603ab184c70a66805ac8043a03d.transactions_cdc. 4,Expected: 10002 Actual: 10002. Passed! 5,Validating Counts for Table bronze_7b866603ab184c70a66805ac8043a03d.transactions_cdc_quarantine. 6,Expected: 9842 Actual: 9842. Passed! 7,Validating Counts for Table bronze_7b866603ab184c70a66805ac8043a03d.customers_cdc. 8,Expected: 98928 Actual: 98928. Passed! 9,Validating Counts for Table silver_7b866603ab184c70a66805ac8043a03d.transactions. 10,Expected: 8759 Actual: 8759. Passed! 11,Validating Counts for Table silver_7b866603ab184c70a66805ac8043a03d.customers. 12,Expected: 87256 Actual: 87256. Passed!
Please note that all projects released under Databricks Labs
are provided for your exploration only, and are not formally supported by Databricks with Service Level Agreements
(SLAs). They are provided AS-IS and we do not make any guarantees of any kind. Please do not submit a support ticket
relating to any issues arising from the use of these projects.
Any issues discovered through the use of this project should be filed as issues on the Github Repo.
They will be reviewed as time permits, but there are no formal SLAs for support.