From 8b17f91a52258e4a38bcc9da11aa75ed3c03e97d Mon Sep 17 00:00:00 2001 From: Chamikara Jayalath Date: Mon, 3 Jul 2023 16:31:03 -0700 Subject: [PATCH] Consider beam_services option when determining the expansion service type --- sdks/python/apache_beam/transforms/external.py | 8 ++++++++ sdks/python/apache_beam/utils/subprocess_server.py | 7 ++++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py index 5182293ed591..9a4b84c8253f 100644 --- a/sdks/python/apache_beam/transforms/external.py +++ b/sdks/python/apache_beam/transforms/external.py @@ -885,6 +885,9 @@ def __init__( self._service_count = 0 self._append_args = append_args or [] + def is_existing_service(self): + return subprocess_server.is_service_endpoint(self._path_to_jar) + @staticmethod def _expand_jars(jar): if glob.glob(jar): @@ -987,6 +990,11 @@ def _maybe_use_transform_service(provided_service=None, options=None): if not isinstance(provided_service, JavaJarExpansionService): return provided_service + if provided_service.is_existing_service(): + # This is an existing service supported through the 'beam_services' + # PipelineOption. + return provided_service + def is_java_available(): cmd = ['java', '--version'] diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py index f11132ca1643..f566c3ea2914 100644 --- a/sdks/python/apache_beam/utils/subprocess_server.py +++ b/sdks/python/apache_beam/utils/subprocess_server.py @@ -169,7 +169,7 @@ def __init__(self, stub_class, path_to_jar, java_arguments, classpath=None): path_to_jar = self.make_classpath_jar(path_to_jar, classpath) super().__init__( stub_class, ['java', '-jar', path_to_jar] + list(java_arguments)) - self._existing_service = path_to_jar if _is_service_endpoint( + self._existing_service = path_to_jar if is_service_endpoint( path_to_jar) else None def start_process(self): @@ -258,7 +258,7 @@ def local_jar(cls, url, cache_dir=None): if cache_dir is None: cache_dir = cls.JAR_CACHE # TODO: Verify checksum? - if _is_service_endpoint(url): + if is_service_endpoint(url): return url elif os.path.exists(url): return url @@ -330,7 +330,8 @@ def make_classpath_jar(cls, main_jar, extra_jars, cache_dir=None): return composite_jar -def _is_service_endpoint(path): +def is_service_endpoint(path): + """Checks whether the path conforms to the 'beam_services' PipelineOption.""" return re.match(r'^[a-zA-Z0-9.-]+:\d+$', path)