Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial support for Data Maintenance #9

Merged
merged 21 commits into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nds/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ optional arguments:
--overwrite_output overwrite if there has already existing data in the path provided.
--replication REPLICATION
the number of replication factor when generating data to HDFS. if not set, the Hadoop job will use the setting in the Hadoop cluster.
--update UPDATE generate update dataset <n>. <n> is identical to the number of streams used in the Throughput Tests of the benchmark
```

Example command:
Expand Down
10 changes: 9 additions & 1 deletion nds/convert_submit_cpu.template
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,13 @@ export SPARK_CONF=("--master" "yarn"
"--num-executors" "8"
"--executor-memory" "40G"
"--executor-cores" "12"
"--conf" "spark.task.cpus=1")
"--conf" "spark.task.cpus=1"
"--packages" "org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.1"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the first character seems not aligned.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we consider the case of not using iceberg?
In that case, the parameters for iceberg should be enabled by some options.

Copy link
Collaborator Author

@wjxiz1992 wjxiz1992 May 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jlowe What's our plan/strategy to do the WHOLE NDS test? Are we going to do them all based on Iceberg? For #8 I've made the transcode step to save the data ONLY to Iceberg, should we keep our old way to save them just to a folder? The old way may be more friendly to users who doesn't know Iceberg, but eventually, if they want to perform the whole NDS test including Data Maintenance, they will come back to do Iceberg writing again... Any suggestions for this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To do the entire NDS suite one would need to use a format that supports the entire set of operations required by the entire suite. That would mean using Iceberg, Delta Lake, or some other format that allows incremental table update.

I've made the transcode step to save the data ONLY to Iceberg

That's not desired. We want to support transcoding to a bunch of different formats, because we're not always going to run the entire suite. We get a lot of useful information from running the significant portion of NDS that works on raw Parquet and ORC files, and we do not want to lose the ability to setup those benchmarks. The transcode needs to be flexible, allowing outputs ideally to every major output format that we want to bench. For now that definitely includes raw Parquet and ORC along with Iceberg (and the ability to control settings for these formats such as compression codec, probably via separate configs spec'd either inline or sideband in the Spark instance to use).

"--conf" "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
"--conf" "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog"
"--conf" "spark.sql.catalog.spark_catalog.type=hadoop"
"--conf" "spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog"
"--conf" "spark.sql.catalog.local.type=hadoop"
"--conf" "spark.sql.catalog.local.warehouse=$PWD/local-warehouse"
"--conf" "spark.sql.catalog.spark_catalog.warehouse=$PWD/spark-warehouse")

31 changes: 29 additions & 2 deletions nds/nds_gen_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

check_version()

table_names = [
source_table_names = [
'call_center',
'catalog_page',
'catalog_returns',
Expand Down Expand Up @@ -66,21 +66,37 @@
'web_site',
]

maintenance_table_names = [
's_catalog_order',
's_catalog_order_lineitem',
's_catalog_returns',
's_inventory',
's_purchase',
's_purchase_lineitem',
's_store_returns',
's_web_order',
's_web_order_lineitem',
's_web_returns'
]

def clean_temp_data(temp_data_path):
cmd = ['hadoop', 'fs', '-rm', '-r', '-skipTrash', temp_data_path]
print(" ".join(cmd))
subprocess.run(cmd)


def merge_temp_tables(temp_data_path, parent_data_path):
def merge_temp_tables(temp_data_path, parent_data_path, update):
"""Helper functions for incremental data generation. Move data in temporary child range path to
parent directory.

Args:
temp_data_path (str): temorary child range data path
parent_data_path (str): parent data path
"""
if update:
table_names = maintenance_table_names
else:
table_names = source_table_names
for table_name in table_names:
# manually create table sub-folders
# redundent step if it's not the first range part.
Expand Down Expand Up @@ -123,6 +139,8 @@ def generate_data_hdfs(args, jar_path):
tpcds_gen_path = jar_path.parent.parent.absolute()
if args.overwrite_output:
cmd += ['-o']
if args.update:
cmd += ["-u", args.update]
if args.range:
# use a temp folder to save the specific range data.
# will move the content to parent folder afterwards.
Expand Down Expand Up @@ -179,6 +197,8 @@ def generate_data_local(args, range_start, range_end, tool_path):
"-verbose", "Y"]
if args.overwrite_output:
dsdgen_args += ["-force", "Y"]
if args.update:
dsdgen_args += ["-update", args.update]
procs.append(subprocess.Popen(
["./dsdgen"] + dsdgen_args, cwd=str(work_dir)))
# wait for data generation to complete
Expand All @@ -188,6 +208,10 @@ def generate_data_local(args, range_start, range_end, tool_path):
print("dsdgen failed with return code {}".format(p.returncode))
raise Exception("dsdgen failed")
# move multi-partition files into table folders
if args.update:
table_names = maintenance_table_names
else:
table_names = source_table_names
for table in table_names:
print('mkdir -p {}/{}'.format(data_dir, table))
subprocess.run(['mkdir', '-p', data_dir + '/' + table])
Expand Down Expand Up @@ -235,6 +259,9 @@ def generate_data(args):
parser.add_argument("--replication",
help="the number of replication factor when generating data to HDFS. " +
"if not set, the Hadoop job will use the setting in the Hadoop cluster.")
parser.add_argument("--update",
help="generate update dataset <n>. <n> is identical to the number of " +
"streams used in the Throughput Tests of the benchmark")


args = parser.parse_args()
Expand Down
102 changes: 102 additions & 0 deletions nds/nds_maintenance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import argparse
import csv
import time

from pyspark.sql import SparkSession
from PysparkBenchReport import PysparkBenchReport

from check import get_abs_path

def get_maintenance_queries(folder, spec_queries):
"""get query content from DM query files

Args:
folder (str): folder to Data Maintenance query files
spec_queries (list[str]): specific target Data Maintenance queries
Returns:
dict{str: list[str]}: a dict contains Data Maintenance query name and its content.
"""
#TODO: Add delete functions
DM_FUNCS = ['LF_CR',
'LF_CS',
'LF_I',
'LF_SR',
'LF_SS',
'LF_WR',
'LF_WS']
if spec_queries:
for q in spec_queries:
if q not in DM_FUNCS:
raise Exception(f"invalid Data Maintenance query: {q}. Valid are: {DM_FUNCS}")
DM_FUNCS = [q for q in spec_queries if q in DM_FUNCS]
folder_abs_path = get_abs_path(folder)
q_dict = {}
for q in DM_FUNCS:
with open(folder_abs_path + '/' + q + '.sql', 'r') as f:
# file content e.g.
# " CREATE view ..... ; INSERT into .... ;"
q_content = [ c + ';' for c in f.read().split(';')[:-1]]
q_dict[q] = q_content
return q_dict

def run_query(query_dict, time_log_output_path):
# TODO: Duplicate code in nds_power.py. Refactor this part, make it general.
execution_time_list = []
total_time_start = time.time()
if len(query_dict) == 1:
app_name = "NDS - Data Maintenance - " + list(query_dict.keys())[0]
else:
app_name = "NDS - Data Maintenance"

spark_session = SparkSession.builder.appName(
app_name).getOrCreate()
spark_app_id = spark_session.sparkContext.applicationId
DM_start = time.time()
for query_name, q_content in query_dict.items():
# show query name in Spark web UI
spark_session.sparkContext.setJobGroup(query_name, query_name)
print(f"====== Run {query_name} ======")
q_report = PysparkBenchReport(spark_session)
for q in q_content:
summary = q_report.report_on(spark_session.sql,
q)
print(f"Time taken: {summary['queryTimes']} millis for {query_name}")
execution_time_list.append((spark_app_id, query_name, summary['queryTimes']))
q_report.write_summary(query_name, prefix="")
spark_session.sparkContext.stop()
DM_end = time.time()
DM_elapse = DM_end - DM_start
total_elapse = DM_end - total_time_start
print("====== Data Maintenance Time: {} s ======".format(DM_elapse))
print("====== Total Time: {} s ======".format(total_elapse))
execution_time_list.append(
(spark_app_id, "Data Maintenance Time", DM_elapse))
execution_time_list.append(
(spark_app_id, "Total Time", total_elapse))

# write to local csv file
header = ["application_id", "query", "time/s"]
with open(time_log_output_path, 'w', encoding='UTF8') as f:
writer = csv.writer(f)
writer.writerow(header)
writer.writerows(execution_time_list)


if __name__ == "__main__":
parser = parser = argparse.ArgumentParser()
parser.add_argument('maintenance_queries_folder',
help='folder contains all NDS Data Maintenance queries. If ' +
'"--maintenance_queries" is not set, all queries under the folder will be' +
'executed.')
parser.add_argument('time_log',
help='path to execution time log, only support local path.',
default="")
parser.add_argument('--maintenance_queries',
type=lambda s: s.split(','),
help='specify Data Maintenance query names by a comma seprated string.' +
' e.g. "LF_CR,LF_CS"')

args = parser.parse_args()
query_dict = get_maintenance_queries(args.maintenance_queries_folder,
args.maintenance_queries)
run_query(query_dict, args.time_log)
Loading