diff --git a/sdks/python/apache_beam/io/hadoopfilesystem.py b/sdks/python/apache_beam/io/hadoopfilesystem.py index da5c57d7ad67..c34deadec87d 100644 --- a/sdks/python/apache_beam/io/hadoopfilesystem.py +++ b/sdks/python/apache_beam/io/hadoopfilesystem.py @@ -44,6 +44,7 @@ _HDFS_PREFIX = 'hdfs:/' _URL_RE = re.compile(r'^' + _HDFS_PREFIX + r'(/.*)') +_FULL_URL_RE = re.compile(r'^' + _HDFS_PREFIX + r'/([^/]+)(/.*)*') _COPY_BUFFER_SIZE = 2**16 _DEFAULT_BUFFER_SIZE = 20 * 1024 * 1024 @@ -116,10 +117,12 @@ def __init__(self, pipeline_options): hdfs_host = hdfs_options.hdfs_host hdfs_port = hdfs_options.hdfs_port hdfs_user = hdfs_options.hdfs_user + self._full_urls = hdfs_options.hdfs_full_urls else: hdfs_host = pipeline_options.get('hdfs_host') hdfs_port = pipeline_options.get('hdfs_port') hdfs_user = pipeline_options.get('hdfs_user') + self._full_urls = pipeline_options.get('hdfs_full_urls', False) if hdfs_host is None: raise ValueError('hdfs_host is not set') @@ -127,6 +130,9 @@ def __init__(self, pipeline_options): raise ValueError('hdfs_port is not set') if hdfs_user is None: raise ValueError('hdfs_user is not set') + if not isinstance(self._full_urls, bool): + raise ValueError( + 'hdfs_full_urls should be bool, got: %s', self._full_urls) self._hdfs_client = hdfs.InsecureClient( 'http://%s:%s' % (hdfs_host, str(hdfs_port)), user=hdfs_user) @@ -134,24 +140,35 @@ def __init__(self, pipeline_options): def scheme(cls): return 'hdfs' - @staticmethod - def _parse_url(url): + def _parse_url(self, url): """Verifies that url begins with hdfs:// prefix, strips it and adds a leading /. - Raises: - ValueError if url doesn't begin with hdfs://. + Parsing behavior is determined by HadoopFileSystemOptions.hdfs_full_urls. Args: - url: A URL in the form hdfs://path/... + url: (str) A URL in the form hdfs://path/... + or in the form hdfs://server/path/... + + Raises: + ValueError if the URL doesn't match the expect format. Returns: - For an input of 'hdfs://path/...', will return '/path/...'. + (str, str) If using hdfs_full_urls, for an input of + 'hdfs://server/path/...' will return (server, '/path/...'). + Otherwise, for an input of 'hdfs://path/...', will return + ('', '/path/...'). """ - m = _URL_RE.match(url) - if m is None: - raise ValueError('Could not parse url: %s' % url) - return m.group(1) + if not self._full_urls: + m = _URL_RE.match(url) + if m is None: + raise ValueError('Could not parse url: %s' % url) + return '', m.group(1) + else: + m = _FULL_URL_RE.match(url) + if m is None: + raise ValueError('Could not parse url: %s' % url) + return m.group(1), m.group(2) or '/' def join(self, base_url, *paths): """Join two or more pathname components. @@ -164,19 +181,24 @@ def join(self, base_url, *paths): Returns: Full url after combining all the passed components. """ - basepath = self._parse_url(base_url) - return _HDFS_PREFIX + self._join(basepath, *paths) + server, basepath = self._parse_url(base_url) + return _HDFS_PREFIX + self._join(server, basepath, *paths) - def _join(self, basepath, *paths): - return posixpath.join(basepath, *paths) + def _join(self, server, basepath, *paths): + res = posixpath.join(basepath, *paths) + if server: + server = '/' + server + return server + res def split(self, url): - rel_path = self._parse_url(url) + server, rel_path = self._parse_url(url) + if server: + server = '/' + server head, tail = posixpath.split(rel_path) - return _HDFS_PREFIX + head, tail + return _HDFS_PREFIX + server + head, tail def mkdirs(self, url): - path = self._parse_url(url) + _, path = self._parse_url(url) if self._exists(path): raise BeamIOError('Path already exists: %s' % path) return self._mkdirs(path) @@ -189,10 +211,10 @@ def has_dirs(self): def _list(self, url): try: - path = self._parse_url(url) + server, path = self._parse_url(url) for res in self._hdfs_client.list(path, status=True): yield FileMetadata( - _HDFS_PREFIX + self._join(path, res[0]), + _HDFS_PREFIX + self._join(server, path, res[0]), res[1][_FILE_STATUS_LENGTH]) except Exception as e: # pylint: disable=broad-except raise BeamIOError('List operation failed', {url: e}) @@ -222,7 +244,7 @@ def create( Returns: A Python File-like object. """ - path = self._parse_url(url) + _, path = self._parse_url(url) return self._create(path, mime_type, compression_type) def _create( @@ -246,7 +268,7 @@ def open( Returns: A Python File-like object. """ - path = self._parse_url(url) + _, path = self._parse_url(url) return self._open(path, mime_type, compression_type) def _open( @@ -293,7 +315,7 @@ def _copy_path(source, destination): for path, dirs, files in self._hdfs_client.walk(source): for dir in dirs: - new_dir = self._join(destination, dir) + new_dir = self._join('', destination, dir) if not self._exists(new_dir): self._mkdirs(new_dir) @@ -302,13 +324,14 @@ def _copy_path(source, destination): rel_path = '' for file in files: _copy_file( - self._join(path, file), self._join(destination, rel_path, file)) + self._join('', path, file), + self._join('', destination, rel_path, file)) exceptions = {} for source, destination in zip(source_file_names, destination_file_names): try: - rel_source = self._parse_url(source) - rel_destination = self._parse_url(destination) + _, rel_source = self._parse_url(source) + _, rel_destination = self._parse_url(destination) _copy_path(rel_source, rel_destination) except Exception as e: # pylint: disable=broad-except exceptions[(source, destination)] = e @@ -320,8 +343,8 @@ def rename(self, source_file_names, destination_file_names): exceptions = {} for source, destination in zip(source_file_names, destination_file_names): try: - rel_source = self._parse_url(source) - rel_destination = self._parse_url(destination) + _, rel_source = self._parse_url(source) + _, rel_destination = self._parse_url(destination) try: self._hdfs_client.rename(rel_source, rel_destination) except hdfs.HdfsError as e: @@ -344,7 +367,7 @@ def exists(self, url): Returns: True if url exists as a file or directory in HDFS. """ - path = self._parse_url(url) + _, path = self._parse_url(url) return self._exists(path) def _exists(self, path): @@ -356,7 +379,7 @@ def _exists(self, path): return self._hdfs_client.status(path, strict=False) is not None def size(self, url): - path = self._parse_url(url) + _, path = self._parse_url(url) status = self._hdfs_client.status(path, strict=False) if status is None: raise BeamIOError('File not found: %s' % url) @@ -371,7 +394,7 @@ def checksum(self, url): Returns: String describing the checksum. """ - path = self._parse_url(url) + _, path = self._parse_url(url) file_checksum = self._hdfs_client.checksum(path) return '%s-%d-%s' % ( file_checksum[_FILE_CHECKSUM_ALGORITHM], @@ -383,7 +406,7 @@ def delete(self, urls): exceptions = {} for url in urls: try: - path = self._parse_url(url) + _, path = self._parse_url(url) self._hdfs_client.delete(path, recursive=True) except Exception as e: # pylint: disable=broad-except exceptions[url] = e diff --git a/sdks/python/apache_beam/io/hadoopfilesystem_test.py b/sdks/python/apache_beam/io/hadoopfilesystem_test.py index aa99fa678b9c..9a1b1387ad47 100644 --- a/sdks/python/apache_beam/io/hadoopfilesystem_test.py +++ b/sdks/python/apache_beam/io/hadoopfilesystem_test.py @@ -31,6 +31,7 @@ # patches unittest.TestCase to be python3 compatible import future.tests.base # pylint: disable=unused-import from future.utils import itervalues +from parameterized import parameterized_class from apache_beam.io import hadoopfilesystem as hdfs from apache_beam.io.filesystem import BeamIOError @@ -203,6 +204,7 @@ def checksum(self, path): return f.get_file_checksum() +@parameterized_class(('full_urls', ), [(False, ), (True, )]) class HadoopFileSystemTest(unittest.TestCase): @classmethod def setUpClass(cls): @@ -220,7 +222,11 @@ def setUp(self): hdfs_options.hdfs_user = '' self.fs = hdfs.HadoopFileSystem(pipeline_options) - self.tmpdir = 'hdfs://test_dir' + self.fs._full_urls = self.full_urls + if self.full_urls: + self.tmpdir = 'hdfs://test_dir' + else: + self.tmpdir = 'hdfs://server/test_dir' for filename in ['old_file1', 'old_file2']: url = self.fs.join(self.tmpdir, filename) @@ -230,6 +236,32 @@ def test_scheme(self): self.assertEqual(self.fs.scheme(), 'hdfs') self.assertEqual(hdfs.HadoopFileSystem.scheme(), 'hdfs') + def test_parse_url(self): + cases = [ + ('hdfs://', ('', '/'), False), + ('hdfs://', None, True), + ('hdfs://a', ('', '/a'), False), + ('hdfs://a', ('a', '/'), True), + ('hdfs://a/', ('', '/a/'), False), + ('hdfs://a/', ('a', '/'), True), + ('hdfs://a/b', ('', '/a/b'), False), + ('hdfs://a/b', ('a', '/b'), True), + ('hdfs://a/b/', ('', '/a/b/'), False), + ('hdfs://a/b/', ('a', '/b/'), True), + ('hdfs:/a/b', None, False), + ('hdfs:/a/b', None, True), + ('invalid', None, False), + ('invalid', None, True), + ] + for url, expected, full_urls in cases: + try: + self.fs._full_urls = full_urls + result = self.fs._parse_url(url) + except ValueError: + self.assertIsNone(expected, msg=(url, expected, full_urls)) + continue + self.assertEqual(expected, result, msg=(url, expected, full_urls)) + def test_url_join(self): self.assertEqual( 'hdfs://tmp/path/to/file', @@ -237,15 +269,29 @@ def test_url_join(self): self.assertEqual( 'hdfs://tmp/path/to/file', self.fs.join('hdfs://tmp/path', 'to/file')) self.assertEqual('hdfs://tmp/path/', self.fs.join('hdfs://tmp/path/', '')) - self.assertEqual('hdfs://bar', self.fs.join('hdfs://foo', '/bar')) - with self.assertRaises(ValueError): - self.fs.join('/no/scheme', 'file') + + if not self.full_urls: + self.assertEqual('hdfs://bar', self.fs.join('hdfs://foo', '/bar')) + self.assertEqual('hdfs://bar', self.fs.join('hdfs://foo/', '/bar')) + with self.assertRaises(ValueError): + self.fs.join('/no/scheme', 'file') + else: + self.assertEqual('hdfs://foo/bar', self.fs.join('hdfs://foo', '/bar')) + self.assertEqual('hdfs://foo/bar', self.fs.join('hdfs://foo/', '/bar')) def test_url_split(self): self.assertEqual(('hdfs://tmp/path/to', 'file'), self.fs.split('hdfs://tmp/path/to/file')) - self.assertEqual(('hdfs://', 'tmp'), self.fs.split('hdfs://tmp')) - self.assertEqual(('hdfs://tmp', ''), self.fs.split('hdfs://tmp/')) + if not self.full_urls: + self.assertEqual(('hdfs://', 'tmp'), self.fs.split('hdfs://tmp')) + self.assertEqual(('hdfs://tmp', ''), self.fs.split('hdfs://tmp/')) + self.assertEqual(('hdfs://tmp', 'a'), self.fs.split('hdfs://tmp/a')) + else: + self.assertEqual(('hdfs://tmp/', ''), self.fs.split('hdfs://tmp')) + self.assertEqual(('hdfs://tmp/', ''), self.fs.split('hdfs://tmp/')) + self.assertEqual(('hdfs://tmp/', 'a'), self.fs.split('hdfs://tmp/a')) + + self.assertEqual(('hdfs://tmp/a', ''), self.fs.split('hdfs://tmp/a/')) with self.assertRaisesRegex(ValueError, r'parse'): self.fs.split('tmp') @@ -329,7 +375,7 @@ def test_create_success(self): url = self.fs.join(self.tmpdir, 'new_file') handle = self.fs.create(url) self.assertIsNotNone(handle) - url = self.fs._parse_url(url) + _, url = self.fs._parse_url(url) expected_file = FakeFile(url, 'wb') self.assertEqual(self._fake_hdfs.files[url], expected_file) @@ -338,7 +384,7 @@ def test_create_write_read_compressed(self): handle = self.fs.create(url) self.assertIsNotNone(handle) - path = self.fs._parse_url(url) + _, path = self.fs._parse_url(url) expected_file = FakeFile(path, 'wb') self.assertEqual(self._fake_hdfs.files[path], expected_file) data = b'abc' * 10 @@ -535,7 +581,7 @@ def test_delete_error(self): url2 = self.fs.join(self.tmpdir, 'old_file1') self.assertTrue(self.fs.exists(url2)) - path1 = self.fs._parse_url(url1) + _, path1 = self.fs._parse_url(url1) with self.assertRaisesRegex(BeamIOError, r'^Delete operation failed .* %s' % path1): self.fs.delete([url1, url2]) @@ -545,9 +591,11 @@ def test_delete_error(self): class HadoopFileSystemRuntimeValueProviderTest(unittest.TestCase): """Tests pipeline_options, in the form of a RuntimeValueProvider.runtime_options object.""" - def test_dict_options(self): + def setUp(self): self._fake_hdfs = FakeHdfs() hdfs.hdfs.InsecureClient = (lambda *args, **kwargs: self._fake_hdfs) + + def test_dict_options(self): pipeline_options = { 'hdfs_host': '', 'hdfs_port': 0, @@ -555,11 +603,9 @@ def test_dict_options(self): } self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options) + self.assertFalse(self.fs._full_urls) def test_dict_options_missing(self): - self._fake_hdfs = FakeHdfs() - hdfs.hdfs.InsecureClient = (lambda *args, **kwargs: self._fake_hdfs) - with self.assertRaisesRegex(ValueError, r'hdfs_host'): self.fs = hdfs.HadoopFileSystem( pipeline_options={ @@ -581,6 +627,21 @@ def test_dict_options_missing(self): 'hdfs_port': 0, }) + def test_dict_options_full_urls(self): + pipeline_options = { + 'hdfs_host': '', + 'hdfs_port': 0, + 'hdfs_user': '', + 'hdfs_full_urls': 'invalid', + } + + with self.assertRaisesRegex(ValueError, r'hdfs_full_urls'): + self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options) + + pipeline_options['hdfs_full_urls'] = True + self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options) + self.assertTrue(self.fs._full_urls) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index befd93b31e77..2812cb69d43c 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -633,6 +633,14 @@ def _add_argparse_args(cls, parser): '--hdfs_port', default=None, help=('Port of the HDFS namenode.')) parser.add_argument( '--hdfs_user', default=None, help=('HDFS username to use.')) + parser.add_argument( + '--hdfs_full_urls', + default=False, + action='store_true', + help=( + 'If set, URLs will be parsed as "hdfs://server/path/...", instead ' + 'of "hdfs://path/...". The "server" part will be unused (use ' + '--hdfs_host and --hdfs_port).')) def validate(self, validator): errors = [] diff --git a/sdks/python/setup.py b/sdks/python/setup.py index d32a7509e18d..51812031094f 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -180,7 +180,7 @@ def get_version(): 'nose>=1.3.7', 'nose_xunitmp>=0.4.1', 'pandas>=0.23.4,<0.25', - 'parameterized>=0.6.0,<0.8.0', + 'parameterized>=0.7.1,<0.8.0', # pyhamcrest==1.10.0 doesn't work on Py2. Beam still supports Py2. # See: https://github.com/hamcrest/PyHamcrest/issues/131. 'pyhamcrest>=1.9,!=1.10.0,<2.0.0', diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index cb16df8337d6..62fd65e47ef3 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -339,6 +339,10 @@ commands = --input hdfs://kinglear* \ --output hdfs://py-wordcount-integration \ --hdfs_host namenode --hdfs_port 50070 --hdfs_user root + python -m apache_beam.examples.wordcount \ + --input hdfs://unused_server/kinglear* \ + --output hdfs://unused_server/py-wordcount-integration \ + --hdfs_host namenode --hdfs_port 50070 --hdfs_user root --hdfs_full_urls # Disable pip check. TODO: remove this once gsutil does not conflict with # apache_beam (oauth2client). commands_pre =