-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Set up integration testing and plug in some statistics #31
Changes from 7 commits
260c326
917d0e9
b2993d2
8c44a79
60ee5da
63de5b6
e221373
f2e798a
1bd7f10
cc1ce6d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,21 +2,34 @@ | |
import re | ||
import logging | ||
from textwrap import dedent | ||
from typing import Optional | ||
from functools import partial | ||
import pandas | ||
|
||
import attr | ||
import google.cloud.bigquery.client | ||
import google.cloud.bigquery.dataset | ||
import google.cloud.bigquery.job | ||
import google.cloud.bigquery.table | ||
import mozanalysis | ||
from typing import Callable, Any, List, Optional | ||
from mozanalysis.experiment import TimeLimits | ||
import mozanalysis.metrics.desktop as mmd | ||
from mozanalysis.utils import add_days | ||
import mozanalysis.bayesian_stats.bayesian_bootstrap as mabsbb | ||
|
||
from . import experimenter | ||
|
||
|
||
# todo: this should be moved somewhere else and might change | ||
# depending on how configuration is implemented | ||
@attr.s(auto_attribs=True) | ||
class Statistic: | ||
name: str | ||
function: Callable[..., Any] | ||
metrics: List[str] | ||
branches: Optional[List[str]] | ||
|
||
|
||
@attr.s(auto_attribs=True) | ||
class Analysis: | ||
""" | ||
|
@@ -35,6 +48,17 @@ class Analysis: | |
mmd.search_count, | ||
] | ||
|
||
STANDARD_STATISTICS = [ | ||
Statistic( | ||
name="bootstrap_one_branch", | ||
function=partial( | ||
mabsbb.bootstrap_one_branch, num_samples=100, summary_quantiles=(0.5, 0.61) | ||
), | ||
metrics=["active_hours"], | ||
branches=["branch1", "branch2"], | ||
) | ||
] | ||
|
||
def __attrs_post_init__(self): | ||
self.logger = logging.getLogger(__name__) | ||
|
||
|
@@ -115,30 +139,13 @@ def _publish_view(self, window_period: str): | |
) | ||
self.bigquery.execute(sql) | ||
|
||
def run(self, current_date: datetime, dry_run: bool): | ||
def _calculate_metrics( | ||
self, exp: mozanalysis.experiment.Experiment, time_limits: TimeLimits, dry_run: bool, | ||
): | ||
""" | ||
Run analysis using mozanalysis for a specific experiment. | ||
Calculate metrics for a specific experiment. | ||
Returns the BigQuery table results are written to. | ||
""" | ||
self.logger.info("Analysis.run invoked for experiment %s", self.experiment.slug) | ||
|
||
if self.experiment.normandy_slug is None: | ||
self.logger.info("Skipping %s; no normandy_slug", self.experiment.slug) | ||
return # some experiments do not have a normandy slug | ||
|
||
if self.experiment.start_date is None: | ||
self.logger.info("Skipping %s; no start_date", self.experiment.slug) | ||
return | ||
|
||
time_limits = self._get_timelimits_if_ready(current_date) | ||
if time_limits is None: | ||
self.logger.info("Skipping %s; not ready", self.experiment.slug) | ||
return | ||
|
||
exp = mozanalysis.experiment.Experiment( | ||
experiment_slug=self.experiment.normandy_slug, | ||
start_date=self.experiment.start_date.strftime("%Y-%m-%d"), | ||
) | ||
|
||
window = len(time_limits.analysis_windows) | ||
last_analysis_window = time_limits.analysis_windows[-1] | ||
# TODO: Add this functionality to TimeLimits. | ||
|
@@ -160,10 +167,93 @@ def run(self, current_date: datetime, dry_run: bool): | |
return | ||
|
||
self.logger.info("Executing query for %s", self.experiment.slug) | ||
self.bigquery.execute(sql, res_table_name) | ||
result = self.bigquery.execute(sql, res_table_name) | ||
self._publish_view("week") | ||
self.logger.info("Finished running query for %s", self.experiment.slug) | ||
|
||
return res_table_name | ||
|
||
def _calculate_statistics(self, result_table: str): | ||
""" | ||
Run statistics on metrics. | ||
""" | ||
|
||
statistics_results = [] | ||
|
||
metrics_data = self.bigquery.table_to_dataframe(result_table) | ||
|
||
for statistic in self.STANDARD_STATISTICS: | ||
result_dict = {} | ||
result_dict["statistic"] = statistic.name | ||
|
||
# calculate statistics for specified branches | ||
if statistic.branches is not None: | ||
results_per_branch = metrics_data.groupby("branch") | ||
|
||
for branch in statistic.branches: | ||
data = results_per_branch.get_group(branch) | ||
|
||
for metric in statistic.metrics: | ||
if metric in data: | ||
key_value_results = [] | ||
|
||
for key, value in statistic.function(data[metric]).to_dict().items(): | ||
key_value_results.append({"key": key, "value": value}) | ||
|
||
statistics_results.append( | ||
{ | ||
"name": statistic.name, | ||
"branch": branch, | ||
"map_key_value": key_value_results, | ||
} | ||
) | ||
else: | ||
# otherwise pass entire dataframe to statistics function | ||
key_value_results = [] | ||
|
||
for key, value in statistic.function(metrics_data).to_dict().items(): | ||
key_value_results.append({"key": key, "value": value}) | ||
|
||
statistics_results.append( | ||
{"name": statistic.name, "branch": None, "map_key_value": key_value_results} | ||
) | ||
|
||
df_statistics_results = pandas.DataFrame.from_dict(statistics_results) | ||
|
||
print(df_statistics_results) | ||
|
||
table_id = f"{self.project}.{self.dataset}.statistics_{result_table}" | ||
|
||
job = self.bigquery.client.load_table_from_dataframe(df_statistics_results, table_id) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently, uploading STRUCT / RECORD fields from load_table_from_dataframe is only supported with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should put together a proposal for what these tables should look like. |
||
|
||
def run(self, current_date: datetime, dry_run: bool): | ||
""" | ||
Run analysis using mozanalysis for a specific experiment. | ||
""" | ||
self.logger.info("Analysis.run invoked for experiment %s", self.experiment.slug) | ||
|
||
if self.experiment.normandy_slug is None: | ||
self.logger.info("Skipping %s; no normandy_slug", self.experiment.slug) | ||
return # some experiments do not have a normandy slug | ||
|
||
if self.experiment.start_date is None: | ||
self.logger.info("Skipping %s; no start_date", self.experiment.slug) | ||
return | ||
|
||
time_limits = self._get_timelimits_if_ready(current_date) | ||
if time_limits is None: | ||
self.logger.info("Skipping %s; not ready", self.experiment.slug) | ||
return | ||
|
||
exp = mozanalysis.experiment.Experiment( | ||
experiment_slug=self.experiment.normandy_slug, | ||
start_date=self.experiment.start_date.strftime("%Y-%m-%d"), | ||
) | ||
|
||
result_table = self._calculate_metrics(exp, time_limits, dry_run) | ||
|
||
self._calculate_statistics(result_table) | ||
|
||
|
||
@attr.s(auto_attribs=True, slots=True) | ||
class BigQueryClient: | ||
|
@@ -176,7 +266,13 @@ def client(self): | |
self._client = self._client or google.cloud.bigquery.client.Client(self.project) | ||
return self._client | ||
|
||
def execute(self, query: str, destination_table: Optional[str] = None) -> None: | ||
def table_to_dataframe(self, table: str): | ||
"""Return all rows of the specified table as a dataframe.""" | ||
table_ref = self.client.get_table(f"{self.project}.{self.dataset}.{table}") | ||
rows = self.client.list_rows(table_ref) | ||
return rows.to_dataframe() | ||
|
||
def execute(self, query: str, destination_table: Optional[str] = None): | ||
dataset = google.cloud.bigquery.dataset.DatasetReference.from_string( | ||
self.dataset, default_project=self.project, | ||
) | ||
|
@@ -187,4 +283,4 @@ def execute(self, query: str, destination_table: Optional[str] = None) -> None: | |
config = google.cloud.bigquery.job.QueryJobConfig(default_dataset=dataset, **kwargs) | ||
job = self.client.query(query, config) | ||
# block on result | ||
job.result(max_results=1) | ||
return job.result(max_results=1) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
/* | ||
Fetch the value associated with a given key from an array of key/value structs. | ||
Because map types aren't available in BigQuery, we model maps as arrays | ||
of structs instead, and this function provides map-like access to such fields. | ||
*/ | ||
CREATE OR REPLACE FUNCTION udf.get_key(map ANY TYPE, k ANY TYPE) AS ( | ||
(SELECT key_value.value FROM UNNEST(map) AS key_value WHERE key_value.key = k LIMIT 1) | ||
); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
{"submission_date": "2020-04-06","client_id": "aaaa","active_hours_sum": 0.6,"experiments": [{"key": "test-experiment","value": "rollout"}]} | ||
{"submission_date": "2020-04-06","client_id": "bbbb","active_hours_sum": 0.1,"experiments": [{"key": "test-experiment","value": "rollout"}]} | ||
{"submission_date": "2020-04-06","client_id": "cccc","active_hours_sum": 0.2,"experiments": [{"key": "test-experiment","value": "rollout"}]} | ||
{"submission_date": "2020-04-06","client_id": "dddd","active_hours_sum": 0.2} | ||
{"submission_date": "2020-04-06","client_id": "eeee","active_hours_sum": 0.2} | ||
{"submission_date": "2020-04-05","client_id": "aaaa","active_hours_sum": 0.1,"experiments": [{"key": "test-experiment","value": "rollout"}]} | ||
{"submission_date": "2020-04-05","client_id": "bbbb","active_hours_sum": 0.1,"experiments": [{"key": "test-experiment","value": "rollout"}]} | ||
{"submission_date": "2020-04-05","client_id": "cccc","active_hours_sum": 0.9,"experiments": [{"key": "test-experiment","value": "rollout"}]} | ||
{"submission_date": "2020-04-05","client_id": "dddd","active_hours_sum": 1} | ||
{"submission_date": "2020-04-05","client_id": "eeee","active_hours_sum": 0.2} | ||
{"submission_date": "2020-04-04","client_id": "aaaa","active_hours_sum": 0.1,"experiments": [{"key": "test-experiment","value": "rollout"}]} | ||
{"submission_date": "2020-04-04","client_id": "bbbb","active_hours_sum": 0.1,"experiments": [{"key": "test-experiment","value": "rollout"}]} | ||
{"submission_date": "2020-04-04","client_id": "dddd","active_hours_sum": 1} | ||
{"submission_date": "2020-04-04","client_id": "eeee","active_hours_sum": 0.2} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
{"submission_date": "2020-04-02", "client_id": "aaaa", "event_category": "normandy", "event_method": "enroll", "event_string_value": "test-experiment", "event_map_values": [{"key": "branch", "value": "branch1"}]} | ||
{"submission_date": "2020-04-03", "client_id": "bbbb", "event_category": "normandy", "event_method": "enroll", "event_string_value": "test-experiment", "event_map_values": [{"key": "branch", "value": "branch2"}]} | ||
{"submission_date": "2020-04-03", "client_id": "bbbb", "event_category": "foo"} | ||
{"submission_date": "2020-04-03", "client_id": "cccc", "event_category": "foo", "event_method": "enroll", "event_string_value": "test-experiment", "event_map_values": [{"key": "branch", "value": "branch1"}]} | ||
{"submission_date": "2020-04-03", "client_id": "dddd", "event_category": "foo", "event_method": "enroll", "event_string_value": "test-experiment", "event_map_values": [{"key": "branch", "value": "branch2"}]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have tried using a pytest marker for this, however for some reason it gets ignored. As a workaround, I moved all test that would require GCP authentication to a separate directory. This way it is easier for folks to ignore running these tests locally if they are not logged in to GCP.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you mean that it gets ignored? Markers don't change what executes unless you specifically include or exclude them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, it should work by adding an option
-m integration
or-m "not integration"
, but even if I set these options, all test still get executed.