Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Give corpus pruning enough time to complete #4396

Merged
merged 4 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/clusterfuzz/_internal/base/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ def is_done_collecting_messages():
def get_postprocess_task():
"""Gets a postprocess task if one exists."""
# This should only be run on non-preemptible bots.
if not (task_utils.is_remotely_executing_utasks() or
if not (task_utils.is_remotely_executing_utasks() and
task_utils.get_opted_in_tasks()):
return None
# Postprocess is platform-agnostic, so we run all such tasks on our
Expand Down
33 changes: 25 additions & 8 deletions src/clusterfuzz/_internal/google_cloud_utils/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from google.cloud import batch_v1 as batch

from clusterfuzz._internal.base import retry
from clusterfuzz._internal.base import tasks
from clusterfuzz._internal.base import utils
from clusterfuzz._internal.base.tasks import task_utils
from clusterfuzz._internal.config import local_config
Expand All @@ -33,7 +34,6 @@

_local = threading.local()

MAX_DURATION = f'{60 * 60 * 6}s'
RETRY_COUNT = 0

TASK_BUNCH_SIZE = 20
Expand All @@ -46,9 +46,20 @@
MAX_CONCURRENT_VMS_PER_JOB = 1000

BatchWorkloadSpec = collections.namedtuple('BatchWorkloadSpec', [
'clusterfuzz_release', 'disk_size_gb', 'disk_type', 'docker_image',
'user_data', 'service_account_email', 'subnetwork', 'preemptible',
'project', 'gce_zone', 'machine_type', 'network', 'gce_region'
'clusterfuzz_release',
'disk_size_gb',
'disk_type',
'docker_image',
'user_data',
'service_account_email',
'subnetwork',
'preemptible',
'project',
'gce_zone',
'machine_type',
'network',
'gce_region',
'max_run_duration',
])


Expand Down Expand Up @@ -158,7 +169,7 @@ def _get_task_spec(batch_workload_spec):
task_spec = batch.TaskSpec()
task_spec.runnables = [runnable]
task_spec.max_retry_count = RETRY_COUNT
task_spec.max_run_duration = MAX_DURATION
task_spec.max_run_duration = batch_workload_spec.max_duration
return task_spec


Expand Down Expand Up @@ -219,8 +230,7 @@ def _create_job(spec, input_urls):
create_request.job_id = job_name
# The job's parent is the region in which the job will run
project_id = spec.project
create_request.parent = (
f'projects/{project_id}/locations/{spec.gce_region}')
create_request.parent = f'projects/{project_id}/locations/{spec.gce_region}'
job_result = _send_create_job_request(create_request)
logs.info(f'Created batch job id={job_name}.', spec=spec)
return job_result
Expand Down Expand Up @@ -274,6 +284,11 @@ def _get_config_name(command, job_name):
return config_name


def _get_task_duration(command):
return tasks.TASK_LEASE_SECONDS_BY_COMMAND.get(command,
tasks.TASK_LEASE_SECONDS)


def _get_spec_from_config(command, job_name):
"""Gets the configured specifications for a batch workload."""
config_name = _get_config_name(command, job_name)
Expand All @@ -285,6 +300,7 @@ def _get_spec_from_config(command, job_name):
docker_image = instance_spec['docker_image']
user_data = instance_spec['user_data']
clusterfuzz_release = instance_spec.get('clusterfuzz_release', 'prod')
max_run_duration = f'{_get_task_duration(command)}s'
spec = BatchWorkloadSpec(
clusterfuzz_release=clusterfuzz_release,
docker_image=docker_image,
Expand All @@ -298,5 +314,6 @@ def _get_spec_from_config(command, job_name):
network=instance_spec['network'],
subnetwork=instance_spec['subnetwork'],
preemptible=instance_spec['preemptible'],
machine_type=instance_spec['machine_type'])
machine_type=instance_spec['machine_type'],
max_run_duration=max_run_duration)
return spec
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,22 @@
from clusterfuzz._internal.google_cloud_utils import batch
from clusterfuzz._internal.tests.test_libs import test_utils

# pylint: disable=protected-access


@test_utils.with_cloud_emulators('datastore')
class GetSpecFromConfigTest(unittest.TestCase):
"""Tests for get_spec_from_config."""

def setUp(self):
self.maxDiff = None
self.job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX')
self.job.put()

def test_nonpreemptible_get_spec_from_config(self):
def test_nonpreemptible(self):
"""Tests that get_spec_from_config works for non-preemptibles as
expected."""
job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX')
job.put()
spec = batch._get_spec_from_config('corpus_pruning', job.name) # pylint: disable=protected-access
spec = batch._get_spec_from_config('analyze', self.job.name)
expected_spec = batch.BatchWorkloadSpec(
clusterfuzz_release='prod',
docker_image='gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654',
Expand All @@ -47,15 +49,14 @@ def test_nonpreemptible_get_spec_from_config(self):
gce_zone='gce-zone',
project='test-clusterfuzz',
preemptible=False,
machine_type='n1-standard-1')
machine_type='n1-standard-1',
max_run_duration='21600s')

self.assertCountEqual(spec, expected_spec)

def test_preemptible_get_spec_from_config(self):
def test_preemptible(self):
"""Tests that get_spec_from_config works for preemptibles as expected."""
job = data_types.Job(name='libfuzzer_chrome_asan', platform='LINUX')
job.put()
spec = batch._get_spec_from_config('fuzz', job.name) # pylint: disable=protected-access
spec = batch._get_spec_from_config('fuzz', self.job.name)
expected_spec = batch.BatchWorkloadSpec(
clusterfuzz_release='prod',
docker_image='gcr.io/clusterfuzz-images/base:a2f4dd6-202202070654',
Expand All @@ -70,6 +71,21 @@ def test_preemptible_get_spec_from_config(self):
gce_region='gce-region',
project='test-clusterfuzz',
preemptible=True,
machine_type='n1-standard-1')
machine_type='n1-standard-1',
max_run_duration='21600s')

self.assertCountEqual(spec, expected_spec)

def test_corpus_pruning(self):
"""Tests that corpus pruning uses a spec of 24 hours and a different one
than normal."""
pruning_spec = batch._get_spec_from_config('corpus_pruning', self.job.name)
self.assertEqual(pruning_spec.max_run_duration, f'{24 * 60 * 60}s')
normal_spec = batch._get_spec_from_config('analyze', self.job.name)
self.assertNotEqual(pruning_spec, normal_spec)
job = data_types.Job(name='libfuzzer_chrome_msan', platform='LINUX')
job.put()
# This behavior is important for grouping batch alike tasks into a single
# batch job.
pruning_spec2 = batch._get_spec_from_config('corpus_pruning', job.name)
self.assertEqual(pruning_spec, pruning_spec2)
Loading