Skip to content

Commit

Permalink
correct comparison of the job name
Browse files Browse the repository at this point in the history
* re-enabled some tests
* remove delay between retries
* appropriate timeout value
  • Loading branch information
Takashi Matsuo committed Jun 3, 2020
1 parent e931091 commit 7954b18
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 51 deletions.
18 changes: 15 additions & 3 deletions dlp/inspect_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,11 @@ def inspect_gcs_file(

def callback(message):
try:
if message.attributes["DlpJobName"] == operation.name:
# The DlpJobName in the Pub/Sub message has the location indicator
# and we need to remove that part for comparison.
dlp_job_name = message.attributes["DlpJobName"].replace(
'/locations/global', '')
if dlp_job_name == operation.name:
# This is the message we're looking for, so acknowledge it.
message.ack()

Expand Down Expand Up @@ -650,7 +654,11 @@ def inspect_datastore(

def callback(message):
try:
if message.attributes["DlpJobName"] == operation.name:
# The DlpJobName in the Pub/Sub message has the location indicator
# and we need to remove that part for comparison.
dlp_job_name = message.attributes["DlpJobName"].replace(
'/locations/global', '')
if dlp_job_name == operation.name:
# This is the message we're looking for, so acknowledge it.
message.ack()

Expand Down Expand Up @@ -817,7 +825,11 @@ def inspect_bigquery(

def callback(message):
try:
if message.attributes["DlpJobName"] == operation.name:
# The DlpJobName in the Pub/Sub message has the location indicator
# and we need to remove that part for comparison.
dlp_job_name = message.attributes["DlpJobName"].replace(
'/locations/global', '')
if dlp_job_name == operation.name:
# This is the message we're looking for, so acknowledge it.
message.ack()

Expand Down
37 changes: 23 additions & 14 deletions dlp/inspect_content_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
BIGQUERY_DATASET_ID = "dlp_test_dataset" + UNIQUE_STRING
BIGQUERY_TABLE_ID = "dlp_test_table" + UNIQUE_STRING

TIMEOUT = 180 # 3 minutes


@pytest.fixture(scope="module")
def bucket():
Expand Down Expand Up @@ -298,6 +300,7 @@ def cancel_operation(out):
client.cancel_dlp_job(operation_id)


@pytest.mark.flaky(max_runs=2, min_passes=1)
def test_inspect_gcs_file(bucket, topic_id, subscription_id, capsys):
try:
inspect_content.inspect_gcs_file(
Expand All @@ -307,15 +310,16 @@ def test_inspect_gcs_file(bucket, topic_id, subscription_id, capsys):
topic_id,
subscription_id,
["EMAIL_ADDRESS", "PHONE_NUMBER"],
timeout=1
timeout=TIMEOUT
)

out, _ = capsys.readouterr()
assert "Inspection operation started" in out
assert "Info type: EMAIL_ADDRESS" in out
finally:
cancel_operation(out)


@pytest.mark.flaky(max_runs=2, min_passes=1)
def test_inspect_gcs_file_with_custom_info_types(
bucket, topic_id, subscription_id, capsys):
try:
Expand All @@ -331,15 +335,16 @@ def test_inspect_gcs_file_with_custom_info_types(
[],
custom_dictionaries=dictionaries,
custom_regexes=regexes,
timeout=1)
timeout=TIMEOUT)

out, _ = capsys.readouterr()

assert "Inspection operation started" in out
assert "Info type: EMAIL_ADDRESS" in out
finally:
cancel_operation(out)


@pytest.mark.flaky(max_runs=2, min_passes=1)
def test_inspect_gcs_file_no_results(
bucket, topic_id, subscription_id, capsys):
try:
Expand All @@ -350,15 +355,16 @@ def test_inspect_gcs_file_no_results(
topic_id,
subscription_id,
["EMAIL_ADDRESS", "PHONE_NUMBER"],
timeout=1)
timeout=TIMEOUT)

out, _ = capsys.readouterr()

assert "Inspection operation started" in out
assert "No findings" in out
finally:
cancel_operation(out)


@pytest.mark.flaky(max_runs=2, min_passes=1)
def test_inspect_gcs_image_file(bucket, topic_id, subscription_id, capsys):
try:
inspect_content.inspect_gcs_file(
Expand All @@ -368,14 +374,15 @@ def test_inspect_gcs_image_file(bucket, topic_id, subscription_id, capsys):
topic_id,
subscription_id,
["EMAIL_ADDRESS", "PHONE_NUMBER"],
timeout=1)
timeout=TIMEOUT)

out, _ = capsys.readouterr()
assert "Inspection operation started" in out
assert "Info type: EMAIL_ADDRESS" in out
finally:
cancel_operation(out)


@pytest.mark.flaky(max_runs=2, min_passes=1)
def test_inspect_gcs_multiple_files(bucket, topic_id, subscription_id, capsys):
try:
inspect_content.inspect_gcs_file(
Expand All @@ -385,15 +392,16 @@ def test_inspect_gcs_multiple_files(bucket, topic_id, subscription_id, capsys):
topic_id,
subscription_id,
["EMAIL_ADDRESS", "PHONE_NUMBER"],
timeout=1)
timeout=TIMEOUT)

