Skip to content

Commit

Permalink
GH-39968: [Python][FS][Azure] Minimal Python bindings for `AzureFileS…
Browse files Browse the repository at this point in the history
…ystem` (#40021)

### Rationale for this change
We want to use the new `AzureFileSystem` in `pyarrow`. 

### What changes are included in this PR?
- Add minimal python bindings for `AzureFileSystem`. This includes just enough to run the python tests against azurite plus default credential auth to enable real use of this once this PR merges. 
- Adding additional configuration options and remaining authentication options can be done as a follow up. 
- I tried to copy the existing pybinds for GCS and S3
- Explicitly set `ARROW_AZURE=OFF` rather than relying on defaults. The defaults are different for builds vs tests so this was causing tests to be enabled while Azure was disabled during the build.

### Are these changes tested?
Enabled the the python filesystem tests for the new filesystem. I had to skip azure in a couple of the tests though because they are not yet working on the C++ side. I created Github issues to resolve these #40025 and #40026 and added TODO comments where relevant, that reference these Github issues. 

### Are there any user-facing changes?
`pyarrow` users can now use the native `AzureFileSystem` to get much better reliability and performance compared to `adlfs` based options. 

* Closes: #39968
* GitHub Issue: #39968

Lead-authored-by: Thomas Newton <thomas.w.newton@gmail.com>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
  • Loading branch information
3 people authored Mar 13, 2024
1 parent dd6d728 commit 9f6dc1f
Show file tree
Hide file tree
Showing 19 changed files with 303 additions and 7 deletions.
1 change: 1 addition & 0 deletions ci/docker/alpine-linux-3.16-cpp.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ COPY ci/scripts/install_gcs_testbench.sh /arrow/ci/scripts/
RUN /arrow/ci/scripts/install_gcs_testbench.sh default

ENV ARROW_ACERO=ON \
ARROW_AZURE=OFF \
ARROW_BUILD_TESTS=ON \
ARROW_DATASET=ON \
ARROW_DEPENDENCY_SOURCE=SYSTEM \
Expand Down
1 change: 1 addition & 0 deletions ci/docker/fedora-39-cpp.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ RUN /arrow/ci/scripts/install_sccache.sh unknown-linux-musl /usr/local/bin
# Python process explicitly if we use LLVM 17 or later.
ENV absl_SOURCE=BUNDLED \
ARROW_ACERO=ON \
ARROW_AZURE=OFF \
ARROW_BUILD_TESTS=ON \
ARROW_DEPENDENCY_SOURCE=SYSTEM \
ARROW_DATASET=ON \
Expand Down
1 change: 1 addition & 0 deletions ci/docker/linux-apt-docs.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ RUN /arrow/ci/scripts/r_deps.sh /arrow && \
R -e "install.packages('pkgdown')"

ENV ARROW_ACERO=ON \
ARROW_AZURE=OFF \
ARROW_BUILD_STATIC=OFF \
ARROW_BUILD_TESTS=OFF \
ARROW_BUILD_UTILITIES=OFF \
Expand Down
1 change: 1 addition & 0 deletions ci/docker/ubuntu-20.04-cpp-minimal.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ COPY ci/scripts/install_sccache.sh /arrow/ci/scripts/
RUN /arrow/ci/scripts/install_sccache.sh unknown-linux-musl /usr/local/bin

ENV ARROW_ACERO=ON \
ARROW_AZURE=OFF \
ARROW_BUILD_TESTS=ON \
ARROW_DATASET=ON \
ARROW_FLIGHT=ON \
Expand Down
1 change: 1 addition & 0 deletions ci/docker/ubuntu-22.04-cpp-minimal.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ COPY ci/scripts/install_sccache.sh /arrow/ci/scripts/
RUN /arrow/ci/scripts/install_sccache.sh unknown-linux-musl /usr/local/bin

ENV ARROW_ACERO=ON \
ARROW_AZURE=OFF \
ARROW_BUILD_TESTS=ON \
ARROW_DATASET=ON \
ARROW_FLIGHT=ON \
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/arrow/filesystem/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
#include "arrow/util/config.h" // IWYU pragma: export

#include "arrow/filesystem/filesystem.h" // IWYU pragma: export
#include "arrow/filesystem/hdfs.h" // IWYU pragma: export
#ifdef ARROW_AZURE
#include "arrow/filesystem/azurefs.h" // IWYU pragma: export
#endif
#ifdef ARROW_GCS
#include "arrow/filesystem/gcsfs.h" // IWYU pragma: export
#endif
#include "arrow/filesystem/hdfs.h" // IWYU pragma: export
#include "arrow/filesystem/localfs.h" // IWYU pragma: export
#include "arrow/filesystem/mockfs.h" // IWYU pragma: export
#ifdef ARROW_S3
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/filesystem/azurefs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2458,6 +2458,7 @@ TEST_F(TestAzuriteFileSystem, WriteMetadata) {
ASSERT_OK(output->Close());

// Verify the metadata has been set.
// TODO(GH-40025): Use `AzureFileSystem` to fetch metadata for this assertion.
auto blob_metadata = blob_service_client_->GetBlobContainerClient(data.container_name)
.GetBlockBlobClient(blob_path)
.GetProperties()
Expand All @@ -2470,6 +2471,7 @@ TEST_F(TestAzuriteFileSystem, WriteMetadata) {
full_path, /*metadata=*/arrow::key_value_metadata({{"bar", "foo"}})));
ASSERT_OK(output->Write(expected));
ASSERT_OK(output->Close());
// TODO(GH-40025): Use `AzureFileSystem` to fetch metadata for this assertion.
blob_metadata = blob_service_client_->GetBlobContainerClient(data.container_name)
.GetBlockBlobClient(blob_path)
.GetProperties()
Expand Down
7 changes: 4 additions & 3 deletions cpp/src/arrow/filesystem/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ struct FileInfo;
struct FileSelector;

class FileSystem;
class SubTreeFileSystem;
class SlowFileSystem;
class AzureFileSystem;
class GcsFileSystem;
class LocalFileSystem;
class S3FileSystem;
class GcsFileSystem;
class SlowFileSystem;
class SubTreeFileSystem;

} // namespace fs
} // namespace arrow
1 change: 1 addition & 0 deletions cpp/src/arrow/util/config.h.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#cmakedefine ARROW_PARQUET
#cmakedefine ARROW_SUBSTRAIT

