diff --git a/.github/workflows/pythonpackage.yml b/.github/workflows/pythonpackage.yml index 2bd543e..490c627 100644 --- a/.github/workflows/pythonpackage.yml +++ b/.github/workflows/pythonpackage.yml @@ -8,7 +8,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.7, 3.8, 3.9] + python-version: [3.8, 3.9] steps: - uses: actions/checkout@v2 diff --git a/.gitignore b/.gitignore index 35d8f26..44aaf04 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,9 @@ # Pycharm .idea/ +# MacOS - Desktop Services Store file(s) +.DS_Store + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] @@ -12,6 +15,7 @@ __pycache__/ # Distribution / packaging .Python env/ +venv/ build/ develop-eggs/ dist/ diff --git a/README.md b/README.md index 742e6e0..24ea0b7 100644 --- a/README.md +++ b/README.md @@ -3,49 +3,62 @@ # Prism-Python Python client library and command line interface (CLI) for interacting with -Workday’s Prism API V2. +Workday’s Prism API V3. + +Workday provides the Prism Analytics REST API web service to work with +Workday Prism Analytics tables, data change tasks, and datasets. You can develop +a software program that uses the different REST endpoints to +programmatically create Prism Analytics tables and datasets and modify +data in them. + +The Python **client library** is a REST API wrapper managing the HTTP methods, +URL endpoints and the data required by specific Workday Prism Analytics API +REST operations. Using this client library in Python projects simplifies interactions +with the Workday Prism Analytics REST API providing the rich functionality +of Workday Prism Analytics to your Python project. + +The **CLI** is a powerful tool for interacting with a Workday Prism Analytics +REST API client library, allowing you to quickly and easily perform Workday +Prism Analytics tasks from any command line. ## Install -You may install the latest version directly from GitHub with: + +To automatically retrieve and install the latest version of this +package directly GitHub, use the following command: ```bash -pip install git+https://github.com/Workday/prism-python.git +$ pip install git+https://github.com/Workday/prism-python.git ``` It is also possible to install a specific tagged release with: ```bash -pip install git+https://github.com/Workday/prism-python.git@0.2.0 +$ pip install git+https://github.com/Workday/prism-python.git@0.3.0 ``` ## Requirements -1. [Register a Workday Prism Analytics API Client.](https://doc.workday.com/reader/J1YvI9CYZUWl1U7_PSHyHA/qAugF2pRAGtECVLHKdMO_A) +Workday Prism Analytics REST APIs use OAuth authentication and the Workday +configurable security model to authorize Workday Prism Analytics operations +in end-user applications. The Workday Prism REST APIs act on behalf of +a Workday user using the client. The user's security profile affects the +REST API access to Workday resources. -In Workday, register an integrations API client with Prism Analytics as its -scope. Obtain the Client ID, Client Secret, and Refresh Token values that the -Prism class requires as parameters. +The Prism client library, and by extension the CLI, require API Client +credentials setup in the target Workday tenant. The API Client credentials +authorize programmatic access to the Workday tenant and provides the identity +of the Workday user to enforce security for all operations. -2. [Obtain the Workday REST API Endpoint.](https://doc.workday.com/reader/J1YvI9CYZUWl1U7_PSHyHA/L_RKkfJI6bKu1M2~_mfesQ) +1. [Register a Workday Prism Analytics API Client.](https://doc.workday.com/admin-guide/en-us/workday-studio/integration-design/common-components/the-prismanalytics-subassembly/tzr1533120600898.html) +2. [Create Refresh Token](https://doc.workday.com/reader/J1YvI9CYZUWl1U7_PSHyHA/L_RKkfJI6bKu1M2~_mfesQ) +3. [Obtain the Workday REST API Endpoint.](https://doc.workday.com/reader/J1YvI9CYZUWl1U7_PSHyHA/L_RKkfJI6bKu1M2~_mfesQ) -In Workday, obtain the Workday REST API endpoint that the Prism class requires -as a parameter. - -3. For ease of use, set the following environment variables using the values obtained above: - -```bash -export workday_base_url= -export workday_tenant_name= -export prism_client_id= -export prism_client_secret= -export prism_refresh_token= -``` ## Python Example -### Create a new table with Prism API Version 2 +### Create a new table with Prism API Version 3 -```python +```{python} import os import prism @@ -55,50 +68,50 @@ p = prism.Prism( os.getenv("workday_tenant_name"), os.getenv("prism_client_id"), os.getenv("prism_client_secret"), - os.getenv("prism_refresh_token"), - version="v2" + os.getenv("prism_refresh_token") ) -# read in your table schema -schema = prism.load_schema("/path/to/schema.json") - -# create an empty API table with your schema -table = prism.create_table(p, "my_new_table", schema=schema["fields"]) +# create a new table based on the schema.json file +table = prism.tables_create( + p, + table_name="my_new_table", + file="/path/to/schema.json" +) -# print details about new table +# print JSON response body describing the new table. print(table) ``` -### Manage data in an existing table with Prism API Version 2 +### Manage data in an existing table with Prism API Version 3 Table Operations Available: `TruncateandInsert`, `Insert`, `Update`, `Upsert`, `Delete`. -To use the `Update`, `Upsert`, or `Delete` operations, you must specify an -external id field within your table schema. - -```python -# upload GZIP CSV file to your table -prism.upload_file(p, "/path/to/file.csv.gz", table["id"], operation="TruncateandInsert") +```{python} +prism.upload_file( + p, + file="/path/to/data.csv.gz", + table_id=table["id"], + operation="Insert" +) ``` ## CLI Example -The command line interface (CLI) provides another way to interact with the Prism API. -The CLI expects your credentials to be stored as environment variables, but they can -also be passed into the CLI manually through the use of optional arguments. - ```bash # get help with the CLI -prism --help +$ prism --help + +# get help for the tables command +$ prism tables --help -# list the Prism API tables that you have access to -prism list +# list Prism tables you have access to. +$ prism tables get -# create a new Prism API table -prism create my_new_table /path/to/schema.json +# create a new Prism table +$ prism tables create my_new_table /path/to/schema.json -# upload data to a Prism API table -prism upload /path/to/file.csv.gz bbab30e3018b01a723524ce18010811b +# upload data to the new table +$ prism tables upload 83dd72bd7b911000ca2d790e719a0000 /path/to/file1.csv.gz ``` ## Bugs diff --git a/examples/1_extract_data.py b/examples/1_extract_data.py deleted file mode 100644 index 30abfee..0000000 --- a/examples/1_extract_data.py +++ /dev/null @@ -1,25 +0,0 @@ -import csv -import logging -import os -import requests - -# configure logging -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") - -# download Workday data from RaaS -r = requests.get( - os.getenv("workday_raas_url"), - auth=(os.getenv("workday_username"), os.getenv("workday_password")), -) - -# f the request was successful, write data to CSV file -if r.status_code == 200: - data = r.json()["Report_Entry"] - fname = "survey_responses.csv" - with open(fname, "w") as f: - writer = csv.DictWriter(f, data[0].keys()) - writer.writeheader() - writer.writerows(data) - logging.info(f"{fname} created") -else: - logging.warning(f"Request not successful ({r.status_code})") diff --git a/examples/2_predict_topics.py b/examples/2_predict_topics.py deleted file mode 100644 index 4b69415..0000000 --- a/examples/2_predict_topics.py +++ /dev/null @@ -1,94 +0,0 @@ -import csv -import en_core_web_sm -import gensim -import gzip -import logging -from nltk.corpus import stopwords - -# configure logging -logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") - - -class clean_document: - def __init__(self, input_string, stopwords, nlp): - self.input_string = input_string - self.string_lower = self.lower_string() - self.tokens = self.tokenizer() - self.tokens_no_stopwords = self.remove_stopwords(stopwords) - self.annotated = self.annotate(nlp) - - def lower_string(self): - string_lower = self.input_string.lower() - return string_lower - - def tokenizer(self): - tokens = gensim.utils.simple_preprocess(self.string_lower, deacc=False) - return tokens - - def remove_stopwords(self, stopwords): - no_stopwords = [line for line in self.tokens if line not in stopwords] - return no_stopwords - - def annotate(self, nlp): - doc = nlp(" ".join(self.tokens_no_stopwords)) - new = [token.lemma_ for token in doc if token.pos_ in ["NOUN", "VERB", "ADJ"]] - return new - - -# load the spaCy model -nlp = en_core_web_sm.load() -logging.info("Trained pipeline loaded") - -# load stopwords -stopwords = set(stopwords.words("english")) -logging.info("Stopwords loaded") - -# load responses into a dict -responses = {} -with open("survey_responses.csv") as f: - for line in list(csv.DictReader(f)): - responses[line["Respondent WID"]] = {"answer": line["Questionnaire Answer"]} -logging.info(f"{len(responses)} survey responses loaded") - -# clean and normalize the survey responses -for wid in responses.keys(): - x = clean_document(responses[wid]["answer"], stopwords, nlp) - responses[wid]["clean"] = x.annotated -logging.info("Survey responses cleaned and normalized") - -# load cleaned comments into a dictionary -id2word = gensim.corpora.Dictionary([responses[wid]["clean"] for wid in responses.keys()]) -logging.info("Cleaned responses converted into a Gensim dictionary") - -# convert the cleaned documents into a bag-of-words -corpus = [id2word.doc2bow(responses[wid]["clean"]) for wid in responses.keys()] -logging.info("Gensim dictionary converted into a corpus") - -# fit LDA model to corpus -model = gensim.models.ldamodel.LdaModel( - corpus=corpus, - num_topics=3, - id2word=id2word, - random_state=42, - chunksize=200, - iterations=41, - passes=16, -) -logging.info("LDA topic model fit to corpus") - -# predict topic for each comment -predictions = [] -for wid, text, vec in zip(responses.keys(), [responses[wid]["answer"] for wid in responses.keys()], corpus): - pred = model[vec] - stats = {f"Topic {line[0]+1}": line[1] for line in pred} - row = {"wid": wid, "topic": max(stats, key=stats.get), "topic_score": round(stats[max(stats, key=stats.get)], 4)} - predictions.append(row) -logging.info("Topics predicted for survey resposnes") - -# write predictions to a compressed file -fname = "predictions.csv.gz" -with gzip.open(fname, "wt") as f: - writer = csv.DictWriter(f, predictions[0].keys()) - writer.writeheader() - writer.writerows(predictions) -logging.info(f"{fname} created") diff --git a/examples/3_upload_predictions.py b/examples/3_upload_predictions.py deleted file mode 100644 index 3a8b894..0000000 --- a/examples/3_upload_predictions.py +++ /dev/null @@ -1,20 +0,0 @@ -import os -import prism - -# instantiate the Prism class -p = prism.Prism( - os.getenv("workday_base_url"), - os.getenv("workday_tenant_name"), - os.getenv("prism_client_id"), - os.getenv("prism_client_secret"), - os.getenv("prism_refresh_token"), -) - -# load schema for new table -schema = prism.load_schema("schema.json") - -# create the table in Prism -table = prism.create_table(p, "Topic_Model_Predictions_BDS", schema["fields"]) - -# upload the file to the table -prism.upload_file(p, "predictions.csv.gz", table["id"], operation="TruncateandInsert") diff --git a/examples/README.md b/examples/README.md deleted file mode 100644 index 5c19e7f..0000000 --- a/examples/README.md +++ /dev/null @@ -1,23 +0,0 @@ -# Workday Data Science Workflow - -The goal of this project is to demonstrate use of the Workday Data Science Workflow. This workflow includes 3 steps: - -1. Extract data from Workday using Report as a Service (RaaS) -2. Enrich your data using your desired Data Science tools -3. Push enriched data back into Workday via Prism API - -## Example Use Case - -This example demonstrates how to obtain survey responses via Workday RaaS, apply an Latent Dirichlet Allocation (LDA) topic model to the open-ended responses, and upload the predicted topics to Prism. This is meant to be a generic example of the workflow and should serve as inspiration of one way to integrate machine learning with Workday. - -## Prism Python Package - -To upload your dataset to Prism, we recommend using the Python package `prism`. This package makes it easy to programatically interact with the Prism API. To learn more about the Prism Python package, refer to the [package repository on GitHub](https://github.com/Workday/prism-python). - -To install the latest version of the Prism package: - -``` -pip install git+git://github.com/Workday/prism-python.git -``` - -> Note: when you install an additional package in Google Colab using this method, you will need to reinstall the package each time you launch a new session. diff --git a/examples/environment.yml b/examples/environment.yml deleted file mode 100644 index f87d5a6..0000000 --- a/examples/environment.yml +++ /dev/null @@ -1,13 +0,0 @@ -name: wday-dev -channels: - - defaults - - conda-forge -dependencies: - - python=3.8 - - nltk=3.4.4 - - spacy=3.0.3 - - spacy-model-en_core_web_sm=3.0.0 - - gensim - - pip - - pip: - - git+git://github.com/Workday/prism-python.git@0.2.2 diff --git a/examples/schema.json b/examples/schema.json deleted file mode 100644 index bfab781..0000000 --- a/examples/schema.json +++ /dev/null @@ -1,41 +0,0 @@ -{ - "fields": [ - { - "ordinal": 1, - "name": "worker_wid", - "type": { - "id": "Schema_Field_Type=Text" - } - }, - { - "ordinal": 2, - "name": "topic", - "type": { - "id": "Schema_Field_Type=Text" - } - }, - { - "ordinal": 3, - "name": "topic_score", - "precision": 38, - "scale": 4, - "type": { - "id": "Schema_Field_Type=Numeric" - } - } - ], - "parseOptions": { - "charset": { - "id": "Encoding=UTF-8" - }, - "fieldsDelimitedBy": ",", - "fieldsEnclosedBy": "\"", - "headerLinesToIgnore": 1, - "type": { - "id": "Schema_File_Type=Delimited" - } - }, - "schemaVersion": { - "id": "Schema_Version=1.0" - } -} diff --git a/prism/__init__.py b/prism/__init__.py index 41c5de7..9682cd5 100644 --- a/prism/__init__.py +++ b/prism/__init__.py @@ -1,8 +1,24 @@ -from prism.prism import Prism, load_schema, create_table, upload_file +from prism.prism import ( + Prism, + set_logging, + schema_compact, + tables_create, + upload_file, + load_schema, + truncate_table, +) from ._version import get_versions __version__ = get_versions()["version"] del get_versions -__all__ = ["load_schema", "Prism", "create_table", "upload_file"] +__all__ = [ + "Prism", + "set_logging", + "schema_compact", + "tables_create", + "upload_file", + "load_schema", + "truncate_table", +] diff --git a/prism/cli.py b/prism/cli.py index 2145798..e13f102 100644 --- a/prism/cli.py +++ b/prism/cli.py @@ -1,122 +1,268 @@ import click -import json +import configparser +import os +import sys +import logging + import prism +from .commands import tables_commands as t_commands +from .commands import buckets_commands as b_commands +from .commands import dataChanges_commands as d_commands +from .commands import dataExport_commands as e_commands +from .commands import fileContainers_commands as f_commands + + +def param_fixup(value, config, config_name, option): + # If already set by an environment or by a command line option, do nothing. + if value is not None: + return value + + try: + return config.get(config_name, option) + except configparser.Error: + # Always fail silently. + return None -@click.group() -@click.option("--base_url", envvar="workday_base_url", type=str, required=True, help="The base URL for the API client") + +@click.group(help="CLI for interacting with Workday’s Prism API") +# Tenant specific parameters +@click.option( + "--base_url", envvar="workday_base_url", type=str, required=False, help="The base URL for the API client", +) @click.option( - "--tenant_name", envvar="workday_tenant_name", type=str, required=True, help="The name of your Workday tenant" + "--tenant_name", envvar="workday_tenant_name", type=str, required=False, help="The name of your Workday tenant", ) @click.option( "--client_id", envvar="prism_client_id", type=str, - required=True, + required=False, help="The client ID for your registered API client", ) @click.option( "--client_secret", envvar="prism_client_secret", type=str, - required=True, + required=False, help="The client secret for your registered API client", ) @click.option( "--refresh_token", envvar="prism_refresh_token", type=str, - required=True, + required=False, help="The refresh token for your registered API client", ) +# Operational parameters +@click.option( + "--log_level", + envvar="prism_log_level", + type=str, + required=False, + help="Level of debugging to display - default = INFO", +) +@click.option( + "--log_file", + envvar="prism_log_file", + type=str, + required=False, + help="Output file for logging - default = prism.log", +) +@click.option( + "--config_file", + envvar="prism_config_file", + type=click.Path(exists=True), + required=False, + help="The name of a configuration with parameters for connections and logging.", +) +@click.option( + "--config_name", + envvar="prism_config_name", + type=str, + required=False, + default="default", + help="The name of a configuration in the configuration file.", +) @click.pass_context -def main(ctx, base_url, tenant_name, client_id, client_secret, refresh_token): - """CLI for interacting with Workday’s Prism API""" +def cli( + ctx, base_url, tenant_name, client_id, client_secret, refresh_token, log_level, log_file, config_file, config_name, +): + # Attempt to locate a configuration file - this is not required and config + # parameters are only used if the configuration values are not passed on + # the command line or by environment variables. - # initialize the prism class with your credentials - p = prism.Prism(base_url, tenant_name, client_id, client_secret, refresh_token, version="v2") + if config_file is None: + # Assume there might be a configuration file in the current directory + filename = os.path.join(os.getcwd(), "prism.ini") + else: + # Click already ensured this is a valid file - if specified. + filename = config_file - # create the bearer token - p.create_bearer_token() + # If the configuration path exists, then load values - this overrides + # environment variables. + if os.path.isfile(filename): + try: + config = configparser.ConfigParser() + config.read(filename) - # store the prism object in the context - ctx.obj = {"p": p} + # Check to see if a particular configuration [name] was asked for, it must + # exist in the configuration file otherwise exit with an error. + if config.has_section(config_name): + # Do fix-up on command line args. Priority comes from the command + # line, then environment variables, and finally the config file. + # Any value not passed and not in the environment arrives here with + # the value "None" - override these with the configuration values. -@main.command() -@click.option("--name", default=None, type=str, help="The name of the table to obtain details about") -@click.pass_context -def list(ctx, name): - """List all tables of type API""" + base_url = param_fixup(base_url, config, config_name, "workday_base_url") + tenant_name = param_fixup(tenant_name, config, config_name, "workday_tenant_name") + client_id = param_fixup(client_id, config, config_name, "prism_client_id") + client_secret = param_fixup(client_secret, config, config_name, "prism_client_secret") + refresh_token = param_fixup(refresh_token, config, config_name, "prism_refresh_token") + log_level = param_fixup(log_level, config, config_name, "prism_log_level") + log_file = param_fixup(log_file, config, config_name, "prism_log_file") + else: + click.echo(f"The specified configuration [{config_name}] does not exist in the configuration file.") + sys.exit(1) + except configparser.Error: + click.echo(f"Error accessing configuration file {filename}.") + # If the configuration is not available or is invalid, exit + sys.exit(1) - # get the initialized prism class - p = ctx.obj["p"] + # Do a quick sanity check - if we don't have connection information + # there is nothing we can do. - # list the tables - status = p.list_table(table_name=name) + if base_url is None or tenant_name is None or client_id is None or client_secret is None or refresh_token is None: + click.echo('No Prism connectivity information found - use "prism --help" for more information.') + sys.exit(1) - # print message - if id is None: - click.echo("There are {} API tables".format(status["total"])) - click.echo(json.dumps(status["data"], indent=2, sort_keys=True)) + if log_level is None: + set_level = logging.INFO else: - click.echo(json.dumps(status, indent=2, sort_keys=True)) + set_level = getattr(logging, log_level) # Translate text level to level value. + + # Setup logging for CLI operations. + logger = logging.getLogger("prismCLI") + logger.setLevel(set_level) + + # Create an explicit console handler to handle just INFO message, i.e., + # script output. + formatter = logging.Formatter("%(message)s") + + ch = logging.StreamHandler(sys.stdout) + ch.setFormatter(formatter) + ch.setLevel(logging.INFO) + logger.addHandler(ch) + + # If the log level is not INFO, create a separate stream + # for logging additional levels. + logging_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + formatter = logging.Formatter(logging_format) + + if set_level != logging.INFO: + other_handler = logging.StreamHandler() + other_handler.setFormatter(formatter) + other_handler.setLevel(set_level) + logger.addHandler(other_handler) + + # Create a handler as specified by the user (or defaults) + if log_file is not None: + # If a log file was specified, log EVERYTHING to the log. + fh = logging.FileHandler(log_file) + fh.setFormatter(formatter) + fh.setLevel(set_level) + logger.addHandler(fh) -@main.command() -@click.argument("table_name", type=str) -@click.argument("schema_path", type=click.Path()) + logger.debug("completed initialization.") + + # initialize the Prism class from our resolved configuration. + + p = prism.Prism(base_url, tenant_name, client_id, client_secret, refresh_token) + prism.set_logging(log_file, log_level) + + # store the prism object in the Click context + ctx.obj = {"p": p} + + +@cli.command("config") +@click.argument("file") @click.pass_context -def create(ctx, table_name, schema_path): - """Create a new Prism table TABLE_NAME with schema from SCHEMA_PATH +def config(ctx, file): + """ + Configuration operations to list, create, and modify parameters + """ + + # TBD - Example: prism create my_table /home/data/schema.json + +@cli.group("tables") +def tables(): + """ + Table operations (/tables) to list, create, load, update, and truncate Prism tables. """ - # get the initialized prism class - p = ctx.obj["p"] - # read in your table schema - schema = prism.load_schema(schema_path) +tables.add_command(t_commands.tables_get) +tables.add_command(t_commands.tables_create) +tables.add_command(t_commands.tables_edit) +tables.add_command(t_commands.tables_patch) +tables.add_command(t_commands.tables_upload) +tables.add_command(t_commands.tables_truncate) - # clean up the table name - table_name = table_name.replace(" ", "_") - # create an empty API table - table = prism.create_table(p, table_name, schema["fields"]) +@cli.group("buckets") +def buckets(): + """ + Bucket operations (/buckets) to list, create and load buckets. + """ - # print message - click.echo(json.dumps(table, indent=2, sort_keys=True)) +buckets.add_command(b_commands.buckets_get) +buckets.add_command(b_commands.buckets_create) +buckets.add_command(b_commands.buckets_complete) +buckets.add_command(b_commands.buckets_status) +buckets.add_command(b_commands.buckets_files) +buckets.add_command(b_commands.buckets_errorFile) +buckets.add_command(b_commands.buckets_name) -@main.command() -@click.argument("gzip_file", type=click.Path()) -@click.argument("table_id", type=str) -@click.option( - "--operation", - type=click.Choice(["TruncateandInsert", "Insert", "Update", "Upsert", "Delete"]), - default="TruncateandInsert", - help="The Table load operation", -) -@click.pass_context -def upload(ctx, gzip_file, table_id, operation): - """Upload GZIP_FILE to TABLE_ID - Example: prism upload /home/data/file.csv.gz bbab30e3018b01a723524ce18010811b +@cli.group("dataChanges") +def dataChanges(): """ + Data Change Tasks (/dataChanges) operations to list, load, and activate. + """ + - # get the initialized prism class - p = ctx.obj["p"] +dataChanges.add_command(d_commands.dataChanges_get) +dataChanges.add_command(d_commands.dataChanges_validate) +dataChanges.add_command(d_commands.dataChanges_run) +dataChanges.add_command(d_commands.dataChanges_activities) +dataChanges.add_command(d_commands.dataChanges_upload) - # upload file to the table - prism.upload_file(p, gzip_file, table_id, operation) - # check the status of the table you just created - status = p.list_table(table_id) +@cli.group("dataExport") +def dataExport(): + """ + Data Change Tasks (/dataChanges) operations to list, load, and activate. + """ + + +dataExport.add_command(e_commands.dataExport_get) + + +@cli.group("fileContainers") +def fileContainers(): + """ + File container (/fileContainers) operations to create, load, and list. + """ + - # print message - click.echo(json.dumps(status["data"], indent=2, sort_keys=True)) +fileContainers.add_command(f_commands.fileContainers_create) +fileContainers.add_command(f_commands.fileContainers_get) +fileContainers.add_command(f_commands.fileContainers_load) if __name__ == "__main__": - main() + cli() diff --git a/prism/commands/__init__.py b/prism/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/prism/commands/buckets_commands.py b/prism/commands/buckets_commands.py new file mode 100644 index 0000000..b5721a4 --- /dev/null +++ b/prism/commands/buckets_commands.py @@ -0,0 +1,270 @@ +import json +import logging +import sys +import click + +logger = logging.getLogger("prismCLI") + + +@click.command("get") +@click.option( + "-n", "--isName", is_flag=True, default=False, help="Flag to treat the bucket or table argument as a name.", +) +@click.option( + "-l", + "--limit", + default=None, + type=int, + help="The maximum number of object data entries included in the response, default=-1 (all).", +) +@click.option( + "-o", + "--offset", + default=None, + type=int, + help="The offset to the first object in a collection to include in the response.", +) +@click.option( + "-t", + "--type", + "type_", + default="summary", + show_default=True, + help="How much information to be returned in response JSON.", +) +@click.option( + "-s", "--search", is_flag=True, show_default=True, default=False, help="Use substring search bucket or table.", +) +@click.option("--table", help="The id or name of a Prism table to list all buckets.") +@click.argument("bucket", required=False) +@click.pass_context +def buckets_get(ctx, bucket, table, isname, limit, offset, type_, search): + """ + View the buckets permitted by the security profile of the current user. + + [BUCKET] ID or name of a Prism bucket. + + NOTE: For table name searching, the Display Name is searched not + the API Name. + """ + p = ctx.obj["p"] + + if isname and bucket is None and table is None: + # It's invalid to add the --isName switch without providing + # a bucket id or table name. + logger.error("To get buckets by name, please provide a bucket name.") + sys.exit(1) + + if not isname and bucket is not None: + # This should be a bucket ID - ignore all other options. + bucket = p.buckets_get(bucket_id=bucket, type_=type_) + logger.info(json.dumps(bucket, indent=2)) + + return + + # We are doing some form of search. + + if isname and bucket is not None: + # This should be a search by bucket name. + buckets = p.buckets_get(bucket_name=bucket, type_=type_, search=search) + else: + # Search by table ID or name. + if isname: + buckets = p.buckets_get(table_name=table, search=search, limit=limit, offset=offset, type_=type_) + else: + buckets = p.buckets_get(table_id=table, limit=limit, offset=offset, type_=type_) + + logger.info(json.dumps(buckets, indent=2)) + + +@click.command("create") +@click.option("-n", "--target_name", default=None, help="Table name to associate with the bucket.") +@click.option("-i", "--target_id", default=None, help="Table ID to associate with the table.") +@click.option( + "-f", + "--file", + "file", + required=False, + default=None, + type=click.Path(exists=True), + help="Schema JSON file for the target table.", +) +@click.option( + "-o", "--operation", default="TruncateAndInsert", show_default=True, help="Operation to perform on the table.", +) +@click.argument("bucket", required=False) +@click.pass_context +def buckets_create(ctx, target_name, target_id, file, operation, bucket): + """ + Create a new bucket with the specified name. + + [BUCKET] explicit bucket name to create otherwise default. + """ + p = ctx.obj["p"] + + if target_name is None and target_id is None and file is None: + logger.error("A table must be associated with this bucket.") + sys.exit(1) + + bucket = p.buckets_create( + bucket_name=bucket, target_id=target_id, target_name=target_name, schema=file, operation=operation, + ) + + if bucket is not None: + logger.info(json.dumps(bucket, indent=2)) + else: + logger.error("Error creating bucket.") + sys.exit(1) + + +@click.command("files") +@click.option( + "-n", "--target_name", default=None, help="Name of the table to associate with the bucket.", +) +@click.option("-i", "--target_id", default=None, help="Table ID to associate with the table.") +@click.option("-f", "--file", default=None, help="Schema JSON file for the target table.") +@click.option( + "-o", "--operation", default="TruncateAndInsert", show_default=True, help="Operation to perform on the table.", +) +@click.option("-b", "--bucket", help="Bucket name to load files.", default=None) +@click.option( + "-c", + "--complete", + is_flag=True, + default=False, + help="Automatically complete bucket and load the data into the table.", +) +@click.argument("files", nargs=-1, required=True, type=click.Path(exists=True)) +@click.pass_context +def buckets_files(ctx, target_name, target_id, file, operation, bucket, complete, files): + """ + Upload one or more CSV or gzip files to the specified bucket + + [FILES] one or more gzip (.gz) or CSV (.csv) files. + + NOTE: This operation will create ".csv.gz" files for each .csv file. + """ + p = ctx.obj["p"] + + # We think we have a file(s) - we don't test the contents. + # Go ahead and create a new bucket or use an existing. + bucket = p.buckets_create(bucket, target_name, target_id, file, operation) + + if bucket is None: + logger.error("Invalid bucket for upload operation.") + sys.exit(1) + + results = p.buckets_files(bucket["id"], files) + + if results["total"] > 0 and complete: + complete = p.buckets_complete(bucket["id"]) + logger.info(complete) + else: + logger.info(json.dumps(results, indent=2)) + + +@click.command("complete") +@click.option( + "-n", "--isName", is_flag=True, default=False, help="Flag to treat the bucket argument as a name.", +) +@click.argument("bucket", required=True) +@click.pass_context +def buckets_complete(ctx, isname, bucket): + """ + Complete the specified bucket and perform the specified operation. + + [BUCKET] A reference to a Prism Analytics bucket. + """ + p = ctx.obj["p"] + + if isname: + buckets = p.buckets_get(bucket_name=bucket, verbosity="full") + + if buckets["total"] == 0: + bucket = None + else: + bucket = buckets["data"][0] + else: + bucket = p.buckets_list(bucket_id=bucket) + + if bucket is None: + logger.error(f"Bucket {bucket} not found.") + sys.exit(1) + + bucket_state = bucket["state"]["descriptor"] + + if bucket_state != "New": + logger.error(f'Bucket state is "{bucket_state}" - only "New" buckets can be completed.') + sys.exit(1) + + logger.info(p.buckets_complete(bucket["id"])) + + +@click.command("errorFile") +@click.option( + "-n", "--isName", is_flag=True, default=False, help="Flag to treat the bucket argument as a name.", +) +@click.argument("bucket", required=True) +@click.pass_context +def buckets_errorFile(ctx, isname, bucket): + """ + Return the error file for a bucket. + + [BUCKET] A reference to a Prism Analytics bucket. + """ + p = ctx.obj["p"] + + if isname: + # Lookup the bucket by name. + buckets = p.buckets_get(bucket_name=bucket) + + if buckets["total"] == 0: + logger.error(f"Bucket {bucket} not found.") + sys.exit(1) + else: + bucket_id = buckets["data"][0]["id"] + else: + bucket_id = bucket + + error_file = p.buckets_errorFile(bucket_id=bucket_id) + + logger.info(error_file) + + +@click.command("status") +@click.option("-n", "--isName", is_flag=True, default=False, help="Bucket name to status") +@click.argument("bucket", required=True) +@click.pass_context +def buckets_status(ctx, isname, bucket): + """ + Get the status of a bucket by ID or name. + + [ID] A reference to a Prism Analytics bucket. + """ + p = ctx.obj["p"] + + if isname: + buckets = p.buckets_get(bucket_name=bucket) + + if buckets["total"] == 0: + logger.error(f"Bucket name {bucket} not found.") + sys.exit(1) + + bucket = buckets["data"][0] + else: + bucket = p.buckets_get(bucket_id=bucket) + + if bucket is None: + logger.error(f"Bucket {bucket} not found.") + sys.exit(1) + + logger.info(bucket["state"]["descriptor"]) + + +@click.command("name") +@click.pass_context +def buckets_name(ctx): + """ + Generate a bucket name to use for other bucket operations. + """ + click.echo(ctx.obj["p"].buckets_gen_name()) diff --git a/prism/commands/dataChanges_commands.py b/prism/commands/dataChanges_commands.py new file mode 100644 index 0000000..7bb0b5a --- /dev/null +++ b/prism/commands/dataChanges_commands.py @@ -0,0 +1,265 @@ +import sys +import click +import json +import logging +import time + +logger = logging.getLogger("prismCLI") + + +@click.command("get") +@click.option( + "-n", "--isName", default=False, is_flag=True, help="Flag to treat the dct argument as a name.", +) +@click.option( + "-l", "--limit", default=-1, help="The maximum number of object data entries included in the response.", +) +@click.option( + "-o", "--offset", default=0, help="The offset to the first object in a collection to include in the response.", +) +@click.option( + "-t", + "--type", + "type_", + default="summary", + help="How much information to be returned in response JSON (default=summary).", +) +@click.option( + "-s", + "--search", + is_flag=True, + default=False, + help="Use contains search substring for --name or --id (default=false).", +) +@click.argument("dct", required=False) +@click.pass_context +def dataChanges_get(ctx, isname, dct, limit, offset, type_, search): + """View the data change tasks permitted by the security profile of the current user. + + [dct] A reference to a Prism Analytics Data Change Task. + """ + p = ctx.obj["p"] + + # Separate the get calls because an ID lookup returns a dict and a name lookup + # always returns an object/list structure with zero or more matching DCTs. + if isname: + data_change_task = p.dataChanges_get( + datachange_name=dct, limit=limit, offset=offset, search=search, type_=type_ + ) + + if data_change_task["total"] == 0: + logger.warning("No data change task(s) found.") + sys.exit(1) + + # For display purposes, sort by display name (case-insensitive) + data_change_task["data"] = sorted(data_change_task["data"], key=lambda dct_srt: dct_srt["displayName"].lower()) + else: + data_change_task = p.dataChanges_get(datachange_id=dct, limit=limit, offset=offset, type_=type_) + + if data_change_task is None: + logger.error(f"Data change task {dct} not found.") + sys.exit(1) + + logger.info(json.dumps(data_change_task, indent=2)) + + +@click.command("validate") +@click.option( + "-n", "--isName", default=False, is_flag=True, help="Flag to treat the dct argument as a name.", +) +@click.option("-s", "--search", is_flag=True, help="Use contains search substring for --name.") +@click.argument("dct", required=True) +@click.pass_context +def dataChanges_validate(ctx, isname, dct, search): + """ + Validate the data change specified by name or ID. + + [DCT] A reference to a Prism Analytics Data Change Task. + """ + + p = ctx.obj["p"] + + if not isname: + validate = p.dataChanges_validate(dct) + logger.info(json.dumps(validate, indent=2)) + else: + data_change_tasks = p.dataChanges_get(datachange_name=dct, search=search) + + if data_change_tasks["total"] == 0: + logger.error("No matching data change task(s) found.") + sys.exit(1) + + results = [] + + for dct in data_change_tasks["data"]: + validate = p.dataChanges_validate(dct["id"]) + + if "error" in validate: + # Add identifying attributes to the error message. + validate["id"] = dct["id"] + validate["descriptor"] = dct["displayName"] + + results.append(validate) + + logger.info(json.dumps(results, indent=2)) + + +@click.command("run") +@click.option( + "-n", "--isName", default=False, is_flag=True, help="Flag to treat the dct argument as a name.", +) +@click.argument("dct", required=True) +@click.argument("fid", required=False) +@click.pass_context +def dataChanges_run(ctx, dct, fid, isname): + """Execute the named data change task with an optional file container. + + [DCT] A reference to a Prism Analytics data change. + [FID] An optional reference to a Prism Analytics file container. + """ + + p = ctx.obj["p"] + + if isname: + # See if we have any matching data change task by name (with minor clean-up). + data_changes = p.dataChanges_get(name=dct.replace(" ", "_")) + + if data_changes["total"] != 1: + logger.error(f"Data change task not found: {dct}") + sys.exit(1) + + dct_id = data_changes["data"][0]["id"] + logger.debug(f"resolved ID: {dct_id}") + else: + dct_id = dct + + # It is valid to run a data change task without a fileContainerID value. + activity = p.dataChanges_activities_post(dct_id, fid) + + if activity is None: + logger.error("Failed to run data change task - please review the log.") + sys.exit(1) + + # Output the results - could be the new activity id or an error message. + logger.info(json.dumps(activity, indent=2)) + + +@click.command("activities") +@click.option( + "-n", "--isName", default=False, is_flag=True, help="Flag to treat the dct argument as a name.", +) +@click.option( + "-s", "--status", is_flag=True, default=False, help="Return only the status of the activity.", +) +@click.argument("dct", required=True) +@click.argument("activityID", required=True) +@click.pass_context +def dataChanges_activities(ctx, dct, activityid, status, isname): + """Get the status for a specific activity associated with a data change task. + + [DCT] A reference to a data change task. + [ACTIVITYID] A reference to a data change task activity. + """ + + p = ctx.obj["p"] + + if isname: + # See if we have any matching data change task. + data_changes = p.dataChanges_list(name=dct.replace(" ", "_")) + + if data_changes["total"] != 1: + logger.error(f"Data change task not found: {dct}") + sys.exit(1) + + dct_id = data_changes["data"][0]["id"] + logger.debug(f"resolved ID: {dct_id}") + else: + dct_id = dct + + current_status = p.dataChanges_activities_get(dct_id, activityid) + + if current_status is None: + logger.error("Activity for DCT not found.") + sys.exit(1) + else: + if status: + logger.info(current_status["state"]["descriptor"]) + else: + logger.info(json.dumps(current_status, indent=2)) + + +@click.command("upload") +@click.option( + "-n", "--isName", default=False, is_flag=True, help="Flag to treat the dct argument as a name.", +) +@click.option( + "-w", "--wait", default=False, is_flag=True, help="Wait for the data change task to complete.", +) +@click.option( + "-v", "--verbose", default=False, is_flag=True, help="Display additional information.", +) +@click.argument("dct", required=True) +@click.argument("file", required=True, nargs=-1, type=click.Path(exists=True)) +@click.pass_context +def dataChanges_upload(ctx, isname, dct, file, wait, verbose): + """Execute a data change task using the provided file(s). + + [DCT] A reference to a Prism Analytics Data Change Task. + [FILE] One or more .CSV or .CSV.GZ files. + """ + + p = ctx.obj["p"] + + if isname: + data_change_tasks = p.dataChanges_get(datachange_name=dct) + + if data_change_tasks["total"] == 0: + logger.error("Data change task not found.") + sys.exit(1) + + dct_id = data_change_tasks["data"][0]["id"] + logger.debug(f"resolved ID: {dct_id}") + else: + dct_id = dct + + # Specifying None for the ID to create a new file container. + file_container = p.fileContainers_load(filecontainer_id=None, file=file) + + if file_container["total"] == 0: + logger.error("Error loading file container.") + sys.exit(1) + + filecontainer_id = file_container["id"] + logger.debug(f"new file container ID: {filecontainer_id}") + + # Execute the DCT. + activity = p.dataChanges_activities_post(datachange_id=dct_id, fileContainer_id=filecontainer_id) + + if "errors" in activity: + # Add the ID of the DCT for easy identification. + activity["id"] = dct_id + + logger.error(json.dumps(activity, indent=2)) + + sys.exit(1) + + if not wait: + logger.info(json.dumps(activity, indent=2)) + else: + activity_id = activity["id"] + + while True: + time.sleep(10) + + activity = p.dataChanges_activities_get(datachange_id=dct_id, activityID=activity_id) + + status = activity["state"]["descriptor"] + + if verbose: + logger.info(f"Status: {status}") + + if status not in ["New", "Queued", "Processing", "Loading"]: + break + + # Output the final status of the activity. + logger.info(json.dumps(activity, indent=2)) diff --git a/prism/commands/dataExport_commands.py b/prism/commands/dataExport_commands.py new file mode 100644 index 0000000..7aa6586 --- /dev/null +++ b/prism/commands/dataExport_commands.py @@ -0,0 +1,56 @@ +import click +import json +import logging + +logger = logging.getLogger("prismCLI") + + +@click.command("get") +@click.option( + "-l", + "--limit", + type=int, + default=None, + help="The maximum number of object data entries included in the response, default=all.", +) +@click.option( + "-o", + "--offset", + type=int, + default=None, + help="The offset to the first object in a collection to include in the response.", +) +@click.option( + "-t", + "--type", + "type_", + default="summary", + type=click.Choice(["summary", "full"], case_sensitive=False), + help="How much information returned for each table.", +) +@click.option( + "-f", + "--format", + "format_", + default="json", + type=click.Choice(["json", "summary", "schema", "csv"], case_sensitive=False), + help="Format output as JSON, summary, schema, or CSV.", +) +@click.pass_context +def dataExport_get(ctx, limit, offset, type_, format_): + """List the tables or datasets permitted by the security profile of the current user. + + [NAME] Prism table name to list. + """ + + p = ctx.obj["p"] + + data_export_list = p.dataExport_get(limit=limit, offset=offset, type_=type_) + + logger.info(json.dumps(data_export_list, indent=2)) + + +@click.command("create") +@click.pass_context +def dataExport_create(ctx): + logger.info("here") diff --git a/prism/commands/fileContainers_commands.py b/prism/commands/fileContainers_commands.py new file mode 100644 index 0000000..e217e6b --- /dev/null +++ b/prism/commands/fileContainers_commands.py @@ -0,0 +1,74 @@ +import click +import sys +import json +import logging + +logger = logging.getLogger("prismCLI") + + +@click.command("create") +@click.pass_context +def fileContainers_create(ctx): + """Create a new fileContainers object returning the ID.""" + + p = ctx.obj["p"] + + file_container = p.fileContainers_create() + + if file_container is not None: + logger.info(json.dumps(file_container, indent=2)) + else: + logger.error("Error creating file container.") + sys.exit(1) + + +@click.command("get") +@click.argument("id", required=True) +@click.pass_context +def fileContainers_get(ctx, id): + """List the files in a file container. + + [ID] File container ID to list. + """ + + p = ctx.obj["p"] + + files_list = p.fileContainers_get(id) + + logger.info(json.dumps(files_list, indent=2)) + + +@click.command("load") +@click.option( + "-i", "--id", default=None, help="Target File container ID - defaults to a new container.", +) +@click.argument("file", nargs=-1, type=click.Path(exists=True)) +@click.pass_context +def fileContainers_load(ctx, id, file): + """Load one or more files into a file container returning the container ID. + + [FILE] one or more CSV or GZipped CSV (.csv.gz) files to load. + """ + + if len(file) == 0: # Click gives a tuple - even if no files included + logger.error("One or more files must be specified.") + sys.exit(1) + + p = ctx.obj["p"] + + # Load the file and retrieve the ID - a new fID is + # created if the command line ID was not specified. + # Subsequent files are loaded into the same container (fID). + results = p.fileContainers_load(id=id, file=file) + + # If the fID comes back blank, then something is not + # working. Note: any error messages have already + # been logged by the load operation. + + if results["total"] == 0: + logger.error("A file container id is required to load a file.") + sys.exit(1) + else: + # Return the file container ID to the command line. If a + # fileContainerID was passed, simply return that id. + logger.info(json.dumps(results, indent=2)) diff --git a/prism/commands/tables_commands.py b/prism/commands/tables_commands.py new file mode 100644 index 0000000..66b08e2 --- /dev/null +++ b/prism/commands/tables_commands.py @@ -0,0 +1,367 @@ +import json +import logging +import sys +import click + +from prism import schema_compact, load_schema, upload_file, truncate_table + +logger = logging.getLogger("prismCLI") + + +@click.command("get") +@click.option( + "-n", "--isName", is_flag=True, default=False, help="Flag to treat the table argument as a name.", +) +@click.option( + "-l", + "--limit", + type=int, + default=None, + help="The maximum number of object data entries included in the response, default=all.", +) +@click.option( + "-o", + "--offset", + type=int, + default=None, + help="The offset to the first object in a collection to include in the response.", +) +@click.option( + "-t", + "--type", + "type_", + default="summary", + type=click.Choice(["summary", "full", "permissions"], case_sensitive=False), + help="How much information returned for each table.", +) +@click.option( + "-c", "--compact", is_flag=True, default=False, help="Compact the table schema for use in edit (put) operations.", +) +@click.option( + "-s", "--search", is_flag=True, help="Enable substring search of NAME in api name or display name.", +) +@click.argument("table", required=False) +@click.pass_context +def tables_get(ctx, isname, table, limit, offset, type_, compact, search): + """List the tables or datasets permitted by the security profile of the current user. + + [TABLE] Prism table ID or name (--isName flag) to list. + """ + + p = ctx.obj["p"] + + # Query the tenant...see if the caller said to treat the + # table as a name, AND that a table was provided. + if not isname and table is not None: + # When using an ID, the GET:/tables operation returns a simple + # dictionary of the table definition. + table = p.tables_get(table_id=table, type_=type_) + + if table is None: + logger.error(f"Table ID {table} not found.") + sys.exit(1) + + if compact: + table = schema_compact(table) + + logger.info(json.dumps(table, indent=2)) + else: + # When querying by name, the get operation returns a + # dict with a count of found tables and a list of + # tables. + tables = p.tables_get(table_name=table, limit=limit, offset=offset, type_=type_, search=search) + + if tables["total"] == 0: + logger.error(f"Table ID {table} not found.") + return + + if compact: + for tab in tables["data"]: + tab = schema_compact(tab) + + logger.info(json.dumps(tables, indent=2)) + + +@click.command("create") +@click.option("-n", "--table_name", help="Table name - overrides name from schema.") +@click.option("-d", "--displayName", help="Specify a display name - defaults to name.") +@click.option( + "-e", "--enableForAnalysis", type=bool, is_flag=True, default=None, help="Enable this table for analytics.", +) +@click.option("-s", "--sourceName", help="The API name of an existing table to copy.") +@click.option("-w", "--sourceWID", help="The WID of an existing table to copy.") +@click.argument("file", required=False, type=click.Path(exists=True)) +@click.pass_context +def tables_create(ctx, table_name, displayname, enableforanalysis, sourcename, sourcewid, file): + """ + Create a new table with the specified name. + + [FILE] Optional file containing a Prism schema definition for the new table. + + Note: A schema file, --sourceName, or --sourceWID must be specified. + """ + p = ctx.obj["p"] + + # We can assume a schema was found/built - get_schema sys.exits if there is a problem. + schema = load_schema(p, file, sourcename, sourcewid) + + # Initialize a new schema with the particulars for this table operation. + if table_name is not None: + # If we got a name, set it in the table schema + schema["name"] = table_name.replace(" ", "_") # Minor clean-up + + # Force the display name - there cannot be duplicate displayNames + # in the data catalog. + schema["displayName"] = table_name + + logger.debug(f'setting table name to {schema["name"]}') + elif "name" not in schema: + # The schema doesn't have a name and none was given - exit. + # Note: this could be true if we have a schema of only fields. + logger.error("Table --table_name must be specified.") + sys.exit(1) + + if displayname is not None: + # If we got a display name, set it in the schema + schema["displayName"] = displayname + elif "displayName" not in schema: + # Default the display name to the name if not in the schema. + schema["displayName"] = table_name + logger.debug(f'defaulting displayName to {schema["displayName"]}') + + if enableforanalysis is not None: + schema["enableForAnalysis"] = enableforanalysis + elif "enableForAnalysis" not in schema: + # Default to False - do not enable. + schema["enableForAnalysis"] = False + logger.debug("defaulting enableForAnalysis to False.") + + # Create the table. + table_def = p.tables_post(schema) + + if table_def is not None: + logger.info(json.dumps(table_def, indent=2)) + else: + logger.error(f'Error creating table {schema["name"]}.') + sys.exit(1) + + +@click.command("edit") +@click.option( + "-t", "--truncate", is_flag=True, default=False, help="Truncate the table before updating.", +) +@click.argument("file", required=True, type=click.Path(exists=True, dir_okay=False, readable=True)) +@click.pass_context +def tables_edit(ctx, file, truncate): + """Edit the schema for an existing table. + + [FILE] File containing an updated schema definition for the table. + """ + p = ctx.obj["p"] + + # The user can specify a GET:/tables output file containing + # the ID and other attributes that could be passed on the + # command line. + schema = load_schema(file=file) + + table = p.tables_put(schema, truncate=truncate) + + if table is None: + logger.error("Error updating table.") + else: + logger.info(json.dumps(table, indent=2)) + + +@click.command("patch") +@click.option( + "-n", "--isName", is_flag=True, default=False, help="Flag to treat the table argument as a name.", +) +@click.option( + "--displayName", + is_flag=False, + flag_value="*-clear-*", + default=None, + help="Set the display name for an existing table.", +) +@click.option( + "--description", + is_flag=False, + flag_value="*-clear-*", + default=None, + help="Set the display name for an existing table.", +) +@click.option( + "--documentation", + is_flag=False, + flag_value="*-clear-*", + default=None, + help="Set the documentation for an existing table.", +) +@click.option( + "--enableForAnalysis", is_flag=False, default=None, type=click.Choice(["true", "false"], case_sensitive=False), +) +@click.argument("table", required=True, type=str) +@click.argument("file", required=False, type=click.Path(dir_okay=False)) +@click.pass_context +def tables_patch(ctx, isname, table, file, displayname, description, documentation, enableforanalysis): + """Edit the specified attributes of an existing table with the specified id (or name). + + If an attribute is not provided in the request, it will not be changed. To set an + attribute to blank (empty), include the attribute without specifying a value. + + TABLE The ID or API name (use -n option) of the table to patch. + + [FILE] Optional file containing patch values for the table. + """ + + p = ctx.obj["p"] + + # Figure out the new schema either by file or other table. + patch_data = {} + + # If a file is specified, there can only be patch values and + # cannot be a full Prism schema. + if file is not None: + try: + with open(file, "r") as patch_file: + patch_data = json.load(patch_file) + except Exception as e: + logger.error(e) + sys.exit(1) + + if not isinstance(patch_data, dict): + logger.error("invalid patch file - should be a dictionary") + sys.exit(1) + + valid_attributes = [ + "displayName", + "description", + "enableForAnalysis", + "documentation", + ] + + for patch_attr in patch_data.keys(): + if patch_attr not in valid_attributes: + logger.error(f'unexpected attribute "{patch_attr}" in patch file') + sys.exit(1) + + def set_patch_value(attr, value): + """Utility function to set or clear a table attribute. + + If the user specifies an attribute but does not provide a value, + add a patch value to clears/null the value + """ + if value == "*-clear-*": + patch_data[attr] = "" + else: + patch_data[attr] = value + + # See if the user creating new patch variables or overriding + # values from the patch file. + + # Note: specifying the option without a value creates a + # patch value to clear the value in the table def. The + # caller can override the values from the patch file using + # command line arguments. + if displayname is not None: # Specified on CLI + set_patch_value("displayName", displayname) + + if description is not None: + set_patch_value("description", description) + + if documentation is not None: + set_patch_value("documentation", documentation) + + if enableforanalysis is not None: + if enableforanalysis.lower() == "true": + patch_data["enableForAnalysis"] = "true" + else: + patch_data["enableForAnalysis"] = "false" + + # The caller must be asking for something to change! + if len(patch_data) == 0: + logger.error("Specify at least one table schema value to update.") + sys.exit(1) + + # Identify the existing table we are about to patch. + if isname: + # Before doing anything, table name must exist. + tables = p.tables_get(table_name=table) # Exact match + + if tables["total"] == 0: + logger.error(f'Table name "{table}" not found.') + sys.exit(1) + + resolved_id = tables["data"][0]["id"] + else: + # No verification needed, simply assume the ID is valid. + resolved_id = table + + table = p.tables_patch(table_id=resolved_id, patch=patch_data) + + if table is None: + logger.error(f"Error updating table ID {resolved_id}") + else: + logger.info(json.dumps(table, indent=2)) + + +@click.command("upload") +@click.option( + "-n", "--isName", is_flag=True, default=False, help="Flag to treat the table argument as a name.", +) +@click.option( + "-o", + "--operation", + default="TruncateAndInsert", + help="Operation for the table operation - default to TruncateAndInsert.", +) +@click.argument("table", required=True) +@click.argument("file", nargs=-1, type=click.Path(exists=True)) +@click.pass_context +def tables_upload(ctx, table, isname, operation, file): + """ + Upload a file into the table using a bucket. + + [TABLE] A Prism Table identifier. + [FILE] One or more CSV or GZIP.CSV files. + """ + + p = ctx.obj["p"] + + # Convert the file(s) provided to a list of compressed files. + + if len(file) == 0: + logger.error("No files to upload.") + sys.exit(1) + + if isname: + results = upload_file(p, table_name=table, file=file, operation=operation) + else: + results = upload_file(p, table_id=table, file=file, operation=operation) + + logger.debug(json.dumps(results, indent=2)) + + +@click.command("truncate") +@click.option( + "-n", "--isName", is_flag=True, default=False, help="Flag to treat the table argument as a name.", +) +@click.argument("table", required=True) +@click.pass_context +def tables_truncate(ctx, table, isname): + """ + Truncate the named table. + + [TABLE] The Prism Table ID or API name of the table to truncate. + """ + p = ctx.obj["p"] + + if isname: + result = truncate_table(p, table_name=table) + else: + result = truncate_table(p, table_id=table) + + if result is None: + logger.warning("Table was not truncated.") + else: + logger.info(json.dumps(result, indent=2)) diff --git a/prism/data/schema.json b/prism/data/schema.json index 896c055..58466f0 100644 --- a/prism/data/schema.json +++ b/prism/data/schema.json @@ -1,42 +1,57 @@ { "fields": [ { - "defaultValue": "textField", - "description": "this is a Text Field", - "name": "State2", - "parseFormat": "", - "precision": 0, + "name": "id", + "displayName" : "Record ID", + "description": "This is an example of text primary key.", "ordinal": 1, - "scale": 0, "type": { "id": "Schema_Field_Type=Text" - } + }, + "required" : true, + "externalId" : true }, { - "defaultValue": "0", - "description": "this is an Integer Field", - "name": "Population2", - "parseFormat": "", - "precision": 9, + "name": "name", + "displayName" : "Full Name", + "description": "Full name of employee.", "ordinal": 2, - "scale": 0, + "type": { + "id": "Schema_Field_Type=Text" + }, + "required" : true + }, + { + "name": "employ_yrs", + "displayName" : "Employee Tenure Years", + "description": "Integer number of years in role.", + "ordinal": 3, + "defaultValue": 0, + "type": { + "id": "Schema_Field_Type=Integer" + } + }, + { + "name": "average_rating", + "displayName" : "Average Rating", + "description": "Average performance rating.", + "ordinal": 4, + "precision" : 9, + "scale" : 2, + "defaultValue": 1.00, "type": { "id": "Schema_Field_Type=Numeric" } - } - ], - "parseOptions": { - "charset": { - "id": "Encoding=UTF-8" }, - "fieldsDelimitedBy": ",", - "fieldsEnclosedBy": "\"", - "headerLinesToIgnore": 1, - "type": { - "id": "Schema_File_Type=Delimited" + { + "name": "hired", + "displayName" : "Hire Date", + "description": "Date of hire.", + "ordinal": 5, + "parseFormat": "yyyy-MM-DD", + "type": { + "id": "Schema_Field_Type=Date" + } } - }, - "schemaVersion": { - "id": "Schema_Version=1.0" - } + ] } \ No newline at end of file diff --git a/prism/prism.py b/prism/prism.py index e4c3f9e..60a3fc9 100644 --- a/prism/prism.py +++ b/prism/prism.py @@ -3,79 +3,379 @@ The Prism API provides a flexible, secure and scalable way to load data into Workday Prism Analytics. + +DocString style: https://www.sphinx-doc.org/en/master/usage/extensions/example_numpy.html """ import logging import json -import random import requests - -# set up basic logging -logger = logging.getLogger() -handler = logging.StreamHandler() -formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s", "%Y-%m-%d %H:%M:%S") -handler.setFormatter(formatter) +import time +import os +import sys +import uuid +import gzip +import inspect +import copy + +from urllib import parse as urlparse + +# Default a logger - the default may be re-configured in the set_logging method. +logger = logging.getLogger(__name__) +logger.setLevel(logging.WARNING) + +# writing to stdout only... +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.WARNING) +log_format = logging.Formatter("%(asctime)s %(levelname)s: %(message)s", "%Y-%m-%d %H:%M:%S") +handler.setFormatter(log_format) logger.addHandler(handler) -logger.setLevel(logging.INFO) -def load_schema(filename): - """Load schema from a JSON file. +def set_logging(log_file=None, log_level="INFO"): + """ + + :param log_file: + :param log_level: + :return: + """ + # Resolve the log level - default to info if empty or invalid. + if log_level is None: + set_level = logging.INFO + else: + # Make sure the caller gave us a valid "name" (INFO/DEBUG/etc) for logging level. + if hasattr(logging, log_level): + set_level = getattr(logging, log_level) + else: + set_level = logging.INFO + + # If no file was specified, simply loop over any handlers and + # set the logging level. + if log_file is None: + logger.setLevel(set_level) + + for log_handler in logger.handlers: + log_handler.setLevel(set_level) + else: + # Setup logging for CLI operations. + for log_handler in logger.handlers: + logger.removeHandler(log_handler) + + logger.setLevel(set_level) + + # Create a handler as specified by the user (or defaults) + fh = logging.FileHandler(log_file) + fh.setLevel(set_level) + + # create formatter and add it to the handlers + formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + fh.setFormatter(formatter) + + logger.addHandler(fh) + + logger.debug(f"set log level: {set_level}") + + +def log_elapsed(msg, timedelta): + """Log the elapsed time of a get/post/put/patch HTTP operation.""" + elapsed = timedelta.total_seconds() + logger.debug(f"{msg}: elapsed {elapsed:.5f}") + + +def buckets_gen_name(): + bucket_name = "prism_python_" + uuid.uuid4().hex + logger.debug(f"buckets_gen_name: created bucket name: {bucket_name}") + + return bucket_name + + +def schema_compact(schema): + """Utility function to revise a schema for a bucket operations.""" + + if schema is None: + logger.error("schema_compact: schema cannot be None.") + return None + + if not isinstance(schema, dict): + logger.error("schema_compact: schema is not a dictionary.") + return None + + compact_schema = copy.deepcopy(schema) + + # Add a sequential order (ordinal) on the fields to (en)force + # required sequencing of fields. Note: for summary tables + # there will not be a fields attribute. + if "fields" in compact_schema: + # Remove Prism managed fields "WPA_*" + compact_schema["fields"] = [fld for fld in compact_schema["fields"] if not fld["name"].startswith("WPA_")] + + for ordinal in range(len(compact_schema["fields"])): + fld = schema["fields"][ordinal] + fld["ordinal"] = ordinal + 1 + + if "fieldId" in fld: + del fld["fieldId"] + + if "id" in fld: + del fld["id"] + + if "type" in fld: + if "descriptor" in fld["type"]: + # Convert the descriptor to the shortened Prism type syntax. + fld["type"]["id"] = f"Schema_Field_Type={fld['type']['descriptor']}" + del fld["type"]["descriptor"] + + # Remove all attributes from the schema that cannot be specified on + # a post or put operation. + keys = list(compact_schema.keys()) + + for k in keys: + if k not in [ + "name", + "id", + "fields", + "tags", + "categories", + "displayName", + "description", + "documentation", + "enableForAnalysis", + ]: + del compact_schema[k] + + return compact_schema + + +def table_to_bucket_schema(table): + """Convert schema derived from list table to a bucket schema. Parameters ---------- - filename : str - The path to your file. + table: dict + A dictionary containing the schema definition for your dataset. Returns ------- - schema : dict - A dictionary containing the schema for your table. + If the request is successful, a dictionary containing the bucket schema is returned. + The results can then be passed to the create_bucket function """ - with open(filename) as f: - schema = json.load(f) - return schema + # describe_schema is a python dict object and needs to be accessed as such, 'data' is the top level object, + # but this is itself a list (with just one item) so needs the list index, in this case 0. 'fields' is found + # in the dict that is in ['data'][0] + if table is None or "fields" not in table: + logger.error("Invalid table passed to table_to_bucket_schema.") + return None -class Prism: - """Base class for interacting with the Workday Prism API. + bucket_schema = { + "schemaVersion": {"id": "Schema_Version=1.0"}, + } - Attributes - ---------- - base_url : str - The URL for the API client + fields = table["fields"] - tenant_name : str - The name of your Workday tenant + # Get rid of any WPA_ fields... + fields[:] = [x for x in fields if "WPA" not in x["name"]] - client_id : str - The Client ID for your registered API client + # Create and assign useAsOperationKey field with true/false values based on externalId value + operation_key_false = {"useAsOperationKey": False} + operation_key_true = {"useAsOperationKey": True} - client_secret : str - The Client Secret for your registered API client + for fld in fields: + if fld["externalId"] is True: + fld.update(operation_key_true) + else: + fld.update(operation_key_false) + + # Now trim our field attributes to keep just what we need - these may + # or may not be in the schema - just make sure. + for fld in fields: + for attr in ["id", "displayName", "fieldId", "required", "externalId"]: + if attr in fld: + del fld[attr] + + # Use the parse options from the schema file if provided, otherwise + # automatically add defaults suitable for most CSV files. + if "parseOptions" in table: + bucket_schema["parseOptions"] = table["parseOptions"] + else: + bucket_schema["parseOptions"] = { + "fieldsDelimitedBy": ",", + "fieldsEnclosedBy": '"', + "headerLinesToIgnore": 1, + "charset": {"id": "Encoding=UTF-8"}, + "type": {"id": "Schema_File_Type=Delimited"}, + } + + # Build the final bucket definition. + bucket_schema["fields"] = fields + + return bucket_schema + + +class Prism: + """Class for interacting with the Workday Prism API. - refresh_token : str - The Refresh Token for your registered API client - version : str - The version of the Prism API to use + Attributes: + base_url (str): URL for the Workday API client + tenant_name (str): Workday tenant name + client_id (str): Client ID for the registered API client + client_secret (str): Client Secret for the registered API client + refresh_token (str): Refresh Token for the Workday user + version (str): Version of the Prism API to use """ - def __init__(self, base_url, tenant_name, client_id, client_secret, refresh_token, version="v2"): + def __init__( + self, base_url, tenant_name, client_id, client_secret, refresh_token, version="v3", + ): """Init the Prism class with required attributes.""" + + # Capture the arguments into the class variables. self.base_url = base_url self.tenant_name = tenant_name self.client_id = client_id self.client_secret = client_secret self.refresh_token = refresh_token - self.token_endpoint = f"{base_url}/ccx/oauth2/{tenant_name}/token" self.version = version + + # Compose the endpoints for authentication and API calls. + self.token_endpoint = f"{base_url}/ccx/oauth2/{tenant_name}/token" self.rest_endpoint = f"{base_url}/ccx/api/{version}/{tenant_name}" - self.prism_endpoint = f"{base_url}/ccx/api/prismAnalytics/{version}/{tenant_name}" + self.prism_endpoint = f"{base_url}/api/prismAnalytics/{version}/{tenant_name}" self.upload_endpoint = f"{base_url}/wday/opa/tenant/{tenant_name}/service/wBuckets" + + # At creation, there cannot yet be a bearer_token obtained from Workday. self.bearer_token = None + """str: Active bearer token for the session.""" + + self.bearer_token_timestamp = None + """time.time: Last bearer token time.""" + + # Helper constants. + self.CONTENT_APP_JSON = {"Content-Type": "application/json"} + self.CONTENT_FORM = {"Content-Type": "application/x-www-form-urlencoded"} + + def http_get(self, url, headers=None, params=None): + """Pass the headers and params to the URL to retrieve + + :param url: + :param headers: + :param params: + :return: + """ + caller = inspect.stack()[1][3] + logger.debug(f"get: called by {caller}") + + if url is None or not isinstance(url, str) or len(url) == 0: + # Create a fake response object for standard error handling. + msg = "get: missing URL" + + response = {"status_code": 600, "text": msg, "errors": [{"error": msg}]} + else: + logger.debug(f"get: {url}") + + # Every request requires an authorization header - make it true. + if headers is None: + headers = {} + + if "Authorization" not in headers: + headers["Authorization"] = "Bearer " + self.get_bearer_token() + + response = requests.get(url, params=params, headers=headers) + log_elapsed(f"get: {caller}", response.elapsed) + + if response.status_code != 200: + logger.error(f"Invalid HTTP status: {response.status_code}") + logger.error(f"Reason: {response.reason}") + logger.error(f"Text: {response.text}") + + return response + + def http_post(self, url, headers=None, data=None, files=None): + caller = inspect.stack()[1][3] + logger.debug(f"post: called by {caller}") + + if url is None or not isinstance(url, str) or len(url) == 0: + # Create a fake response object for standard error handling. + msg = "POST: missing URL" + + response = {"status_code": 600, "text": msg, "errors": [{"error": msg}]} + else: + logger.debug(f"post: {url}") + + # Every request requires an authorization header - make it true. + if headers is None: + headers = {} + + if "Authorization" not in headers and caller != "create_bearer_token": + headers["Authorization"] = "Bearer " + self.get_bearer_token() + + response = requests.post(url, headers=headers, data=data, files=files) + log_elapsed(f"put: {caller}", response.elapsed) + + if response.status_code > 299: + logger.error(response.text) + + return response + + def http_patch(self, url, headers=None, data=None): + caller = inspect.stack()[1][3] + logger.debug(f"patch: called by {caller}") + + if url is None or not isinstance(url, str) or len(url) == 0: + # Create a fake response object for standard error handling. + msg = "PATCH: missing URL" + + response = {"status_code": 600, "text": msg, "errors": [{"error": msg}]} + else: + logger.debug(f"patch: {url}") + + # Every request requires an authorization header - make it true. + if headers is None: + headers = {} + + if "Authorization" not in headers and caller != "create_bearer_token": + headers["Authorization"] = "Bearer " + self.get_bearer_token() + + response = requests.patch(url, headers=headers, data=json.dumps(data)) + log_elapsed(f"patch: {caller}", response.elapsed) + + if response.status_code > 299: + logger.error(response.text) + + return response + + def http_put(self, url, headers=None, data=None): + caller = inspect.stack()[1][3] + logger.debug(f"put: called by {caller}") + + if url is None or not isinstance(url, str) or len(url) == 0: + # Create a fake response object for standard error handling. + msg = "PUT: missing URL" + + response = {"status_code": 600, "text": msg, "errors": [{"error": msg}]} + else: + logger.debug(f"put: {url}") + + # Every request requires an authorization header - make it true. + if headers is None: + headers = {} + + if "Authorization" not in headers and caller != "create_bearer_token": + headers["Authorization"] = "Bearer " + self.get_bearer_token() + + if "Content-Type" not in headers: + headers["Content-Type"] = "application/json" + + response = requests.put(url, headers=headers, data=json.dumps(data)) + log_elapsed(f"put: {caller}", response.elapsed) + + if response.status_code > 299: + logger.error(response.text) + + return response def create_bearer_token(self): """Exchange a refresh token for an access token. @@ -89,7 +389,6 @@ def create_bearer_token(self): class. """ - headers = {"Content-Type": "application/x-www-form-urlencoded"} data = { "grant_type": "refresh_token", @@ -98,325 +397,1076 @@ def create_bearer_token(self): "client_secret": self.client_secret, } - r = requests.post(self.token_endpoint, headers=headers, data=data) - r.raise_for_status() + r = self.http_post(url=self.token_endpoint, headers=self.CONTENT_FORM, data=data) if r.status_code == 200: - logging.info("Successfully obtained bearer token") + logger.debug("successfully obtained bearer token") self.bearer_token = r.json()["access_token"] + self.bearer_token_timestamp = time.time() else: - logging.warning(f"HTTP status code {r.status_code}: {r.content}") + # Error handling occurred in http_post, fail silently here. + self.bearer_token = None + self.bearer_token_timestamp = None - def create_table(self, table_name, schema): - """Create an empty table of type "API". + def get_bearer_token(self): + """Get the current bearer token, or create a new one + + Note: + If the token doesn't exist, or it's older than 15 minutes create + a new token. + + Returns: + Workday bearer token. + """ + if self.bearer_token is None or (time.time() - self.bearer_token_timestamp) > 900: + self.create_bearer_token() + + if self.bearer_token is None: + return "" # Only return strings + + return self.bearer_token + + def reset_bearer_token(self): + """Reset the current bearer token to none. + + Note: Use this to force getting a new token on the next API call. + """ + self.bearer_token = None + self.bearer_token_timestamp = None + + def tables_get( + self, table_name=None, table_id=None, limit=None, offset=None, type_="summary", search=False, + ): + """Obtain details for all tables or a given table(s). + + Notes + ----- + This method never fails and always returns a valid Dict. Parameters ---------- table_name : str - The table name. The name must be unique and conform to the name - validation rules. - - schema : list - A list of dictionaries containing the schema + The name of the table to obtain details about. If the default value + of None is specified. + table_id : str + The ID of a table to obtain details about. When specified, all tables + are searched for the matching id. + limit : int + The maximum number of tables to be queried, if None all tables are returned. + offset: int + The offset from zero of tables to return. + type_ : str + Level of detail to return. + search : bool + Enable contains searching for table names and display names. Returns ------- - If the request is successful, a dictionary containing information about - the new table is returned. - + dict + For an ID query, return the table information as a dict. For any other + table list query, return a total attribute of the number of tables found and data + attribute containing the list tables. """ - url = self.prism_endpoint + "/datasets" + operation = "/tables" + + if type_ is None or type_.lower() not in ["full", "summary", "permissions"]: + logger.warning("Invalid output type for tables list operation - defaulting to summary.") + output_type = "summary" + else: + output_type = type_.lower() + + # If we got an ID, then do a direct query by ID - no validation, paging + # or searching required. + if table_id is not None: + operation = f"{operation}/{table_id}?format={output_type}" + logger.debug(f"get: {operation}") + url = self.prism_endpoint + operation + + response = self.http_get(url) - headers = { - "Authorization": "Bearer " + self.bearer_token, - "Content-Type": "application/json", + if response.status_code == 200: + # Return the dict object to the caller - note: no + # 'total' or 'data' attributes for this single + # response to match the return from the API call. + return response.json() + else: + return None + + # We are doing a query by attributes other than ID. + logger.debug(f"tables_get: {operation}") + url = self.prism_endpoint + operation + + # Always return a valid JSON object of results regardless of + # errors or API responses. THIS METHOD NEVER FAILS. + return_tables = {"total": 0, "data": []} + + # Start setting up the API call parameters - this is the minimal + # parameters to perform a search. + params = { + "limit": limit if isinstance(limit, int) and limit <= 100 else 20, + "offset": offset if isinstance(limit, int) and offset >= 0 else 0, + "type": output_type, } - data = {"name": table_name, "fields": schema} + # See if we want to add an explicit table name as a search parameter. + if not search and table_name is not None: + # Here, the caller is not searching, they gave us an exact name. + params["name"] = table_name.replace(" ", "_") # Minor clean-up + + # Should only be 0 (not found) or 1 (found) tables found. + params["limit"] = 1 + params["offset"] = 0 + elif search and table_name is not None: + # If the caller asked for a search, open up the limits on + # the GETs for maximum retrieval since we need to look at + # every table to check for matches - a user specified limit + # (if specified) applies as tables are found. + params["limit"] = 100 # Max pagesize to retrieve in the fewest REST calls. + params["offset"] = 0 + elif not search and limit is None: + # The caller asked for all the tables, i.e., no ID, table substring search, + # or limit - open up the limits for maximum retrieval. + search = True + params["limit"] = 100 # Max pagesize to retrieve in the fewest REST calls. + params["offset"] = 0 + + # Assume we are paging the results. + while True: + r = self.http_get(url, params=params) + + if r.status_code != 200: + # Whatever we've captured (perhaps zero tables) so far + # will be returned due to unexpected status code. Break + # and do final clean-up on exit. + break + + # Convert the response to a list of tables. + tables = r.json() + + # We are not searching, and we have a specific table - return + # whatever we got - maybe zero if table was not found. + if not search and table_name is not None: # Explicit table name + return tables + + # Figure out what tables of this batch of tables should be part of the + # return results, i.e., search the this batch for matches. + if table_name is not None: + table_lower = table_name.lower() + + # We are searching, do a substring search for matching strings + # anywhere in table names and display names + match_tables = [ + tab + for tab in tables["data"] + if table_lower in tab["name"].lower() or table_lower in tab["displayName"].lower() + ] + else: + # Grab all the tables in the result + match_tables = tables["data"] - r = requests.post(url, headers=headers, data=json.dumps(data)) - r.raise_for_status() + return_tables["data"] += match_tables - if r.status_code == 201: - logging.info("Successfully created an empty API table") - return r.json() - elif r.status_code == 400: - logging.warning(r.json()["errors"][0]["error"]) - else: - logging.warning(f"HTTP status code {r.status_code}: {r.content}") + # If we get back anything but a full page, we are done + # paging the results. + if len(tables["data"]) < params["limit"]: + break + + if search: + # Move on to the next page. + params["offset"] += params["limit"] + else: + # The caller asked for a specific limit and offset, exit the loop. + break + + # We always return a dict with the total tables found. + return_tables["total"] = len(return_tables["data"]) # Separate step for debugging. + return return_tables + + def tables_post(self, schema): + """Create an empty table of type "API". + + Parameters + ---------- + schema : dict + A dictionary containing the schema + + Returns + ------- + dict + If the request is successful, a dictionary containing information about + the new table is returned, otherwise None. + """ + operation = "/tables" + logger.debug(f"POST : {operation}") + url = self.prism_endpoint + "/tables" + + compact_schema = schema_compact(schema) + + if compact_schema is None: + logger.error("Invalid schema for create operation.") + return None + + response = self.http_post(url=url, headers=self.CONTENT_APP_JSON, data=json.dumps(compact_schema)) - def create_bucket(self, schema, table_id, operation="TruncateandInsert"): - """Create a temporary bucket to upload files. + if response.status_code == 201: + return response.json() + + return None + + def tables_put(self, schema): + """Update an existing table using a full schema definition. + + Notes + ----- + For certain changes, e.g., changing a data type, the table cannot + have any data. Parameters ---------- schema : dict - A dictionary containing the schema for your table. + A dictionary containing the schema + + Returns + ------- + dict + If the request is successful, a dictionary containing information about + the new table is returned, otherwise None. + """ + compact_schema = schema_compact(schema) + + if compact_schema is None: + logger.error("Invalid schema for update operation.") + return None + + table_id = compact_schema["id"] + + operation = f"/tables/{table_id}" + logger.debug(f"PUT: {operation}") + url = self.prism_endpoint + operation + + response = self.http_put(url=url, data=compact_schema) + + if response.status_code == 200: + return response.json() + return None + + def tables_patch(self, table_id, patch): + """Patch the table with specified values. + + Notes + ----- + Patching only changes a short list of table + level attributes. + + Parameters + ---------- table_id : str - The ID of the table that this bucket is to be associated with. + Prism Table ID of an existing table. - operation : str - Required, defaults to "TruncateandInsert" operation - Additional Operations - “Insert”, “Update”, “Upsert”, “Delete” - When you use Update/Upsert/Delete operation you must specify which field to use - as the matching key by setting the ‘useAsOperationKey’ attribute on that field as True. - Only fields marked as ExternalID or WPA_RowID or WPA_LoadId on Table schema can be used - as operation keys during loads into the table. + patch : dict + One or more table attributes to update. Returns ------- - If the request is successful, a dictionary containing information about - the new bucket is returned. + dict + If the request is successful, a dictionary containing information about + the new table is returned, otherwise None. + """ + operation = f"/tables/{table_id}" + logger.debug(f"PATCH: {operation}") + url = self.prism_endpoint + operation + + response = self.http_patch(url=url, headers=self.CONTENT_APP_JSON, data=patch) + + if response.status_code == 200: + return response.json() + + return None + + def buckets_get( + self, + bucket_id=None, + bucket_name=None, + search=False, + limit=None, + offset=None, + type_="summary", + table_id=None, + table_name=None, + ): + """Get a one or more bucket definitions. + + Parameters + ---------- + bucket_id : str + The ID of an existing bucket. + bucket_name : str + The name of an existing bucket. + limit : int + The maximum number of tables to be queried, if None all tables are returned. + offset: int + The offset from zero of tables to return. + type_ : str + Level of detail to return. + table_id : str + List all/any buckets for associated with the table id. + table_name : str + List all/any buckets for associated with the table name. + search : bool + Enable contains searching for bucket names and display names. + Returns + ------- + dict + For an ID query, return the bucket information as a dict. For any other + bucket query, return a total attribute of the number of buckets found and data + attribute containing the list buckets. """ - url = self.prism_endpoint + "/wBuckets" + operation = "/buckets" + + output_type = type_.lower() if type_.lower() in ["full", "summary"] else "summary" + + # If we got an ID, then do a direct query by ID - no paging or + # searching required. + if bucket_id is not None: + operation = f"{operation}/{bucket_id}?format={output_type}" + logger.debug(f"get: {operation}") + url = self.prism_endpoint + operation - headers = { - "Authorization": "Bearer " + self.bearer_token, - "Content-Type": "application/json", + response = self.http_get(url) + + if response.status_code == 200: + return response.json() + else: + return None + + logger.debug(f"get: {operation}") + url = self.prism_endpoint + operation + + # Start the return object - this method NEVER fails + # and always returns a valid dict object. + return_buckets = {"total": 0, "data": []} + + params = { + "limit": limit if limit is not None else 100, + "offset": offset if offset is not None else 0, + "type": output_type, } + if not search and bucket_name is not None: + # List a specific bucket name overrides any other + # combination of search/table/bucket name/wid. + params["name"] = urlparse.quote(bucket_name) + + params["limit"] = 1 # Can ONLY be one matching bucket. + params["offset"] = 0 + else: + # Any other combination of parameters requires a search + # through all the buckets in the data catalog. + search = True + + params["limit"] = 100 # Max pagesize to retrieve in the fewest REST calls. + params["offset"] = 0 + + while True: + r = self.http_get(url, params=params) + + if r.status_code != 200: + # This routine never fails, return whatever we got (if any). + break + + buckets = r.json() + + if not search and bucket_name is not None: # exact bucket name + # We are not searching, and we have a specific bucket, + # return whatever we got with this call even if no buckets + # were found (it will be in the necessary dict structure). + return buckets + + if bucket_name is not None: # We are searching at this point. + # Substring search for matching table names + match_buckets = [ + bck for bck in buckets["data"] if bucket_name in bck["name"] or bucket_name in bck["displayName"] + ] + elif table_id is not None: + match_buckets = [bck for bck in buckets["data"] if table_id == bck["targetDataset"]["id"]] + elif table_name is not None: + # Caller is looking for any/all buckets by target table(s) + match_buckets = [ + bck + for bck in buckets["data"] + if table_name == bck["targetDataset"]["descriptor"] + or (search and table_name.lower() in bck["targetDataset"]["descriptor"].lower()) + ] + else: + # No search in progress, grab all the buckets in this page. + match_buckets = buckets["data"] + + # Add to the results. + return_buckets["data"] += match_buckets + + # If we get back a list of buckets fewer than a full page, we are done + # paging the results. + if len(buckets["data"]) < params["limit"]: + break + + if search: + # Move on to the next page... + params["offset"] += params["limit"] + else: + # The caller asked for a specific limit and offset, exit the loop. + break + + # We always return a valid count of buckets found. + return_buckets["total"] = len(return_buckets["data"]) + + return return_buckets + + def buckets_create( + self, bucket_name=None, target_name=None, target_id=None, schema=None, operation="TruncateAndInsert", + ): + """Create a Prism bucket to upload files. + + Notes + ----- + A table name (without a table id) retrieves the table id. + + Default operation is TruncateAndInsert, valid operations include + “Insert”, “Update”, “Upsert” and “Delete” + + For Update/Upsert/Delete operations, one field in the table must have the + ‘useAsOperationKey’ attribute set to True. Only fields marked as ExternalID + or WPA_RowID or WPA_LoadId on Table schema can be used as operation keys + during loads into the table. + + Parameters + ---------- + bucket_name : str + Name of the bucket to create, default to a generated name. + target_id : str + The ID of the table for this bucket. + target_name : str + The name of the table for bucket. + schema : dict|file + A dictionary or JSON file containing the schema fields describing the file. + operation : str + Required, defaults to 'TruncateAndInsert' operation + + Returns + ------- + dict + Information about the new bucket, or None if there was a problem. + """ + + # If the caller didn't give us a name for the new bucket, create a default name. + new_bucket_name = bucket_name if bucket_name is not None else buckets_gen_name() + table_schema = None + + if schema is not None: + if isinstance(schema, dict): + table_schema = schema # Use as provided. + elif isinstance(schema, str): + try: + with open(schema) as schema_file: + table_schema = json.load(schema_file) + except Exception as e: + # We don't care what the problem is (missing file, bad JSON). + logger.error(e) + return None + else: + logger.error("invalid schema - expecting dict or file name.") + return None + + if target_id is None and target_name is None: + # The caller expects the target table to be identified in the passed dict/file - do a quick sanity check. + if table_schema is None: + logger.error("schema, target id or target name is required to create a bucket.") + return None + + # To create a bucket based on ONLY the schema dict/file, the caller + # must have provide the ID of the target table and the fields + # expected in the CSV file. + if "id" not in table_schema or "fields" not in table_schema: + logger.error('schema missing "id" or "fields" attribute.') + return None + else: + # The caller gave us in ID or name of the target table, make sure the table exists. + if target_id is not None: + # Always use ID if provided - has precedence over name. + table = self.tables_get(table_id=target_id, type_="full") # Full=include fields object + + if table is None: + logger.error(f"table ID {target_id} not found.") + return None + else: + tables = self.tables_get(table_name=target_name, type_="full") + + if tables["total"] == 0: + logger.error(f"table {target_name} not found for bucket operation.") + return None + + table = tables["data"][0] + + # If the caller DIDN'T provide a schema dict/file, use the table + # we just found to supply the ID and fields for the bucket. + if table_schema is None: + table_schema = table + else: + # Use everything from the schema dict/file, but set/overwrite the ID + # to the target table we just looked up. + table_schema["id"] = table["id"] + + # Regardless of how we got the table definition, reduce the definition + # to remove extrainious attributes for a bucket operation. + compact_schema = schema_compact(table_schema) + + if compact_schema is None: + logger.error("Invalid schema for bucket operation.") + return None + + bucket_schema = table_to_bucket_schema(compact_schema) + + logger.debug("post: /buckets") + url = self.prism_endpoint + "/buckets" + data = { - "name": "prism_python_wbucket_" + str(random.randint(1000000, 9999999)), + "name": new_bucket_name, "operation": {"id": "Operation_Type=" + operation}, - "targetDataset": {"id": table_id}, - "schema": schema, + "targetDataset": {"id": table_schema["id"]}, + "schema": bucket_schema, } - r = requests.post(url, headers=headers, data=json.dumps(data)) - r.raise_for_status() + response = self.http_post(url, headers=self.CONTENT_APP_JSON, data=json.dumps(data)) + + if response.status_code == 201: + response_json = response.json() + + logger.debug(f'successfully created a new wBucket: {response_json["id"]}') + return response_json + + return None + + def buckets_complete(self, bucket_id): + """ + Commit the data contained in the bucket to the associated table. + + Parameters + ---------- + bucket_id : str + The ID of an existing bucket with a "New" status. + + Returns + ------- + dict + Information about the completed bucket, or None if there was a problem. + """ + operation = f"/buckets/{bucket_id}/complete" + logger.debug(f"post: {operation}") + url = self.prism_endpoint + operation + + r = self.http_post(url) if r.status_code == 201: - logging.info("Successfully created a new wBucket") + logger.debug(f"successfully completed wBucket {bucket_id}.") return r.json() elif r.status_code == 400: - logging.warning(r.json()["errors"][0]["error"]) - else: - logging.warning(f"HTTP status code {r.status_code}: {r.content}") + # This is an error coming back from the API call and + # is actually valid JSON with an "error" attribute. + logger.debug("non-fatal error completing bucket") + return r.json() + + return None - def upload_file_to_bucket(self, bucket_id, filename): + def buckets_files(self, bucket_id, file=None): """Upload a file to a given bucket. + Notes + ----- + The file may be a single file or a list of files having + and extension of .CSV or .CSV.GZ (lowercase). + + When a .CSV file is encountered, automatically GZIP before + uploading. + Parameters ---------- bucket_id : str - The ID of the bucket that the file should be added to. + Upload the file to the bucket identified by ID. - filename : str - The path to your file to upload to the bucket. The file must be - gzip compressed delimited and the file must conform to the file - size limits. + file : str | list(str) + The file(s) to upload to the bucket. Each file must conform + to the file size limits. Returns ------- - None - + Upload information or None if there was a problem. When uploading + multiple files, an array of upload information with information for + each file. """ - url = self.upload_endpoint + "/" + bucket_id + "/files" + operation = f"/buckets/{bucket_id}/files" + logger.debug(f"post: {operation}") + url = self.prism_endpoint + operation + + results = { + "total": 0, + "data": [], + } # Always return a valid list - regardless of files + + if file is None: + # It is legal to upload an empty file - see the table truncate command. + target_files = [None] # Provide one empty file to iterate over. + else: + target_files = resolve_file_list(file) - headers = {"Authorization": "Bearer " + self.bearer_token} + target_file: str + for target_file in target_files: + if target_file is None: + new_file = {"file": ("empty.csv.gz", gzip.compress(bytearray()))} + elif target_file.lower().endswith(".csv.gz"): + new_file = {"file": open(target_file, "rb")} + elif target_file.lower().endswith(".csv"): + upload_filename = os.path.basename(target_file) + upload_filename += ".gz" - files = {"file": open(filename, "rb")} + # Buckets can only load gzip files - do it. + with open(target_file, "rb") as in_file: + new_file = {"file": (upload_filename, gzip.compress(in_file.read()))} - r = requests.post(url, headers=headers, files=files) - r.raise_for_status() + response = self.http_post(url, files=new_file) - if r.status_code == 200: - logging.info("Successfully uploaded file to the bucket") - else: - logging.warning(f"HTTP status code {r.status_code}: {r.content}") + if response.status_code == 201: + logger.debug(f"successfully uploaded {target_file} to the bucket") - def complete_bucket(self, bucket_id): - """Finalize the bucket once all files have been added. + results["data"].append(response.json()) # Add this file's info to the return list + + results["total"] = len(results["data"]) + return results + + def buckets_errorFile(self, bucket_id): + """Get a list of all rows that failed to load into the table Parameters ---------- bucket_id : str - The ID of the bucket to be marked as complete. + A reference to a Prism Analytics bucket. Returns ------- - None + str + """ + + if bucket_id is None: + logger.error("bucket id is required.") + return None + + operation = f"/buckets/{bucket_id}/errorFile" + logger.debug(f"post: {operation}") + url = self.prism_endpoint + operation + + response = self.http_get(url) + + if response.status_code == 200: + return response.text + + return None + + def dataChanges_get( + self, datachange_name=None, datachange_id=None, limit=None, offset=None, type_="summary", search=False, + ): + """ """ + # We are doing a dataChanges GET operation. + operation = "/dataChanges" + + # Make sure output type is valid. + output_type = type_.lower() if type_.lower() in ["summary", "full"] else "summary" + + # Searching by ID is a special case that eliminates all other types + # of search. Ask for the datachange by id and return just this + # result - even blank. + if datachange_id is not None and isinstance(datachange_id, str) and len(datachange_id) > 0: + operation = f"{operation}/{datachange_id}?type={output_type}" + logger.debug(f"dataChanges_get: {operation}") + url = self.prism_endpoint + operation + + response = self.http_get(url) + + if response.status_code == 200: + return response.json() + + return None + + logger.debug(f"dataChanges_get: {operation}") + url = self.prism_endpoint + operation + # Get a list of tasks by page, with or without searching. + + search_limit = 500 # Assume all DCTs should be returned - max API limit + search_offset = 0 # API default value + + if limit is not None and isinstance(limit, int) and limit > 0: + search_limit = limit + + if offset is not None and isinstance(offset, int) and offset > 0: + search_offset = offset + + searching = False + name_param = "" + + if datachange_name is not None and isinstance(datachange_name, str) and len(datachange_name) > 0: + if search is not None and isinstance(search, bool) and search: + # Force a return of ALL data change tasks, so we can search the names. + name_param = "" # Added to the query params + searching = True + + search_limit = 500 + search_offset = 0 + else: + # With an explicit name, we should return at most 1 result. + name_param = f"&name={urlparse.quote(datachange_name)}" + searching = False + + search_limit = 1 + search_offset = 0 + + # Assume we will be looping based on limit and offset values; however, we may + # execute only once. NOTE: this routine NEVER fails, but may return zero + # data change tasks. + + data_changes = {"total": 0, "data": []} + + while True: + search_url = f"{url}?type={output_type}&limit={search_limit}&offset={search_offset}{name_param}" + + response = self.http_get(url=search_url) + + if response.status_code != 200: + break + + return_json = response.json() + + if searching: + # Only add matching rows - check name and displayName + data_changes["data"] += filter( + lambda dtc: dtc["name"].lower().find(datachange_name.lower()) != -1 + or dtc["displayName"].lower().find(datachange_name.lower()) != -1, + return_json["data"], + ) + else: + # Without searching, simply paste the current page to the list. + data_changes["data"] += return_json["data"] + break + + # If we didn't get a full page, then we are done. + if len(return_json["data"]) < search_limit: + break + + # Go to the next page. + search_offset += search_limit + + data_changes["total"] = len(data_changes["data"]) + + return data_changes + + def dataChanges_activities_get(self, datachange_id, activity_id): + """Returns details of the activity specified by activityID. + + Parameters + ---------- + datachange_id : str + A reference to a Prism Analytics data change. + activity_id : str + A reference to a Prism Analytics activity. """ - url = self.prism_endpoint + "/wBuckets/" + bucket_id + "/complete" + operation = f"/dataChanges/{datachange_id}/activities/{activity_id}" + logger.debug(f"dataChanges_activities_get: {operation}") + url = self.prism_endpoint + operation - headers = { - "Authorization": "Bearer " + self.bearer_token, - "Content-Type": "application/json", - } + r = self.http_get(url) + + if r.status_code == 200: + return r.json() + + return None + + def dataChanges_activities_post(self, datachange_id, filecontainer_id=None): + """Execute a data change task. + + Parameters + ---------- + datachange_id : str + A reference to a Prism Analytics data change. + filecontainer_id : str + A reference to a Prism Analytics File Container. + + Returns + ------- + """ + operation = f"/dataChanges/{datachange_id}/activities" + logger.debug(f"post: {operation}") + url = self.prism_endpoint + operation + + if filecontainer_id is None: + logger.debug("no file container ID") + data = None + else: + logger.debug("with file container ID: {filecontainer_id}") - data = {} + # NOTE: the name is NOT correct based on the API definition + data = json.dumps({"fileContainerWid": filecontainer_id}) - r = requests.post(url, headers=headers, data=json.dumps(data)) - r.raise_for_status() + r = self.http_post(url, headers=self.CONTENT_APP_JSON, data=data) if r.status_code == 201: - logging.info("Successfully completed the bucket") + return_json = r.json() + activity_id = return_json["id"] + + logger.debug(f"successfully started data load task - id: {activity_id}") + return return_json elif r.status_code == 400: - logging.warning(r.json()["errors"][0]["error"]) - else: - logging.warning(f"HTTP status code {r.status_code}: {r.content}") + logger.error("error running data change task.") + return r.json() # This is still valid JSON with the error. + + return None - def list_bucket(self, bucket_id=None): - """Obtain details for all buckets or a given bucket. + def dataChanges_is_valid(self, datachange_id): + """Utility method to return the validation status of a data change task. Parameters ---------- - bucket_id : str - The ID of the bucket to obtain details about. If the default value - of None is specified, details regarding all buckets is returned. + datachange_id : str + A reference to a Prism Analytics data change. Returns ------- - If the request is successful, a dictionary containing information about - the bucket is returned. - + bool + True if data change task is valid or False if the task does not + exist or is not valid. """ - url = self.prism_endpoint + "/wBuckets" + dct = self.dataChanges_validate(id) - if bucket_id is not None: - url = url + "/" + bucket_id + if dct is None: + logger.error(f"data_change_id {datachange_id} not found!") + return False - headers = {"Authorization": "Bearer " + self.bearer_token} + if "error" in dct: + logger.error(f"data_change_id {datachange_id} is not valid!") + return False - r = requests.get(url, headers=headers) - r.raise_for_status() + # There is no specific status value to check, we simply get + # a small JSON object with the ID of the DCT if it is valid. + return True - if r.status_code == 200: - logging.info("Successfully obtained information about your buckets") - return r.json() - else: - logging.warning(f"HTTP status code {r.status_code}: {r.content}") - - def list_table(self, table_name=None): - """Obtain details for all tables or a given table. + def dataChanges_validate(self, datachange_id): + """validates the data change specified by dataChangeID Parameters ---------- - table_name : str - The name of the table to obtain details about. If the default value - of None is specified, details regarding first 100 tables is returned. + datachange_id : str + The data change task ID to validate. Returns ------- - If the request is successful, a dictionary containing information about - the table is returned. - """ - url = self.prism_endpoint + "/datasets?" + operation = f"/dataChanges/{datachange_id}/validate" + logger.debug(f"dataChanges_validate: get {operation}") + url = self.prism_endpoint + operation + + r = self.http_get(url) - if table_name is not None: - url = url + "name=" + table_name + if r.status_code in [200, 400, 404]: + # For these status codes, simply return what we got. + return r.json() - params = {"limit": 100} + return None - headers = {"Authorization": "Bearer " + self.bearer_token} + def dataExport_get(self, limit=None, offset=None, type_=None): + operation = "/dataExport" + logger.debug(f"dataExport_get: get {operation}") + url = self.prism_endpoint + operation - r = requests.get(url, params=params, headers=headers) - r.raise_for_status() + r = self.http_get(url) if r.status_code == 200: - logging.info("Successfully obtained information about your tables") return r.json() - else: - logging.warning(f"HTTP status code {r.status_code}: {r.content}") - def describe_table(self, table_id=None): - """Obtain details for for a given table + return None + + def fileContainers_create(self): + """Create a new file container. + + Returns + ------- + Dict object with an "id" attribute or None if there was a problem. + """ + operation = "/fileContainers" + logger.debug(f"fileContainer_create: post {operation}") + url = self.prism_endpoint + operation + + r = self.http_post(url) + + if r.status_code == 201: + return_json = r.json() + + filecontainer_id = return_json["id"] + logger.debug(f"successfully created file container: {filecontainer_id}") + + return return_json + + return None + + def fileContainers_get(self, filecontainer_id): + """Return all files for a file container. Parameters ---------- - table_id : str - The ID of the table to obtain details about. If the default value - of None is specified, details regarding all tables is returned. + filecontainer_id : str + File container ID to list. Returns ------- - If the request is successful, a dictionary containing information about - the table is returned. - + Dictionary of found files having a 'total' attribute with the count + of files uploaded and a data attribute with an array of file metadata + for each file in the container. """ - url = self.prism_endpoint + "/datasets/" + operation = f"/fileContainers/{filecontainer_id}/files" + logger.debug(f"fileContainers_list: get {operation}") + url = self.prism_endpoint + operation - if table_id is not None: - url = url + table_id + "/describe" + response = self.http_get(url) - headers = {"Authorization": "Bearer " + self.bearer_token} + if response.status_code == 200: + return_json = response.json() - r = requests.get(url, headers=headers) - r.raise_for_status() + return {"total": len(return_json), "data": return_json} - if r.status_code == 200: - logging.info("Successfully obtained information about your datasets") - return r.json() - else: - logging.warning(f"HTTP status code {r.status_code}: {r.content}") + if response.status_code == 404: + logger.warning("verify: Self-Service: Prism File Container domain in the Prism Analytics functional area.") + + return {"total": 0, "data": []} # Always return a list. - def convert_describe_schema_to_bucket_schema(self, describe_schema): - """Convert schema (derived from describe table) to bucket schema + def fileContainers_load(self, filecontainer_id, file): + """ + Load one or more files to a fileContainer. Parameters ---------- - describe_schema: dict - A dictionary containing the describe schema for your dataset. + filecontainer_id : str + File container ID of target container. + file : str|list + File name(s) to load into the container Returns ------- - If the request is successful, a dictionary containing the bucket schema is returned. - The results can then be passed to the create_bucket function - + For a single file, the upload results are returned as a + dict. For multiple files, an array of results is returned. """ - # describe_schema is a python dict object and needs to be accessed as such, 'data' is the top level object, - # but this is itself a list (with just one item) so needs the list index, in this case 0. 'fields' is found - # in the dict that is in ['data'][0] - fields = describe_schema["data"][0]["fields"] + # Create the specified fID - a new ID is created if None. + resolved_fid = filecontainer_id # No testing here, just use it. - # Create and assign useAsOperationKey field with true/false values based on externalId value - operation_key_false = {"useAsOperationKey": False} - operation_key_true = {"useAsOperationKey": True} + target_files = resolve_file_list(file) - for i in fields: - if i["externalId"] is True: - i.update(operation_key_true) - else: - i.update(operation_key_false) - - # Now trim our fields data to keep just what we need - for i in fields: - del i["id"] - del i["displayName"] - del i["fieldId"] - del i["required"] - del i["externalId"] - - # Get rid of the WPA_ fields... - fields[:] = [x for x in fields if "WPA" not in x["name"]] - - # The "header" for the load schema - bucket_schema = { - "parseOptions": { - "fieldsDelimitedBy": ",", - "fieldsEnclosedBy": '"', - "headerLinesToIgnore": 1, - "charset": {"id": "Encoding=UTF-8"}, - "type": {"id": "Schema_File_Type=Delimited"}, - } - } + results = {"id": None, "total": 0, "data": []} + + for target_file in target_files: + # It is legal to upload an empty file - see the table truncate method. + if target_file is None: + new_file = {"file": ("empty.csv.gz", gzip.compress(bytearray()))} + elif target_file.lower().endswith(".csv.gz"): + new_file = {"file": open(target_file, "rb")} + elif target_file.lower().endswith(".csv"): + upload_filename = os.path.basename(target_file) + upload_filename += ".gz" + + with open(target_file, "rb") as in_file: + new_file = {"file": (upload_filename, gzip.compress(in_file.read()))} + + # Create the file container and get the ID. We use the + # file container ID to load the file and then return the + # value to the caller for use in a data change call. + + if resolved_fid is None: + # The caller is asking us to create a new container. + file_container_response = self.fileContainers_create() + + if file_container_response is None: + logger.error("Unable to create fileContainer") + return None + + resolved_fid = file_container_response["id"] - # The footer for the load schema - schema_version = {"id": "Schema_Version=1.0"} + results["id"] = resolved_fid - bucket_schema["fields"] = fields - bucket_schema["schemaVersion"] = schema_version + logger.debug(f"resolved fID: {resolved_fid}") - return bucket_schema + # We have our container, load the file + operation = f"/fileContainers/{resolved_fid}/files" + logger.debug(f"fileContainer_load: POST {operation}") + url = self.prism_endpoint + operation -def create_table(p, table_name, schema): - """Create a new Prism table. + response = self.http_post(url, files=new_file) + + if response.status_code == 201: + logger.debug(f"successfully loaded file: {file}") + results["data"].append(response.json()) + + results["total"] = len(results["data"]) + + return results + + +def resolve_file_list(files): + """Evaluate file name(s)s and return the list of supported files. + + Parameters + ---------- + files : str|list + One (str) or more (list) file names. + + Returns + ------- + list + List of files that can be uploaded. + """ + # At a minimum, an empty list will always be returned. + target_files = [] + + if files is None: + logger.warning("File(s) must be specified.") + return target_files + elif isinstance(files, list) and len(files) == 0: + logger.warning("File(s) must be specified.") + return target_files + elif isinstance(files, tuple) and len(files) == 0: + logger.warning("File(s) must be specified.") + return target_files + elif isinstance(files, str): + if not files: + logger.warning("File(s) must be specified.") + return target_files + else: + files = [files] + + # Check the extension of each file in the list. + for f in files: + if not os.path.exists(f): + logger.warning(f"File {f} not found - skipping.") + continue + + if f.lower().endswith(".csv") or f.lower().endswith(".csv.gz"): + target_files.append(f) + else: + logger.warning(f"File {f} is not a .csv.gz or .csv file - skipping.") + + return target_files + + +def tables_create( + p, table_name=None, display_name=None, enable_for_analysis=True, source_name=None, source_wid=None, file=None +): + """Create Prism table Parameters ---------- @@ -424,11 +1474,22 @@ def create_table(p, table_name, schema): Instantiated Prism class from prism.Prism() table_name : str - The name of the table to obtain details about. If the default value - of None is specified, details regarding first 100 tables is returned. + Table name - overrides name from schema + + display_name : str + Specify a display name - defaults to name + + enableForAnalysis : boolean + Enable this table for analytic - schema : list - A list of dictionaries containing the schema + sourceName : str + The API name of an existing table to copy + + sourceWID : str + The WID of an existing table to copy + + file : str + File containing the schema to be used to create the table Returns ------- @@ -436,26 +1497,66 @@ def create_table(p, table_name, schema): the table is returned. """ - p.create_bearer_token() - table = p.create_table(table_name, schema=schema) + # We can assume a schema was found/built - get_schema sys.exits if there is a problem. + schema = load_schema(p, file, source_name, source_wid) + + # Initialize a new schema with the particulars for this table operation. + if table_name is not None: + + # If we got a name, set it in the table schema + schema["name"] = table_name.replace(" ", "_") # Minor clean-up + + # Force the display name - there cannot be duplicate displayNames + # in the data catalog. + schema["displayName"] = table_name + logger.debug(f'setting table name to {schema["name"]}') + + elif "name" not in schema: + # The schema doesn't have a name and none was given - exit. + # Note: this could be true if we have a schema of only fields. + logger.error("Table --table_name must be specified.") + sys.exit(1) + + if display_name is not None: + # If we got a display name, set it in the schema + schema["displayName"] = display_name + + elif "displayName" not in schema: + # Default the display name to the name if not in the schema. + schema["displayName"] = table_name + logger.debug(f'defaulting displayName to {schema["displayName"]}') + + if enable_for_analysis is not None: + schema["enableForAnalysis"] = enable_for_analysis + + elif "enableForAnalysis" not in schema: + # Default to False - do not enable. + schema["enableForAnalysis"] = False + logger.debug("defaulting enableForAnalysis to False.") + + # create the table + table = p.tables_post(schema) return table -def upload_file(p, filename, table_id, operation="TruncateandInsert"): - """Create a new Prism table. +def upload_file(p, file, table_id=None, table_name=None, operation="TruncateAndInsert"): + """Upload a file to an existing Prism table Parameters ---------- p : Prism Instantiated Prism class from prism.Prism() - filename : str - The path to you GZIP compressed file to upload. + file : str | list + The path to CSV or GZIP compressed file(s) to upload. table_id : str The ID of the Prism table to upload your file to. + table_name : str + The API name of the Prism table to upload your file to. + operation : str (default = TruncateandInsert) The table load operation. Possible options include TruncateandInsert, Insert, Update, Upsert, Delete. @@ -466,9 +1567,97 @@ def upload_file(p, filename, table_id, operation="TruncateandInsert"): the table is returned. """ - p.create_bearer_token() - details = p.describe_table(table_id) - bucket_schema = p.convert_describe_schema_to_bucket_schema(details) - bucket = p.create_bucket(bucket_schema, table_id, operation=operation) - p.upload_file_to_bucket(bucket["id"], filename) - p.complete_bucket(bucket["id"]) + bucket = p.buckets_create(target_id=table_id, target_name=table_name, operation=operation) + + if bucket is None: + return None + + file_results = p.buckets_files(bucket["id"], file) + + if file_results["total"] > 0: + results = p.buckets_complete(bucket["id"]) + + # Add the file upload results to the bucket + # info returned to the caller. + results["files"] = file_results + results["bucket"] = bucket # Ensure bucket info is present. + + return results + else: + return file_results + + +def truncate_table(p, table_id=None, table_name=None): + # To do a truncate, we still need a bucket with a truncate operation. + if table_id is not None: + bucket = p.buckets_create(target_id=table_id, operation="TruncateAndInsert") + else: + bucket = p.buckets_create(target_name=table_name, operation="TruncateAndInsert") + + if bucket is None: + logger.error("Unable to truncate table - see log for details.") + return None + + bucket_id = bucket["id"] + + # Don't specify a file to put a zero sized file into the bucket. + p.buckets_files(bucket_id) + + # Ask Prism to run the delete statement by completing the bucket. + bucket = p.buckets_complete(bucket_id) + + return bucket + + +def load_schema(p=None, file=None, source_name=None, source_id=None): + """Get or extract a schema from a file or existing Prism table.""" + + # Start with a blank schema definition. + schema = {} + + # A file always takes precedence over sourceName and sourceWID + # options, and must BE a valid schema. + + if file is not None: + if not os.path.isfile(file): + logger.error("File not found.") + return None + + # We can expect either a JSON file or a CSV file. + try: + with open(file) as json_file: + schema = json.load(json_file) + + if isinstance(schema, list): + # Convert a list of fields into a basic schema. + schema["fields"] = schema + else: + # This should be a full schema, perhaps from a table list command. + if "name" not in schema and "fields" not in schema: + logger.error("Invalid schema - name and fields attribute not found.") + return None + except Exception as e: + logger.error(e) + return None + else: + # No file was specified, check for a Prism source table. + if source_name is None and source_id is None: + logger.error("No schema file provided and a table (--sourceName or --sourceId) not specified.") + return None + + if source_id is not None: + schema = p.tables_list(id=source_id, type_="full") # Exact match on WID - and get the fields (full) + + if schema is None: + logger.error(f"Invalid --sourceId {source_id} : table not found.") + return None + else: + tables = p.tables_list(name=source_name, type_="full") # Exact match on API Name + + if tables["total"] == 0: + logger.error(f"Invalid --sourceName {source_name} : table not found.") + return None + + schema = tables["data"][0] + + return schema diff --git a/setup.cfg b/setup.cfg index 897f8d4..81dd7b5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -4,6 +4,7 @@ # resulting files. [flake8] +ignore = E203,W503 max-line-length = 120 exclude = versioneer.py diff --git a/setup.py b/setup.py index e2ed2a1..1691b59 100644 --- a/setup.py +++ b/setup.py @@ -8,14 +8,14 @@ name="prism", version=versioneer.get_version(), description="Python API client to load data into Prism.", - author="Curtis Hampton", + author="Curtis Hampton, Mark Waldron, Jacinta Corbett, Mark Greynolds", author_email="CurtLHampton@gmail.com", url="https://github.com/Workday/prism-python", - packages=["prism"], + packages=["prism", "prism.commands"], package_data={"prism": ["data/*"]}, - entry_points={"console_scripts": ["prism=prism.cli:main"]}, + entry_points={"console_scripts": ["prism=prism.cli:cli"]}, install_requires=requirements, extras_require={"dev": ["pytest"]}, keywords="prism", - classifiers=["Programming Language :: Python :: 3.6", "Programming Language :: Python :: 3.7"], + classifiers=["Programming Language :: Python :: 3.9"], ) diff --git a/tests/test_cli.py b/tests/test_cli.py index 49420a6..aa2ee5d 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,8 +1,8 @@ from click.testing import CliRunner -from prism.cli import main +from prism.cli import cli def test_cli(): runner = CliRunner() - result = runner.invoke(main, ["--help"]) + result = runner.invoke(cli, ["--help"]) assert result.exit_code == 0 diff --git a/tests/test_prism.py b/tests/test_prism.py index d9793f9..80390af 100644 --- a/tests/test_prism.py +++ b/tests/test_prism.py @@ -2,5 +2,5 @@ def test_load_schema(schema_file): - schema = prism.load_schema(schema_file) + schema = prism.load_schema(file=schema_file) assert type(schema) is dict