out, _ = capsys.readouterr()

assert "Inspection operation started" in out
assert "Info type: EMAIL_ADDRESS" in out
finally:
cancel_operation(out)


@pytest.mark.flaky(max_runs=2, min_passes=1)
def test_inspect_datastore(
datastore_project, topic_id, subscription_id, capsys):
try:
Expand All @@ -404,14 +412,15 @@ def test_inspect_datastore(
topic_id,
subscription_id,
["FIRST_NAME", "EMAIL_ADDRESS", "PHONE_NUMBER"],
timeout=1)
timeout=TIMEOUT)

out, _ = capsys.readouterr()
assert "Inspection operation started" in out
assert "Info type: EMAIL_ADDRESS" in out
finally:
cancel_operation(out)


@pytest.mark.flaky(max_runs=2, min_passes=1)
def test_inspect_datastore_no_results(
datastore_project, topic_id, subscription_id, capsys):
try:
Expand All @@ -422,10 +431,10 @@ def test_inspect_datastore_no_results(
topic_id,
subscription_id,
["PHONE_NUMBER"],
timeout=1)
timeout=TIMEOUT)

out, _ = capsys.readouterr()
assert "Inspection operation started" in out
assert "No findings" in out
finally:
cancel_operation(out)

Expand Down
30 changes: 25 additions & 5 deletions dlp/risk.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ def numerical_risk_analysis(
operation = dlp.create_dlp_job(parent, risk_job=risk_job)

def callback(message):
if message.attributes["DlpJobName"] == operation.name:
# The DlpJobName in the Pub/Sub message has the location indicator
# and we need to remove that part for comparison.
dlp_job_name = message.attributes["DlpJobName"].replace(
'/locations/global', '')
if dlp_job_name == operation.name:
# This is the message we're looking for, so acknowledge it.
message.ack()

Expand Down Expand Up @@ -196,7 +200,11 @@ def categorical_risk_analysis(
operation = dlp.create_dlp_job(parent, risk_job=risk_job)

def callback(message):
if message.attributes["DlpJobName"] == operation.name:
# The DlpJobName in the Pub/Sub message has the location indicator
# and we need to remove that part for comparison.
dlp_job_name = message.attributes["DlpJobName"].replace(
'/locations/global', '')
if dlp_job_name == operation.name:
# This is the message we're looking for, so acknowledge it.
message.ack()

Expand Down Expand Up @@ -324,7 +332,11 @@ def map_fields(field):
operation = dlp.create_dlp_job(parent, risk_job=risk_job)

def callback(message):
if message.attributes["DlpJobName"] == operation.name:
# The DlpJobName in the Pub/Sub message has the location indicator
# and we need to remove that part for comparison.
dlp_job_name = message.attributes["DlpJobName"].replace(
'/locations/global', '')
if dlp_job_name == operation.name:
# This is the message we're looking for, so acknowledge it.
message.ack()

Expand Down Expand Up @@ -460,7 +472,11 @@ def map_fields(field):
operation = dlp.create_dlp_job(parent, risk_job=risk_job)

def callback(message):
if message.attributes["DlpJobName"] == operation.name:
# The DlpJobName in the Pub/Sub message has the location indicator
# and we need to remove that part for comparison.
dlp_job_name = message.attributes["DlpJobName"].replace(
'/locations/global', '')
if dlp_job_name == operation.name:
# This is the message we're looking for, so acknowledge it.
message.ack()

Expand Down Expand Up @@ -617,7 +633,11 @@ def map_fields(quasi_id, info_type):
operation = dlp.create_dlp_job(parent, risk_job=risk_job)

def callback(message):
if message.attributes["DlpJobName"] == operation.name:
# The DlpJobName in the Pub/Sub message has the location indicator
# and we need to remove that part for comparison.
dlp_job_name = message.attributes["DlpJobName"].replace(
'/locations/global', '')
if dlp_job_name == operation.name:
# This is the message we're looking for, so acknowledge it.
message.ack()

Expand Down
42 changes: 13 additions & 29 deletions dlp/risk_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

import os
import time
import uuid

import google.cloud.bigquery
Expand All @@ -37,14 +36,14 @@
BIGQUERY_TABLE_ID = "dlp_test_table" + UNIQUE_STRING
BIGQUERY_HARMFUL_TABLE_ID = "harmful" + UNIQUE_STRING

TIMEOUT = 180 # 3 minutes
TIMEOUT = 60 # 1 minutes


# Create new custom topic/subscription
# We observe sometimes all the tests in this file fail. In a
# hypothesis where DLP service somehow loses the connection to the
# topic, now we use function scope for Pub/Sub fixtures.
@pytest.fixture(scope="function")
@pytest.fixture(scope="module")
def topic_id():
# Creates a pubsub topic, and tears it down.
publisher = google.cloud.pubsub.PublisherClient()
Expand All @@ -59,7 +58,7 @@ def topic_id():
publisher.delete_topic(topic_path)


@pytest.fixture(scope="function")
@pytest.fixture(scope="module")
def subscription_id(topic_id):
# Subscribes to a topic.
subscriber = google.cloud.pubsub.SubscriberClient()
Expand Down Expand Up @@ -166,22 +165,7 @@ def bigquery_project():
bigquery_client.delete_dataset(dataset_ref, delete_contents=True)


def delay(err, *args):
# 20 mins of delay. This sounds like too long a delay, but we
# occasionally observe consequtive time block where operations are
# slow which leads to the test failures. These situations tend to
# get self healed in 20 minutes or so, so I'm trying this strategy.
#
# There are 10 tests, so we don't want the retry delay happening
# for all the tests. When we exhaust the MAX_FLAKY_WAIT, we retry
# the test immediately.
wait_time = min(pytest.MAX_FLAKY_WAIT, 60*20)
pytest.MAX_FLAKY_WAIT -= wait_time
time.sleep(wait_time)
return True


@pytest.mark.flaky(max_runs=2, min_passes=1, rerun_filter=delay)
@pytest.mark.flaky(max_runs=2, min_passes=1)
def test_numerical_risk_analysis(
topic_id, subscription_id, bigquery_project, capsys
):
Expand All @@ -200,7 +184,7 @@ def test_numerical_risk_analysis(
assert "Value Range:" in out


@pytest.mark.flaky(max_runs=2, min_passes=1, rerun_filter=delay)
@pytest.mark.flaky(max_runs=2, min_passes=1)
def test_categorical_risk_analysis_on_string_field(
topic_id, subscription_id, bigquery_project, capsys
):
Expand All @@ -219,7 +203,7 @@ def test_categorical_risk_analysis_on_string_field(
assert "Most common value occurs" in out


@pytest.mark.flaky(max_runs=2, min_passes=1, rerun_filter=delay)
@pytest.mark.flaky(max_runs=2, min_passes=1)
def test_categorical_risk_analysis_on_number_field(
topic_id, subscription_id, bigquery_project, capsys
):
Expand All @@ -238,7 +222,7 @@ def test_categorical_risk_analysis_on_number_field(
assert "Most common value occurs" in out


@pytest.mark.flaky(max_runs=2, min_passes=1, rerun_filter=delay)
@pytest.mark.flaky(max_runs=2, min_passes=1)
def test_k_anonymity_analysis_single_field(
topic_id, subscription_id, bigquery_project, capsys
):
Expand All @@ -258,7 +242,7 @@ def test_k_anonymity_analysis_single_field(
assert "Class size:" in out


@pytest.mark.flaky(max_runs=2, min_passes=1, rerun_filter=delay)
@pytest.mark.flaky(max_runs=2, min_passes=1)
def test_k_anonymity_analysis_multiple_fields(
topic_id, subscription_id, bigquery_project, capsys
):
Expand All @@ -278,7 +262,7 @@ def test_k_anonymity_analysis_multiple_fields(
assert "Class size:" in out


@pytest.mark.flaky(max_runs=2, min_passes=1, rerun_filter=delay)
@pytest.mark.flaky(max_runs=2, min_passes=1)
def test_l_diversity_analysis_single_field(
topic_id, subscription_id, bigquery_project, capsys
):
Expand All @@ -300,7 +284,7 @@ def test_l_diversity_analysis_single_field(
assert "Sensitive value" in out


@pytest.mark.flaky(max_runs=2, min_passes=1, rerun_filter=delay)
@pytest.mark.flaky(max_runs=2, min_passes=1)
def test_l_diversity_analysis_multiple_field(
topic_id, subscription_id, bigquery_project, capsys
):
Expand All @@ -322,7 +306,7 @@ def test_l_diversity_analysis_multiple_field(
assert "Sensitive value" in out


@pytest.mark.flaky(max_runs=2, min_passes=1, rerun_filter=delay)
@pytest.mark.flaky(max_runs=2, min_passes=1)
def test_k_map_estimate_analysis_single_field(
topic_id, subscription_id, bigquery_project, capsys
):
Expand All @@ -344,7 +328,7 @@ def test_k_map_estimate_analysis_single_field(
assert "Values" in out


@pytest.mark.flaky(max_runs=2, min_passes=1, rerun_filter=delay)
@pytest.mark.flaky(max_runs=2, min_passes=1)
def test_k_map_estimate_analysis_multiple_field(
topic_id, subscription_id, bigquery_project, capsys
):
Expand All @@ -366,7 +350,7 @@ def test_k_map_estimate_analysis_multiple_field(
assert "Values" in out


@pytest.mark.flaky(max_runs=2, min_passes=1, rerun_filter=delay)
@pytest.mark.flaky(max_runs=2, min_passes=1)
def test_k_map_estimate_analysis_quasi_ids_info_types_equal(
topic_id, subscription_id, bigquery_project
):
Expand Down

0 comments on commit 7954b18

Please sign in to comment.