#cmakedefine ARROW_AZURE
#cmakedefine ARROW_ENABLE_THREADING
#cmakedefine ARROW_GCS
#cmakedefine ARROW_HDFS
Expand Down
4 changes: 4 additions & 0 deletions python/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,10 @@ set_source_files_properties(pyarrow/lib.pyx PROPERTIES CYTHON_API TRUE)

set(LINK_LIBS arrow_python)

if(PYARROW_BUILD_AZURE)
list(APPEND CYTHON_EXTENSIONS _azurefs)
endif()

if(PYARROW_BUILD_GCS)
list(APPEND CYTHON_EXTENSIONS _gcsfs)
endif()
Expand Down
3 changes: 2 additions & 1 deletion python/pyarrow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ def print_entry(label, value):
print(f" {module: <20}: {status: <8}")

print("\nFilesystems:")
filesystems = ["GcsFileSystem", "HadoopFileSystem", "S3FileSystem"]
filesystems = ["AzureFileSystem", "GcsFileSystem",
"HadoopFileSystem", "S3FileSystem"]
for fs in filesystems:
status = "Enabled" if _filesystem_is_available(fs) else "-"
print(f" {fs: <20}: {status: <8}")
Expand Down
134 changes: 134 additions & 0 deletions python/pyarrow/_azurefs.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# cython: language_level = 3

from cython cimport binding


from pyarrow.lib import frombytes, tobytes
from pyarrow.includes.libarrow_fs cimport *
from pyarrow._fs cimport FileSystem


