From 3d088db06d68bc92965a5c4e6fda3f8d253f37ca Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Thu, 26 May 2022 18:02:45 +0800 Subject: [PATCH 01/17] init support for Data Maintenance Signed-off-by: Allen Xu --- nds/README.md | 1 + nds/convert_submit_cpu.template | 10 +- nds/nds_gen_data.py | 30 ++- nds/nds_transcode.py | 173 ++++++++++++++++-- .../java/org/notmysock/tpcds/GenTable.java | 26 ++- 5 files changed, 218 insertions(+), 22 deletions(-) diff --git a/nds/README.md b/nds/README.md index 2f7adc5..7619c9c 100644 --- a/nds/README.md +++ b/nds/README.md @@ -72,6 +72,7 @@ optional arguments: --overwrite_output overwrite if there has already existing data in the path provided. --replication REPLICATION the number of replication factor when generating data to HDFS. if not set, the Hadoop job will use the setting in the Hadoop cluster. + --update UPDATE generate update dataset . is identical to the number of streams used in the Throughput Tests of the benchmark ``` Example command: diff --git a/nds/convert_submit_cpu.template b/nds/convert_submit_cpu.template index a487da4..1427e92 100644 --- a/nds/convert_submit_cpu.template +++ b/nds/convert_submit_cpu.template @@ -21,5 +21,13 @@ export SPARK_CONF=("--master" "yarn" "--num-executors" "8" "--executor-memory" "40G" "--executor-cores" "12" - "--conf" "spark.task.cpus=1") + "--conf" "spark.task.cpus=1" + "--packages" "org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.1" + "--conf" "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" + "--conf" "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog" + "--conf" "spark.sql.catalog.spark_catalog.type=hadoop" + "--conf" "spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog" + "--conf" "spark.sql.catalog.local.type=hadoop" + "--conf" "spark.sql.catalog.local.warehouse=$PWD/local-warehouse" + "--conf" "spark.sql.catalog.spark_catalog.warehouse=$PWD/spark-warehouse") diff --git a/nds/nds_gen_data.py b/nds/nds_gen_data.py index fde7e2c..fbcdcb5 100644 --- a/nds/nds_gen_data.py +++ b/nds/nds_gen_data.py @@ -38,7 +38,7 @@ check_version() -table_names = [ +source_table_names = [ 'call_center', 'catalog_page', 'catalog_returns', @@ -66,6 +66,17 @@ 'web_site', ] +maintenance_table_names = [ + 's_catalog_order', + 's_catalog_order_lineitem', + 's_catalog_returns', + 's_inventory', + 's_purchase', + 's_purchase_lineitem', + 's_store_returns', + 's_web_order', + 's_web_order_lineitem' +] def clean_temp_data(temp_data_path): cmd = ['hadoop', 'fs', '-rm', '-r', '-skipTrash', temp_data_path] @@ -73,7 +84,7 @@ def clean_temp_data(temp_data_path): subprocess.run(cmd) -def merge_temp_tables(temp_data_path, parent_data_path): +def merge_temp_tables(temp_data_path, parent_data_path, update): """Helper functions for incremental data generation. Move data in temporary child range path to parent directory. @@ -81,6 +92,10 @@ def merge_temp_tables(temp_data_path, parent_data_path): temp_data_path (str): temorary child range data path parent_data_path (str): parent data path """ + if update: + table_names = maintenance_table_names + else: + table_names = source_table_names for table_name in table_names: # manually create table sub-folders # redundent step if it's not the first range part. @@ -123,6 +138,8 @@ def generate_data_hdfs(args, jar_path): tpcds_gen_path = jar_path.parent.parent.absolute() if args.overwrite_output: cmd += ['-o'] + if args.update: + cmd += ["-u", args.update] if args.range: # use a temp folder to save the specific range data. # will move the content to parent folder afterwards. @@ -179,6 +196,8 @@ def generate_data_local(args, range_start, range_end, tool_path): "-verbose", "Y"] if args.overwrite_output: dsdgen_args += ["-force", "Y"] + if args.update: + dsdgen_args += ["-update", args.update] procs.append(subprocess.Popen( ["./dsdgen"] + dsdgen_args, cwd=str(work_dir))) # wait for data generation to complete @@ -188,6 +207,10 @@ def generate_data_local(args, range_start, range_end, tool_path): print("dsdgen failed with return code {}".format(p.returncode)) raise Exception("dsdgen failed") # move multi-partition files into table folders + if args.update: + table_names = maintenance_table_names + else: + table_names = source_table_names for table in table_names: print('mkdir -p {}/{}'.format(data_dir, table)) subprocess.run(['mkdir', '-p', data_dir + '/' + table]) @@ -235,6 +258,9 @@ def generate_data(args): parser.add_argument("--replication", help="the number of replication factor when generating data to HDFS. " + "if not set, the Hadoop job will use the setting in the Hadoop cluster.") + parser.add_argument("--update", + help="generate update dataset . is identical to the number of " + + "streams used in the Throughput Tests of the benchmark") args = parser.parse_args() diff --git a/nds/nds_transcode.py b/nds/nds_transcode.py index 80f47fd..91bad5a 100644 --- a/nds/nds_transcode.py +++ b/nds/nds_transcode.py @@ -570,6 +570,123 @@ def get_schemas(use_decimal): ]) return SCHEMAS +def get_maintenance_schemas(use_decimal): + MAINTENANCE_SCHEMAS = {} + MAINTENANCE_SCHEMAS["s_purchase_lineitem"] = StructType([ + StructField("plin_purchase_id", IntegerType(), nullable=False), + StructField("plin_line_number", IntegerType(), nullable=False), + StructField("plin_item_id", CharType(16)), + StructField("plin_promotion_id", CharType(16)), + StructField("plin_quantity", IntegerType()), + StructField("plin_sale_price", decimalType(use_decimal, 7,2)), + StructField("plin_coupon_amt", decimalType(use_decimal, 7,2)), + StructField("plin_comment", VarcharType(100)), + ]) + MAINTENANCE_SCHEMAS["s_purchase"] = StructType([ + StructField("purc_purchase_id", IntegerType(), nullable=False), + StructField("purc_store_id", CharType(16)), + StructField("purc_customer_id", CharType(16)), + StructField("purc_purchase_date", CharType(10)), + StructField("purc_purchase_time", IntegerType()), + StructField("purc_register_id", IntegerType()), + StructField("purc_clerk_id", IntegerType()), + StructField("purc_comment", CharType(100)), + ]) + MAINTENANCE_SCHEMAS["s_catalog_order"] = StructType([ + StructField("cord_order_id", IntegerType(), nullable=False), + StructField("cord_bill_customer_id", CharType(16)), + StructField("cord_ship_customer_id", CharType(16)), + StructField("cord_order_date", CharType(10)), + StructField("cord_order_time", IntegerType()), + StructField("cord_ship_mode_id", CharType(16)), + StructField("cord_call_center_id", CharType(16)), + StructField("cord_order_comments", VarcharType(100)), + ]) + MAINTENANCE_SCHEMAS["s_web_order"] = StructType([ + StructField("word_order_id", IntegerType(), nullable=False), + StructField("word_bill_customer_id", CharType(16)), + StructField("word_ship_customer_id", CharType(16)), + StructField("word_order_date", CharType(10)), + StructField("word_order_time", IntegerType()), + StructField("word_ship_mode_id", CharType(16)), + StructField("word_web_site_id", CharType(16)), + StructField("word_order_comments", CharType(100)), + ]) + MAINTENANCE_SCHEMAS["s_catalog_order_lineitem"] = StructType([ + StructField("clin_order_id", IntegerType(), nullable=False), + StructField("clin_line_number", IntegerType(), nullable=False), + StructField("clin_item_id", CharType(16)), + StructField("clin_promotion_id", CharType(16)), + StructField("clin_quantity", IntegerType()), + StructField("clin_sales_price", decimalType(use_decimal, 7,2)), + StructField("clin_coupon_amt", decimalType(use_decimal, 7,2)), + StructField("clin_warehouse_id", CharType(16)), + StructField("clin_ship_date", CharType(10)), + StructField("clin_catalog_number", IntegerType()), + StructField("clin_catalog_page_number", IntegerType()), + StructField("clin_ship_cost", decimalType(use_decimal, 7,2)), + ]) + MAINTENANCE_SCHEMAS["s_web_order_lineitem"] = StructType([ + StructField("wlin_order_id", IntegerType(), nullable=False), + StructField("wlin_line_number", IntegerType(), nullable=False), + StructField("wlin_item_id", CharType(16)), + StructField("wlin_promotion_id", CharType(16)), + StructField("wlin_quantity", IntegerType()), + StructField("wlin_sales_price", decimalType(use_decimal, 7,2)), + StructField("wlin_coupon_amt", decimalType(use_decimal, 7,2)), + StructField("wlin_warehouse_id", CharType(16)), + StructField("wlin_ship_date", CharType(10)), + StructField("wlin_ship_cost", decimalType(use_decimal, 7,2)), + StructField("wlin_web_page_id", CharType(16)), + ]) + MAINTENANCE_SCHEMAS["s_store_returns"] = StructType([ + StructField("sret_store_id", CharType(16)), + StructField("sret_purchase_id", CharType(16), nullable=False), + StructField("sret_line_number", IntegerType(), nullable=False), + StructField("sret_item_id", CharType(16), nullable=False), + StructField("sret_customer_id", CharType(16)), + StructField("sret_return_date", CharType(10)), + StructField("sret_return_time", CharType(10)), + StructField("sret_ticket_number", CharType(20)), + StructField("sret_return_qty", IntegerType()), + StructField("sret_return_amt", decimalType(use_decimal, 7,2)), + StructField("sret_return_tax", decimalType(use_decimal, 7,2)), + StructField("sret_return_fee", decimalType(use_decimal, 7,2)), + StructField("sret_return_ship_cost", decimalType(use_decimal, 7,2)), + StructField("sret_refunded_cash", decimalType(use_decimal, 7,2)), + StructField("sret_reversed_charge", decimalType(use_decimal, 7,2)), + StructField("sret_store_credit", decimalType(use_decimal, 7,2)), + StructField("sret_reason_id", CharType(16)), + ]) + MAINTENANCE_SCHEMAS["s_catalog_returns"] = StructType([ + StructField("cret_call_center_id", CharType(16)), + StructField("cret_order_id", IntegerType(), nullable=False), + StructField("cret_line_number", IntegerType(), nullable=False), + StructField("cret_item_id", CharType(16), nullable=False), + StructField("cret_return_customer_id", CharType(16)), + StructField("cret_refund_customer_id", CharType(16)), + StructField("cret_return_date", CharType(10)), + StructField("cret_return_time", CharType(10)), + StructField("cret_return_qty", IntegerType()), + StructField("cret_return_amt", decimalType(use_decimal, 7,2)), + StructField("cret_return_tax", decimalType(use_decimal, 7,2)), + StructField("cret_return_fee", decimalType(use_decimal, 7,2)), + StructField("cret_return_ship_cost", decimalType(use_decimal, 7,2)), + StructField("cret_refunded_cash", decimalType(use_decimal, 7,2)), + StructField("cret_reversed_charge", decimalType(use_decimal, 7,2)), + StructField("cret_merchant_credit", decimalType(use_decimal, 7,2)), + StructField("cret_reason_id", CharType(16)), + StructField("cret_shipmode_id", CharType(16)), + StructField("cret_catalog_page_id", CharType(16)), + StructField("cret_warehouse_id", CharType(16)), + ]) + MAINTENANCE_SCHEMAS["s_inventory"] = StructType([ + StructField("invn_warehouse_id", CharType(16), nullable=False), + StructField("invn_item_id", CharType(16), nullable=False), + StructField("invn_date", CharType(10), nullable=False), + StructField("invn_qty_on_hand", IntegerType()), + ]) + return MAINTENANCE_SCHEMAS # Note the specific partitioning is applied when save the parquet data files. TABLE_PARTITIONING = { @@ -588,15 +705,27 @@ def load(session, filename, schema, delimiter="|", header="false", prefix=""): return session.read.option("delimiter", delimiter).option("header", header).csv(data_path, schema=schema) -def store(df, filename, write_mode, output_format, prefix=""): - data_path = prefix + '/' + filename +def store(session, df, filename, output_format, compression): + """Create Iceberg tables by CTAS + + Args: + session (SparkSession): a working SparkSession instance + df (DataFrame): DataFrame to be serialized into Iceberg table + filename (str): name of the table(file) + output_format (str): parquet, orc or avro + compression (str): Parquet compression codec when saving Iceberg tables + """ + CTAS = f"create table {filename} using iceberg " if filename in TABLE_PARTITIONING.keys(): - df = df.repartition(col(TABLE_PARTITIONING[filename])) - df.write.format(output_format).mode(write_mode).partitionBy( - TABLE_PARTITIONING[filename]).save(data_path) + df.repartition(col(TABLE_PARTITIONING[filename])).sortWithinPartitions(TABLE_PARTITIONING[filename]).createOrReplaceTempView("temptbl") + CTAS += f"partitioned by ({TABLE_PARTITIONING[filename]})" else: - df.coalesce(1).write.format(output_format).mode(write_mode).save(data_path) - + df.coalesce(1).createOrReplaceTempView("temptbl") + CTAS += f" tblproperties('write.format.default' = '{output_format}'" + # the compression-codec won't panic when output_format is not parquet + CTAS += f", 'write.parquet.compression-codec' = '{compression}')" + CTAS += " as select * from temptbl" + session.sql(CTAS) def transcode(args): session = pyspark.sql.SparkSession.builder \ @@ -607,25 +736,29 @@ def transcode(args): results = {} schemas = get_schemas(use_decimal=not args.floats) - - trans_tables = schemas + maintenance_schemas = get_maintenance_schemas(use_decimal=not args.floats) + + if args.update: + trans_tables = maintenance_schemas + else: + trans_tables = schemas + if args.tables: for t in args.tables: - if t not in schemas.keys(): + if t not in trans_tables.keys() : raise Exception(f"invalid table name: {t}. Valid tables are: {schemas.keys()}") - trans_tables = {t: schemas[t] for t in args.tables if t in schemas} + trans_tables = {t: trans_tables[t] for t in args.tables if t in trans_tables} for fn, schema in trans_tables.items(): results[fn] = timeit.timeit( - lambda: store( + lambda: store(session, load(session, f"{fn}", schema, prefix=args.input_prefix), f"{fn}", - args.output_mode, args.output_format, - args.output_prefix), + args.compression), number=1) report_text = "Total conversion time for %d tables was %.02fs\n" % ( @@ -680,6 +813,16 @@ def transcode(args): '--floats', action='store_true', help='replace DecimalType with DoubleType when saving parquet files. If not specified, decimal data will be saved.') - + parser.add_argument( + '--update', + action='store_true', + help='transcode the source data or update data' + ) + parser.add_argument( + '--compression', + default='snappy', + help='Parquet compression codec. Iceberg is using gzip as default but spark-rapids plugin ' + + 'does not support it yet, so default it to snappy.' + ) args = parser.parse_args() transcode(args) diff --git a/nds/tpcds-gen/src/main/java/org/notmysock/tpcds/GenTable.java b/nds/tpcds-gen/src/main/java/org/notmysock/tpcds/GenTable.java index ad5b57c..637f1f4 100755 --- a/nds/tpcds-gen/src/main/java/org/notmysock/tpcds/GenTable.java +++ b/nds/tpcds-gen/src/main/java/org/notmysock/tpcds/GenTable.java @@ -59,6 +59,7 @@ public int run(String[] args) throws Exception { options.addOption("p", "parallel", true, "parallel"); options.addOption("r", "range", true, "child range in one data generation run"); options.addOption("o", "overwrite", false, "overwrite existing data"); + options.addOption("u", "update", false, "generate data for Data Maintenance"); CommandLine line = parser.parse(options, remainingArgs); if(!(line.hasOption("scale") && line.hasOption("dir"))) { @@ -97,12 +98,23 @@ public int run(String[] args) throws Exception { } } + // use 999999 for default update value to avoid user input conflict. + int update = 999999; + if(line.hasOption("update")) { + update = Integer.parseInt(line.getOptionValue("update")); + } + + if(update < 0) { + // TPC-DS will error if update is < 0 + System.err.println("The update value cannot be less than 0, your input: " + update); + } + if(parallel == 1 || scale == 1) { System.err.println("The MR task does not work for scale=1 or parallel=1"); return 1; } - Path in = genInput(table, scale, parallel, rangeStart, rangeEnd); + Path in = genInput(table, scale, parallel, rangeStart, rangeEnd, update); Path dsdgen = copyJar(new File("target/lib/dsdgen.jar")); URI dsuri = dsdgen.toUri(); @@ -172,18 +184,24 @@ public Path copyJar(File jar) throws Exception { return dst; } - public Path genInput(String table, int scale, int parallel, int rangeStart, int rangeEnd) throws Exception { + public Path genInput(String table, int scale, int parallel, int rangeStart, int rangeEnd, int update) throws Exception { long epoch = System.currentTimeMillis()/1000; Path in = new Path("/tmp/"+table+"_"+scale+"-"+epoch); FileSystem fs = FileSystem.get(getConf()); FSDataOutputStream out = fs.create(in); + String cmd = ""; for(int i = rangeStart; i <= rangeEnd; i++) { if(table.equals("all")) { - out.writeBytes(String.format("./dsdgen -dir $DIR -force Y -scale %d -parallel %d -child %d\n", scale, parallel, i)); + cmd += String.format("./dsdgen -dir $DIR -force Y -scale %d -parallel %d -child %d", scale, parallel, i) } else { - out.writeBytes(String.format("./dsdgen -dir $DIR -table %s -force Y -scale %d -parallel %d -child %d\n", table, scale, parallel, i)); + cmd += String.format("./dsdgen -dir $DIR -table %s -force Y -scale %d -parallel %d -child %d", table, scale, parallel, i) + } + if(update != 999999) { + cmd += String.format(" -update %d", update) } + cmd += "\n" + out.writeBytes(cmd) } out.close(); return in; From c4ac29c8310786bae4df9ddb3d0569bede0acfc3 Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Thu, 26 May 2022 18:05:07 +0800 Subject: [PATCH 02/17] add data maintenance script Signed-off-by: Allen Xu --- nds/nds_maintenance.py | 102 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 nds/nds_maintenance.py diff --git a/nds/nds_maintenance.py b/nds/nds_maintenance.py new file mode 100644 index 0000000..6c4cfbe --- /dev/null +++ b/nds/nds_maintenance.py @@ -0,0 +1,102 @@ +import argparse +import csv +import time + +from pyspark.sql import SparkSession +from PysparkBenchReport import PysparkBenchReport + +from check import get_abs_path + +def get_maintenance_queries(folder, spec_queries): + """get query content from DM query files + + Args: + folder (str): folder to Data Maintenance query files + spec_queries (list[str]): specific target Data Maintenance queries + Returns: + dict{str: list[str]}: a dict contains Data Maintenance query name and its content. + """ + #TODO: Add delete functions + DM_FUNCS = ['LF_CR', + 'LF_CS', + 'LF_I', + 'LF_SR', + 'LF_SS', + 'LF_WR', + 'LF_WS'] + if spec_queries: + for q in spec_queries: + if q not in DM_FUNCS: + raise Exception(f"invalid Data Maintenance query: {q}. Valid are: {DM_FUNCS}") + DM_FUNCS = [q for q in spec_queries if q in DM_FUNCS] + folder_abs_path = get_abs_path(folder) + q_dict = {} + for q in DM_FUNCS: + with open(folder_abs_path + '/' + q + '.sql', 'r') as f: + # file content e.g. + # " CREATE view ..... ; INSERT into .... ;" + q_content = [ c + ';' for c in f.read().split(';')[:-1]] + q_dict[q] = q_content + return q_dict + +def run_query(query_dict, time_log_output_path): + # TODO: Duplicate code in nds_power.py. Refactor this part, make it general. + execution_time_list = [] + total_time_start = time.time() + if len(query_dict) == 1: + app_name = "NDS - Data Maintenance - " + list(query_dict.keys())[0] + else: + app_name = "NDS - Data Maintenance" + + spark_session = SparkSession.builder.appName( + app_name).getOrCreate() + spark_app_id = spark_session.sparkContext.applicationId + DM_start = time.time() + for query_name, q_content in query_dict.items(): + # show query name in Spark web UI + spark_session.sparkContext.setJobGroup(query_name, query_name) + print(f"====== Run {query_name} ======") + q_report = PysparkBenchReport(spark_session) + for q in q_content: + summary = q_report.report_on(spark_session.sql, + q) + print(f"Time taken: {summary['queryTimes']} millis for {query_name}") + execution_time_list.append((spark_app_id, query_name, summary['queryTimes'])) + q_report.write_summary(query_name, prefix="") + spark_session.sparkContext.stop() + DM_end = time.time() + DM_elapse = DM_end - DM_start + total_elapse = DM_end - total_time_start + print("====== Data Maintenance Time: {} s ======".format(DM_elapse)) + print("====== Total Time: {} s ======".format(total_elapse)) + execution_time_list.append( + (spark_app_id, "Data Maintenance Time", DM_elapse)) + execution_time_list.append( + (spark_app_id, "Total Time", total_elapse)) + + # write to local csv file + header = ["application_id", "query", "time/s"] + with open(time_log_output_path, 'w', encoding='UTF8') as f: + writer = csv.writer(f) + writer.writerow(header) + writer.writerows(execution_time_list) + + +if __name__ == "__main__": + parser = parser = argparse.ArgumentParser() + parser.add_argument('maintenance_queries_folder', + help='folder contains all NDS Data Maintenance queries. If ' + + '"--maintenance_queries" is not set, all queries under the folder will be' + + 'executed.') + parser.add_argument('time_log', + help='path to execution time log, only support local path.', + default="") + parser.add_argument('--maintenance_queries', + type=lambda s: s.split(','), + help='specify Data Maintenance query names by a comma seprated string.' + + ' e.g. "LF_CR,LF_CS"') + + args = parser.parse_args() + query_dict = get_maintenance_queries(args.maintenance_queries_folder, + args.maintenance_queries) + run_query(query_dict, args.time_log) From 8f3714f852322f200d0d086a8145f47e582aa89f Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Thu, 26 May 2022 18:50:47 +0800 Subject: [PATCH 03/17] add table s_web_returns --- nds/nds_gen_data.py | 3 ++- nds/nds_transcode.py | 20 ++++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/nds/nds_gen_data.py b/nds/nds_gen_data.py index fbcdcb5..149fd2a 100644 --- a/nds/nds_gen_data.py +++ b/nds/nds_gen_data.py @@ -75,7 +75,8 @@ 's_purchase_lineitem', 's_store_returns', 's_web_order', - 's_web_order_lineitem' + 's_web_order_lineitem', + 's_web_returns' ] def clean_temp_data(temp_data_path): diff --git a/nds/nds_transcode.py b/nds/nds_transcode.py index 91bad5a..2c35683 100644 --- a/nds/nds_transcode.py +++ b/nds/nds_transcode.py @@ -680,6 +680,26 @@ def get_maintenance_schemas(use_decimal): StructField("cret_catalog_page_id", CharType(16)), StructField("cret_warehouse_id", CharType(16)), ]) + MAINTENANCE_SCHEMAS["s_web_returns"] = StructType([ + StructField("wret_web_page_id", CharType(16)), + StructField("wret_order_id", IntegerType(), nullable=False), + StructField("wret_line_number", IntegerType(), nullable=False), + StructField("wret_item_id", CharType(16), nullable=False), + StructField("wret_return_customer_id", CharType(16)), + StructField("wret_refund_customer_id", CharType(16)), + StructField("wret_return_date", CharType(10)), + StructField("wret_return_time", CharType(10)), + StructField("wret_return_qty", IntegerType()), + StructField("wret_return_amt", decimalType(use_decimal,7,2)), + StructField("wret_return_tax", decimalType(use_decimal,7,2)), + StructField("wret_return_fee", decimalType(use_decimal,7,2)), + StructField("wret_return_ship_cost", decimalType(use_decimal,7,2)), + StructField("wret_refunded_cash", decimalType(use_decimal,7,2)), + StructField("wret_reversed_CharTypege", decimalType(use_decimal,7,2)), + StructField("wret_account_credit", decimalType(use_decimal,7,2)), + StructField("wret_reason_id", CharType(16)), + ]) + MAINTENANCE_SCHEMAS["s_inventory"] = StructType([ StructField("invn_warehouse_id", CharType(16), nullable=False), StructField("invn_item_id", CharType(16), nullable=False), From bfc2079084d55c2e729719cd185a31d1207797b5 Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Fri, 27 May 2022 09:14:17 +0800 Subject: [PATCH 04/17] add DM functions Signed-off-by: Allen Xu --- nds/data_maintenance/LF_CR.sql | 47 +++++++++++++++++++++++++++++ nds/data_maintenance/LF_CS.sql | 54 ++++++++++++++++++++++++++++++++++ nds/data_maintenance/LF_I.sql | 11 +++++++ nds/data_maintenance/LF_SR.sql | 37 +++++++++++++++++++++++ nds/data_maintenance/LF_SS.sql | 38 ++++++++++++++++++++++++ nds/data_maintenance/LF_WR.sql | 37 +++++++++++++++++++++++ nds/data_maintenance/LF_WS.sql | 53 +++++++++++++++++++++++++++++++++ 7 files changed, 277 insertions(+) create mode 100644 nds/data_maintenance/LF_CR.sql create mode 100644 nds/data_maintenance/LF_CS.sql create mode 100644 nds/data_maintenance/LF_I.sql create mode 100644 nds/data_maintenance/LF_SR.sql create mode 100644 nds/data_maintenance/LF_SS.sql create mode 100644 nds/data_maintenance/LF_WR.sql create mode 100644 nds/data_maintenance/LF_WS.sql diff --git a/nds/data_maintenance/LF_CR.sql b/nds/data_maintenance/LF_CR.sql new file mode 100644 index 0000000..59da96f --- /dev/null +++ b/nds/data_maintenance/LF_CR.sql @@ -0,0 +1,47 @@ +CREATE VIEW crv as +SELECT d_date_sk cr_returned_date_sk + ,t_time_sk cr_returned_time_sk + ,i_item_sk cr_item_sk + ,c1.c_customer_sk cr_refunded_customer_sk + ,c1.c_current_cdemo_sk cr_refunded_cdemo_sk + ,c1.c_current_hdemo_sk cr_refunded_hdemo_sk + ,c1.c_current_addr_sk cr_refunded_addr_sk + ,c2.c_customer_sk cr_returning_customer_sk + ,c2.c_current_cdemo_sk cr_returning_cdemo_sk + ,c2.c_current_hdemo_sk cr_returning_hdemo_sk + ,c2.c_current_addr_sk cr_returing_addr_sk + ,cc_call_center_sk cr_call_center_sk + ,cp_catalog_page_sk CR_CATALOG_PAGE_SK + ,sm_ship_mode_sk CR_SHIP_MODE_SK + ,w_warehouse_sk CR_WAREHOUSE_SK + ,r_reason_sk cr_reason_sk + ,cret_order_id cr_order_number + ,cret_return_qty cr_return_quantity + ,cret_return_amt cr_return_amt + ,cret_return_tax cr_return_tax + ,cret_return_amt + cret_return_tax AS cr_return_amt_inc_tax + ,cret_return_fee cr_fee + ,cret_return_ship_cost cr_return_ship_cost + ,cret_refunded_cash cr_refunded_cash + ,cret_reversed_charge cr_reversed_charge + ,cret_merchant_credit cr_merchant_credit + ,cret_return_amt+cret_return_tax+cret_return_fee + -cret_refunded_cash-cret_reversed_charge-cret_merchant_credit cr_net_loss +FROM s_catalog_returns +LEFT OUTER JOIN date_dim + ON (cast(cret_return_date as date) = d_date) +LEFT OUTER JOIN time_dim ON + ((CAST(substr(cret_return_time,1,2) AS integer)*3600 + +CAST(substr(cret_return_time,4,2) AS integer)*60 + +CAST(substr(cret_return_time,7,2) AS integer)) = t_time) +LEFT OUTER JOIN item ON (cret_item_id = i_item_id) +LEFT OUTER JOIN customer c1 ON (cret_return_customer_id = c1.c_customer_id) +LEFT OUTER JOIN customer c2 ON (cret_refund_customer_id = c2.c_customer_id) +LEFT OUTER JOIN reason ON (cret_reason_id = r_reason_id) +LEFT OUTER JOIN call_center ON (cret_call_center_id = cc_call_center_id) +LEFT OUTER JOIN catalog_page ON (cret_catalog_page_id = cp_catalog_page_id) +LEFT OUTER JOIN ship_mode ON (cret_shipmode_id = sm_ship_mode_id) +LEFT OUTER JOIN warehouse ON (cret_warehouse_id = w_warehouse_id) +WHERE i_rec_end_date IS NULL AND cc_rec_end_date IS NULL; +------------------------------------------------ +insert into catalog_returns (select * from crv); \ No newline at end of file diff --git a/nds/data_maintenance/LF_CS.sql b/nds/data_maintenance/LF_CS.sql new file mode 100644 index 0000000..976c420 --- /dev/null +++ b/nds/data_maintenance/LF_CS.sql @@ -0,0 +1,54 @@ +CREATE view csv as +SELECT d1.d_date_sk cs_sold_date_sk + ,t_time_sk cs_sold_time_sk + ,d2.d_date_sk cs_ship_date_sk + ,c1.c_customer_sk cs_bill_customer_sk + ,c1.c_current_cdemo_sk cs_bill_cdemo_sk + ,c1.c_current_hdemo_sk cs_bill_hdemo_sk + ,c1.c_current_addr_sk cs_bill_addr_sk + ,c2.c_customer_sk cs_ship_customer_sk + ,c2.c_current_cdemo_sk cs_ship_cdemo_sk + ,c2.c_current_hdemo_sk cs_ship_hdemo_sk + ,c2.c_current_addr_sk cs_ship_addr_sk + ,cc_call_center_sk cs_call_center_sk + ,cp_catalog_page_sk cs_catalog_page_sk + ,sm_ship_mode_sk cs_ship_mode_sk + ,w_warehouse_sk cs_warehouse_sk + ,i_item_sk cs_item_sk + ,p_promo_sk cs_promo_sk + ,cord_order_id cs_order_number + ,clin_quantity cs_quantity + ,i_wholesale_cost cs_wholesale_cost + ,i_current_price cs_list_price + ,clin_sales_price cs_sales_price + ,(i_current_price-clin_sales_price)*clin_quantity cs_ext_discount_amt + ,clin_sales_price * clin_quantity cs_ext_sales_price + ,i_wholesale_cost * clin_quantity cs_ext_wholesale_cost + ,i_current_price * clin_quantity CS_EXT_LIST_PRICE + ,i_current_price * cc_tax_percentage CS_EXT_TAX + ,clin_coupon_amt cs_coupon_amt + ,clin_ship_cost * clin_quantity CS_EXT_SHIP_COST + ,(clin_sales_price * clin_quantity)-clin_coupon_amt cs_net_paid + ,((clin_sales_price * clin_quantity)-clin_coupon_amt)*(1+cc_tax_percentage) cs_net_paid_inc_tax + ,(clin_sales_price * clin_quantity)-clin_coupon_amt + (clin_ship_cost * clin_quantity) CS_NET_PAID_INC_SHIP + ,(clin_sales_price * clin_quantity)-clin_coupon_amt + (clin_ship_cost * clin_quantity) + + i_current_price * cc_tax_percentage CS_NET_PAID_INC_SHIP_TAX + ,((clin_sales_price * clin_quantity)-clin_coupon_amt)-(clin_quantity*i_wholesale_cost) cs_net_profit +FROM s_catalog_order +LEFT OUTER JOIN date_dim d1 ON + (cast(cord_order_date as date) = d1.d_date) +LEFT OUTER JOIN time_dim ON (cord_order_time = t_time) +LEFT OUTER JOIN customer c1 ON (cord_bill_customer_id = c1.c_customer_id) +LEFT OUTER JOIN customer c2 ON (cord_ship_customer_id = c2.c_customer_id) +LEFT OUTER JOIN call_center ON (cord_call_center_id = cc_call_center_id AND cc_rec_end_date IS NULL) +LEFT OUTER JOIN ship_mode ON (cord_ship_mode_id = sm_ship_mode_id) +JOIN s_catalog_order_lineitem ON (cord_order_id = clin_order_id) +LEFT OUTER JOIN date_dim d2 ON + (cast(clin_ship_date as date) = d2.d_date) +LEFT OUTER JOIN catalog_page ON + (clin_catalog_page_number = cp_catalog_page_number and clin_catalog_number = cp_catalog_number) +LEFT OUTER JOIN warehouse ON (clin_warehouse_id = w_warehouse_id) +LEFT OUTER JOIN item ON (clin_item_id = i_item_id AND i_rec_end_date IS NULL) +LEFT OUTER JOIN promotion ON (clin_promotion_id = p_promo_id); +------------------------------------------------ +insert into catalog_sales (select * from csv); \ No newline at end of file diff --git a/nds/data_maintenance/LF_I.sql b/nds/data_maintenance/LF_I.sql new file mode 100644 index 0000000..d6d025d --- /dev/null +++ b/nds/data_maintenance/LF_I.sql @@ -0,0 +1,11 @@ +CREATE view iv AS +SELECT d_date_sk inv_date_sk, + i_item_sk inv_item_sk, + w_warehouse_sk inv_warehouse_sk, + invn_qty_on_hand inv_quantity_on_hand +FROM s_inventory +LEFT OUTER JOIN warehouse ON (invn_warehouse_id=w_warehouse_id) +LEFT OUTER JOIN item ON (invn_item_id=i_item_id AND i_rec_end_date IS NULL) +LEFT OUTER JOIN date_dim ON (d_date=invn_date); +------------------------------------------------ +insert into inventory (select * from iv); \ No newline at end of file diff --git a/nds/data_maintenance/LF_SR.sql b/nds/data_maintenance/LF_SR.sql new file mode 100644 index 0000000..e7841b2 --- /dev/null +++ b/nds/data_maintenance/LF_SR.sql @@ -0,0 +1,37 @@ +CREATE view srv as +SELECT d_date_sk sr_returned_date_sk + ,t_time_sk sr_return_time_sk + ,i_item_sk sr_item_sk + ,c_customer_sk sr_customer_sk + ,c_current_cdemo_sk sr_cdemo_sk + ,c_current_hdemo_sk sr_hdemo_sk + ,c_current_addr_sk sr_addr_sk + ,s_store_sk sr_store_sk + ,r_reason_sk sr_reason_sk + ,sret_ticket_number sr_ticket_number + ,sret_return_qty sr_return_quantity + ,sret_return_amt sr_return_amt + ,sret_return_tax sr_return_tax + ,sret_return_amt + sret_return_tax sr_return_amt_inc_tax + ,sret_return_fee sr_fee + ,sret_return_ship_cost sr_return_ship_cost + ,sret_refunded_cash sr_refunded_cash + ,sret_reversed_charge sr_reversed_charge + ,sret_store_credit sr_store_credit + ,sret_return_amt+sret_return_tax+sret_return_fee + -sret_refunded_cash-sret_reversed_charge-sret_store_credit sr_net_loss +FROM s_store_returns +LEFT OUTER JOIN date_dim + ON (cast(sret_return_date as date) = d_date) +LEFT OUTER JOIN time_dim + ON (( cast(substr(sret_return_time,1,2) AS integer)*3600 + +cast(substr(sret_return_time,4,2) AS integer)*60 + +cast(substr(sret_return_time,7,2) AS integer)) = t_time) +LEFT OUTER JOIN item ON (sret_item_id = i_item_id) +LEFT OUTER JOIN customer ON (sret_customer_id = c_customer_id) +LEFT OUTER JOIN store ON (sret_store_id = s_store_id) +LEFT OUTER JOIN reason ON (sret_reason_id = r_reason_id) +WHERE i_rec_end_date IS NULL + AND s_rec_end_date IS NULL; +------------------------------------------------ +insert into store_returns (select * from srv); \ No newline at end of file diff --git a/nds/data_maintenance/LF_SS.sql b/nds/data_maintenance/LF_SS.sql new file mode 100644 index 0000000..57ca27b --- /dev/null +++ b/nds/data_maintenance/LF_SS.sql @@ -0,0 +1,38 @@ +CREATE view ssv as +SELECT d_date_sk ss_sold_date_sk, + t_time_sk ss_sold_time_sk, + i_item_sk ss_item_sk, + c_customer_sk ss_customer_sk, + c_current_cdemo_sk ss_cdemo_sk, + c_current_hdemo_sk ss_hdemo_sk, + c_current_addr_sk ss_addr_sk, + s_store_sk ss_store_sk, + p_promo_sk ss_promo_sk, + purc_purchase_id ss_ticket_number, + plin_quantity ss_quantity, + i_wholesale_cost ss_wholesale_cost, + i_current_price ss_list_price, + plin_sale_price ss_sales_price, + (i_current_price-plin_sale_price)*plin_quantity ss_ext_discount_amt, + plin_sale_price * plin_quantity ss_ext_sales_price, + i_wholesale_cost * plin_quantity ss_ext_wholesale_cost, + i_current_price * plin_quantity ss_ext_list_price, + i_current_price * s_tax_precentage ss_ext_tax, + plin_coupon_amt ss_coupon_amt, + (plin_sale_price * plin_quantity)-plin_coupon_amt ss_net_paid, + ((plin_sale_price * plin_quantity)-plin_coupon_amt)*(1+s_tax_precentage) ss_net_paid_inc_tax, + ((plin_sale_price * plin_quantity)-plin_coupon_amt)-(plin_quantity*i_wholesale_cost) +ss_net_profit +FROM s_purchase +LEFT OUTER JOIN customer ON (purc_customer_id = c_customer_id) +LEFT OUTER JOIN store ON (purc_store_id = s_store_id) +LEFT OUTER JOIN date_dim ON (cast(purc_purchase_date as date) = d_date) +LEFT OUTER JOIN time_dim ON (PURC_PURCHASE_TIME = t_time) +JOIN s_purchase_lineitem ON (purc_purchase_id = plin_purchase_id) +LEFT OUTER JOIN promotion ON plin_promotion_id = p_promo_id +LEFT OUTER JOIN item ON plin_item_id = i_item_id +WHERE purc_purchase_id = plin_purchase_id + AND i_rec_end_date is NULL + AND s_rec_end_date is NULL; +------------------------------------------------ +insert into store_sales (select * from ssv); \ No newline at end of file diff --git a/nds/data_maintenance/LF_WR.sql b/nds/data_maintenance/LF_WR.sql new file mode 100644 index 0000000..545881e --- /dev/null +++ b/nds/data_maintenance/LF_WR.sql @@ -0,0 +1,37 @@ +CREATE VIEW wrv AS +SELECT d_date_sk wr_return_date_sk + ,t_time_sk wr_return_time_sk + ,i_item_sk wr_item_sk + ,c1.c_customer_sk wr_refunded_customer_sk + ,c1.c_current_cdemo_sk wr_refunded_cdemo_sk + ,c1.c_current_hdemo_sk wr_refunded_hdemo_sk + ,c1.c_current_addr_sk wr_refunded_addr_sk + ,c2.c_customer_sk wr_returning_customer_sk + ,c2.c_current_cdemo_sk wr_returning_cdemo_sk + ,c2.c_current_hdemo_sk wr_returning_hdemo_sk + ,c2.c_current_addr_sk wr_returing_addr_sk + ,wp_web_page_sk wr_web_page_sk + ,r_reason_sk wr_reason_sk + ,wret_order_id wr_order_number + ,wret_return_qty wr_return_quantity + ,wret_return_amt wr_return_amt + ,wret_return_tax wr_return_tax + ,wret_return_amt + wret_return_tax AS wr_return_amt_inc_tax + ,wret_return_fee wr_fee + ,wret_return_ship_cost wr_return_ship_cost + ,wret_refunded_cash wr_refunded_cash + ,wret_reversed_charge wr_reversed_charge + ,wret_account_credit wr_account_credit + ,wret_return_amt+wret_return_tax+wret_return_fee + -wret_refunded_cash-wret_reversed_charge-wret_account_credit wr_net_loss +FROM s_web_returns LEFT OUTER JOIN date_dim ON (cast(wret_return_date as date) = d_date) +LEFT OUTER JOIN time_dim ON ((CAST(SUBSTR(wret_return_time,1,2) AS integer)*3600 ++CAST(SUBSTR(wret_return_time,4,2) AS integer)*60+CAST(SUBSTR(wret_return_time,7,2) AS integer))=t_time) +LEFT OUTER JOIN item ON (wret_item_id = i_item_id) +LEFT OUTER JOIN customer c1 ON (wret_return_customer_id = c1.c_customer_id) +LEFT OUTER JOIN customer c2 ON (wret_refund_customer_id = c2.c_customer_id) +LEFT OUTER JOIN reason ON (wret_reason_id = r_reason_id) +LEFT OUTER JOIN web_page ON (wret_web_page_id = WP_WEB_PAGE_id) +WHERE i_rec_end_date IS NULL AND wp_rec_end_date IS NULL; +------------------------------------------------ +insert into web_returns (select * from wrv); \ No newline at end of file diff --git a/nds/data_maintenance/LF_WS.sql b/nds/data_maintenance/LF_WS.sql new file mode 100644 index 0000000..5c2381b --- /dev/null +++ b/nds/data_maintenance/LF_WS.sql @@ -0,0 +1,53 @@ +CREATE VIEW wsv AS +SELECT d1.d_date_sk ws_sold_date_sk, + t_time_sk ws_sold_time_sk, + d2.d_date_sk ws_ship_date_sk, + i_item_sk ws_item_sk, + c1.c_customer_sk ws_bill_customer_sk, + c1.c_current_cdemo_sk ws_bill_cdemo_sk, + c1.c_current_hdemo_sk ws_bill_hdemo_sk, + c1.c_current_addr_sk ws_bill_addr_sk, + c2.c_customer_sk ws_ship_customer_sk, + c2.c_current_cdemo_sk ws_ship_cdemo_sk, + c2.c_current_hdemo_sk ws_ship_hdemo_sk, + c2.c_current_addr_sk ws_ship_addr_sk, + wp_web_page_sk ws_web_page_sk, + web_site_sk ws_web_site_sk, + sm_ship_mode_sk ws_ship_mode_sk, + w_warehouse_sk ws_warehouse_sk, + p_promo_sk ws_promo_sk, + word_order_id ws_order_number, + wlin_quantity ws_quantity, + i_wholesale_cost ws_wholesale_cost, + i_current_price ws_list_price, + wlin_sales_price ws_sales_price, + (i_current_price-wlin_sales_price)*wlin_quantity ws_ext_discount_amt, + wlin_sales_price * wlin_quantity ws_ext_sales_price, + i_wholesale_cost * wlin_quantity ws_ext_wholesale_cost, + i_current_price * wlin_quantity ws_ext_list_price, + i_current_price * web_tax_percentage ws_ext_tax, + wlin_coupon_amt ws_coupon_amt, + wlin_ship_cost * wlin_quantity WS_EXT_SHIP_COST, + (wlin_sales_price * wlin_quantity)-wlin_coupon_amt ws_net_paid, + ((wlin_sales_price * wlin_quantity)-wlin_coupon_amt)*(1+web_tax_percentage) ws_net_paid_inc_tax, + ((wlin_sales_price * wlin_quantity)-wlin_coupon_amt)-(wlin_quantity*i_wholesale_cost) +WS_NET_PAID_INC_SHIP, + (wlin_sales_price * wlin_quantity)-wlin_coupon_amt + (wlin_ship_cost * wlin_quantity) + + i_current_price * web_tax_percentage WS_NET_PAID_INC_SHIP_TAX, + ((wlin_sales_price * wlin_quantity)-wlin_coupon_amt)-(i_wholesale_cost * wlin_quantity) +WS_NET_PROFIT +FROM s_web_order +LEFT OUTER JOIN date_dim d1 ON (cast(word_order_date as date) = d1.d_date) +LEFT OUTER JOIN time_dim ON (word_order_time = t_time) +LEFT OUTER JOIN customer c1 ON (word_bill_customer_id = c1.c_customer_id) +LEFT OUTER JOIN customer c2 ON (word_ship_customer_id = c2.c_customer_id) +LEFT OUTER JOIN web_site ON (word_web_site_id = web_site_id AND web_rec_end_date IS NULL) +LEFT OUTER JOIN ship_mode ON (word_ship_mode_id = sm_ship_mode_id) +JOIN s_web_order_lineitem ON (word_order_id = wlin_order_id) +LEFT OUTER JOIN date_dim d2 ON (cast(wlin_ship_date as date) = d2.d_date) +LEFT OUTER JOIN item ON (wlin_item_id = i_item_id AND i_rec_end_date IS NULL) +LEFT OUTER JOIN web_page ON (wlin_web_page_id = wp_web_page_id AND wp_rec_end_date IS NULL) +LEFT OUTER JOIN warehouse ON (wlin_warehouse_id = w_warehouse_id) +LEFT OUTER JOIN promotion ON (wlin_promotion_id = p_promo_id); +------------------------------------------------ +insert into web_sales (select * from wsv); \ No newline at end of file From a4cbec64e1cbbb3421fc5821b915fc98a34e53e7 Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Mon, 30 May 2022 17:07:44 +0800 Subject: [PATCH 05/17] Add --iceberg argument Signed-off-by: Allen Xu --- nds/README.md | 23 +++++++------- nds/nds_transcode.py | 71 +++++++++++++++++++++++++++++++------------- 2 files changed, 63 insertions(+), 31 deletions(-) diff --git a/nds/README.md b/nds/README.md index 7619c9c..193ee27 100644 --- a/nds/README.md +++ b/nds/README.md @@ -99,30 +99,31 @@ otherwise DecimalType will be saved. arguments for `nds_transcode.py`: ``` python nds_transcode.py -h -usage: nds_transcode.py [-h] [--output_mode OUTPUT_MODE] [--input_suffix INPUT_SUFFIX] - [--log_level LOG_LEVEL] [--floats] +usage: nds_transcode.py [-h] [--output_mode {overwrite,append,ignore,error,errorifexists}] [--output_format {parquet,orc,avro}] [--tables TABLES] [--log_level LOG_LEVEL] [--floats] [--update] [--iceberg] [--compression COMPRESSION] input_prefix output_prefix report_file positional arguments: input_prefix text to prepend to every input file path (e.g., "hdfs:///ds-generated-data"; the default is empty) - output_prefix text to prepend to every output file (e.g., "hdfs:///ds-parquet"; the default is - empty) + output_prefix text to prepend to every output file (e.g., "hdfs:///ds-parquet"; the default is empty). + This positional arguments will not take effect if "--iceberg" is specified and user needs to set Iceberg table path in their Spark submit templates/configs. report_file location to store a performance report(local) optional arguments: -h, --help show this help message and exit --output_mode {overwrite,append,ignore,error,errorifexists,default} - save modes as defined by https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modesdefault value is errorifexists, which is the Spark default behavior - --output_format {parquet,orc} - output data format when converting CSV data sources. Now supports parquet, orc. + save modes as defined by https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes. + default value is errorifexists, which is the Spark default behavior. + --output_format {parquet,orc,avro} + output data format when converting CSV data sources. --tables TABLES specify table names by a comma seprated string. e.g. 'catalog_page,catalog_sales'. - --input_suffix INPUT_SUFFIX - text to append to every input filename (e.g., ".dat"; the default is empty) --log_level LOG_LEVEL set log level for Spark driver log. Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN(default: INFO) - --floats replace DecimalType with DoubleType when saving parquet files. If not specified, - decimal data will be saved. + --floats replace DecimalType with DoubleType when saving parquet files. If not specified, decimal data will be saved. + --update transcode the source data or update data + --iceberg Save converted data into Iceberg tables. + --compression COMPRESSION + Parquet compression codec. Iceberg is using gzip as default but spark-rapids plugin does not support it yet, so default it to snappy. ``` diff --git a/nds/nds_transcode.py b/nds/nds_transcode.py index 2c35683..73d14f4 100644 --- a/nds/nds_transcode.py +++ b/nds/nds_transcode.py @@ -331,7 +331,10 @@ def get_schemas(use_decimal): StructField("sr_addr_sk", IntegerType()), StructField("sr_store_sk", IntegerType()), StructField("sr_reason_sk", IntegerType()), - StructField("sr_ticket_number", IntegerType(), nullable=False), + # Use LongType due to https://github.com/NVIDIA/spark-rapids-benchmarks/pull/9#issuecomment-1138379596 + # Databricks is using LongType as well in their accpeted benchmark reports. + # See https://www.tpc.org/results/supporting_files/tpcds/databricks~tpcds~100000~databricks_SQL_8.3~sup-1~2021-11-02~v01.zip + StructField("sr_ticket_number", LongType(), nullable=False), StructField("sr_return_quantity", IntegerType()), StructField("sr_return_amt", decimalType(use_decimal, 7, 2)), StructField("sr_return_tax", decimalType(use_decimal, 7, 2)), @@ -725,7 +728,7 @@ def load(session, filename, schema, delimiter="|", header="false", prefix=""): return session.read.option("delimiter", delimiter).option("header", header).csv(data_path, schema=schema) -def store(session, df, filename, output_format, compression): +def store(session, df, filename, output_format, output_mode, use_iceberg, compression, prefix=""): """Create Iceberg tables by CTAS Args: @@ -733,19 +736,37 @@ def store(session, df, filename, output_format, compression): df (DataFrame): DataFrame to be serialized into Iceberg table filename (str): name of the table(file) output_format (str): parquet, orc or avro + write_mode (str): save modes as defined by "https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes. + use_iceberg (bool): write data into Iceberg tables compression (str): Parquet compression codec when saving Iceberg tables + prefix (str): output data path when not using Iceberg. """ - CTAS = f"create table {filename} using iceberg " - if filename in TABLE_PARTITIONING.keys(): - df.repartition(col(TABLE_PARTITIONING[filename])).sortWithinPartitions(TABLE_PARTITIONING[filename]).createOrReplaceTempView("temptbl") - CTAS += f"partitioned by ({TABLE_PARTITIONING[filename]})" + if use_iceberg: + if output_mode == 'overwrite': + session.sql(f"drop table {filename}") + CTAS = f"create table {filename} using iceberg " + if filename in TABLE_PARTITIONING.keys(): + df.repartition( + col(TABLE_PARTITIONING[filename])).sortWithinPartitions( + TABLE_PARTITIONING[filename]).createOrReplaceTempView("temptbl") + CTAS += f"partitioned by ({TABLE_PARTITIONING[filename]})" + else: + df.coalesce(1).createOrReplaceTempView("temptbl") + CTAS += f" tblproperties('write.format.default' = '{output_format}'" + # the compression-codec won't panic when output_format is not parquet + CTAS += f", 'write.parquet.compression-codec' = '{compression}')" + CTAS += " as select * from temptbl" + session.sql(CTAS) else: - df.coalesce(1).createOrReplaceTempView("temptbl") - CTAS += f" tblproperties('write.format.default' = '{output_format}'" - # the compression-codec won't panic when output_format is not parquet - CTAS += f", 'write.parquet.compression-codec' = '{compression}')" - CTAS += " as select * from temptbl" - session.sql(CTAS) + data_path = prefix + '/' + filename + if filename in TABLE_PARTITIONING.keys(): + df = df.repartition( + col(TABLE_PARTITIONING[filename])).sortWithinPartitions( + TABLE_PARTITIONING[filename]) + df.write.format(output_format).mode(output_mode).partitionBy( + TABLE_PARTITIONING[filename]).save(data_path) + else: + df.coalesce(1).write.format(output_format).mode(output_mode).save(data_path) def transcode(args): session = pyspark.sql.SparkSession.builder \ @@ -772,13 +793,16 @@ def transcode(args): for fn, schema in trans_tables.items(): results[fn] = timeit.timeit( lambda: store(session, - load(session, - f"{fn}", - schema, - prefix=args.input_prefix), - f"{fn}", - args.output_format, - args.compression), + load(session, + f"{fn}", + schema, + prefix=args.input_prefix), + f"{fn}", + args.output_format, + args.output_mode, + args.iceberg, + args.compression, + args.output_prefix), number=1) report_text = "Total conversion time for %d tables was %.02fs\n" % ( @@ -804,7 +828,9 @@ def transcode(args): help='text to prepend to every input file path (e.g., "hdfs:///ds-generated-data"; the default is empty)') parser.add_argument( 'output_prefix', - help='text to prepend to every output file (e.g., "hdfs:///ds-parquet"; the default is empty)') + help='text to prepend to every output file (e.g., "hdfs:///ds-parquet"; the default is empty)' + + '. This positional arguments will not take effect if "--iceberg" is specified. ' + + 'User needs to set Iceberg table path in their Spark submit templates/configs.') parser.add_argument( 'report_file', help='location to store a performance report(local)') @@ -838,6 +864,11 @@ def transcode(args): action='store_true', help='transcode the source data or update data' ) + parser.add_argument( + '--iceberg', + action='store_true', + help='Save converted data into Iceberg tables.' + ) parser.add_argument( '--compression', default='snappy', From 920d84edd2b6858e47e02b5cb304a6c68a324c96 Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Mon, 30 May 2022 17:11:17 +0800 Subject: [PATCH 06/17] add iceberg template Signed-off-by: Allen Xu --- nds/convert_submit_cpu.template | 11 +-------- nds/convert_submit_cpu_iceberg.template | 32 +++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 10 deletions(-) create mode 100644 nds/convert_submit_cpu_iceberg.template diff --git a/nds/convert_submit_cpu.template b/nds/convert_submit_cpu.template index 1427e92..ea68d63 100644 --- a/nds/convert_submit_cpu.template +++ b/nds/convert_submit_cpu.template @@ -21,13 +21,4 @@ export SPARK_CONF=("--master" "yarn" "--num-executors" "8" "--executor-memory" "40G" "--executor-cores" "12" - "--conf" "spark.task.cpus=1" - "--packages" "org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.1" - "--conf" "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" - "--conf" "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog" - "--conf" "spark.sql.catalog.spark_catalog.type=hadoop" - "--conf" "spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog" - "--conf" "spark.sql.catalog.local.type=hadoop" - "--conf" "spark.sql.catalog.local.warehouse=$PWD/local-warehouse" - "--conf" "spark.sql.catalog.spark_catalog.warehouse=$PWD/spark-warehouse") - + "--conf" "spark.task.cpus=1") diff --git a/nds/convert_submit_cpu_iceberg.template b/nds/convert_submit_cpu_iceberg.template new file mode 100644 index 0000000..d9d649d --- /dev/null +++ b/nds/convert_submit_cpu_iceberg.template @@ -0,0 +1,32 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +export SPARK_HOME=YOUR_SPARK_HOME +export SPARK_CONF=("--master" "yarn" + "--deploy-mode" "cluster" + "--driver-memory" "10G" + "--num-executors" "8" + "--executor-memory" "40G" + "--executor-cores" "12" + "--conf" "spark.task.cpus=1" + "--packages" "org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.1" + "--conf" "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" + "--conf" "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog" + "--conf" "spark.sql.catalog.spark_catalog.type=hadoop" + "--conf" "spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog" + "--conf" "spark.sql.catalog.local.type=hadoop" + "--conf" "spark.sql.catalog.local.warehouse=$PWD/local-warehouse" + "--conf" "spark.sql.catalog.spark_catalog.warehouse=$PWD/spark-warehouse") From fa910a4bc49c4501f245db9ae0a0c1abad564a4a Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Tue, 31 May 2022 18:09:11 +0800 Subject: [PATCH 07/17] add DELETE funcs Signed-off-by: Allen Xu --- nds/data_maintenance/DF_CS.sql | 6 +++++ nds/data_maintenance/DF_I.sql | 2 ++ nds/data_maintenance/DF_SS.sql | 3 +++ nds/data_maintenance/DF_WS.sql | 3 +++ nds/data_maintenance/LF_CR.sql | 1 + nds/data_maintenance/LF_CS.sql | 1 + nds/data_maintenance/LF_I.sql | 1 + nds/data_maintenance/LF_SR.sql | 1 + nds/data_maintenance/LF_SS.sql | 1 + nds/data_maintenance/LF_WR.sql | 1 + nds/data_maintenance/LF_WS.sql | 1 + nds/nds_maintenance.py | 48 ++++++++++++++++++++++++++++++++-- nds/nds_transcode.py | 12 ++++++++- 13 files changed, 78 insertions(+), 3 deletions(-) create mode 100644 nds/data_maintenance/DF_CS.sql create mode 100644 nds/data_maintenance/DF_I.sql create mode 100644 nds/data_maintenance/DF_SS.sql create mode 100644 nds/data_maintenance/DF_WS.sql diff --git a/nds/data_maintenance/DF_CS.sql b/nds/data_maintenance/DF_CS.sql new file mode 100644 index 0000000..cc1f025 --- /dev/null +++ b/nds/data_maintenance/DF_CS.sql @@ -0,0 +1,6 @@ +delete from catalog_returns where cr_order_number in ( +select cs_order_number from catalog_sales where cs_sold_date_sk in (select d_date_sk from date_dim where d_date between 'DATE1' and 'DATE2') +); + +delete from catalog_sales where cs_sold_date_sk >= (select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2') and + cs_sold_date_sk <= (select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'); diff --git a/nds/data_maintenance/DF_I.sql b/nds/data_maintenance/DF_I.sql new file mode 100644 index 0000000..5b19f7a --- /dev/null +++ b/nds/data_maintenance/DF_I.sql @@ -0,0 +1,2 @@ +delete from inventory where inv_date_sk >= ( select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2') and + inv_date_sk <= ( select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'); diff --git a/nds/data_maintenance/DF_SS.sql b/nds/data_maintenance/DF_SS.sql new file mode 100644 index 0000000..1012c45 --- /dev/null +++ b/nds/data_maintenance/DF_SS.sql @@ -0,0 +1,3 @@ +delete from store_returns where sr_ticket_number in (select ss_ticket_number from store_sales, date_dim where ss_sold_date_sk=d_date_sk and d_date between 'DATE1' and 'DATE2'); +delete from store_sales where ss_sold_date_sk >= (select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2') and + ss_sold_date_sk <= (select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'); diff --git a/nds/data_maintenance/DF_WS.sql b/nds/data_maintenance/DF_WS.sql new file mode 100644 index 0000000..bdde5a9 --- /dev/null +++ b/nds/data_maintenance/DF_WS.sql @@ -0,0 +1,3 @@ +delete from web_returns where wr_order_number in (select ws_order_number from web_sales, date_dim where ws_sold_date_sk=d_date_sk and d_date between 'DATE1' and 'DATE2'); +delete from web_sales where ws_sold_date_sk >= (select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2') and + ws_sold_date_sk <= (select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'); diff --git a/nds/data_maintenance/LF_CR.sql b/nds/data_maintenance/LF_CR.sql index 59da96f..f4ee1ca 100644 --- a/nds/data_maintenance/LF_CR.sql +++ b/nds/data_maintenance/LF_CR.sql @@ -1,3 +1,4 @@ +DROP VIEW IF EXISTS crv; CREATE VIEW crv as SELECT d_date_sk cr_returned_date_sk ,t_time_sk cr_returned_time_sk diff --git a/nds/data_maintenance/LF_CS.sql b/nds/data_maintenance/LF_CS.sql index 976c420..fac5e55 100644 --- a/nds/data_maintenance/LF_CS.sql +++ b/nds/data_maintenance/LF_CS.sql @@ -1,3 +1,4 @@ +DROP VIEW IF EXISTS csv; CREATE view csv as SELECT d1.d_date_sk cs_sold_date_sk ,t_time_sk cs_sold_time_sk diff --git a/nds/data_maintenance/LF_I.sql b/nds/data_maintenance/LF_I.sql index d6d025d..8057eb8 100644 --- a/nds/data_maintenance/LF_I.sql +++ b/nds/data_maintenance/LF_I.sql @@ -1,3 +1,4 @@ +DROP VIEW IF EXISTS iv; CREATE view iv AS SELECT d_date_sk inv_date_sk, i_item_sk inv_item_sk, diff --git a/nds/data_maintenance/LF_SR.sql b/nds/data_maintenance/LF_SR.sql index e7841b2..d035a37 100644 --- a/nds/data_maintenance/LF_SR.sql +++ b/nds/data_maintenance/LF_SR.sql @@ -1,3 +1,4 @@ +DROP VIEW IF EXISTS srv; CREATE view srv as SELECT d_date_sk sr_returned_date_sk ,t_time_sk sr_return_time_sk diff --git a/nds/data_maintenance/LF_SS.sql b/nds/data_maintenance/LF_SS.sql index 57ca27b..4d04084 100644 --- a/nds/data_maintenance/LF_SS.sql +++ b/nds/data_maintenance/LF_SS.sql @@ -1,3 +1,4 @@ +DROP VIEW IF EXISTS ssv; CREATE view ssv as SELECT d_date_sk ss_sold_date_sk, t_time_sk ss_sold_time_sk, diff --git a/nds/data_maintenance/LF_WR.sql b/nds/data_maintenance/LF_WR.sql index 545881e..d0694fd 100644 --- a/nds/data_maintenance/LF_WR.sql +++ b/nds/data_maintenance/LF_WR.sql @@ -1,3 +1,4 @@ +DROP VIEW IF EXISTS wrv; CREATE VIEW wrv AS SELECT d_date_sk wr_return_date_sk ,t_time_sk wr_return_time_sk diff --git a/nds/data_maintenance/LF_WS.sql b/nds/data_maintenance/LF_WS.sql index 5c2381b..acaf674 100644 --- a/nds/data_maintenance/LF_WS.sql +++ b/nds/data_maintenance/LF_WS.sql @@ -1,3 +1,4 @@ +DROP VIEW IF EXISTS wsv; CREATE VIEW wsv AS SELECT d1.d_date_sk ws_sold_date_sk, t_time_sk ws_sold_time_sk, diff --git a/nds/nds_maintenance.py b/nds/nds_maintenance.py index 6c4cfbe..407aa21 100644 --- a/nds/nds_maintenance.py +++ b/nds/nds_maintenance.py @@ -7,6 +7,36 @@ from check import get_abs_path +def get_delete_date(spark_session): + """get delete dates for Data Maintenance. Each delete functions requires 3 tuples: (date1, date2) + + Args: + spark_session (SparkSession): Spark session + Returns: + delete_dates_dict ({str: list[(date1, date2)]}): a dict contains date tuples for each delete functions + """ + delete_dates = spark_session.sql("select * from delete").collect() + inventory_delete_dates = spark_session.sql("select * from inventory_delete").collect() + date_dict = {} + date_dict['delete'] = [(row['date1'], row['date2']) for row in delete_dates] + date_dict['inventory_delete'] = [(row['date1'], row['date2']) for row in inventory_delete_dates] + return date_dict + +def replace_date(query_list, date_tuple_list): + """Replace the date keywords in DELETE queries. 3 date tuples will be applied to the delete query. + + Args: + query_list ([str]): delete query list + date_tuple_list ([(str, str)]): actual delete date + """ + q_updated = [] + for date_tuple in date_tuple_list: + for c in query_list: + c = c.replace("DATE1", date_tuple[0]) + c = c.replace("DATE2", date_tuple[1]) + q_updated.append(c) + return q_updated + def get_maintenance_queries(folder, spec_queries): """get query content from DM query files @@ -16,14 +46,21 @@ def get_maintenance_queries(folder, spec_queries): Returns: dict{str: list[str]}: a dict contains Data Maintenance query name and its content. """ - #TODO: Add delete functions - DM_FUNCS = ['LF_CR', + INSERT_FUNCS = ['LF_CR', 'LF_CS', 'LF_I', 'LF_SR', 'LF_SS', 'LF_WR', 'LF_WS'] + DELETE_FUNCS = ['DF_CS', + 'DF_SS', + 'DF_WS'] + INVENTORY_DELETE_FUNC = ['DF_I'] + DM_FUNCS = INSERT_FUNCS + DELETE_FUNCS + INVENTORY_DELETE_FUNC + # need a spark session to get delete date + spark = SparkSession.builder.appName("GET DELETE DATES").getOrCreate() + delete_date_dict = get_delete_date(spark) if spec_queries: for q in spec_queries: if q not in DM_FUNCS: @@ -35,7 +72,14 @@ def get_maintenance_queries(folder, spec_queries): with open(folder_abs_path + '/' + q + '.sql', 'r') as f: # file content e.g. # " CREATE view ..... ; INSERT into .... ;" + # " DELETE from ..... ; DELETE FROM .... ;" q_content = [ c + ';' for c in f.read().split(';')[:-1]] + if q in DELETE_FUNCS: + # There're 3 date tuples to be replace for one DELETE function + # according to TPC-DS Spec 5.3.11 + q_content = replace_date(q_content, delete_date_dict['delete']) + if q in INVENTORY_DELETE_FUNC: + q_content = replace_date(q_content, delete_date_dict['inventory_delete']) q_dict[q] = q_content return q_dict diff --git a/nds/nds_transcode.py b/nds/nds_transcode.py index 73d14f4..a732541 100644 --- a/nds/nds_transcode.py +++ b/nds/nds_transcode.py @@ -709,6 +709,16 @@ def get_maintenance_schemas(use_decimal): StructField("invn_date", CharType(10), nullable=False), StructField("invn_qty_on_hand", IntegerType()), ]) + + MAINTENANCE_SCHEMAS["delete"] = StructType([ + StructField("date1", StringType(), nullable=False), + StructField("date2", StringType(), nullable=False), + ]) + + MAINTENANCE_SCHEMAS["inventory_delete"] = StructType([ + StructField("date1", StringType(), nullable=False), + StructField("date2", StringType(), nullable=False), + ]) return MAINTENANCE_SCHEMAS # Note the specific partitioning is applied when save the parquet data files. @@ -743,7 +753,7 @@ def store(session, df, filename, output_format, output_mode, use_iceberg, compre """ if use_iceberg: if output_mode == 'overwrite': - session.sql(f"drop table {filename}") + session.sql(f"drop table if exists {filename}") CTAS = f"create table {filename} using iceberg " if filename in TABLE_PARTITIONING.keys(): df.repartition( From cc20271ac7bdc37543e58a77e382c51e56739841 Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Thu, 2 Jun 2022 16:01:00 +0800 Subject: [PATCH 08/17] make DELETE funcs work Signed-off-by: Allen Xu --- nds/data_maintenance/DF_CS.sql | 10 ++-- nds/data_maintenance/DF_I.sql | 6 ++- nds/data_maintenance/DF_SS.sql | 9 ++-- nds/data_maintenance/DF_WS.sql | 9 ++-- nds/nds_maintenance.py | 88 +++++++++++++++++++++++++++------- 5 files changed, 91 insertions(+), 31 deletions(-) diff --git a/nds/data_maintenance/DF_CS.sql b/nds/data_maintenance/DF_CS.sql index cc1f025..1c712b3 100644 --- a/nds/data_maintenance/DF_CS.sql +++ b/nds/data_maintenance/DF_CS.sql @@ -1,6 +1,6 @@ -delete from catalog_returns where cr_order_number in ( -select cs_order_number from catalog_sales where cs_sold_date_sk in (select d_date_sk from date_dim where d_date between 'DATE1' and 'DATE2') -); +select cs_order_number from catalog_sales, date_dim where cs_sold_date_sk=d_date_sk and d_date between 'DATE1' and 'DATE2'; +select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'; +select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'; -delete from catalog_sales where cs_sold_date_sk >= (select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2') and - cs_sold_date_sk <= (select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'); +delete from catalog_returns where cr_order_number in (SQL1); +delete from catalog_sales where cs_sold_date_sk >= (SQL2) and cs_sold_date_sk <= (SQL3); diff --git a/nds/data_maintenance/DF_I.sql b/nds/data_maintenance/DF_I.sql index 5b19f7a..518d6f9 100644 --- a/nds/data_maintenance/DF_I.sql +++ b/nds/data_maintenance/DF_I.sql @@ -1,2 +1,4 @@ -delete from inventory where inv_date_sk >= ( select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2') and - inv_date_sk <= ( select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'); +select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'; +select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'; + +delete from inventory where inv_date_sk >= (SQL1) and inv_date_sk <= (SQL2); diff --git a/nds/data_maintenance/DF_SS.sql b/nds/data_maintenance/DF_SS.sql index 1012c45..671b1c9 100644 --- a/nds/data_maintenance/DF_SS.sql +++ b/nds/data_maintenance/DF_SS.sql @@ -1,3 +1,6 @@ -delete from store_returns where sr_ticket_number in (select ss_ticket_number from store_sales, date_dim where ss_sold_date_sk=d_date_sk and d_date between 'DATE1' and 'DATE2'); -delete from store_sales where ss_sold_date_sk >= (select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2') and - ss_sold_date_sk <= (select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'); +select ss_ticket_number from store_sales, date_dim where ss_sold_date_sk=d_date_sk and d_date between 'DATE1' and 'DATE2'; +select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'; +select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'; + +delete from store_returns where sr_ticket_number in (SQL1); +delete from store_sales where ss_sold_date_sk >= (SQL2) and ss_sold_date_sk <= (SQL3); diff --git a/nds/data_maintenance/DF_WS.sql b/nds/data_maintenance/DF_WS.sql index bdde5a9..7451806 100644 --- a/nds/data_maintenance/DF_WS.sql +++ b/nds/data_maintenance/DF_WS.sql @@ -1,3 +1,6 @@ -delete from web_returns where wr_order_number in (select ws_order_number from web_sales, date_dim where ws_sold_date_sk=d_date_sk and d_date between 'DATE1' and 'DATE2'); -delete from web_sales where ws_sold_date_sk >= (select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2') and - ws_sold_date_sk <= (select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'); +select ws_order_number from web_sales, date_dim where ws_sold_date_sk=d_date_sk and d_date between 'DATE1' and 'DATE2'; +select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'; +select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'; +delete from web_returns where wr_order_number in (SQL1); +delete from web_sales where ws_sold_date_sk >= (SQL2) and + ws_sold_date_sk <= (SQL3); diff --git a/nds/nds_maintenance.py b/nds/nds_maintenance.py index 407aa21..6d0e3cd 100644 --- a/nds/nds_maintenance.py +++ b/nds/nds_maintenance.py @@ -7,6 +7,19 @@ from check import get_abs_path +INSERT_FUNCS = ['LF_CR', + 'LF_CS', + 'LF_I', + 'LF_SR', + 'LF_SS', + 'LF_WR', + 'LF_WS'] +DELETE_FUNCS = ['DF_CS', + 'DF_SS', + 'DF_WS'] +INVENTORY_DELETE_FUNC = ['DF_I'] +DM_FUNCS = INSERT_FUNCS + DELETE_FUNCS + INVENTORY_DELETE_FUNC + def get_delete_date(spark_session): """get delete dates for Data Maintenance. Each delete functions requires 3 tuples: (date1, date2) @@ -46,21 +59,12 @@ def get_maintenance_queries(folder, spec_queries): Returns: dict{str: list[str]}: a dict contains Data Maintenance query name and its content. """ - INSERT_FUNCS = ['LF_CR', - 'LF_CS', - 'LF_I', - 'LF_SR', - 'LF_SS', - 'LF_WR', - 'LF_WS'] - DELETE_FUNCS = ['DF_CS', - 'DF_SS', - 'DF_WS'] - INVENTORY_DELETE_FUNC = ['DF_I'] - DM_FUNCS = INSERT_FUNCS + DELETE_FUNCS + INVENTORY_DELETE_FUNC # need a spark session to get delete date spark = SparkSession.builder.appName("GET DELETE DATES").getOrCreate() delete_date_dict = get_delete_date(spark) + # exclude this "get_delte_date" step from main DM process. + spark.stop() + global DM_FUNCS if spec_queries: for q in spec_queries: if q not in DM_FUNCS: @@ -83,6 +87,50 @@ def get_maintenance_queries(folder, spec_queries): q_dict[q] = q_content return q_dict +def run_insert_query(spark, query_list): + """Run insert query. Insert query contains 3 subqueries. + See data_maintenance/LF_*.sql for details. + + Args: + spark (SparkSession): SparkSession instance. + query_list ([str]): INSERT query list. + """ + for q in query_list: + spark.sql(q) + +def run_delete_query(spark, query_name, query_list): + """Only process DELETE queries as Spark doesn't support it yet. + See https://github.com/NVIDIA/spark-rapids-benchmarks/pull/9#issuecomment-1141956487. + This function runs subqueries first to get subquery results. Then run the main delete query + using the intermediate results. + # See data_maintenance/DF_*.sql for query details. + + Args: + spark (SparkSession): SparkSession instance. + query_name (str): query name to identify DELETE or INVENTORY DELETE queries. + query_list ([str]): DELETE query list. The first 3 queires are subquery results. + The last 2 queries are main DELETE queries. + """ + if query_name in DELETE_FUNCS: + # contains 3 subqueries + # a list "[1,2,3,4,5]", drop box brackets for further SQL process. + list_result = str([x[0] for x in spark.sql(query_list[0]).collect()])[1:-1] + # date results are integer + date1 = str(spark.sql(query_list[1]).collect()[0][0]) + date2 = str(spark.sql(query_list[2]).collect()[0][0]) + main_delete_1 = query_list[3].replace("SQL1", list_result) + main_delete_2 = query_list[4].replace("SQL2", date1) + main_delete_2 = main_delete_2.replace("SQL3", date2) + spark.sql(main_delete_1) + spark.sql(main_delete_2) + if query_name in INVENTORY_DELETE_FUNC: + # contains 2 subqueries + date1 = str(spark.sql(query_list[0]).collect()[0][0]) + date2 = str(spark.sql(query_list[1]).collect()[0][0]) + main_delete = query_list[4].replace("SQL1", date1) + main_delete = main_delete.replace("SQL2", date2) + spark.sql(main_delete) + def run_query(query_dict, time_log_output_path): # TODO: Duplicate code in nds_power.py. Refactor this part, make it general. execution_time_list = [] @@ -101,12 +149,16 @@ def run_query(query_dict, time_log_output_path): spark_session.sparkContext.setJobGroup(query_name, query_name) print(f"====== Run {query_name} ======") q_report = PysparkBenchReport(spark_session) - for q in q_content: - summary = q_report.report_on(spark_session.sql, - q) - print(f"Time taken: {summary['queryTimes']} millis for {query_name}") - execution_time_list.append((spark_app_id, query_name, summary['queryTimes'])) - q_report.write_summary(query_name, prefix="") + if query_name in DELETE_FUNCS + INVENTORY_DELETE_FUNC: + summary = q_report.report_on(run_delete_query,spark_session, + query_name, + q_content) + else: + summary = q_report.report_on(run_insert_query, spark_session, + q_content) + print(f"Time taken: {summary['queryTimes']} millis for {query_name}") + execution_time_list.append((spark_app_id, query_name, summary['queryTimes'])) + q_report.write_summary(query_name, prefix="") spark_session.sparkContext.stop() DM_end = time.time() DM_elapse = DM_end - DM_start From 1c6223c9207dbbd62c24acf51d1d3b86ff639c61 Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Thu, 2 Jun 2022 16:04:37 +0800 Subject: [PATCH 09/17] refine Signed-off-by: Allen Xu --- nds/nds_maintenance.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nds/nds_maintenance.py b/nds/nds_maintenance.py index 6d0e3cd..1a56060 100644 --- a/nds/nds_maintenance.py +++ b/nds/nds_maintenance.py @@ -150,9 +150,9 @@ def run_query(query_dict, time_log_output_path): print(f"====== Run {query_name} ======") q_report = PysparkBenchReport(spark_session) if query_name in DELETE_FUNCS + INVENTORY_DELETE_FUNC: - summary = q_report.report_on(run_delete_query,spark_session, - query_name, - q_content) + summary = q_report.report_on(run_delete_query, spark_session, + query_name, + q_content) else: summary = q_report.report_on(run_insert_query, spark_session, q_content) From 96bac9d3437be2b02b9ecaa6be06819a94b7964f Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Thu, 2 Jun 2022 16:30:14 +0800 Subject: [PATCH 10/17] add DM part in README Signed-off-by: Allen Xu --- nds/README.md | 38 +++++++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/nds/README.md b/nds/README.md index 193ee27..1882f47 100644 --- a/nds/README.md +++ b/nds/README.md @@ -280,4 +280,40 @@ in your environment to make sure all Spark job can get necessary resources to ru otherwise some query application may be in _WAITING_ status(which can be observed from Spark UI or Yarn Resource Manager UI) until enough resources are released. -### NDS2.0 is using source code from TPC-DS Tool V3.2.0 \ No newline at end of file +### Data Maintenance +Data Maintenance performance data update over existed dataset including data INSERT and DELETE. Due +to the limits of Spark that the update operations are not supported yet in Spark, we use +[Iceberg](https://iceberg.apache.org/) as dataset metadata manager to overcome the issue. + +To enable Iceberg, user must set proper configurations. Please refer to [Iceberg Spark](https://iceberg.apache.org/docs/latest/getting-started/) +for details. We also provide a Spark submit template with necessary Iceberg configs: [convert_submit_cpu_iceberg.template](./convert_submit_cpu_iceberg.template) + +The data maintenance queries are in [data_maintenance](./data_maintenance) folder. `DF_*.sql` are +DELETE queries while `LF_*.sql` are INSERT queries. + +Arguments supported for data maintenance: +``` +usage: nds_maintenance.py [-h] [--maintenance_queries MAINTENANCE_QUERIES] maintenance_queries_folder time_log + +positional arguments: + maintenance_queries_folder + folder contains all NDS Data Maintenance queries. If "--maintenance_queries" + is not set, all queries under the folder will beexecuted. + time_log path to execution time log in csv format, only support local path. + +optional arguments: + -h, --help show this help message and exit + --maintenance_queries MAINTENANCE_QUERIES + specify Data Maintenance query names by a comma seprated string. e.g. "LF_CR,LF_CS" +``` + +An example command to run only _LF_CS_ and _DF_CS_ functions: +``` +./spark-submit-template convert_submit_cpu_iceberg.template \ +nds_maintenance.py \ +./data_maintenance \ +time.csv \ +--maintenance_queries LF_CS,DF_CS +``` + +### NDS2.0 is using source code from TPC-DS Tool V3.2.0 From 44c4d399c262888ebdcfcaef3dc04c6d4612572e Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Tue, 14 Jun 2022 15:45:40 +0800 Subject: [PATCH 11/17] fix DM query Signed-off-by: Allen Xu --- nds/data_maintenance/DF_CS.sql | 9 ++-- nds/data_maintenance/DF_I.sql | 6 +-- nds/data_maintenance/DF_SS.sql | 9 ++-- nds/data_maintenance/DF_WS.sql | 9 ++-- nds/nds_maintenance.py | 51 +++---------------- .../java/org/notmysock/tpcds/GenTable.java | 10 ++-- 6 files changed, 24 insertions(+), 70 deletions(-) diff --git a/nds/data_maintenance/DF_CS.sql b/nds/data_maintenance/DF_CS.sql index 1c712b3..144f94b 100644 --- a/nds/data_maintenance/DF_CS.sql +++ b/nds/data_maintenance/DF_CS.sql @@ -1,6 +1,3 @@ -select cs_order_number from catalog_sales, date_dim where cs_sold_date_sk=d_date_sk and d_date between 'DATE1' and 'DATE2'; -select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'; -select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'; - -delete from catalog_returns where cr_order_number in (SQL1); -delete from catalog_sales where cs_sold_date_sk >= (SQL2) and cs_sold_date_sk <= (SQL3); +delete from catalog_returns where cr_order_number in (select cs_order_number from catalog_sales, date_dim where cs_sold_date_sk=d_date_sk and d_date between 'DATE1' and 'DATE2'); +delete from catalog_sales where cs_sold_date_sk >= (select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2') and + cs_sold_date_sk <= (select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'); diff --git a/nds/data_maintenance/DF_I.sql b/nds/data_maintenance/DF_I.sql index 518d6f9..5b19f7a 100644 --- a/nds/data_maintenance/DF_I.sql +++ b/nds/data_maintenance/DF_I.sql @@ -1,4 +1,2 @@ -select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'; -select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'; - -delete from inventory where inv_date_sk >= (SQL1) and inv_date_sk <= (SQL2); +delete from inventory where inv_date_sk >= ( select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2') and + inv_date_sk <= ( select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'); diff --git a/nds/data_maintenance/DF_SS.sql b/nds/data_maintenance/DF_SS.sql index 671b1c9..1012c45 100644 --- a/nds/data_maintenance/DF_SS.sql +++ b/nds/data_maintenance/DF_SS.sql @@ -1,6 +1,3 @@ -select ss_ticket_number from store_sales, date_dim where ss_sold_date_sk=d_date_sk and d_date between 'DATE1' and 'DATE2'; -select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'; -select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'; - -delete from store_returns where sr_ticket_number in (SQL1); -delete from store_sales where ss_sold_date_sk >= (SQL2) and ss_sold_date_sk <= (SQL3); +delete from store_returns where sr_ticket_number in (select ss_ticket_number from store_sales, date_dim where ss_sold_date_sk=d_date_sk and d_date between 'DATE1' and 'DATE2'); +delete from store_sales where ss_sold_date_sk >= (select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2') and + ss_sold_date_sk <= (select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'); diff --git a/nds/data_maintenance/DF_WS.sql b/nds/data_maintenance/DF_WS.sql index 7451806..bdde5a9 100644 --- a/nds/data_maintenance/DF_WS.sql +++ b/nds/data_maintenance/DF_WS.sql @@ -1,6 +1,3 @@ -select ws_order_number from web_sales, date_dim where ws_sold_date_sk=d_date_sk and d_date between 'DATE1' and 'DATE2'; -select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'; -select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'; -delete from web_returns where wr_order_number in (SQL1); -delete from web_sales where ws_sold_date_sk >= (SQL2) and - ws_sold_date_sk <= (SQL3); +delete from web_returns where wr_order_number in (select ws_order_number from web_sales, date_dim where ws_sold_date_sk=d_date_sk and d_date between 'DATE1' and 'DATE2'); +delete from web_sales where ws_sold_date_sk >= (select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2') and + ws_sold_date_sk <= (select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'); diff --git a/nds/nds_maintenance.py b/nds/nds_maintenance.py index 1a56060..8716e47 100644 --- a/nds/nds_maintenance.py +++ b/nds/nds_maintenance.py @@ -87,9 +87,12 @@ def get_maintenance_queries(folder, spec_queries): q_dict[q] = q_content return q_dict -def run_insert_query(spark, query_list): - """Run insert query. Insert query contains 3 subqueries. - See data_maintenance/LF_*.sql for details. +def run_dm_query(spark, query_list): + """Run data maintenance query. + For delete queries, they can run on Spark 3.2.2 but not Spark 3.2.1 + See: https://issues.apache.org/jira/browse/SPARK-39454 + See: data_maintenance/DF_*.sql for insert query details. + See data_maintenance/LF_*.sql for delete query details. Args: spark (SparkSession): SparkSession instance. @@ -98,39 +101,6 @@ def run_insert_query(spark, query_list): for q in query_list: spark.sql(q) -def run_delete_query(spark, query_name, query_list): - """Only process DELETE queries as Spark doesn't support it yet. - See https://github.com/NVIDIA/spark-rapids-benchmarks/pull/9#issuecomment-1141956487. - This function runs subqueries first to get subquery results. Then run the main delete query - using the intermediate results. - # See data_maintenance/DF_*.sql for query details. - - Args: - spark (SparkSession): SparkSession instance. - query_name (str): query name to identify DELETE or INVENTORY DELETE queries. - query_list ([str]): DELETE query list. The first 3 queires are subquery results. - The last 2 queries are main DELETE queries. - """ - if query_name in DELETE_FUNCS: - # contains 3 subqueries - # a list "[1,2,3,4,5]", drop box brackets for further SQL process. - list_result = str([x[0] for x in spark.sql(query_list[0]).collect()])[1:-1] - # date results are integer - date1 = str(spark.sql(query_list[1]).collect()[0][0]) - date2 = str(spark.sql(query_list[2]).collect()[0][0]) - main_delete_1 = query_list[3].replace("SQL1", list_result) - main_delete_2 = query_list[4].replace("SQL2", date1) - main_delete_2 = main_delete_2.replace("SQL3", date2) - spark.sql(main_delete_1) - spark.sql(main_delete_2) - if query_name in INVENTORY_DELETE_FUNC: - # contains 2 subqueries - date1 = str(spark.sql(query_list[0]).collect()[0][0]) - date2 = str(spark.sql(query_list[1]).collect()[0][0]) - main_delete = query_list[4].replace("SQL1", date1) - main_delete = main_delete.replace("SQL2", date2) - spark.sql(main_delete) - def run_query(query_dict, time_log_output_path): # TODO: Duplicate code in nds_power.py. Refactor this part, make it general. execution_time_list = [] @@ -149,13 +119,8 @@ def run_query(query_dict, time_log_output_path): spark_session.sparkContext.setJobGroup(query_name, query_name) print(f"====== Run {query_name} ======") q_report = PysparkBenchReport(spark_session) - if query_name in DELETE_FUNCS + INVENTORY_DELETE_FUNC: - summary = q_report.report_on(run_delete_query, spark_session, - query_name, - q_content) - else: - summary = q_report.report_on(run_insert_query, spark_session, - q_content) + summary = q_report.report_on(run_dm_query, spark_session, + q_content) print(f"Time taken: {summary['queryTimes']} millis for {query_name}") execution_time_list.append((spark_app_id, query_name, summary['queryTimes'])) q_report.write_summary(query_name, prefix="") diff --git a/nds/tpcds-gen/src/main/java/org/notmysock/tpcds/GenTable.java b/nds/tpcds-gen/src/main/java/org/notmysock/tpcds/GenTable.java index 637f1f4..ee86e96 100755 --- a/nds/tpcds-gen/src/main/java/org/notmysock/tpcds/GenTable.java +++ b/nds/tpcds-gen/src/main/java/org/notmysock/tpcds/GenTable.java @@ -193,15 +193,15 @@ public Path genInput(String table, int scale, int parallel, int rangeStart, int String cmd = ""; for(int i = rangeStart; i <= rangeEnd; i++) { if(table.equals("all")) { - cmd += String.format("./dsdgen -dir $DIR -force Y -scale %d -parallel %d -child %d", scale, parallel, i) + cmd += String.format("./dsdgen -dir $DIR -force Y -scale %d -parallel %d -child %d", scale, parallel, i); } else { - cmd += String.format("./dsdgen -dir $DIR -table %s -force Y -scale %d -parallel %d -child %d", table, scale, parallel, i) + cmd += String.format("./dsdgen -dir $DIR -table %s -force Y -scale %d -parallel %d -child %d", table, scale, parallel, i); } if(update != 999999) { - cmd += String.format(" -update %d", update) + cmd += String.format(" -update %d", update); } - cmd += "\n" - out.writeBytes(cmd) + cmd += "\n"; + out.writeBytes(cmd); } out.close(); return in; From e405cf180ce5bb5da000ec7cefb76e640b5a82f0 Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Wed, 15 Jun 2022 17:07:30 +0800 Subject: [PATCH 12/17] refine command line options Signed-off-by: Allen Xu --- nds/README.md | 14 ++--- nds/convert_submit_cpu_iceberg.template | 5 ++ nds/nds_maintenance.py | 53 +++++++++++++++---- nds/nds_transcode.py | 41 ++++++++------ .../java/org/notmysock/tpcds/GenTable.java | 8 +-- 5 files changed, 85 insertions(+), 36 deletions(-) diff --git a/nds/README.md b/nds/README.md index 1882f47..d83bc4a 100644 --- a/nds/README.md +++ b/nds/README.md @@ -114,16 +114,18 @@ optional arguments: --output_mode {overwrite,append,ignore,error,errorifexists,default} save modes as defined by https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes. default value is errorifexists, which is the Spark default behavior. - --output_format {parquet,orc,avro} + --output_format {parquet,orc,avro,iceberg} output data format when converting CSV data sources. --tables TABLES specify table names by a comma seprated string. e.g. 'catalog_page,catalog_sales'. --log_level LOG_LEVEL set log level for Spark driver log. Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN(default: INFO) --floats replace DecimalType with DoubleType when saving parquet files. If not specified, decimal data will be saved. --update transcode the source data or update data - --iceberg Save converted data into Iceberg tables. + --iceberg_write_format {parquet,orc,avro} + File format for the Iceberg table; parquet, avro, or orc --compression COMPRESSION - Parquet compression codec. Iceberg is using gzip as default but spark-rapids plugin does not support it yet, so default it to snappy. + Compression codec when saving Parquet Orc or Iceberg data. Iceberg is using gzip as default but spark-rapids plugin does not support it yet, so default it to snappy. Please refer to https://iceberg.apache.org/docs/latest/configuration/#write-properties for + supported codec for different output format such as Parquet or Avro in Iceberg. Please refer to https://spark.apache.org/docs/latest/sql-data-sources.html for supported codec when writing Parquet Orc or Avro by Spark. Default is Snappy. ``` @@ -281,11 +283,11 @@ otherwise some query application may be in _WAITING_ status(which can be observe Yarn Resource Manager UI) until enough resources are released. ### Data Maintenance -Data Maintenance performance data update over existed dataset including data INSERT and DELETE. Due -to the limits of Spark that the update operations are not supported yet in Spark, we use +Data Maintenance performance data update over existed dataset including data INSERT and DELETE. The +update operations cannot be done atomically on raw Parquet/Orc files, so we use [Iceberg](https://iceberg.apache.org/) as dataset metadata manager to overcome the issue. -To enable Iceberg, user must set proper configurations. Please refer to [Iceberg Spark](https://iceberg.apache.org/docs/latest/getting-started/) +Enabling Iceberg requires additional configuration. Please refer to [Iceberg Spark](https://iceberg.apache.org/docs/latest/getting-started/) for details. We also provide a Spark submit template with necessary Iceberg configs: [convert_submit_cpu_iceberg.template](./convert_submit_cpu_iceberg.template) The data maintenance queries are in [data_maintenance](./data_maintenance) folder. `DF_*.sql` are diff --git a/nds/convert_submit_cpu_iceberg.template b/nds/convert_submit_cpu_iceberg.template index d9d649d..9ab13f9 100644 --- a/nds/convert_submit_cpu_iceberg.template +++ b/nds/convert_submit_cpu_iceberg.template @@ -15,6 +15,11 @@ # limitations under the License. # export SPARK_HOME=YOUR_SPARK_HOME + +# 1. The iceberg-spark-runtime-3.2_2.12:0.13.1 only works on Spark 3.2.x +# Please refer to https://iceberg.apache.org/releases/ for other Spark versions. +# 2. The Iceberg catalog/tables is expected to be in current directory. +# see `spark.sql.catalog.spark_catalog.warehouse`. export SPARK_CONF=("--master" "yarn" "--deploy-mode" "cluster" "--driver-memory" "10G" diff --git a/nds/nds_maintenance.py b/nds/nds_maintenance.py index 8716e47..2345fb6 100644 --- a/nds/nds_maintenance.py +++ b/nds/nds_maintenance.py @@ -1,3 +1,34 @@ +# -*- coding: utf-8 -*- +# +# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ----- +# +# Certain portions of the contents of this file are derived from TPC-DS version 3.2.0 +# (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +# Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +# and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +# available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +# +# You may not use this file except in compliance with the TPC EULA. +# DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results +# obtained using this file are not comparable to published TPC-DS Benchmark results, as the results +# obtained from using this file do not comply with the TPC-DS Benchmark. +# + import argparse import csv import time @@ -7,16 +38,18 @@ from check import get_abs_path -INSERT_FUNCS = ['LF_CR', - 'LF_CS', - 'LF_I', - 'LF_SR', - 'LF_SS', - 'LF_WR', - 'LF_WS'] -DELETE_FUNCS = ['DF_CS', - 'DF_SS', - 'DF_WS'] +INSERT_FUNCS = [ + 'LF_CR', + 'LF_CS', + 'LF_I', + 'LF_SR', + 'LF_SS', + 'LF_WR', + 'LF_WS'] +DELETE_FUNCS = [ + 'DF_CS', + 'DF_SS', + 'DF_WS'] INVENTORY_DELETE_FUNC = ['DF_I'] DM_FUNCS = INSERT_FUNCS + DELETE_FUNCS + INVENTORY_DELETE_FUNC diff --git a/nds/nds_transcode.py b/nds/nds_transcode.py index a732541..c1accad 100644 --- a/nds/nds_transcode.py +++ b/nds/nds_transcode.py @@ -332,7 +332,7 @@ def get_schemas(use_decimal): StructField("sr_store_sk", IntegerType()), StructField("sr_reason_sk", IntegerType()), # Use LongType due to https://github.com/NVIDIA/spark-rapids-benchmarks/pull/9#issuecomment-1138379596 - # Databricks is using LongType as well in their accpeted benchmark reports. + # Databricks is using LongType as well in their accepeted benchmark reports. # See https://www.tpc.org/results/supporting_files/tpcds/databricks~tpcds~100000~databricks_SQL_8.3~sup-1~2021-11-02~v01.zip StructField("sr_ticket_number", LongType(), nullable=False), StructField("sr_return_quantity", IntegerType()), @@ -738,7 +738,7 @@ def load(session, filename, schema, delimiter="|", header="false", prefix=""): return session.read.option("delimiter", delimiter).option("header", header).csv(data_path, schema=schema) -def store(session, df, filename, output_format, output_mode, use_iceberg, compression, prefix=""): +def store(session, df, filename, output_format, output_mode, iceberg_write_format, compression, prefix=""): """Create Iceberg tables by CTAS Args: @@ -751,7 +751,7 @@ def store(session, df, filename, output_format, output_mode, use_iceberg, compre compression (str): Parquet compression codec when saving Iceberg tables prefix (str): output data path when not using Iceberg. """ - if use_iceberg: + if output_format == "iceberg": if output_mode == 'overwrite': session.sql(f"drop table if exists {filename}") CTAS = f"create table {filename} using iceberg " @@ -762,9 +762,12 @@ def store(session, df, filename, output_format, output_mode, use_iceberg, compre CTAS += f"partitioned by ({TABLE_PARTITIONING[filename]})" else: df.coalesce(1).createOrReplaceTempView("temptbl") - CTAS += f" tblproperties('write.format.default' = '{output_format}'" - # the compression-codec won't panic when output_format is not parquet - CTAS += f", 'write.parquet.compression-codec' = '{compression}')" + CTAS += f" tblproperties('write.format.default' = '{iceberg_write_format}'" + # Iceberg now only support compression codec option for Parquet and Avro write. + if iceberg_write_format == "parquet": + CTAS += f", 'write.parquet.compression-codec' = '{compression}')" + elif iceberg_write_format == "avro": + CTAS += f", 'write.avro.compression-codec' = '{compression}')" CTAS += " as select * from temptbl" session.sql(CTAS) else: @@ -773,10 +776,11 @@ def store(session, df, filename, output_format, output_mode, use_iceberg, compre df = df.repartition( col(TABLE_PARTITIONING[filename])).sortWithinPartitions( TABLE_PARTITIONING[filename]) - df.write.format(output_format).mode(output_mode).partitionBy( - TABLE_PARTITIONING[filename]).save(data_path) + df.write.option('compression', compression).format(output_format).mode( + output_mode).partitionBy(TABLE_PARTITIONING[filename]).save(data_path) else: - df.coalesce(1).write.format(output_format).mode(output_mode).save(data_path) + df.coalesce(1).write.option('compression', compression).format( + output_format).mode(output_mode).save(data_path) def transcode(args): session = pyspark.sql.SparkSession.builder \ @@ -810,7 +814,7 @@ def transcode(args): f"{fn}", args.output_format, args.output_mode, - args.iceberg, + args.iceberg_write_format, args.compression, args.output_prefix), number=1) @@ -853,7 +857,7 @@ def transcode(args): default="errorifexists") parser.add_argument( '--output_format', - choices=['parquet', 'orc', 'avro'], + choices=['parquet', 'orc', 'avro', 'iceberg'], default='parquet', help="output data format when converting CSV data sources." ) @@ -875,15 +879,20 @@ def transcode(args): help='transcode the source data or update data' ) parser.add_argument( - '--iceberg', - action='store_true', - help='Save converted data into Iceberg tables.' + '--iceberg_write_format', + choices=['parquet', 'orc', 'avro'], + default='parquet', + help='File format for the Iceberg table; parquet, avro, or orc' ) parser.add_argument( '--compression', default='snappy', - help='Parquet compression codec. Iceberg is using gzip as default but spark-rapids plugin ' + - 'does not support it yet, so default it to snappy.' + help='Compression codec when saving Parquet Orc or Iceberg data. Iceberg is using gzip as' + + ' default but spark-rapids plugin does not support it yet, so default it to snappy.' + + ' Please refer to https://iceberg.apache.org/docs/latest/configuration/#write-properties ' + + ' for supported codec for different output format such as Parquet or Avro in Iceberg.' + + ' Please refer to https://spark.apache.org/docs/latest/sql-data-sources.html' + + ' for supported codec when writing Parquet Orc or Avro by Spark. Default is Snappy.' ) args = parser.parse_args() transcode(args) diff --git a/nds/tpcds-gen/src/main/java/org/notmysock/tpcds/GenTable.java b/nds/tpcds-gen/src/main/java/org/notmysock/tpcds/GenTable.java index ee86e96..2aacc7e 100755 --- a/nds/tpcds-gen/src/main/java/org/notmysock/tpcds/GenTable.java +++ b/nds/tpcds-gen/src/main/java/org/notmysock/tpcds/GenTable.java @@ -99,12 +99,12 @@ public int run(String[] args) throws Exception { } // use 999999 for default update value to avoid user input conflict. - int update = 999999; + Integer update = null; if(line.hasOption("update")) { update = Integer.parseInt(line.getOptionValue("update")); } - if(update < 0) { + if(update != null && update < 0) { // TPC-DS will error if update is < 0 System.err.println("The update value cannot be less than 0, your input: " + update); } @@ -184,7 +184,7 @@ public Path copyJar(File jar) throws Exception { return dst; } - public Path genInput(String table, int scale, int parallel, int rangeStart, int rangeEnd, int update) throws Exception { + public Path genInput(String table, int scale, int parallel, int rangeStart, int rangeEnd, Integer update) throws Exception { long epoch = System.currentTimeMillis()/1000; Path in = new Path("/tmp/"+table+"_"+scale+"-"+epoch); @@ -197,7 +197,7 @@ public Path genInput(String table, int scale, int parallel, int rangeStart, int } else { cmd += String.format("./dsdgen -dir $DIR -table %s -force Y -scale %d -parallel %d -child %d", table, scale, parallel, i); } - if(update != 999999) { + if(update != null) { cmd += String.format(" -update %d", update); } cmd += "\n"; From 9c3515e6e918a960567efc70043757414857795c Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Thu, 16 Jun 2022 11:56:51 +0800 Subject: [PATCH 13/17] refine cmd option Signed-off-by: Allen Xu --- nds/README.md | 4 +-- nds/nds_transcode.py | 32 +++++++++++-------- .../java/org/notmysock/tpcds/GenTable.java | 9 +++--- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/nds/README.md b/nds/README.md index f3ad950..c0de364 100644 --- a/nds/README.md +++ b/nds/README.md @@ -124,8 +124,8 @@ optional arguments: --iceberg_write_format {parquet,orc,avro} File format for the Iceberg table; parquet, avro, or orc --compression COMPRESSION - Compression codec when saving Parquet Orc or Iceberg data. Iceberg is using gzip as default but spark-rapids plugin does not support it yet, so default it to snappy. Please refer to https://iceberg.apache.org/docs/latest/configuration/#write-properties for - supported codec for different output format such as Parquet or Avro in Iceberg. Please refer to https://spark.apache.org/docs/latest/sql-data-sources.html for supported codec when writing Parquet Orc or Avro by Spark. Default is Snappy. + Compression codec when saving Parquet Orc or Iceberg data. Please refer to https://iceberg.apache.org/docs/latest/configuration/#write-properties for supported codec for different output format such as Parquet or Avro in Iceberg. Please refer to + https://spark.apache.org/docs/latest/sql-data-sources.html for supported codec when writing Parquet Orc or Avro by Spark. When not specified, it will use Spark or Iceberg default ones. ``` diff --git a/nds/nds_transcode.py b/nds/nds_transcode.py index 968d45e..332fef4 100644 --- a/nds/nds_transcode.py +++ b/nds/nds_transcode.py @@ -332,7 +332,7 @@ def get_schemas(use_decimal): StructField("sr_store_sk", IntegerType()), StructField("sr_reason_sk", IntegerType()), # Use LongType due to https://github.com/NVIDIA/spark-rapids-benchmarks/pull/9#issuecomment-1138379596 - # Databricks is using LongType as well in their accepeted benchmark reports. + # Databricks is using LongType as well in their accepted benchmark reports. # See https://www.tpc.org/results/supporting_files/tpcds/databricks~tpcds~100000~databricks_SQL_8.3~sup-1~2021-11-02~v01.zip StructField("sr_ticket_number", LongType(), nullable=False), StructField("sr_return_quantity", IntegerType()), @@ -748,7 +748,7 @@ def store(session, df, filename, output_format, output_mode, iceberg_write_forma output_format (str): parquet, orc or avro write_mode (str): save modes as defined by "https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes. use_iceberg (bool): write data into Iceberg tables - compression (str): Parquet compression codec when saving Iceberg tables + compression (str): compression codec for converted data when saving to disk prefix (str): output data path when not using Iceberg. """ if output_format == "iceberg": @@ -764,10 +764,12 @@ def store(session, df, filename, output_format, output_mode, iceberg_write_forma df.coalesce(1).createOrReplaceTempView("temptbl") CTAS += f" tblproperties('write.format.default' = '{iceberg_write_format}'" # Iceberg now only support compression codec option for Parquet and Avro write. - if iceberg_write_format == "parquet": - CTAS += f", 'write.parquet.compression-codec' = '{compression}')" - elif iceberg_write_format == "avro": - CTAS += f", 'write.avro.compression-codec' = '{compression}')" + if compression: + if iceberg_write_format == "parquet": + CTAS += f", 'write.parquet.compression-codec' = '{compression}'" + elif iceberg_write_format == "avro": + CTAS += f", 'write.avro.compression-codec' = '{compression}'" + CTAS += ")" CTAS += " as select * from temptbl" session.sql(CTAS) else: @@ -776,11 +778,16 @@ def store(session, df, filename, output_format, output_mode, iceberg_write_forma df = df.repartition( col(TABLE_PARTITIONING[filename])).sortWithinPartitions( TABLE_PARTITIONING[filename]) - df.write.option('compression', compression).format(output_format).mode( + writer = df.write + if compression: + writer = writer.option('compression', compression) + writer.format(output_format).mode( output_mode).partitionBy(TABLE_PARTITIONING[filename]).save(data_path) else: - df.coalesce(1).write.option('compression', compression).format( - output_format).mode(output_mode).save(data_path) + writer = df.coalesce(1).write + if compression: + writer = writer.option('compression', compression) + writer.format(output_format).mode(output_mode).save(data_path) def transcode(args): session = pyspark.sql.SparkSession.builder \ @@ -886,13 +893,12 @@ def transcode(args): ) parser.add_argument( '--compression', - default='snappy', - help='Compression codec when saving Parquet Orc or Iceberg data. Iceberg is using gzip as' + - ' default but spark-rapids plugin does not support it yet, so default it to snappy.' + + help='Compression codec when saving Parquet Orc or Iceberg data.' + ' Please refer to https://iceberg.apache.org/docs/latest/configuration/#write-properties ' + ' for supported codec for different output format such as Parquet or Avro in Iceberg.' + ' Please refer to https://spark.apache.org/docs/latest/sql-data-sources.html' + - ' for supported codec when writing Parquet Orc or Avro by Spark. Default is Snappy.' + ' for supported codec when writing Parquet Orc or Avro by Spark.' + + ' When not specified, it will use Spark or Iceberg default ones.' ) args = parser.parse_args() transcode(args) diff --git a/nds/tpcds-gen/src/main/java/org/notmysock/tpcds/GenTable.java b/nds/tpcds-gen/src/main/java/org/notmysock/tpcds/GenTable.java index 2aacc7e..433ee29 100755 --- a/nds/tpcds-gen/src/main/java/org/notmysock/tpcds/GenTable.java +++ b/nds/tpcds-gen/src/main/java/org/notmysock/tpcds/GenTable.java @@ -102,12 +102,13 @@ public int run(String[] args) throws Exception { Integer update = null; if(line.hasOption("update")) { update = Integer.parseInt(line.getOptionValue("update")); + if(update < 0) { + // TPC-DS will error if update is < 0 + System.err.println("The update value cannot be less than 0, your input: " + update); + } } - if(update != null && update < 0) { - // TPC-DS will error if update is < 0 - System.err.println("The update value cannot be less than 0, your input: " + update); - } + if(parallel == 1 || scale == 1) { System.err.println("The MR task does not work for scale=1 or parallel=1"); From 36876fc6fad725fd4c56a76c8277f60bd7df3cfe Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Thu, 16 Jun 2022 23:05:38 +0800 Subject: [PATCH 14/17] Update nds/README.md Co-authored-by: Jason Lowe --- nds/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nds/README.md b/nds/README.md index c0de364..a39ff48 100644 --- a/nds/README.md +++ b/nds/README.md @@ -124,8 +124,8 @@ optional arguments: --iceberg_write_format {parquet,orc,avro} File format for the Iceberg table; parquet, avro, or orc --compression COMPRESSION - Compression codec when saving Parquet Orc or Iceberg data. Please refer to https://iceberg.apache.org/docs/latest/configuration/#write-properties for supported codec for different output format such as Parquet or Avro in Iceberg. Please refer to - https://spark.apache.org/docs/latest/sql-data-sources.html for supported codec when writing Parquet Orc or Avro by Spark. When not specified, it will use Spark or Iceberg default ones. + Compression codec to use when saving data. See https://iceberg.apache.org/docs/latest/configuration/#write-properties for supported codecs in Iceberg. See + https://spark.apache.org/docs/latest/sql-data-sources.html for supported codecs for Spark built-in formats. When not specified, the default for the requested output format will be used. ``` From 9c37ed81b7455de369e1fb8d208f4a0f81db8939 Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Thu, 16 Jun 2022 23:05:54 +0800 Subject: [PATCH 15/17] Update nds/nds_transcode.py Co-authored-by: Jason Lowe --- nds/nds_transcode.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/nds/nds_transcode.py b/nds/nds_transcode.py index 332fef4..15a5d49 100644 --- a/nds/nds_transcode.py +++ b/nds/nds_transcode.py @@ -893,12 +893,12 @@ def transcode(args): ) parser.add_argument( '--compression', - help='Compression codec when saving Parquet Orc or Iceberg data.' + - ' Please refer to https://iceberg.apache.org/docs/latest/configuration/#write-properties ' + - ' for supported codec for different output format such as Parquet or Avro in Iceberg.' + - ' Please refer to https://spark.apache.org/docs/latest/sql-data-sources.html' + - ' for supported codec when writing Parquet Orc or Avro by Spark.' + - ' When not specified, it will use Spark or Iceberg default ones.' + help='Compression codec to use when saving data.' + + ' See https://iceberg.apache.org/docs/latest/configuration/#write-properties ' + + ' for supported codecs in Iceberg.' + + ' See https://spark.apache.org/docs/latest/sql-data-sources.html' + + ' for supported codecs for Spark built-in formats.' + + ' When not specified, the default for the requested output format will be used.' ) args = parser.parse_args() transcode(args) From 6e82a79cdef1b89b8bebe765e02013865465ccff Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Mon, 20 Jun 2022 14:17:33 +0800 Subject: [PATCH 16/17] add copyright and refine README Signed-off-by: Allen Xu --- nds/README.md | 5 +++++ nds/data_maintenance/DF_CS.sql | 30 ++++++++++++++++++++++++++++++ nds/data_maintenance/DF_I.sql | 30 ++++++++++++++++++++++++++++++ nds/data_maintenance/DF_SS.sql | 30 ++++++++++++++++++++++++++++++ nds/data_maintenance/DF_WS.sql | 30 ++++++++++++++++++++++++++++++ nds/data_maintenance/LF_CR.sql | 30 ++++++++++++++++++++++++++++++ nds/data_maintenance/LF_CS.sql | 30 ++++++++++++++++++++++++++++++ nds/data_maintenance/LF_I.sql | 30 ++++++++++++++++++++++++++++++ nds/data_maintenance/LF_SR.sql | 30 ++++++++++++++++++++++++++++++ nds/data_maintenance/LF_SS.sql | 30 ++++++++++++++++++++++++++++++ nds/data_maintenance/LF_WR.sql | 30 ++++++++++++++++++++++++++++++ nds/data_maintenance/LF_WS.sql | 30 ++++++++++++++++++++++++++++++ 12 files changed, 335 insertions(+) diff --git a/nds/README.md b/nds/README.md index c0de364..2fa67c7 100644 --- a/nds/README.md +++ b/nds/README.md @@ -293,6 +293,11 @@ for details. We also provide a Spark submit template with necessary Iceberg conf The data maintenance queries are in [data_maintenance](./data_maintenance) folder. `DF_*.sql` are DELETE queries while `LF_*.sql` are INSERT queries. +Note: The Delete functions in Data Maintenance cannot run successfully in Spark 3.2.0 and 3.2.1 due +to a known Spark [issue](https://issues.apache.org/jira/browse/SPARK-39454). User can run it in Spark 3.2.2 +or later. More details including work-around for version 3.2.0 and 3.2.1 could be found in this +[link](https://github.com/NVIDIA/spark-rapids-benchmarks/pull/9#issuecomment-1141956487) + Arguments supported for data maintenance: ``` usage: nds_maintenance.py [-h] [--maintenance_queries MAINTENANCE_QUERIES] maintenance_queries_folder time_log diff --git a/nds/data_maintenance/DF_CS.sql b/nds/data_maintenance/DF_CS.sql index 144f94b..3cd05c7 100644 --- a/nds/data_maintenance/DF_CS.sql +++ b/nds/data_maintenance/DF_CS.sql @@ -1,3 +1,33 @@ +-- +-- SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +-- ----- +-- +-- Certain portions of the contents of this file are derived from TPC-DS version 3.2.0 +-- (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +-- Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +-- and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +-- available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +-- +-- You may not use this file except in compliance with the TPC EULA. +-- DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results +-- obtained using this file are not comparable to published TPC-DS Benchmark results, as the results +-- obtained from using this file do not comply with the TPC-DS Benchmark. +-- + delete from catalog_returns where cr_order_number in (select cs_order_number from catalog_sales, date_dim where cs_sold_date_sk=d_date_sk and d_date between 'DATE1' and 'DATE2'); delete from catalog_sales where cs_sold_date_sk >= (select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2') and cs_sold_date_sk <= (select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'); diff --git a/nds/data_maintenance/DF_I.sql b/nds/data_maintenance/DF_I.sql index 5b19f7a..b062973 100644 --- a/nds/data_maintenance/DF_I.sql +++ b/nds/data_maintenance/DF_I.sql @@ -1,2 +1,32 @@ +-- +-- SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +-- ----- +-- +-- Certain portions of the contents of this file are derived from TPC-DS version 3.2.0 +-- (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +-- Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +-- and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +-- available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +-- +-- You may not use this file except in compliance with the TPC EULA. +-- DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results +-- obtained using this file are not comparable to published TPC-DS Benchmark results, as the results +-- obtained from using this file do not comply with the TPC-DS Benchmark. +-- + delete from inventory where inv_date_sk >= ( select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2') and inv_date_sk <= ( select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'); diff --git a/nds/data_maintenance/DF_SS.sql b/nds/data_maintenance/DF_SS.sql index 1012c45..b7a605f 100644 --- a/nds/data_maintenance/DF_SS.sql +++ b/nds/data_maintenance/DF_SS.sql @@ -1,3 +1,33 @@ +-- +-- SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +-- ----- +-- +-- Certain portions of the contents of this file are derived from TPC-DS version 3.2.0 +-- (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +-- Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +-- and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +-- available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +-- +-- You may not use this file except in compliance with the TPC EULA. +-- DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results +-- obtained using this file are not comparable to published TPC-DS Benchmark results, as the results +-- obtained from using this file do not comply with the TPC-DS Benchmark. +-- + delete from store_returns where sr_ticket_number in (select ss_ticket_number from store_sales, date_dim where ss_sold_date_sk=d_date_sk and d_date between 'DATE1' and 'DATE2'); delete from store_sales where ss_sold_date_sk >= (select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2') and ss_sold_date_sk <= (select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'); diff --git a/nds/data_maintenance/DF_WS.sql b/nds/data_maintenance/DF_WS.sql index bdde5a9..98e0175 100644 --- a/nds/data_maintenance/DF_WS.sql +++ b/nds/data_maintenance/DF_WS.sql @@ -1,3 +1,33 @@ +-- +-- SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +-- ----- +-- +-- Certain portions of the contents of this file are derived from TPC-DS version 3.2.0 +-- (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +-- Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +-- and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +-- available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +-- +-- You may not use this file except in compliance with the TPC EULA. +-- DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results +-- obtained using this file are not comparable to published TPC-DS Benchmark results, as the results +-- obtained from using this file do not comply with the TPC-DS Benchmark. +-- + delete from web_returns where wr_order_number in (select ws_order_number from web_sales, date_dim where ws_sold_date_sk=d_date_sk and d_date between 'DATE1' and 'DATE2'); delete from web_sales where ws_sold_date_sk >= (select min(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2') and ws_sold_date_sk <= (select max(d_date_sk) from date_dim where d_date between 'DATE1' and 'DATE2'); diff --git a/nds/data_maintenance/LF_CR.sql b/nds/data_maintenance/LF_CR.sql index f4ee1ca..54b7e6f 100644 --- a/nds/data_maintenance/LF_CR.sql +++ b/nds/data_maintenance/LF_CR.sql @@ -1,3 +1,33 @@ +-- +-- SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +-- ----- +-- +-- Certain portions of the contents of this file are derived from TPC-DS version 3.2.0 +-- (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +-- Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +-- and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +-- available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +-- +-- You may not use this file except in compliance with the TPC EULA. +-- DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results +-- obtained using this file are not comparable to published TPC-DS Benchmark results, as the results +-- obtained from using this file do not comply with the TPC-DS Benchmark. +-- + DROP VIEW IF EXISTS crv; CREATE VIEW crv as SELECT d_date_sk cr_returned_date_sk diff --git a/nds/data_maintenance/LF_CS.sql b/nds/data_maintenance/LF_CS.sql index fac5e55..4a722ec 100644 --- a/nds/data_maintenance/LF_CS.sql +++ b/nds/data_maintenance/LF_CS.sql @@ -1,3 +1,33 @@ +-- +-- SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +-- ----- +-- +-- Certain portions of the contents of this file are derived from TPC-DS version 3.2.0 +-- (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +-- Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +-- and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +-- available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +-- +-- You may not use this file except in compliance with the TPC EULA. +-- DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results +-- obtained using this file are not comparable to published TPC-DS Benchmark results, as the results +-- obtained from using this file do not comply with the TPC-DS Benchmark. +-- + DROP VIEW IF EXISTS csv; CREATE view csv as SELECT d1.d_date_sk cs_sold_date_sk diff --git a/nds/data_maintenance/LF_I.sql b/nds/data_maintenance/LF_I.sql index 8057eb8..a787c09 100644 --- a/nds/data_maintenance/LF_I.sql +++ b/nds/data_maintenance/LF_I.sql @@ -1,3 +1,33 @@ +-- +-- SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +-- ----- +-- +-- Certain portions of the contents of this file are derived from TPC-DS version 3.2.0 +-- (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +-- Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +-- and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +-- available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +-- +-- You may not use this file except in compliance with the TPC EULA. +-- DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results +-- obtained using this file are not comparable to published TPC-DS Benchmark results, as the results +-- obtained from using this file do not comply with the TPC-DS Benchmark. +-- + DROP VIEW IF EXISTS iv; CREATE view iv AS SELECT d_date_sk inv_date_sk, diff --git a/nds/data_maintenance/LF_SR.sql b/nds/data_maintenance/LF_SR.sql index d035a37..d7484c2 100644 --- a/nds/data_maintenance/LF_SR.sql +++ b/nds/data_maintenance/LF_SR.sql @@ -1,3 +1,33 @@ +-- +-- SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +-- ----- +-- +-- Certain portions of the contents of this file are derived from TPC-DS version 3.2.0 +-- (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +-- Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +-- and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +-- available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +-- +-- You may not use this file except in compliance with the TPC EULA. +-- DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results +-- obtained using this file are not comparable to published TPC-DS Benchmark results, as the results +-- obtained from using this file do not comply with the TPC-DS Benchmark. +-- + DROP VIEW IF EXISTS srv; CREATE view srv as SELECT d_date_sk sr_returned_date_sk diff --git a/nds/data_maintenance/LF_SS.sql b/nds/data_maintenance/LF_SS.sql index 4d04084..fef2d8a 100644 --- a/nds/data_maintenance/LF_SS.sql +++ b/nds/data_maintenance/LF_SS.sql @@ -1,3 +1,33 @@ +-- +-- SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +-- ----- +-- +-- Certain portions of the contents of this file are derived from TPC-DS version 3.2.0 +-- (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +-- Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +-- and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +-- available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +-- +-- You may not use this file except in compliance with the TPC EULA. +-- DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results +-- obtained using this file are not comparable to published TPC-DS Benchmark results, as the results +-- obtained from using this file do not comply with the TPC-DS Benchmark. +-- + DROP VIEW IF EXISTS ssv; CREATE view ssv as SELECT d_date_sk ss_sold_date_sk, diff --git a/nds/data_maintenance/LF_WR.sql b/nds/data_maintenance/LF_WR.sql index d0694fd..804b14a 100644 --- a/nds/data_maintenance/LF_WR.sql +++ b/nds/data_maintenance/LF_WR.sql @@ -1,3 +1,33 @@ +-- +-- SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +-- ----- +-- +-- Certain portions of the contents of this file are derived from TPC-DS version 3.2.0 +-- (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +-- Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +-- and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +-- available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +-- +-- You may not use this file except in compliance with the TPC EULA. +-- DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results +-- obtained using this file are not comparable to published TPC-DS Benchmark results, as the results +-- obtained from using this file do not comply with the TPC-DS Benchmark. +-- + DROP VIEW IF EXISTS wrv; CREATE VIEW wrv AS SELECT d_date_sk wr_return_date_sk diff --git a/nds/data_maintenance/LF_WS.sql b/nds/data_maintenance/LF_WS.sql index acaf674..3c61751 100644 --- a/nds/data_maintenance/LF_WS.sql +++ b/nds/data_maintenance/LF_WS.sql @@ -1,3 +1,33 @@ +-- +-- SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +-- SPDX-License-Identifier: Apache-2.0 +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +-- ----- +-- +-- Certain portions of the contents of this file are derived from TPC-DS version 3.2.0 +-- (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +-- Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +-- and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +-- available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +-- +-- You may not use this file except in compliance with the TPC EULA. +-- DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results +-- obtained using this file are not comparable to published TPC-DS Benchmark results, as the results +-- obtained from using this file do not comply with the TPC-DS Benchmark. +-- + DROP VIEW IF EXISTS wsv; CREATE VIEW wsv AS SELECT d1.d_date_sk ws_sold_date_sk, From 220e3b412aa860519c3cdfa4795c5e7af178bdf8 Mon Sep 17 00:00:00 2001 From: Allen Xu Date: Tue, 21 Jun 2022 10:48:10 +0800 Subject: [PATCH 17/17] refine readme Signed-off-by: Allen Xu --- nds/README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/nds/README.md b/nds/README.md index df9ed20..c911e9b 100644 --- a/nds/README.md +++ b/nds/README.md @@ -99,8 +99,7 @@ otherwise DecimalType will be saved. arguments for `nds_transcode.py`: ``` python nds_transcode.py -h -usage: nds_transcode.py [-h] [--output_mode {overwrite,append,ignore,error,errorifexists}] [--output_format {parquet,orc,avro,iceberg}] [--tables TABLES] [--log_level LOG_LEVEL] [--floats] [--update] [--iceberg_write_format {parquet,orc,avro}] [--compression COMPRESSION] - input_prefix output_prefix report_file +usage: nds_transcode.py [-h] [--output_mode {overwrite,append,ignore,error,errorifexists}] [--output_format {parquet,orc,avro,iceberg}] [--tables TABLES] [--log_level LOG_LEVEL] [--floats] [--update] [--iceberg_write_format {parquet,orc,avro}] [--compression COMPRESSION] input_prefix output_prefix report_file positional arguments: input_prefix text to prepend to every input file path (e.g., "hdfs:///ds-generated-data"; the