Amundsen Databuilder is a data ingestion library, which is inspired by Apache Gobblin. It could be used in an orchestration framework(e.g. Apache Airflow) to build data from Amundsen. You could use the library either with an adhoc python script(example) or inside an Apache Airflow DAG(example).
For information about Amundsen and our other services, visit the main repository. Please also see our instructions for a quick start setup of Amundsen with dummy data, and an overview of the architecture.
- Python = 2.7.x (And Python >= 3.x if you don't use column usage transformer as it depends on antlr python 2 runtime)
ETL job consists of extraction of records from the source, transform records, if necessary, and load records into the sink. Amundsen Databuilder is a ETL framework for Amundsen and there are corresponding components for ETL called Extractor, Transformer, and Loader that deals with record level operation. A component called task controls all these three components. Job is the highest level component in Databuilder that controls task and publisher and is the one that client use to launch the ETL job.
In Databuilder, each components are highly modularized and each components are using namespace based config, HOCON config, which makes it highly reusable and pluggable. (e.g: transformer can be reused within extractor, or extractor can be reused within extractor) (Note that concept on components are highly motivated by Apache Gobblin)
Extractor extracts record from the source. This does not neccessarily mean that it only supports pull pattern in ETL. For example, extracting record from messaging bus make it a push pattern in ETL.
Transfomer takes record from either extractor or from transformer itself (via ChainedTransformer) to transform record.
A loader takes record from transformer or from extractor directly and load it to sink, or staging area. As loader is operated in record level, it's not capable of supporting atomicity.
A task orchestrates extractor, transformer, and loader to perform record level operation.
A record is represented by one of models.
A publisher is an optional component. It's common usage is to support atomicity in job level and/or to easily support bulk load into the sink.
Job is the highest level component in Databuilder, and it orchestrates task, and publisher.
An extractor that uses Python Database API interface. DBAPI requires three information, connection object that conforms DBAPI spec, a SELECT SQL statement, and a model class that correspond to the output of each row of SQL statement.
job_config = ConfigFactory.from_dict({
'extractor.dbapi{}'.format(DBAPIExtractor.CONNECTION_CONFIG_KEY): db_api_conn,
'extractor.dbapi.{}'.format(DBAPIExtractor.SQL_CONFIG_KEY ): select_sql_stmt,
'extractor.dbapi.model_class': 'package.module_name.class_name'
})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=DBAPIExtractor(),
loader=AnyLoader()))
job.launch()
An extractor that takes list of dict from user through config.
An extractor that extracts last updated time from Hive metastore and underlying file system. Although, hive metastore as a parameter called "last_modified_time", but it cannot be used as it provides DDL timestamp not DML timestamp. For this reason, HiveTableLastUpdatedExtractor is utilizing underlying file of Hive to fetch latest updated date. However, it is not efficient to poke all files in Hive, and it only pokes underlying storage for non-partitioned table. For partitioned table, it will fetch partition created timestamp, and it's close enough for last updated timestamp.
As getting metadata from files could be time consuming there're several features to increase performance.
- Support of multithreading to parallelize metadata fetching. Although, cpython's multithreading is not true multithreading as it's bounded by single core, getting metadata of file is mostly IO bound operation. Note that number of threads should be less or equal to number of connections.
- User can pass where clause to only include certain schema and also remove certain tables. For example, by adding something like
TBL_NAME NOT REGEXP '(tmp|temp)
would eliminate unncecessary computation.
job_config = ConfigFactory.from_dict({
'extractor.hive_table_last_updated.partitioned_table_where_clause_suffix': partitioned_table_where_clause,
'extractor.hive_table_last_updated.non_partitioned_table_where_clause_suffix'): non_partitioned_table_where_clause,
'extractor.hive_table_last_updated.extractor.sqlalchemy.{}'.format(
SQLAlchemyExtractor.CONN_STRING): connection_string,
'extractor.hive_table_last_updated.extractor.fs_worker_pool_size': pool_size,
'extractor.hive_table_last_updated.filesystem.{}'.format(FileSystem.DASK_FILE_SYSTEM): s3fs.S3FileSystem(
anon=False,
config_kwargs={'max_pool_connections': pool_size})})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=HiveTableLastUpdatedExtractor(),
loader=AnyLoader()))
job.launch()
An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from Hive metastore database.
job_config = ConfigFactory.from_dict({
'extractor.hive_table_metadata.{}'.format(HiveTableMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix,
'extractor.hive_table_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=HiveTableMetadataExtractor(),
loader=AnyLoader()))
job.launch()
An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a Postgres or Redshift database.
By default, the Postgres/Redshift database name is used as the cluter name. To override this, set USE_CATALOG_AS_CLUSTER_NAME
to False
, and CLUSTER_KEY
to what you wish to use as the cluster name.
The where_clause_suffix
below should define which schemas you'd like to query (see the sample dag for an example).
The SQL query driving the extraction is defined here
job_config = ConfigFactory.from_dict({
'extractor.postgres_metadata.{}'.format(PostgresMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix,
'extractor.postgres_metadata.{}'.format(PostgresMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME): True,
'extractor.postgres_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=PostgresMetadataExtractor(),
loader=AnyLoader()))
job.launch()
An extractor that extracts table and column metadata including database, schema, table name, table description, column name and column description from a Snowflake database.
By default, the Snowflake database name is used as the cluter name. To override this, set USE_CATALOG_AS_CLUSTER_NAME
to False
, and CLUSTER_KEY
to what you wish to use as the cluster name.
By default, the Snowflake database is set to PROD
. To override this, set DATABASE_KEY
to WhateverNameOfYourDb
.
The where_clause_suffix
below should define which schemas you'd like to query (see the sample dag for an example).
The SQL query driving the extraction is defined here
job_config = ConfigFactory.from_dict({
'extractor.postgres_metadata.{}'.format(PostgresMetadataExtractor.DATABASE_KEY): 'YourDbName',
'extractor.postgres_metadata.{}'.format(PostgresMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY): where_clause_suffix,
'extractor.postgres_metadata.{}'.format(PostgresMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME): True,
'extractor.postgres_metadata.extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string()})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=SnowflakeMetadataExtractor(),
loader=AnyLoader()))
job.launch()
An extractor that basically get current timestamp and passes it GenericExtractor. This extractor is basically being used to create timestamp for "Amundsen was last indexed on ..." in Amundsen web page's footer.
An extractor that extracts records from Neo4j based on provided Cypher query. One example is to extract data from Neo4j so that it can transform and publish to Elasticsearch.
job_config = ConfigFactory.from_dict({
'extractor.neo4j.{}'.format(Neo4jExtractor.CYPHER_QUERY_CONFIG_KEY)': cypher_query,
'extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint,
'extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY): 'package.module.class_name',
'extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user,
'extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=Neo4jExtractor(),
loader=AnyLoader()))
job.launch()
An extractor that is extracting Neo4j utilizing Neo4jExtractor where CYPHER query is already embedded in it.
job_config = ConfigFactory.from_dict({
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.GRAPH_URL_CONFIG_KEY): neo4j_endpoint,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.MODEL_CLASS_CONFIG_KEY): 'databuilder.models.neo4j_data.Neo4jDataResult',
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_USER): neo4j_user,
'extractor.search_data.extractor.neo4j.{}'.format(Neo4jExtractor.NEO4J_AUTH_PW): neo4j_password})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=Neo4jSearchDataExtractor(),
loader=AnyLoader()))
job.launch()
An extractor utilizes SQLAlchemy to extract record from any database that support SQL Alchemy.
job_config = ConfigFactory.from_dict({
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.CONN_STRING): connection_string(),
'extractor.sqlalchemy.{}'.format(SQLAlchemyExtractor.EXTRACT_SQL): sql,
'extractor.sqlalchemy.model_class': 'package.module.class_name'})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=SQLAlchemyExtractor(),
loader=AnyLoader()))
job.launch()
An extractor that extracts table usage from SQL statements. It accept any extractor that extracts row from source that has SQL audit log. Once SQL statement is extracted, it uses ANTLR to parse and get tables and columns that it reads from. Also, it aggregates usage based on table and user. (Column level aggregation is not there yet.)
A chanined transformer that can take multiple transformer.
Generic string replacement transformer using REGEX. User can pass list of tuples where tuple contains regex and replacement pair.
job_config = ConfigFactory.from_dict({
'transformer.regex_str_replace.{}'.format(REGEX_REPLACE_TUPLE_LIST): [(',', ' '), ('"', '')],
'transformer.regex_str_replace.{}'.format(ATTRIBUTE_NAME): 'instance_field_name',})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=AnyExtractor(),
transformer=RegexStrReplaceTransformer(),
loader=AnyLoader()))
job.launch()
A SQL to usage transformer where it transforms to ColumnReader that has column, user, count. Currently it's collects on table level that column on same table will be de-duped. In many cases, "from" clause does not contain schema and this will be fetched via table name -> schema name mapping which it gets from metadata extractor.
Write node and relationship CSV file(s) that can be consumed by Neo4jCsvPublisher. It assumes that the record it consumes is instance of Neo4jCsvSerializable.
job_config = ConfigFactory.from_dict({
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH): node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH): relationship_files_folder},)
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=AnyExtractor(),
loader=FsNeo4jCSVLoader()),
publisher=Neo4jCsvPublisher())
job.launch()
Write Elasticsearch document in JSON format which can be consumed by ElasticsearchPublisher. It assumes that the record it consumes is instance of ElasticsearchDocument.
tmp_folder = '/var/tmp/amundsen/dummy_metadata'
node_files_folder = '{tmp_folder}/nodes/'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships/'.format(tmp_folder=tmp_folder)
job_config = ConfigFactory.from_dict({
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY): data_file_path,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w',})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=AnyExtractor(),
loader=FSElasticsearchJSONLoader()),
publisher=ElasticsearchPublisher())
job.launch()
A Publisher takes two folders for input and publishes to Neo4j. One folder will contain CSV file(s) for Node where the other folder will contain CSV file(s) for Relationship. Neo4j follows Label Node properties Graph and refer to here for more information
job_config = ConfigFactory.from_dict({
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.NODE_DIR_PATH): node_files_folder,
'loader.filesystem_csv_neo4j.{}'.format(FsNeo4jCSVLoader.RELATION_DIR_PATH): relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NODE_FILES_DIR): node_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.RELATION_FILES_DIR): relationship_files_folder,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_END_POINT_KEY): neo4j_endpoint,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_USER): neo4j_user,
'publisher.neo4j.{}'.format(neo4j_csv_publisher.NEO4J_PASSWORD): neo4j_password,})
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=AnyExtractor(),
loader=FsNeo4jCSVLoader()),
publisher=Neo4jCsvPublisher())
job.launch()
Elasticsearch Publisher uses Bulk API to load data from JSON file. Elasticsearch publisher supports atomic operation by utilizing alias in Elasticsearch. A new index is created and data is uploaded into it. After the upload is complete, index alias is swapped to point to new index from old index and traffic is routed to new index.
tmp_folder = '/var/tmp/amundsen/dummy_metadata'
node_files_folder = '{tmp_folder}/nodes/'.format(tmp_folder=tmp_folder)
relationship_files_folder = '{tmp_folder}/relationships/'.format(tmp_folder=tmp_folder)
job_config = ConfigFactory.from_dict({
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY): data_file_path,
'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): 'w',
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY): data_file_path,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): 'r',
'publisher.elasticsearch{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY): elasticsearch_client,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY): elasticsearch_new_index,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY): elasticsearch_doc_type,
'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY): elasticsearch_index_alias,)
job = DefaultJob(
conf=job_config,
task=DefaultTask(
extractor=AnyExtractor(),
loader=FSElasticsearchJSONLoader()),
publisher=ElasticsearchPublisher())
job.launch()
Callback interface is built upon a Observer pattern where the participant want to take any action when target's state changes.
Publisher is the first one adopting Callback where registered Callback will be called either when publish succeeded or when publish failed. In order to register callback, Publisher provides register_call_back method.
One use case is for Extractor that needs to commit when job is finished (e.g: Kafka). Having Extractor register a callback to Publisher to commit when publish is successful, extractor can safely commit by implementing commit logic into on_success method.