cdef class AzureFileSystem(FileSystem):
"""
Azure Blob Storage backed FileSystem implementation
This implementation supports flat namespace and hierarchical namespace (HNS) a.k.a.
Data Lake Gen2 storage accounts. HNS will be automatically detected and HNS specific
features will be used when they provide a performance advantage. Azurite emulator is
also supported. Note: `/` is the only supported delimiter.
The storage account is considered the root of the filesystem. When enabled, containers
will be created or deleted during relevant directory operations. Obviously, this also
requires authentication with the additional permissions.
By default `DefaultAzureCredential <https://github.com/Azure/azure-sdk-for-cpp/blob/main/sdk/identity/azure-identity/README.md#defaultazurecredential>`__
is used for authentication. This means it will try several types of authentication
and go with the first one that works. If any authentication parameters are provided when
initialising the FileSystem, they will be used instead of the default credential.
Parameters
----------
account_name : str
Azure Blob Storage account name. This is the globally unique identifier for the
storage account.
account_key : str, default None
Account key of the storage account. Pass None to use default credential.
blob_storage_authority : str, default None
hostname[:port] of the Blob Service. Defaults to `.blob.core.windows.net`. Useful
for connecting to a local emulator, like Azurite.
dfs_storage_authority : str, default None
hostname[:port] of the Data Lake Gen 2 Service. Defaults to
`.dfs.core.windows.net`. Useful for connecting to a local emulator, like Azurite.
blob_storage_scheme : str, default None
Either `http` or `https`. Defaults to `https`. Useful for connecting to a local
emulator, like Azurite.
dfs_storage_scheme : str, default None
Either `http` or `https`. Defaults to `https`. Useful for connecting to a local
emulator, like Azurite.
Examples
--------
>>> from pyarrow import fs
>>> azure_fs = fs.AzureFileSystem(account_name='myaccount')
>>> azurite_fs = fs.AzureFileSystem(
... account_name='devstoreaccount1',
... account_key='Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==',
... blob_storage_authority='127.0.0.1:10000',
... dfs_storage_authority='127.0.0.1:10000',
... blob_storage_scheme='http',
... dfs_storage_scheme='http',
... )
For usage of the methods see examples for :func:`~pyarrow.fs.LocalFileSystem`.
"""
cdef:
CAzureFileSystem* azurefs
c_string account_key

def __init__(self, account_name, *, account_key=None, blob_storage_authority=None,
dfs_storage_authority=None, blob_storage_scheme=None,
dfs_storage_scheme=None):
cdef:
CAzureOptions options
shared_ptr[CAzureFileSystem] wrapped

options.account_name = tobytes(account_name)
if blob_storage_authority:
options.blob_storage_authority = tobytes(blob_storage_authority)
if dfs_storage_authority:
options.dfs_storage_authority = tobytes(dfs_storage_authority)
if blob_storage_scheme:
options.blob_storage_scheme = tobytes(blob_storage_scheme)
if dfs_storage_scheme:
options.dfs_storage_scheme = tobytes(dfs_storage_scheme)

if account_key:
options.ConfigureAccountKeyCredential(tobytes(account_key))
self.account_key = tobytes(account_key)
else:
options.ConfigureDefaultCredential()

with nogil:
wrapped = GetResultValue(CAzureFileSystem.Make(options))

self.init(<shared_ptr[CFileSystem]> wrapped)

cdef init(self, const shared_ptr[CFileSystem]& wrapped):
FileSystem.init(self, wrapped)
self.azurefs = <CAzureFileSystem*> wrapped.get()

@staticmethod
@binding(True) # Required for cython < 3
def _reconstruct(kwargs):
# __reduce__ doesn't allow passing named arguments directly to the
# reconstructor, hence this wrapper.
return AzureFileSystem(**kwargs)

