Skip to content

Commit

Permalink
Unit test for downloading whl when default container image is used
Browse files Browse the repository at this point in the history
  • Loading branch information
AnandInguva committed Jan 10, 2022
1 parent e50d40f commit 84a7241
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 5 deletions.
9 changes: 4 additions & 5 deletions sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def create_job_resources(options, # type: PipelineOptions
# downloads the binary distributions
(
populate_requirements_cache if populate_requirements_cache else
Stager._populate_requirements_cache_with_bdists)(
Stager._populate_requirements_cache_with_whl)(
setup_options.requirements_file, requirements_cache_path)
else:
(
Expand All @@ -244,7 +244,7 @@ def create_job_resources(options, # type: PipelineOptions
# downloads the binary distributions
(
populate_requirements_cache if populate_requirements_cache else
Stager._populate_requirements_cache_with_bdists)(
Stager._populate_requirements_cache_with_whl)(
setup_options.requirements_file, requirements_cache_path)
else:
(
Expand Down Expand Up @@ -366,7 +366,6 @@ def create_job_resources(options, # type: PipelineOptions
resources.append(
Stager._create_file_stage_to_artifact(
dataflow_worker_jar, jar_staged_filename))

return resources

def stage_job_resources(self,
Expand Down Expand Up @@ -714,12 +713,12 @@ def _populate_requirements_cache(requirements_file, cache_dir):
@staticmethod
@retry.with_exponential_backoff(
num_retries=4, retry_filter=retry_on_non_zero_exit)
def _populate_requirements_cache_with_bdists(
def _populate_requirements_cache_with_whl(
requirements_file,
cache_dir,
language_implementation_tag='cp',
):
"""Downloads bdists if available. If not, sources would be downloaded.
"""Downloads whl if available. If not, source would be downloaded.
"""
language_version_tag = '%d%d' % (
sys.version_info[0], sys.version_info[1]) # Python version
Expand Down
34 changes: 34 additions & 0 deletions sdks/python/apache_beam/runners/portability/stager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.runners.internal import names
from apache_beam.runners.portability import stager

Expand Down Expand Up @@ -708,6 +709,39 @@ def test_remove_dependency_from_requirements(self):
self.assertEqual(['apache_beam\n', 'avro-python3\n', 'numpy\n'],
sorted(lines))

def test_download_whl_with_default_container_image(self):
staging_dir = self.make_temp_dir()
requirements_cache_dir = self.make_temp_dir()
source_dir = self.make_temp_dir()

options = PipelineOptions()
self.update_options(options)

options.view_as(SetupOptions).requirements_cache = requirements_cache_dir
options.view_as(SetupOptions).requirements_file = os.path.join(
source_dir, stager.REQUIREMENTS_FILE)
self.create_temp_file(
os.path.join(source_dir, stager.REQUIREMENTS_FILE), 'nothing')
# for default container image, the sdk_container_image option would be none
options.view_as(
WorkerOptions).sdk_container_image = None # default value is None

def _create_file(temp_dir, fetch_binary=True, **unused_args):
if fetch_binary:
self.create_temp_file(os.path.join(temp_dir, 'nothing.whl'), 'Fake whl')
else:
self.create_temp_file(
os.path.join(temp_dir, 'nothing.tar.gz'), 'Fake tarball')

with mock.patch('apache_beam.runners.portability.stager_test'
'.stager.Stager._download_pypi_package',
staticmethod(_create_file)):
resources = self.stager.create_and_stage_job_resources(
options, staging_location=staging_dir)[1]
# in the resources, only a whl should be present
for f in resources:
self.assertTrue('.tar.gz' not in f)


class TestStager(stager.Stager):
def stage_artifact(self, local_path_to_artifact, artifact_name, sha256):
Expand Down

0 comments on commit 84a7241

Please sign in to comment.