Skip to content

Commit

Permalink
Adding keep_sc support nds-h
Browse files Browse the repository at this point in the history
Signed-off-by: Sayed Bilal Bari <sbari@nvidia.com>
  • Loading branch information
bilalbari committed Jul 11, 2024
1 parent a97870f commit cd7700e
Showing 1 changed file with 18 additions and 10 deletions.
28 changes: 18 additions & 10 deletions nds-h/nds_h_power.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,27 +171,28 @@ def run_query_stream(input_prefix,
time_log_output_path,
input_format="parquet",
output_path=None,
keep_sc=False,
output_format="parquet"):
"""run SQL in Spark and record execution time log. The execution time log is saved as a CSV file
for easy accesibility. TempView Creation time is also recorded.
Args:
input_prefix (str): path of input data or warehouse if input_format is "iceberg" or hive_external=True.
query_dict (OrderedDict): ordered dict {query_name: query_content} of all TPC-DS queries runnable in Spark
time_log_output_path (str): path of the log that contains query execution time, both local
:param input_prefix : path of input data or warehouse if input_format is "iceberg" or hive_external=True.
:param property_file: property file for Spark configuration.
:param query_dict : ordered dict {query_name: query_content} of all TPC-DS queries runnable in Spark
:param time_log_output_path : path of the log that contains query execution time, both local
and HDFS path are supported.
input_format (str, optional): type of input data source.
use_deciaml(bool, optional): use decimal type for certain columns when loading data of text type.
output_path (str, optional): path of query output, optinal. If not specified, collect()
:param input_format : type of input data source.
:param output_path : path of query output, optinal. If not specified, collect()
action will be applied to each query. Defaults to None.
output_format (str, optional): query output format, choices are csv, orc, parquet. Defaults
to "parquet".
:param output_format : query output format, choices are csv, orc, parquet. Defaults to "parquet".
:param keep_sc : Databricks specific to keep the spark context alive. Defaults to False.
"""
queries_reports = []
execution_time_list = []
total_time_start = time.time()
# check if it's running specific query or Power Run
app_name = "NDS - Power Run"
app_name = "NDS-H - Power Run"
# Execute Power Run or Specific query in Spark
# build Spark Session
session_builder = SparkSession.builder
Expand Down Expand Up @@ -223,7 +224,8 @@ def run_query_stream(input_prefix,
queries_reports.append(q_report)
power_end = int(time.time())
power_elapse = int((power_end - power_start)*1000)
spark_session.sparkContext.stop()
if not keep_sc:
spark_session.sparkContext.stop()
total_time_end = time.time()
total_elapse = int((total_time_end - total_time_start)*1000)
print("====== Power Test Time: {} milliseconds ======".format(power_elapse))
Expand Down Expand Up @@ -278,6 +280,11 @@ def load_properties(filename):
'absolute. Issue: https://github.com/delta-io/delta/issues/555')
parser.add_argument('query_stream_file',
help='query stream file that contains NDS queries in specific order')
parser.add_argument('--keep_sc',
action='store_true',
help='Keep SparkContext alive after running all queries. This is a ' +
'limitation on Databricks runtime environment. User should always '
'attach this flag when running on Databricks.')
parser.add_argument('time_log',
help='path to execution time log, only support local path.',
default="")
Expand All @@ -303,4 +310,5 @@ def load_properties(filename):
args.time_log,
args.input_format,
args.output_prefix,
args.keep_sc,
args.output_format)

0 comments on commit cd7700e

Please sign in to comment.