def __reduce__(self):
cdef CAzureOptions opts = self.azurefs.options()
return (
AzureFileSystem._reconstruct, (dict(
account_name=frombytes(opts.account_name),
account_key=frombytes(self.account_key),
blob_storage_authority=frombytes(opts.blob_storage_authority),
dfs_storage_authority=frombytes(opts.dfs_storage_authority),
blob_storage_scheme=frombytes(opts.blob_storage_scheme),
dfs_storage_scheme=frombytes(opts.dfs_storage_scheme)
),))
3 changes: 3 additions & 0 deletions python/pyarrow/_fs.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,9 @@ cdef class FileSystem(_Weakrefable):
elif typ == 'gcs':
from pyarrow._gcsfs import GcsFileSystem
self = GcsFileSystem.__new__(GcsFileSystem)
elif typ == 'abfs':
from pyarrow._azurefs import AzureFileSystem
self = AzureFileSystem.__new__(AzureFileSystem)
elif typ == 'hdfs':
from pyarrow._hdfs import HadoopFileSystem
self = HadoopFileSystem.__new__(HadoopFileSystem)
Expand Down
9 changes: 8 additions & 1 deletion python/pyarrow/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

groups = [
'acero',
'azure',
'brotli',
'bz2',
'cython',
Expand Down Expand Up @@ -54,6 +55,7 @@

defaults = {
'acero': False,
'azure': False,
'brotli': Codec.is_available('brotli'),
'bz2': Codec.is_available('bz2'),
'cython': False,
Expand Down Expand Up @@ -142,13 +144,18 @@
except ImportError:
pass

try:
from pyarrow.fs import AzureFileSystem # noqa
defaults['azure'] = True
except ImportError:
pass

try:
from pyarrow.fs import GcsFileSystem # noqa
defaults['gcs'] = True
except ImportError:
pass


try:
from pyarrow.fs import S3FileSystem # noqa
defaults['s3'] = True
Expand Down
4 changes: 4 additions & 0 deletions python/pyarrow/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
FileStats = FileInfo

_not_imported = []
try:
from pyarrow._azurefs import AzureFileSystem # noqa
except ImportError:
_not_imported.append("AzureFileSystem")

try:
from pyarrow._hdfs import HadoopFileSystem # noqa
Expand Down
16 changes: 16 additions & 0 deletions python/pyarrow/includes/libarrow_fs.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,22 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil:
CResult[shared_ptr[CGcsFileSystem]] Make(const CGcsOptions& options)
CGcsOptions options()

cdef cppclass CAzureOptions "arrow::fs::AzureOptions":
c_string account_name
c_string blob_storage_authority
c_string dfs_storage_authority
c_string blob_storage_scheme
c_string dfs_storage_scheme

c_bool Equals(const CAzureOptions& other)
CStatus ConfigureDefaultCredential()
CStatus ConfigureAccountKeyCredential(c_string account_key)

cdef cppclass CAzureFileSystem "arrow::fs::AzureFileSystem":
@staticmethod
CResult[shared_ptr[CAzureFileSystem]] Make(const CAzureOptions& options)
CAzureOptions options()

cdef cppclass CHdfsOptions "arrow::fs::HdfsOptions":
HdfsConnectionConfig connection_config
int32_t buffer_size
Expand Down
31 changes: 31 additions & 0 deletions python/pyarrow/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,37 @@ def gcs_server():
proc.wait()


@pytest.fixture(scope='session')
def azure_server(tmpdir_factory):
port = find_free_port()
env = os.environ.copy()
tmpdir = tmpdir_factory.getbasetemp()
# We only need blob service emulator, not queue or table.
args = ['azurite-blob', "--location", tmpdir, "--blobPort", str(port)]
proc = None
try:
proc = subprocess.Popen(args, env=env)
# Make sure the server is alive.
if proc.poll() is not None:
pytest.skip(f"Command {args} did not start server successfully!")
except (ModuleNotFoundError, OSError) as e:
pytest.skip(f"Command {args} failed to execute: {e}")
else:
yield {
# Use the standard azurite account_name and account_key.
# https://learn.microsoft.com/en-us/azure/storage/common/storage-use-emulator#authorize-with-shared-key-credentials
'connection': ('127.0.0.1', port, 'devstoreaccount1',
'Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2'
'UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=='),
'process': proc,
'tempdir': tmpdir,
}
finally:
if proc is not None:
proc.kill()
proc.wait()


@pytest.fixture(
params=[
'builtin_pickle',
Expand Down
Loading

0 comments on commit 9f6dc1f

Please sign in to comment.