From b0be3757998079841692a4c9d38b4840eca937a5 Mon Sep 17 00:00:00 2001 From: Cong Liu Date: Tue, 15 Oct 2024 02:57:57 +0000 Subject: [PATCH] Add the ability to benchmark multiple models concurrently. This is useful for benchmarking multiple LoRA adapters. - Also fix the latency_throughput_curve.sh to parse non-integer request rate properly. - Also added "errors" to the benchmark results. --- .../container/benchmark_serving.py | 276 +++++++++++------- .../container/latency_throughput_curve.sh | 12 +- .../benchmark/tools/profile-generator/main.tf | 3 + .../modules/latency-profile/main.tf | 3 + .../latency-profile-generator.yaml.tpl | 6 + .../modules/latency-profile/variables.tf | 20 ++ .../tools/profile-generator/variables.tf | 20 ++ 7 files changed, 227 insertions(+), 113 deletions(-) mode change 100644 => 100755 benchmarks/benchmark/tools/profile-generator/container/latency_throughput_curve.sh diff --git a/benchmarks/benchmark/tools/profile-generator/container/benchmark_serving.py b/benchmarks/benchmark/tools/profile-generator/container/benchmark_serving.py index 29ab4cf00..13e22d439 100644 --- a/benchmarks/benchmark/tools/profile-generator/container/benchmark_serving.py +++ b/benchmarks/benchmark/tools/profile-generator/container/benchmark_serving.py @@ -24,10 +24,6 @@ from google.protobuf.timestamp_pb2 import Timestamp - -# (prompt len, output len, latency) -REQUEST_LATENCY: List[Tuple[int, int, float]] = [] - MIN_SEQ_LEN = 4 CLIENT_TIMEOUT_SEC = 3 * 60 * 60 NEW_TEXT_KEY = "\nOutput:\n" @@ -109,6 +105,16 @@ async def get_request( # The next request will be sent after the interval. await asyncio.sleep(interval) +def init_errors_map() -> Dict[str, int]: + errors = { + "ClientConnectorError": 0, + "TimeoutError": 0, + "ContentTypeError": 0, + "ClientOSError": 0, + "ServerDisconnectedError": 0, + "unknown_error": 0, + } + return errors async def send_request( backend: str, @@ -122,9 +128,10 @@ async def send_request( tokenizer: PreTrainedTokenizerBase, sax_model: str, model: str, -) -> None: +) -> Tuple[Tuple[int, int, float], Dict[str, int]]: """Sends request to server.""" request_start_time = time.time() + errors = init_errors_map() headers = {"User-Agent": "Benchmark Client"} if backend == "vllm": @@ -195,18 +202,39 @@ async def send_request( # Set client timeout to be 3 hrs. timeout = aiohttp.ClientTimeout(total=CLIENT_TIMEOUT_SEC) - async with aiohttp.ClientSession(timeout=timeout) as session: + async with aiohttp.ClientSession(timeout=timeout,trust_env=True) as session: while True: - async with session.post(api_url, headers=headers, json=pload) as response: - chunks = [] - async for chunk, _ in response.content.iter_chunks(): - chunks.append(chunk) - output = b"".join(chunks).decode("utf-8") - output = json.loads(output) - - # Re-send the request if it failed. - if "error" not in output: - break + try: + async with session.post(api_url, headers=headers, json=pload, ssl=False) as response: + output = await response.json() + + # Re-send the request if it failed. + if "error" not in output: + break + except aiohttp.client_exceptions.ClientConnectorError as client_err: + errors["ClientConnectorError"] += 1 + print(f"ClientConnectorError: {client_err}") + return None, errors + except asyncio.TimeoutError as timeout_err: + errors["TimeoutError"] += 1 + print(f"TimeoutError: {timeout_err}") + return None, errors + except aiohttp.client_exceptions.ClientOSError as e: + errors["ClientOSError"] += 1 + print(f"ClientOSError: {e}") + return None, errors + except aiohttp.client_exceptions.ContentTypeError as e: + print(f"ContentTypeError: {e}, response: {response}") + errors["ContentTypeError"] += 1 + return None, errors + except aiohttp.client_exceptions.ServerDisconnectedError as e: + errors["ServerDisconnectedError"] += 1 + print(f"ServerDisconnectedError: {e}") + return None, errors + except Exception as e: + print(f"Unknown error {e}") + errors["unknown_error"] += 1 + return None, errors request_end_time = time.time() # Naive HF transformers generation and TensorRT-LLM generation stops at EOS @@ -234,46 +262,54 @@ async def send_request( output_token_ids = tokenizer(output["response"]).input_ids output_len = len(output_token_ids) - request_latency = request_end_time - request_start_time - REQUEST_LATENCY.append((prompt_len, output_len, request_latency)) - + # (prompt len, output len, latency, success) + request_latency = (prompt_len, output_len, (request_end_time - request_start_time)) + return request_latency, None async def benchmark( - backend: str, + args: argparse.Namespace, api_url: str, input_requests: List[Tuple[str, int, int]], - best_of: int, - use_beam_search: bool, - request_rate: float, - top_k: int, tokenizer: PreTrainedTokenizerBase, - sax_model: str, model: str, -) -> None: +) -> Tuple[List[Tuple[int, int, float]], Dict[str, int]]: """Runs benchmark with asynchronous requests.""" + benchmark_start_time = time.time() tasks: List[asyncio.Task] = [] - async for request in get_request(input_requests, request_rate): + async for request in get_request(input_requests, args.request_rate): prompt, prompt_len, output_len = request task = asyncio.create_task( send_request( - backend, + args.backend, api_url, prompt, prompt_len, output_len, - best_of, - use_beam_search, - top_k, + args.best_of, + args.use_beam_search, + args.top_k, tokenizer, - sax_model, + args.sax_model, model, ) ) tasks.append(task) - await asyncio.gather(*tasks) + results = await asyncio.gather(*tasks) + combined_latencies = [] + combined_errors = init_errors_map() + for latency, errors in results: + if latency: + combined_latencies.append(latency) + if errors: + for err, count in errors.items(): + combined_errors[err] = combined_errors[err] + count + + benchmark_duration = time.time() - benchmark_start_time + print_and_save_result(args, benchmark_duration, len(input_requests), model, combined_latencies, combined_errors) + return combined_latencies, combined_errors -def save_json_results(args: argparse.Namespace, benchmark_result, server_metrics): +def save_json_results(args: argparse.Namespace, benchmark_result, server_metrics, model, errors): # Setup start_dt_proto = Timestamp() start_dt_proto.FromDatetime(args.start_datetime) @@ -282,23 +318,26 @@ def save_json_results(args: argparse.Namespace, benchmark_result, server_metrics # metrics values are numerical "metrics" : { # Traffic - "num_prompts": args.num_prompts, + "num_prompts_attempted": benchmark_result['num_prompts_attempted'], + "num_prompts_succeeded": benchmark_result['num_prompts_succeeded'], "request_rate": args.request_rate, 'server_metrics': { **server_metrics }, - **benchmark_result + **benchmark_result, + **errors, }, # dimensions values are strings "dimensions": { "date": args.start_datetime.strftime('%Y%m%d-%H%M%S'), "backend": args.backend, - "model_id": args.model, + "model_id": model, "tokenizer_id": args.tokenizer, **(json.loads(args.additional_metadata_metrics_to_save) if args.additional_metadata_metrics_to_save else {}) }, "config": { - "model": args.model, + "model": model, + "num_models": len(args.models.split(',')), "model_server": args.backend, "start_time": { "seconds" : start_dt_proto.seconds, @@ -353,9 +392,9 @@ def save_json_results(args: argparse.Namespace, benchmark_result, server_metrics } # Save to file - base_model_id = args.model.split("/")[-1] + model_without_slash = model.replace("/","-") file_name = ( - f"{args.backend}-{args.request_rate}qps-{base_model_id}-{args.start_datetime.strftime('%Y%m%d-%H%M%S')}.json" + f"{args.file_prefix}-{args.backend}-{args.request_rate}qps-{args.start_datetime.strftime('%Y%m%d-%H%M%S')}-{model_without_slash}.json" ) with open(file_name, "w", encoding="utf-8") as outfile: json.dump(final_json, outfile) @@ -448,13 +487,13 @@ def print_metrics(metrics: List[str], duration: float, backend: str): return server_metrics def get_stats_for_set(name, description, points): - avg = np.mean(points) - median = np.median(points) - sd = np.std(points) - min = np.min(points) - max = np.max(points) - p90 = np.percentile(points, 90) - p99 = np.percentile(points, 99) + avg = np.mean(points) if points else 0 + median = np.median(points) if points else 0 + sd = np.std(points) if points else 0 + min = np.min(points) if points else 0 + max = np.max(points) if points else 0 + p90 = np.percentile(points, 90) if points else 0 + p99 = np.percentile(points, 99) if points else 0 print(f"Average {description}:" f" {avg:.2f}") @@ -468,58 +507,22 @@ def get_stats_for_set(name, description, points): f'p99_{name}': p99, } -def main(args: argparse.Namespace): - print(args) - random.seed(args.seed) - np.random.seed(args.seed) - - endpoint = ( - "v1/completions" - if args.backend == "vllm" - else args.endpoint -) - - api_url = f"http://{args.host}:{args.port}/{endpoint}" - tokenizer = AutoTokenizer.from_pretrained( - args.tokenizer, trust_remote_code=args.trust_remote_code - ) - input_requests = sample_requests( - args.dataset, - args.num_prompts, - args.max_input_length, - args.max_output_length, - tokenizer, - args.use_dummy_text, - ) - - benchmark_start_time = time.time() - args.start_datetime = datetime.fromtimestamp(benchmark_start_time) - - asyncio.run( - benchmark( - args.backend, - api_url, - input_requests, - args.best_of, - args.use_beam_search, - args.request_rate, - args.top_k, - tokenizer, - args.sax_model, - args.model, - ) - ) +def print_and_save_result(args: argparse.Namespace, benchmark_duration, total_requests, model, request_latencies, errors): benchmark_result = {} - benchmark_end_time = time.time() - benchmark_time = benchmark_end_time - benchmark_start_time - print(f"Total time: {benchmark_time:.2f} s") - print(f"Requests/min: {60 * args.num_prompts / benchmark_time:.2f}") - benchmark_result['benchmark_time'] = benchmark_time - benchmark_result['throughput_rps'] = (args.num_prompts / benchmark_time) + + print(f"====Result for Model: {model}====") + print(f"Errors: {errors}") + print(f"Total time: {benchmark_duration:.2f} s") + print(f"Successful/total requests: {len(request_latencies)}/{total_requests}") + print(f"Requests/min: {60 * total_requests / benchmark_duration:.2f}") + benchmark_result["num_prompts_attempted"] = total_requests + benchmark_result["num_prompts_succeeded"] = len(request_latencies) + benchmark_result['benchmark_time'] = benchmark_duration + benchmark_result['throughput_rps'] = (args.num_prompts / benchmark_duration) total_output_tokens = np.sum([output_len for _, output_len, _ in - REQUEST_LATENCY]) - output_tokens_per_second = total_output_tokens / benchmark_time + request_latencies]) + output_tokens_per_second = total_output_tokens / benchmark_duration benchmark_result['throughput'] = output_tokens_per_second output_tokens_per_min = 60 * output_tokens_per_second @@ -528,14 +531,14 @@ def main(args: argparse.Namespace): benchmark_result['output_tokens_per_min'] = output_tokens_per_min total_input_tokens = np.sum([prompt_len for prompt_len, _, _ in - REQUEST_LATENCY]) - input_tokens_per_min = 60 * total_input_tokens / benchmark_time + request_latencies]) + input_tokens_per_min = 60 * total_input_tokens / benchmark_duration print(f"Input_tokens/min: {input_tokens_per_min:.2f}") benchmark_result['total_input_tokens'] = int(total_input_tokens) benchmark_result['input_tokens_per_min'] = input_tokens_per_min total_tokens = total_input_tokens + total_output_tokens - tokens_per_min = 60 * total_tokens / benchmark_time + tokens_per_min = 60 * total_tokens / benchmark_duration print(f"Tokens/min: {tokens_per_min:.2f}") benchmark_result['total_tokens'] = int(total_tokens) benchmark_result['tokens_per_min'] = tokens_per_min @@ -550,23 +553,72 @@ def main(args: argparse.Namespace): **benchmark_result, **(get_stats_for_set("per_token_latency", "seconds/token (includes waiting time on server)", [ latency / (prompt_len + output_len) - for prompt_len, output_len, latency in REQUEST_LATENCY + for prompt_len, output_len, latency in request_latencies ])), # NOTE: The latency below includes requests awaiting time on server side. # It's not comparable with the model inference latency for batch size 1. - **(get_stats_for_set("latency", "milliseconds/request (includes waiting time on server)" ,[1000 * latency for _, _, latency in REQUEST_LATENCY])), - **(get_stats_for_set("per_output_token_latency", "milliseconds/output_token (includes waiting time on server)", [1000 * latency / output_len for _, output_len, latency in REQUEST_LATENCY])), - **(get_stats_for_set("input_len", "input length", [float(prompt_len) for prompt_len, _, _ in REQUEST_LATENCY])), - **(get_stats_for_set("output_len", "output length", [float(output_len) for _, output_len, _ in REQUEST_LATENCY])) + **(get_stats_for_set("latency", "milliseconds/request (includes waiting time on server)" ,[1000 * latency for _, _, latency in request_latencies])), + **(get_stats_for_set("per_output_token_latency", "milliseconds/output_token (includes waiting time on server)", [1000 * latency / output_len for _, output_len, latency in request_latencies])), + **(get_stats_for_set("input_len", "input length", [float(prompt_len) for prompt_len, _, _ in request_latencies])), + **(get_stats_for_set("output_len", "output length", [float(output_len) for _, output_len, _ in request_latencies])) } server_metrics = {} if args.scrape_server_metrics: - server_metrics = print_metrics(metrics_to_scrape(args.backend), benchmark_time, args.backend) + server_metrics = print_metrics(metrics_to_scrape(args.backend), benchmark_duration, args.backend) if args.save_json_results: - save_json_results(args, benchmark_result, server_metrics) + save_json_results(args, benchmark_result, server_metrics, model, errors) +async def main(args: argparse.Namespace): + print(args) + models = args.models.split(',') + print(f"Models to benchmark: {models}") + random.seed(args.seed) + np.random.seed(args.seed) + endpoint = ( + "v1/completions" + if args.backend == "vllm" + else args.endpoint +) + + api_url = f"http://{args.host}:{args.port}/{endpoint}" + tokenizer = AutoTokenizer.from_pretrained( + args.tokenizer, trust_remote_code=args.trust_remote_code + ) + input_requests = sample_requests( + args.dataset, + args.num_prompts, + args.max_input_length, + args.max_output_length, + tokenizer, + args.use_dummy_text, + ) + benchmark_start_time = time.time() + args.start_datetime = datetime.fromtimestamp(benchmark_start_time) + + results = await asyncio.gather( + *[benchmark(args, api_url, input_requests, tokenizer, model) for model in models] + ) + + # Summarize results + combined_latencies = [] + combined_errors = { + "ClientConnectorError": 0, + "TimeoutError": 0, + "ContentTypeError": 0, + "ClientOSError": 0, + "unknown_error": 0, + "ServerDisconnectedError": 0, + } + for latencies, errors in results: + combined_latencies.extend(latencies) + for k, v in errors.items(): + combined_errors[k] = combined_errors[k] + v + + benchmark_duration_all_models = time.time() - benchmark_start_time + if args.save_aggregated_result: + print_and_save_result(args, benchmark_duration_all_models, len(models)*len(input_requests), f"ALL-{len(models)}-MODELS", combined_latencies, combined_errors) if __name__ == "__main__": parser = argparse.ArgumentParser( @@ -591,14 +643,15 @@ def main(args: argparse.Namespace): default="", help="Model name to send request to at API server for SAX model server.", ) + parser.add_argument("--file-prefix", type=str, default="benchmark") parser.add_argument("--endpoint", type=str, default="generate") parser.add_argument("--host", type=str, default="localhost") parser.add_argument("--port", type=int, default=7080) parser.add_argument("--dataset", type=str, help="Path to the dataset.") parser.add_argument( - "--model", + "--models", type=str, - help="Name of the model.", + help="Comma separated list of models to benchmark.", ) parser.add_argument( "--tokenizer", @@ -681,6 +734,11 @@ def main(args: argparse.Namespace): action="store_true", help="Whether to save benchmark results to a json file.", ) + parser.add_argument( + "--save-aggregated-result", + action="store_true", + help="Whether to aggregate results of all models and save the result.", + ) parser.add_argument( "--additional-metadata-metrics-to-save", type=str, @@ -695,4 +753,4 @@ def main(args: argparse.Namespace): help="Whether to scrape server metrics.", ) cmd_args = parser.parse_args() - main(cmd_args) \ No newline at end of file + asyncio.run(main(cmd_args)) \ No newline at end of file diff --git a/benchmarks/benchmark/tools/profile-generator/container/latency_throughput_curve.sh b/benchmarks/benchmark/tools/profile-generator/container/latency_throughput_curve.sh old mode 100644 new mode 100755 index 53e9da2d0..7c8f638e3 --- a/benchmarks/benchmark/tools/profile-generator/container/latency_throughput_curve.sh +++ b/benchmarks/benchmark/tools/profile-generator/container/latency_throughput_curve.sh @@ -26,20 +26,24 @@ fi PYTHON="python3" PYTHON_OPTS="benchmark_serving.py " for request_rate in $(echo $REQUEST_RATES | tr ',' ' '); do + echo "Benchmaking request rate: ${request_rate}" # TODO: Check if profile already exists, if so then skip timestamp=$(date +"%Y-%m-%d_%H-%M-%S") output_file="latency-profile-${timestamp}.txt" if [ ${request_rate} == 0 ]; then request_rate="inf" - NUM_PROMPTS=$MAX_NUM_PROMPTS + num_prompts=$MAX_NUM_PROMPTS else - NUM_PROMPTS=$((${request_rate} * $BENCHMARK_TIME_SECONDS)) + num_prompts=$(awk "BEGIN {print int($request_rate * $BENCHMARK_TIME_SECONDS)}") fi - - PYTHON_OPTS="$PYTHON_OPTS --save-json-results --host=$IP --port=$PORT --model=$TOKENIZER --dataset=$PROMPT_DATASET_FILE --tokenizer=$TOKENIZER --request-rate=$request_rate --backend=$BACKEND --num-prompts=$NUM_PROMPTS --max-input-length=$INPUT_LENGTH --max-output-length=$OUTPUT_LENGTH" + echo "TOTAL prompts: $num_prompts" # Output: 8 + PYTHON_OPTS="$PYTHON_OPTS --save-json-results --host=$IP --port=$PORT --dataset=$PROMPT_DATASET_FILE --tokenizer=$TOKENIZER --request-rate=$request_rate --backend=$BACKEND --num-prompts=$num_prompts --max-input-length=$INPUT_LENGTH --max-output-length=$OUTPUT_LENGTH --file-prefix=$FILE_PREFIX --models=$MODELS" if [[ "$SCRAPE_SERVER_METRICS" = "true" ]]; then PYTHON_OPTS="$PYTHON_OPTS --scrape-server-metrics" fi + if [[ "$SAVE_AGGREGATED_RESULT" = "true" ]]; then + PYTHON_OPTS="$PYTHON_OPTS --save-aggregated-result" + fi $PYTHON $PYTHON_OPTS > $output_file cat $output_file sleep 5 # wait 5 seconds before next run diff --git a/benchmarks/benchmark/tools/profile-generator/main.tf b/benchmarks/benchmark/tools/profile-generator/main.tf index 495cfae84..766004b62 100644 --- a/benchmarks/benchmark/tools/profile-generator/main.tf +++ b/benchmarks/benchmark/tools/profile-generator/main.tf @@ -80,4 +80,7 @@ module "latency-profile" { hugging_face_secret = var.hugging_face_secret hugging_face_secret_version = var.hugging_face_secret_version scrape_server_metrics = var.scrape_server_metrics + file_prefix = var.file_prefix + save_aggregated_result = var.save_aggregated_result + models = var.models } \ No newline at end of file diff --git a/benchmarks/benchmark/tools/profile-generator/modules/latency-profile/main.tf b/benchmarks/benchmark/tools/profile-generator/modules/latency-profile/main.tf index 79b050e3e..5d9d9baea 100644 --- a/benchmarks/benchmark/tools/profile-generator/modules/latency-profile/main.tf +++ b/benchmarks/benchmark/tools/profile-generator/modules/latency-profile/main.tf @@ -64,5 +64,8 @@ resource "kubernetes_manifest" "latency-profile-generator" { k8s_hf_secret_list = var.k8s_hf_secret == null ? [] : [var.k8s_hf_secret] output_bucket = var.output_bucket scrape_server_metrics = var.scrape_server_metrics + file_prefix = var.file_prefix + save_aggregated_result = var.save_aggregated_result + models = var.models })) } \ No newline at end of file diff --git a/benchmarks/benchmark/tools/profile-generator/modules/latency-profile/manifest-templates/latency-profile-generator.yaml.tpl b/benchmarks/benchmark/tools/profile-generator/modules/latency-profile/manifest-templates/latency-profile-generator.yaml.tpl index 520d83379..60eacf4c7 100644 --- a/benchmarks/benchmark/tools/profile-generator/modules/latency-profile/manifest-templates/latency-profile-generator.yaml.tpl +++ b/benchmarks/benchmark/tools/profile-generator/modules/latency-profile/manifest-templates/latency-profile-generator.yaml.tpl @@ -20,6 +20,8 @@ spec: image: ${artifact_registry}/latency-profile:latest command: ["bash", "-c", "./latency_throughput_curve.sh"] env: + - name: MODELS + value: ${models} - name: TOKENIZER value: ${tokenizer} - name: IP @@ -44,6 +46,10 @@ spec: value: ${scrape_server_metrics} - name: MAX_NUM_PROMPTS value: ${max_num_prompts} + - name: FILE_PREFIX + value: ${file_prefix} + - name: SAVE_AGGREGATED_RESULT + value: ${save_aggregated_result} %{ for hugging_face_token_secret in hugging_face_token_secret_list ~} - name: HF_TOKEN valueFrom: diff --git a/benchmarks/benchmark/tools/profile-generator/modules/latency-profile/variables.tf b/benchmarks/benchmark/tools/profile-generator/modules/latency-profile/variables.tf index 6e7702e7b..3489b63ad 100644 --- a/benchmarks/benchmark/tools/profile-generator/modules/latency-profile/variables.tf +++ b/benchmarks/benchmark/tools/profile-generator/modules/latency-profile/variables.tf @@ -132,6 +132,13 @@ variable "tokenizer" { default = "tiiuae/falcon-7b" } +variable "models" { + description = "A list of comma separated models to benchmark." + type = string + nullable = false + default = "tiiuae/falcon-7b" +} + variable "output_bucket" { description = "Bucket name for storing results" type = string @@ -176,4 +183,17 @@ variable "benchmark_time_seconds" { type = number default = 120 nullable = false +} + +variable "file_prefix" { + description = "A prefix to the saved json file, useful to add additional context to the benchmark." + type = string + nullable = false + default = "benchmark" +} + +variable "save_aggregated_result" { + description = "Whether to save aggregated result, useful when benchmarking multiple models." + type = bool + default = false } \ No newline at end of file diff --git a/benchmarks/benchmark/tools/profile-generator/variables.tf b/benchmarks/benchmark/tools/profile-generator/variables.tf index a9ab4b7d4..1e7864bad 100644 --- a/benchmarks/benchmark/tools/profile-generator/variables.tf +++ b/benchmarks/benchmark/tools/profile-generator/variables.tf @@ -157,6 +157,13 @@ variable "targets" { }) } +variable "models" { + description = "A list of comma separated models to benchmark." + type = string + nullable = false + default = "tiiuae/falcon-7b" +} + variable "scrape_server_metrics" { description = "Whether to scrape server metrics." type = bool @@ -168,4 +175,17 @@ variable "benchmark_time_seconds" { type = number default = 120 nullable = false +} + +variable "file_prefix" { + description = "A prefix to the saved json file, useful to add additional context to the benchmark." + type = string + nullable = false + default = "benchmark" +} + +variable "save_aggregated_result" { + description = "Whether to save aggregated result, useful when benchmarking multiple models." + type = bool + default = false } \ No newline at end of file