Skip to content

Commit

Permalink
Merge pull request #26 from chrisiou/add-http-stats
Browse files Browse the repository at this point in the history
include http stats in the explain analyze statement
  • Loading branch information
samansmink authored Dec 8, 2023
2 parents d2c685a + 45df687 commit 6b1f93f
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 8 deletions.
63 changes: 59 additions & 4 deletions src/azure_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

#include "duckdb.hpp"
#include "duckdb/common/exception.hpp"
#include "duckdb/common/http_state.hpp"
#include "duckdb/common/file_opener.hpp"
#include "duckdb/common/string_util.hpp"
#include "duckdb/function/scalar/string_functions.hpp"
#include "duckdb/function/scalar_function.hpp"
#include "duckdb/main/extension_util.hpp"
#include "duckdb/main/client_data.hpp"
#include <azure/storage/blobs.hpp>
#include <azure/core/diagnostics/logger.hpp>
#include <azure/identity/default_azure_credential.hpp>
#include <azure/identity/chained_token_credential.hpp>
#include <azure/identity/environment_credential.hpp>
Expand All @@ -20,6 +23,32 @@

namespace duckdb {

using namespace Azure::Core::Diagnostics;

mutex AzureStorageFileSystem::azure_log_lock = {};
weak_ptr<HTTPState> AzureStorageFileSystem::http_state = std::weak_ptr<HTTPState>();
bool AzureStorageFileSystem::listener_set = false;

// TODO: extract received/sent bytes information
static void Log(Logger::Level level, std::string const &message) {
auto http_state_ptr = AzureStorageFileSystem::http_state;
auto http_state = http_state_ptr.lock();
if (!http_state) {
throw std::runtime_error("HTTP state weak pointer failed to lock");
}
if (message.find("Request") != std::string::npos) {
if (message.find("Request : HEAD") != std::string::npos) {
http_state->head_count++;
} else if (message.find("Request : GET") != std::string::npos) {
http_state->get_count++;
} else if (message.find("Request : POST") != std::string::npos) {
http_state->post_count++;
} else if (message.find("Request : PUT") != std::string::npos) {
http_state->put_count++;
}
}
}

static Azure::Identity::ChainedTokenCredential::Sources
CreateCredentialChainFromSetting(const string &credential_chain) {
auto chain_list = StringUtil::Split(credential_chain, ';');
Expand Down Expand Up @@ -164,6 +193,27 @@ unique_ptr<FileHandle> AzureStorageFileSystem::OpenFile(const string &path, uint
FileCompressionType compression, FileOpener *opener) {
D_ASSERT(compression == FileCompressionType::UNCOMPRESSED);

Value value;
bool enable_http_stats = false;
auto context = FileOpener::TryGetClientContext(opener);
if (FileOpener::TryGetCurrentSetting(opener, "azure_http_stats", value)) {
enable_http_stats = value.GetValue<bool>();
}

if (context && enable_http_stats) {
unique_lock<mutex> lck(AzureStorageFileSystem::azure_log_lock);
if (!context->client_data->http_state) {
context->client_data->http_state = make_shared<HTTPState>();
}
AzureStorageFileSystem::http_state = context->client_data->http_state;

if (!AzureStorageFileSystem::listener_set) {
Logger::SetListener(std::bind(&Log, std::placeholders::_1, std::placeholders::_2));
Logger::SetLevel(Logger::Level::Verbose);
AzureStorageFileSystem::listener_set = true;
}
}

if (flags & FileFlags::FILE_FLAGS_WRITE) {
throw NotImplementedException("Writing to Azure containers is currently not supported");
}
Expand Down Expand Up @@ -216,22 +266,27 @@ static void LoadInternal(DatabaseInstance &instance) {
config.AddExtensionOption("azure_endpoint",
"Override the azure endpoint for when the Azure credential providers are used.",
LogicalType::VARCHAR, "blob.core.windows.net");
config.AddExtensionOption("azure_http_stats",
"Include http info from the Azure Storage in the explain analyze statement. "
"Notice that the result may be incorrect for more than one active DuckDB connection "
"and the calculation of total received and sent bytes is not yet implemented.",
LogicalType::BOOLEAN, false);

AzureReadOptions default_read_options;
config.AddExtensionOption("azure_read_transfer_concurrency",
"Maximum number of threads the Azure client can use for a single parallel read. "
"If azure_read_transfer_chunk_size is less than azure_read_buffer_size then setting "
"this > 1 will allow the Azure client to do concurrent requests to fill the buffer.",
"If azure_read_transfer_chunk_size is less than azure_read_buffer_size then setting "
"this > 1 will allow the Azure client to do concurrent requests to fill the buffer.",
LogicalType::INTEGER, Value::INTEGER(default_read_options.transfer_concurrency));

config.AddExtensionOption("azure_read_transfer_chunk_size",
"Maximum size in bytes that the Azure client will read in a single request. "
"It is recommended that this is a factor of azure_read_buffer_size.",
"It is recommended that this is a factor of azure_read_buffer_size.",
LogicalType::BIGINT, Value::BIGINT(default_read_options.transfer_chunk_size));

config.AddExtensionOption("azure_read_buffer_size",
"Size of the read buffer. It is recommended that this is evenly divisible by "
"azure_read_transfer_chunk_size.",
"azure_read_transfer_chunk_size.",
LogicalType::UBIGINT, Value::UBIGINT(default_read_options.buffer_size));
}

Expand Down
8 changes: 7 additions & 1 deletion src/include/azure_extension.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class BlobClient;
} // namespace Azure

namespace duckdb {
class HTTPState;

class AzureExtension : public Extension {
public:
Expand Down Expand Up @@ -78,7 +79,6 @@ class AzureStorageFileHandle : public FileHandle {
BlobClientWrapper blob_client;

AzureReadOptions read_options;

};

class AzureStorageFileSystem : public FileSystem {
Expand Down Expand Up @@ -113,6 +113,12 @@ class AzureStorageFileSystem : public FileSystem {

static void Verify();

public:
// guarded global varables are used here to share the http_state when parsing multiple files
static mutex azure_log_lock;
static weak_ptr<HTTPState> http_state;
static bool listener_set;

protected:
static AzureParsedUrl ParseUrl(const string &url);
static void ReadRange(FileHandle &handle, idx_t file_offset, char *buffer_out, idx_t buffer_out_len);
Expand Down
28 changes: 25 additions & 3 deletions test/sql/azure.test
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ require-env AZURE_STORAGE_CONNECTION_STRING
# We need a connection string to do requests
foreach prefix azure:// az://

# Unset the connection string var
statement ok
SET azure_storage_connection_string = '';

statement error
SELECT sum(l_orderkey) FROM '${prefix}testing-private/l.parquet';
----
Expand All @@ -38,8 +42,26 @@ SELECT count(*) FROM '${prefix}testing-private/l.csv';
----
60175

# Unset the connection string var
endloop

# Enable http info for the explain analyze statement
statement ok
SET azure_storage_connection_string = '';
SET azure_http_stats = true;

query II
EXPLAIN ANALYZE SELECT sum(l_orderkey) FROM 'az://testing-private/l.parquet';
----
analyzed_plan <REGEX>:.*HTTP Stats.*\#HEAD\: 2.*GET\: 3.*PUT\: 0.*\#POST\: 0.*

# Redoing query should still result in same request count
query II
EXPLAIN ANALYZE SELECT sum(l_orderkey) FROM 'az://testing-private/l.parquet';
----
analyzed_plan <REGEX>:.*HTTP Stats.*\#HEAD\: 2.*GET\: 3.*PUT\: 0.*\#POST\: 0.*

# Testing public blobs
query II
EXPLAIN ANALYZE SELECT COUNT(*) FROM "azure://testing-public/l.parquet";
----
analyzed_plan <REGEX>:.*HTTP Stats.*\#HEAD\: 2.*GET\: 2.*PUT\: 0.*\#POST\: 0.*

endloop

0 comments on commit 6b1f93f

Please sign in to comment.