8000 chore: update local benchmark, 10t config and kokoro project. by Genesis929 · Pull Request #995 · googleapis/python-bigquery-dataframes · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .kokoro/load/benchmark.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ env_vars: {

env_vars: {
key: "GOOGLE_CLOUD_PROJECT"
value: "bigframes-load-testing"
value: "bigframes-benchmarking"
}

env_vars: {
Expand Down
56 changes: 52 additions & 4 deletions noxfile.py
10000
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from __future__ import absolute_import

import argparse
import multiprocessing
import os
import pathlib
Expand Down Expand Up @@ -804,7 +805,7 @@ def notebook(session: nox.Session):
processes = []
for notebook, regions in notebooks_reg.items():
for region in regions:
args = (
region_args = (
"python",
"scripts/run_and_publish_benchmark.py",
"--notebook",
Expand All @@ -814,15 +815,15 @@ def notebook(session: nox.Session):
if multi_process_mode:
process = multiprocessing.Process(
target=_run_process,
args=(session, args, error_flag),
args=(session, region_args, error_flag),
)
process.start()
processes.append(process)
# Adding a small delay between starting each
# process to avoid potential race conditions。
time.sleep(1)
else:
session.run(*args)
session.run(*region_args)

for process in processes:
process.join()
Expand Down Expand Up @@ -861,7 +862,51 @@ def benchmark(session: nox.Session):
session.install("-e", ".[all]")
base_path = os.path.join("tests", "benchmark")

benchmark_script_list = list(pathlib.Path(base_path).rglob("*.py"))
parser = argparse.ArgumentParser()
parser.add_argument(
"-i",
"--iterations",
type=int,
default=1,
help="Number of iterations to run each benchmark.",
)
parser.add_argument(
"-o",
"--output-csv",
nargs="?",
const=True,
default=False,
help=(
"Determines whether to output results to a CSV file. If no location is provided, "
"a temporary location is automatically generated."
),
)
parser.add_argument(
"-b",
"--benchmark-filter",
nargs="+",
help=(
"List of file or directory names to include in the benchmarks. If not provided, "
"all benchmarks are run."
),
)

args = parser.parse_args(session.posargs)

benchmark_script_list: List[pathlib.Path] = []
if args.benchmark_filter:
for filter_item in args.benchmark_filter:
full_path = os.path.join(base_path, filter_item)
if os.path.isdir(full_path):
benchmark_script_list.extend(pathlib.Path(full_path).rglob("*.py"))
elif os.path.isfile(full_path) and full_path.endswith(".py"):
benchmark_script_list.append(pathlib.Path(full_path))
else:
raise ValueError(
f"Item {filter_item} does not match any valid file or directory"
)
else:
benchmark_script_list = list(pathlib.Path(base_path).rglob("*.py"))

try:
for benchmark in benchmark_script_list:
Expand All @@ -871,12 +916,15 @@ def benchmark(session: nox.Session):
"python",
"scripts/run_and_publish_benchmark.py",
f"--benchmark-path={benchmark}",
f"--iterations={args.iterations}",
)
finally:
session.run(
"python",
"scripts/run_and_publish_benchmark.py",
f"--publish-benchmarks={base_path}",
f"--iterations={args.iterations}",
f"--output-csv={args.output_csv}",
)


Expand Down
138 changes: 92 additions & 46 deletions scripts/run_and_publish_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import pathlib
import subprocess
import sys
import tempfile
from typing import Dict, List, Union

import numpy as np
Expand Down Expand Up @@ -50,7 +51,7 @@ def run_benchmark_subprocess(args, log_env_name_var, filename=None, region=None)
subprocess.run(args, env=env, check=True)


def collect_benchmark_result(benchmark_path: str) -> pd.DataFrame:
def collect_benchmark_result(benchmark_path: str, iterations: int) -> pd.DataFrame:
"""Generate a DataFrame report on HTTP queries, bytes processed, slot time and execution time from log files."""
path = pathlib.Path(benchmark_path)
try:
Expand Down Expand Up @@ -100,28 +101,23 @@ def collect_benchmark_result(benchmark_path: str) -> pd.DataFrame:

with open(bytes_file, "r") as file:
lines = file.read().splitlines()
query_count = len(lines)
total_bytes = sum(int(line) for line in lines)
query_count = len(lines) / iterations
total_bytes = sum(int(line) for line in lines) / iterations

with open(millis_file, "r") as file:
lines = file.read().splitlines()
total_slot_millis = sum(int(line) for line in lines)
total_slot_millis = sum(int(line) for line in lines) / iterations

if has_local_seconds:
# 'local_seconds' captures the total execution time for a benchmark as it
# starts timing immediately before the benchmark code begins and stops
# immediately after it ends. Unlike other metrics that might accumulate
# values proportional to the number of queries executed, 'local_seconds' is
# a singular measure of the time taken for the complete execution of the
# benchmark, from start to finish.
with open(local_seconds_file, "r") as file:
local_seconds = float(file.readline().strip())
lines = file.read().splitlines()
local_seconds = sum(float(line) for line in lines) / iterations
else:
local_seconds = None

with open(bq_seconds_file, "r") as file:
lines = file.read().splitlines()
bq_seconds = sum(float(line) for line in lines)
bq_seconds = sum(float(line) for line in lines) / iterations

results_dict[str(filename)] = [
query_count,
Expand Down Expand Up @@ -154,7 +150,12 @@ def collect_benchmark_result(benchmark_path: str) -> pd.DataFrame:
columns=columns,
)

print("---BIGQUERY USAGE REPORT---")
report_title = (
"---BIGQUERY USAGE REPORT---"
if iterations == 1
else f"---BIGQUERY USAGE REPORT (Averages over {iterations} Iterations)---"
)
print(report_title)
for index, row in benchmark_metrics.iterrows():
formatted_local_exec_time = (
f"{round(row['Local_Execution_Time_Sec'], 1)} seconds"
Expand Down Expand Up @@ -259,32 +260,53 @@ def find_config(start_path):
return None


def run_benchmark_from_config(benchmark: str):
def publish_to_bigquery(dataframe, notebook, project_name="bigframes-metrics"):
bigquery_table = (
f"{project_name}.benchmark_report.notebook_benchmark"
if notebook
else f"{project_name}.benchmark_report.benchmark"
)

repo_status = get_repository_status()
for idx, col in enumerate(repo_status.keys()):
dataframe.insert(idx, col, repo_status[col])

pandas_gbq.to_gbq(
dataframe=dataframe,
destination_table=bigquery_table,
if_exists="append",
)
print(f"Results have been successfully uploaded to {bigquery_table}.")


def run_benchmark_from_config(benchmark: str, iterations: int):
print(benchmark)
config_path = find_config(benchmark)

if config_path:
benchmark_configs = []
with open(config_path, "r") as f:
for line in f:
config = json.loads(line)
python_args = [f"--{key}={value}" for key, value in config.items()]
suffix = (
config["benchmark_suffix"]
if "benchmark_suffix" in config
else "_".join(f"{key}_{value}" for key, value in config.items())
)
benchmark_configs.append((suffix, python_args))
if line.strip():
config = json.loads(line)
python_args = [f"--{key}={value}" for key, value in config.items()]
suffix = (
config["benchmark_suffix"]
if "benchmark_suffix" in config
else "_".join(f"{key}_{value}" for key, value in config.items())
)
benchmark_configs.append((suffix, python_args))
else:
benchmark_configs = [(None, [])]

for benchmark_config in benchmark_configs:
args = ["python", str(benchmark)]
args.extend(benchmark_config[1])
log_env_name_var = str(benchmark)
if benchmark_config[0] is not None:
log_env_name_var += f"_{benchmark_config[0]}"
run_benchmark_subprocess(args=args, log_env_name_var=log_env_name_var)
for _ in range(iterations):
for benchmark_config in benchmark_configs:
args = ["python", str(benchmark)]
args.extend(benchmark_config[1])
log_env_name_var = str(benchmark)
if benchmark_config[0] is not None:
log_env_name_var += f"_{benchmark_config[0]}"
run_benchmark_subprocess(args=args, log_env_name_var=log_env_name_var)


def run_notebook_benchmark(benchmark_file: str, region: str):
Expand Down Expand Up @@ -341,35 +363,59 @@ def parse_arguments():
help="Set the benchmarks to be published to BigQuery.",
)

parser.add_argument(
"--iterations",
type=int,
default=1,
help="Number of iterations to run each benchmark.",
)
parser.add_argument(
"--output-csv",
type=str,
default=None,
help="Determines whether to output results to a CSV file. If no location is provided, a temporary location is automatically generated.",
)

return parser.parse_args()


def main():
args = parse_arguments()

if args.publish_benchmarks:
bigquery_table = (
"bigframes-metrics.benchmark_report.notebook_benchmark"
if args.notebook
else "bigframes-metrics.benchmark_report.benchmark"
benchmark_metrics = collect_benchmark_result(
args.publish_benchmarks, args.iterations
)
benchmark_metrics = collect_benchmark_result(args.publish_benchmarks)

if os.getenv("BENCHMARK_AND_PUBLISH", "false") == "true":
repo_status = get_repository_status()
for idx, col in enumerate(repo_status.keys()):
benchmark_metrics.insert(idx, col, repo_status[col])

pandas_gbq.to_gbq(
dataframe=benchmark_metrics,
destination_table=bigquery_table,
if_exists="append",
# Output results to CSV without specifying a location
if args.output_csv == "True":
current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
temp_file = tempfile.NamedTemporaryFile(
prefix=f"benchmark_{current_time}_", delete=False, suffix=".csv"
)
print("Results have been successfully uploaded to BigQuery.")
benchmark_metrics.to_csv(temp_file.name, index=False)
print(
f"Benchmark result is saved to a temporary location: {temp_file.name}"
)
temp_file.close()
# Output results to CSV with specified a custom location
elif args.output_csv != "False":
benchmark_metrics.to_csv(args.output_csv, index=False)
print(f"Benchmark result is saved to: {args.output_csv}")

# Publish the benchmark metrics to BigQuery under the 'bigframes-metrics' project.
# The 'BENCHMARK_AND_PUBLISH' environment variable should be set to 'true' only
# in specific Kokoro sessions.
if os.getenv("BENCHMARK_AND_PUBLISH", "false") == "true":
publish_to_bigquery(benchmark_metrics, args.notebook)
# If the 'GCLOUD_BENCH_PUBLISH_PROJECT' environment variable is set, publish the
# benchmark metrics to a specified BigQuery table in the provided project. This is
# intended for local testing where the default behavior is not to publish results.
elif project := os.getenv("GCLOUD_BENCH_PUBLISH_PROJECT", ""):
publish_to_bigquery(benchmark_metrics, args.notebook, project)
elif args.notebook:
run_notebook_benchmark(args.benchmark_path, args.region)
else:
run_benchmark_from_config(args.benchmark_path)
run_benchmark_from_config(args.benchmark_path, args.iterations)


if __name__ == "__main__":
Expand Down
18 changes: 10 additions & 8 deletions tests/benchmark/tpch/config.jsonl
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
{"benchmark_suffix": "1g_ordered", "dataset_id": "tpch_0001g", "ordered": true}
{"benchmark_suffix": "1g_unordered", "dataset_id": "tpch_0001g", "ordered": false}
{"benchmark_suffix": "10g_ordered", "dataset_id": "tpch_0010g", "ordered": true}
{"benchmark_suffix": "10g_unordered", "dataset_id": "tpch_0010g", "ordered": false}
{"benchmark_suffix": "100g_ordered", "dataset_id": "tpch_0100g", "ordered": true}
{"benchmark_suffix": "100g_unordered", "dataset_id": "tpch_0100g", "ordered": false}
{"benchmark_suffix": "1t_ordered", "dataset_id": "tpch_0001t", "ordered": true}
{"benchmark_suffix": "1t_unordered", "dataset_id": "tpch_0001t", "ordered": false}
{"benchmark_suffix": "1g_ordered", "project_id": "bigframes-dev-perf", "dataset_id": "tpch_0001g", "ordered": true}
{"benchmark_suffix": "1g_unordered", "project_id": "bigframes-dev-perf", "dataset_id": "tpch_0001g", "ordered": false}
{"benchmark_suffix": "10g_ordered", "project_id": "bigframes-dev-perf", "dataset_id": "tpch_0010g", "ordered": true}
{"benchmark_suffix": "10g_unordered", "project_id": "bigframes-dev-perf", "dataset_id": "tpch_0010g", "ordered": false}
{"benchmark_suffix": "100g_ordered", "project_id": "bigframes-dev-perf", "dataset_id": "tpch_0100g", "ordered": true}
{"benchmark_suffix": "100g_unordered", "project_id": "bigframes-dev-perf", "dataset_id": "tpch_0100g", "ordered": false}
{"benchmark_suffix": "1t_ordered", "project_id": "bigframes-dev-perf", "dataset_id": "tpch_0001t", "ordered": true}
{"benchmark_suffix": "1t_unordered", "project_id": "bigframes-dev-perf", "dataset_id": "tpch_0001t", "ordered": false}
{"benchmark_suffix": "10t_ordered", "project_id": "bigframes-dev-perf", "dataset_id": "tpch_0010t", "ordered": true}
{"benchmark_suffix": "10t_unordered", "project_id": "bigframes-dev-perf", "dataset_id": "tpch_0010t", "ordered": false}
4 changes: 2 additions & 2 deletions tests/benchmark/tpch/q1.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import bigframes_vendored.tpch.queries.q1 as vendored_tpch_q1

if __name__ == "__main__":
dataset_id, session, suffix = utils.get_tpch_configuration()
project_id, dataset_id, session, suffix = utils.get_tpch_configuration()
current_path = pathlib.Path(__file__).absolute()

utils.get_execution_time(
vendored_tpch_q1.q, current_path, suffix, dataset_id, session
vendored_tpch_q1.q, current_path, suffix, project_id, dataset_id, session
)
4 changes: 2 additions & 2 deletions tests/benchmark/tpch/q10.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import bigframes_vendored.tpch.queries.q10 as vendored_tpch_q10

if __name__ == "__main__":
dataset_id, session, suffix = utils.get_tpch_configuration()
project_id, dataset_id, session, suffix = utils.get_tpch_configuration()
current_path = pathlib.Path(__file__).absolute()

utils.get_execution_time(
vendored_tpch_q10.q, current_path, suffix, dataset_id, session
vendored_tpch_q10.q, current_path, suffix, project_id, dataset_id, session
)
4 changes: 2 additions & 2 deletions tests/benchmark/tpch/q11.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import bigframes_vendored.tpch.queries.q11 as vendored_tpch_q11

if __name__ == "__main__":
dataset_id, session, suffix = utils.get_tpch_configuration()
project_id, dataset_id, session, suffix = utils.get_tpch_configuration()
current_path = pathlib.Path(__file__).absolute()

utils.get_execution_time(
vendored_tpch_q11.q, current_path, suffix, dataset_id, session
vendored_tpch_q11.q, current_path, suffix, project_id, dataset_id, session
)
4 changes: 2 additions & 2 deletions tests/benchmark/tpch/q12.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import bigframes_vendored.tpch.queries.q12 as vendored_tpch_q12

if __name__ == "__main__":
dataset_id, session, suffix = utils.get_tpch_configuration()
project_id, dataset_id, session, suffix = utils.get_tpch_configuration()
current_path = pathlib.Path(__file__).absolute()

utils.get_execution_time(
vendored_tpch_q12.q, current_path, suffix, dataset_id, session
vendored_tpch_q12.q, current_path, suffix, project_id, dataset_id, session
)
Loading
0