Skip to content

Commit

Permalink
Merge pull request #25 from jkuhn-cuda/jkuhn/make-buffer-sizes-config…
Browse files Browse the repository at this point in the history
…urable

make buffer size, transfer size and concurrency configurable
  • Loading branch information
samansmink committed Dec 1, 2023
2 parents 9d1b95f + 0c4490f commit d2c685a
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 7 deletions.
53 changes: 48 additions & 5 deletions src/azure_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,27 @@ static AzureAuthentication ParseAzureAuthSettings(FileOpener *opener) {
return auth;
}

static AzureReadOptions ParseAzureReadOptions(FileOpener *opener) {
AzureReadOptions options;

Value concurrency_val;
if (FileOpener::TryGetCurrentSetting(opener, "azure_read_transfer_concurrency", concurrency_val)) {
options.transfer_concurrency = concurrency_val.GetValue<int32_t>();
}

Value chunk_size_val;
if (FileOpener::TryGetCurrentSetting(opener, "azure_read_transfer_chunk_size", chunk_size_val)) {
options.transfer_chunk_size = chunk_size_val.GetValue<int64_t>();
}

Value buffer_size_val;
if (FileOpener::TryGetCurrentSetting(opener, "azure_read_buffer_size", buffer_size_val)) {
options.buffer_size = buffer_size_val.GetValue<idx_t>();
}

return options;
}

static Azure::Storage::Blobs::BlobContainerClient GetContainerClient(AzureAuthentication &auth, AzureParsedUrl &url) {
if (!auth.connection_string.empty()) {
return Azure::Storage::Blobs::BlobContainerClient::CreateFromConnectionString(auth.connection_string,
Expand Down Expand Up @@ -108,9 +129,10 @@ Azure::Storage::Blobs::BlobClient *BlobClientWrapper::GetClient() {
};

AzureStorageFileHandle::AzureStorageFileHandle(FileSystem &fs, string path_p, uint8_t flags, AzureAuthentication &auth,
AzureParsedUrl parsed_url)
const AzureReadOptions &read_options, AzureParsedUrl parsed_url)
: FileHandle(fs, std::move(path_p)), flags(flags), length(0), last_modified(time_t()), buffer_available(0),
buffer_idx(0), file_offset(0), buffer_start(0), buffer_end(0), blob_client(auth, parsed_url) {
buffer_idx(0), file_offset(0), buffer_start(0), buffer_end(0), blob_client(auth, parsed_url),
read_options(read_options) {
try {
auto client = *blob_client.GetClient();
auto res = client.GetProperties();
Expand All @@ -121,7 +143,7 @@ AzureStorageFileHandle::AzureStorageFileHandle(FileSystem &fs, string path_p, ui
}

if (flags & FileFlags::FILE_FLAGS_READ) {
read_buffer = duckdb::unique_ptr<data_t[]>(new data_t[READ_BUFFER_LEN]);
read_buffer = duckdb::unique_ptr<data_t[]>(new data_t[read_options.buffer_size]);
}
}

Expand All @@ -133,8 +155,9 @@ unique_ptr<AzureStorageFileHandle> AzureStorageFileSystem::CreateHandle(const st

auto parsed_url = ParseUrl(path);
auto azure_auth = ParseAzureAuthSettings(opener);
auto read_options = ParseAzureReadOptions(opener);

return make_uniq<AzureStorageFileHandle>(*this, path, flags, azure_auth, parsed_url);
return make_uniq<AzureStorageFileHandle>(*this, path, flags, azure_auth, read_options, parsed_url);
}

unique_ptr<FileHandle> AzureStorageFileSystem::OpenFile(const string &path, uint8_t flags, FileLockType lock,
Expand Down Expand Up @@ -193,6 +216,23 @@ static void LoadInternal(DatabaseInstance &instance) {
config.AddExtensionOption("azure_endpoint",
"Override the azure endpoint for when the Azure credential providers are used.",
LogicalType::VARCHAR, "blob.core.windows.net");

AzureReadOptions default_read_options;
config.AddExtensionOption("azure_read_transfer_concurrency",
"Maximum number of threads the Azure client can use for a single parallel read. "
"If azure_read_transfer_chunk_size is less than azure_read_buffer_size then setting "
"this > 1 will allow the Azure client to do concurrent requests to fill the buffer.",
LogicalType::INTEGER, Value::INTEGER(default_read_options.transfer_concurrency));

config.AddExtensionOption("azure_read_transfer_chunk_size",
"Maximum size in bytes that the Azure client will read in a single request. "
"It is recommended that this is a factor of azure_read_buffer_size.",
LogicalType::BIGINT, Value::BIGINT(default_read_options.transfer_chunk_size));

config.AddExtensionOption("azure_read_buffer_size",
"Size of the read buffer. It is recommended that this is evenly divisible by "
"azure_read_transfer_chunk_size.",
LogicalType::UBIGINT, Value::UBIGINT(default_read_options.buffer_size));
}

int64_t AzureStorageFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes) {
Expand Down Expand Up @@ -321,7 +361,7 @@ void AzureStorageFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_b
}

if (to_read > 0 && hfh.buffer_available == 0) {
auto new_buffer_available = MinValue<idx_t>(hfh.READ_BUFFER_LEN, hfh.length - hfh.file_offset);
auto new_buffer_available = MinValue<idx_t>(hfh.read_options.buffer_size, hfh.length - hfh.file_offset);

// Bypass buffer if we read more than buffer size
if (to_read > new_buffer_available) {
Expand Down Expand Up @@ -366,6 +406,9 @@ void AzureStorageFileSystem::ReadRange(FileHandle &handle, idx_t file_offset, ch
range.Length = buffer_out_len;
Azure::Storage::Blobs::DownloadBlobToOptions options;
options.Range = range;
options.TransferOptions.Concurrency = afh.read_options.transfer_concurrency;
options.TransferOptions.InitialChunkSize = afh.read_options.transfer_chunk_size;
options.TransferOptions.ChunkSize = afh.read_options.transfer_chunk_size;
auto res = blob_client.DownloadTo((uint8_t *)buffer_out, buffer_out_len, options);

} catch (Azure::Storage::StorageException &e) {
Expand Down
12 changes: 10 additions & 2 deletions src/include/azure_extension.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ struct AzureAuthentication {
string endpoint;
};

struct AzureReadOptions {
int32_t transfer_concurrency = 5;
int64_t transfer_chunk_size = 1 * 1024 * 1024;
idx_t buffer_size = 1 * 1024 * 1024;
};

struct AzureParsedUrl {
string container;
string prefix;
Expand All @@ -47,7 +53,7 @@ class BlobClientWrapper {
class AzureStorageFileHandle : public FileHandle {
public:
AzureStorageFileHandle(FileSystem &fs, string path, uint8_t flags, AzureAuthentication &auth,
AzureParsedUrl parsed_url);
const AzureReadOptions &read_options, AzureParsedUrl parsed_url);
~AzureStorageFileHandle() override = default;

public:
Expand All @@ -67,10 +73,12 @@ class AzureStorageFileHandle : public FileHandle {

// Read buffer
duckdb::unique_ptr<data_t[]> read_buffer;
constexpr static idx_t READ_BUFFER_LEN = 1000000;

// Azure Blob Client
BlobClientWrapper blob_client;

AzureReadOptions read_options;

};

class AzureStorageFileSystem : public FileSystem {
Expand Down

0 comments on commit d2c685a

Please sign in to comment.