diff --git a/.github/workflows/test_pr.yml b/.github/workflows/test_pr.yml index 9e17a369..56bdd096 100644 --- a/.github/workflows/test_pr.yml +++ b/.github/workflows/test_pr.yml @@ -63,4 +63,6 @@ jobs: - name: Test working-directory: ${{github.workspace}}/build - run: ctest -C ${{env.BUILD_TYPE}} -L acquire-driver-zarr --output-on-failure + run: | + ctest -C ${{env.BUILD_TYPE}} -L acquire-driver-zarr --output-on-failure + ctest -C ${{env.BUILD_TYPE}} -L acquire-zarr --output-on-failure diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 30e1cc2c..55be93dc 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -8,6 +8,8 @@ add_library(${tgt} STATIC internal/logger.cpp internal/stream.settings.hh internal/stream.settings.cpp + internal/zarr.stream.hh + internal/zarr.stream.cpp ) target_include_directories(${tgt} diff --git a/src/internal/zarr.stream.cpp b/src/internal/zarr.stream.cpp new file mode 100644 index 00000000..ee099b52 --- /dev/null +++ b/src/internal/zarr.stream.cpp @@ -0,0 +1,368 @@ +#include "zarr.stream.hh" +#include "logger.hh" + +#include "zarr.h" + +#include + +#define EXPECT_VALID_ARGUMENT(e, ...) \ + do { \ + if (!(e)) { \ + LOG_ERROR(__VA_ARGS__); \ + return ZarrError_InvalidArgument; \ + } \ + } while (0) + +#define STREAM_GET_STRING(stream, member) \ + do { \ + if (!stream) { \ + LOG_ERROR("Null pointer: %s", #stream); \ + return nullptr; \ + } \ + return stream->settings().member.c_str(); \ + } while (0) + +namespace fs = std::filesystem; + +namespace { +bool +is_s3_acquisition(const struct ZarrStreamSettings_s* settings) +{ + return !settings->s3_endpoint.empty() && + !settings->s3_bucket_name.empty() && + !settings->s3_access_key_id.empty() && + !settings->s3_secret_access_key.empty(); +} + +bool +validate_settings(const struct ZarrStreamSettings_s* settings, + ZarrVersion version) +{ + if (!settings) { + LOG_ERROR("Null pointer: settings"); + return false; + } + if (version < ZarrVersion_2 || version >= ZarrVersionCount) { + LOG_ERROR("Invalid Zarr version: %d", version); + return false; + } + + std::string store_path(settings->store_path); + std::string s3_endpoint(settings->s3_endpoint); + std::string s3_bucket_name(settings->s3_bucket_name); + std::string s3_access_key_id(settings->s3_access_key_id); + std::string s3_secret_access_key(settings->s3_secret_access_key); + + // we require the store_path to be nonempty + if (store_path.empty()) { + LOG_ERROR("Store path is empty"); + return false; + } + + // if all S3 settings are nonempty, we consider this an S3 store + if (is_s3_acquisition(settings)) { + // check that the S3 endpoint is a valid URL + if (s3_endpoint.find("http://") != 0 && + s3_endpoint.find("https://") != 0) { + LOG_ERROR("Invalid S3 endpoint: %s", s3_endpoint.c_str()); + return false; + } + + // test the S3 connection + // todo (aliddell): implement this + } else { + // if any S3 setting is nonempty, this is a filesystem store + fs::path path(store_path); + fs::path parent_path = path.parent_path(); + if (parent_path.empty()) { + parent_path = "."; + } + + // parent path must exist and be a directory + if (!fs::exists(parent_path) || !fs::is_directory(parent_path)) { + LOG_ERROR("Parent path '%s' does not exist or is not a directory", + parent_path.c_str()); + return false; + } + + // parent path must be writable + const auto perms = fs::status(parent_path).permissions(); + const bool is_writable = + (perms & (fs::perms::owner_write | fs::perms::group_write | + fs::perms::others_write)) != fs::perms::none; + + if (!is_writable) { + LOG_ERROR("Parent path '%s' is not writable", parent_path.c_str()); + return false; + } + } + + if (settings->dtype >= ZarrDataTypeCount) { + LOG_ERROR("Invalid data type: %d", settings->dtype); + return false; + } + + if (settings->compressor >= ZarrCompressorCount) { + LOG_ERROR("Invalid compressor: %d", settings->compressor); + return false; + } + + if (settings->compression_codec >= ZarrCompressionCodecCount) { + LOG_ERROR("Invalid compression codec: %d", settings->compression_codec); + return false; + } + + // if compressing, we require a compression codec + if (settings->compressor != ZarrCompressor_None && + settings->compression_codec == ZarrCompressionCodec_None) { + LOG_ERROR("Compression codec must be set when using a compressor"); + return false; + } + + // validate the dimensions individually + for (size_t i = 0; i < settings->dimensions.size(); ++i) { + if (!validate_dimension(settings->dimensions[i])) { + LOG_ERROR("Invalid dimension at index %d", i); + return false; + } + + if (i > 0 && settings->dimensions[i].array_size_px == 0) { + LOG_ERROR("Only the first dimension can have an array size of 0"); + return false; + } + } + + // if version 3, we require shard size to be positive + if (version == ZarrVersion_3) { + for (const auto& dim : settings->dimensions) { + if (dim.shard_size_chunks == 0) { + LOG_ERROR("Shard sizes must be positive"); + return false; + } + } + } + + return true; +} +} // namespace + +extern "C" +{ + ZarrStream* ZarrStream_create(struct ZarrStreamSettings_s* settings, + ZarrVersion version) + { + if (!validate_settings(settings, version)) { + return nullptr; + } + + // initialize the stream + ZarrStream_s* stream; + + try { + stream = new ZarrStream(settings, version); + } catch (const std::bad_alloc&) { + LOG_ERROR("Failed to allocate memory for Zarr stream"); + return nullptr; + } catch (const std::exception& e) { + LOG_ERROR("Error creating Zarr stream: %s", e.what()); + return nullptr; + } + ZarrStreamSettings_destroy(settings); + + return stream; + } + + void ZarrStream_destroy(ZarrStream* stream) + { + delete stream; + } + + /* Appending data */ + + ZarrError ZarrStream_append(ZarrStream* stream, + const void* data, + size_t bytes_in, + size_t* bytes_out) + { + EXPECT_VALID_ARGUMENT(stream, "Null pointer: stream"); + EXPECT_VALID_ARGUMENT(data, "Null pointer: data"); + EXPECT_VALID_ARGUMENT(bytes_out, "Null pointer: bytes_out"); + + try { + *bytes_out = stream->append(data, bytes_in); + } catch (const std::exception& e) { + LOG_ERROR("Error appending data: %s", e.what()); + return ZarrError_InternalError; + } + + return ZarrError_Success; + } + + /* Getters */ + + ZarrVersion ZarrStream_get_version(const ZarrStream* stream) + { + if (!stream) { + LOG_WARNING("Null pointer: stream. Returning ZarrVersion_2"); + return ZarrVersion_2; + } + return static_cast(stream->version()); + } + + const char* ZarrStream_get_store_path(const ZarrStream* stream) + { + STREAM_GET_STRING(stream, store_path); + } + + const char* ZarrStream_get_s3_endpoint(const ZarrStream* stream) + { + STREAM_GET_STRING(stream, s3_endpoint); + } + + const char* ZarrStream_get_s3_bucket_name(const ZarrStream* stream) + { + STREAM_GET_STRING(stream, s3_bucket_name); + } + + const char* ZarrStream_get_s3_access_key_id(const ZarrStream* stream) + { + STREAM_GET_STRING(stream, s3_access_key_id); + } + + const char* ZarrStream_get_s3_secret_access_key(const ZarrStream* stream) + { + STREAM_GET_STRING(stream, s3_secret_access_key); + } + + ZarrCompressor ZarrStream_get_compressor(const ZarrStream* stream) + { + if (!stream) { + LOG_WARNING("Null pointer: stream. Returning ZarrCompressor_None"); + return ZarrCompressor_None; + } + return ZarrStreamSettings_get_compressor(&stream->settings()); + } + + ZarrCompressionCodec ZarrStream_get_compression_codec( + const ZarrStream* stream) + { + if (!stream) { + LOG_WARNING( + "Null pointer: stream. Returning ZarrCompressionCodec_None"); + return ZarrCompressionCodec_None; + } + return ZarrStreamSettings_get_compression_codec(&stream->settings()); + } + + uint8_t ZarrStream_get_compression_level(const ZarrStream* stream) + { + if (!stream) { + LOG_WARNING("Null pointer: stream. Returning 0"); + return 0; + } + return ZarrStreamSettings_get_compression_level(&stream->settings()); + } + + uint8_t ZarrStream_get_compression_shuffle(const ZarrStream* stream) + { + if (!stream) { + LOG_WARNING("Null pointer: stream. Returning 0"); + return 0; + } + return ZarrStreamSettings_get_compression_shuffle(&stream->settings()); + } + + size_t ZarrStream_get_dimension_count(const ZarrStream* stream) + { + if (!stream) { + LOG_WARNING("Null pointer: stream. Returning 0"); + return 0; + } + return ZarrStreamSettings_get_dimension_count(&stream->settings()); + } + + ZarrError ZarrStream_get_dimension(const ZarrStream* stream, + size_t index, + char* name, + size_t bytes_of_name, + ZarrDimensionType* kind, + size_t* array_size_px, + size_t* chunk_size_px, + size_t* shard_size_chunks) + { + EXPECT_VALID_ARGUMENT(stream, "Null pointer: stream"); + return ZarrStreamSettings_get_dimension(&stream->settings(), + index, + name, + bytes_of_name, + kind, + array_size_px, + chunk_size_px, + shard_size_chunks); + } + + uint8_t ZarrStream_get_multiscale(const ZarrStream* stream) + { + if (!stream) { + LOG_WARNING("Null pointer: stream. Returning 0"); + return 0; + } + return ZarrStreamSettings_get_multiscale(&stream->settings()); + } + + /* Logging */ + + ZarrError Zarr_set_log_level(LogLevel level) + { + if (level < LogLevel_Debug || level >= LogLevelCount) { + return ZarrError_InvalidArgument; + } + + Logger::set_log_level(level); + return ZarrError_Success; + } + + LogLevel Zarr_get_log_level() + { + return Logger::get_log_level(); + } + + /* Error handling */ + + const char* Zarr_get_error_message(ZarrError error) + { + switch (error) { + case ZarrError_Success: + return "Success"; + case ZarrError_InvalidArgument: + return "Invalid argument"; + case ZarrError_Overflow: + return "Overflow"; + case ZarrError_InvalidIndex: + return "Invalid index"; + case ZarrError_NotYetImplemented: + return "Not yet implemented"; + case ZarrError_InternalError: + return "Internal error"; + default: + return "Unknown error"; + } + } +} + +/* ZarrStream_s implementation */ + +ZarrStream::ZarrStream_s(struct ZarrStreamSettings_s* settings, uint8_t version) + : settings_(*settings) + , version_(version) + , error_() +{ + settings_.dimensions = std::move(settings->dimensions); +} + +size_t + ZarrStream::append(const void* data, size_t nbytes) +{ + // todo (aliddell): implement this + return 0; +} \ No newline at end of file diff --git a/src/internal/zarr.stream.hh b/src/internal/zarr.stream.hh new file mode 100644 index 00000000..18a7ff1b --- /dev/null +++ b/src/internal/zarr.stream.hh @@ -0,0 +1,29 @@ +#pragma once + +#include "stream.settings.hh" + +#include // size_t +#include // unique_ptr + +struct ZarrStream_s +{ + public: + ZarrStream_s(struct ZarrStreamSettings_s* settings, uint8_t version); + ~ZarrStream_s() = default; + + /** + * @brief Append data to the stream. + * @param data The data to append. + * @param nbytes The number of bytes to append. + * @return The number of bytes appended. + */ + size_t append(const void* data, size_t nbytes); + + size_t version() const { return version_; } + const ZarrStreamSettings_s& settings() const { return settings_; } + + private: + struct ZarrStreamSettings_s settings_; + uint8_t version_; // Zarr version. 2 or 3. + std::string error_; // error message. If nonempty, an error occurred. +}; diff --git a/tests/integration/CMakeLists.txt b/tests/integration/CMakeLists.txt index d17d0baa..e600dccc 100644 --- a/tests/integration/CMakeLists.txt +++ b/tests/integration/CMakeLists.txt @@ -2,6 +2,7 @@ set(project acquire-zarr) set(tests set-and-get-params + create-and-destroy-stream ) foreach (name ${tests}) diff --git a/tests/integration/create-and-destroy-stream.cpp b/tests/integration/create-and-destroy-stream.cpp new file mode 100644 index 00000000..d79d581d --- /dev/null +++ b/tests/integration/create-and-destroy-stream.cpp @@ -0,0 +1,181 @@ +#include "zarr.h" + +#include +#include +#include +#include + +// to be used in functions returning bool +#define CHECK(cond) \ + do { \ + if (!(cond)) { \ + fprintf(stderr, "Assertion failed: %s\n", #cond); \ + goto Error; \ + } \ + } while (0) + +#define CHECK_EQ(a, b) CHECK((a) == (b)) + +#define SIZED(name) name, sizeof(name) + +namespace fs = std::filesystem; + +bool +try_with_invalid_settings() +{ + ZarrStreamSettings* settings; + ZarrStream* stream; + + settings = ZarrStreamSettings_create(); + CHECK(settings); + + // reserve 3 dimensions, but only set 2 of them + CHECK_EQ(ZarrStreamSettings_reserve_dimensions(settings, 3), + ZarrError_Success); + + CHECK_EQ(ZarrStreamSettings_set_dimension( + settings, 1, SIZED("y"), ZarrDimensionType_Space, 12, 3, 4), + ZarrError_Success); + + CHECK_EQ(ZarrStreamSettings_set_dimension( + settings, 2, SIZED("x"), ZarrDimensionType_Space, 1, 1, 1), + ZarrError_Success); + + stream = ZarrStream_create(settings, ZarrVersion_2); + CHECK(!stream); + + return true; + +Error: + return false; +} + +bool +try_with_valid_settings() +{ + ZarrStreamSettings* settings; + ZarrStream* stream; + const std::string store_path = TEST ".zarr"; + + settings = ZarrStreamSettings_create(); + CHECK(settings); + + CHECK_EQ(ZarrStreamSettings_reserve_dimensions(settings, 3), + ZarrError_Success); + + CHECK_EQ(ZarrStreamSettings_set_dimension( + settings, 2, SIZED("x"), ZarrDimensionType_Space, 10, 5, 1), + ZarrError_Success); + + CHECK_EQ(ZarrStreamSettings_set_dimension( + settings, 1, SIZED("y"), ZarrDimensionType_Space, 12, 3, 4), + ZarrError_Success); + + CHECK_EQ(ZarrStreamSettings_set_dimension( + settings, 0, SIZED("t"), ZarrDimensionType_Time, 1, 1, 0), + ZarrError_Success); + + CHECK_EQ(ZarrStreamSettings_set_store_path( + settings, store_path.c_str(), store_path.size()), + ZarrError_Success); + + stream = ZarrStream_create(settings, ZarrVersion_2); + CHECK(stream); + + // check the stream's settings are correct + CHECK_EQ(ZarrStream_get_version(stream), ZarrVersion_2); + + CHECK_EQ(std::string(ZarrStream_get_store_path(stream)), store_path); + CHECK_EQ(strlen(ZarrStream_get_s3_endpoint(stream)), 0); + CHECK_EQ(strlen(ZarrStream_get_s3_bucket_name(stream)), 0); + CHECK_EQ(strlen(ZarrStream_get_s3_access_key_id(stream)), 0); + CHECK_EQ(strlen(ZarrStream_get_s3_secret_access_key(stream)), 0); + + CHECK_EQ(ZarrStream_get_compressor(stream), ZarrCompressor_None); + CHECK_EQ(ZarrStream_get_compression_codec(stream), + ZarrCompressionCodec_None); + + CHECK_EQ(ZarrStream_get_dimension_count(stream), 3); + + char name[64]; + ZarrDimensionType kind; + size_t array_size_px, chunk_size_px, shard_size_chunks; + CHECK_EQ(ZarrStream_get_dimension(stream, + 0, + name, + sizeof(name), + &kind, + &array_size_px, + &chunk_size_px, + &shard_size_chunks), + ZarrError_Success); + CHECK_EQ(std::string(name), "t"); + CHECK_EQ(kind, ZarrDimensionType_Time); + CHECK_EQ(array_size_px, 1); + CHECK_EQ(chunk_size_px, 1); + CHECK_EQ(shard_size_chunks, 0); + + CHECK_EQ(ZarrStream_get_dimension(stream, + 1, + name, + sizeof(name), + &kind, + &array_size_px, + &chunk_size_px, + &shard_size_chunks), + ZarrError_Success); + + CHECK_EQ(std::string(name), "y"); + CHECK_EQ(kind, ZarrDimensionType_Space); + CHECK_EQ(array_size_px, 12); + CHECK_EQ(chunk_size_px, 3); + CHECK_EQ(shard_size_chunks, 4); + + CHECK_EQ(ZarrStream_get_dimension(stream, + 2, + name, + sizeof(name), + &kind, + &array_size_px, + &chunk_size_px, + &shard_size_chunks), + ZarrError_Success); + + CHECK_EQ(std::string(name), "x"); + CHECK_EQ(kind, ZarrDimensionType_Space); + CHECK_EQ(array_size_px, 10); + CHECK_EQ(chunk_size_px, 5); + CHECK_EQ(shard_size_chunks, 1); + + ZarrStream_destroy(stream); + + // cleanup + try { + fs::remove_all(store_path); + } catch (const fs::filesystem_error& e) { + fprintf( + stderr, "Failed to remove %s: %s\n", store_path.c_str(), e.what()); + return false; + } + + return true; + +Error: + return false; +} + +int +main() +{ + int retval = 0; + + CHECK(try_with_invalid_settings()); + CHECK(try_with_valid_settings()); + +Finalize: + return retval; + +Error: + retval = 1; + goto Finalize; +} \ No newline at end of file diff --git a/tests/integration/test.logger.cpp b/tests/integration/test.logger.cpp new file mode 100644 index 00000000..16861601 --- /dev/null +++ b/tests/integration/test.logger.cpp @@ -0,0 +1,81 @@ +#include "test.logger.hh" + +#include +#include +#include +#include +#include + +LogLevel Logger::current_level = LogLevel_Info; + +void +Logger::set_log_level(LogLevel level) +{ + current_level = level; +} + +LogLevel +Logger::get_log_level() +{ + return current_level; +} + +std::string +Logger::log(LogLevel level, + const char* file, + int line, + const char* func, + const char* format, + ...) +{ + if (current_level == LogLevel_None || level < current_level) { + return {}; // Suppress logs + } + + va_list args; + va_start(args, format); + + std::string prefix; + std::ostream* stream = &std::cout; + + switch (level) { + case LogLevel_Debug: + prefix = "[DEBUG] "; + break; + case LogLevel_Info: + prefix = "[INFO] "; + break; + case LogLevel_Warning: + prefix = "[WARNING] "; + stream = &std::cerr; + break; + case LogLevel_Error: + prefix = "[ERROR] "; + stream = &std::cerr; + break; + } + + // Get current time + auto now = std::chrono::system_clock::now(); + auto time = std::chrono::system_clock::to_time_t(now); + auto ms = std::chrono::duration_cast( + now.time_since_epoch()) % + 1000; + + // Get filename without path + std::filesystem::path filepath(file); + std::string filename = filepath.filename().string(); + + // Output timestamp, log level, filename + *stream << std::put_time(std::localtime(&time), "%Y-%m-%d %H:%M:%S") << '.' + << std::setfill('0') << std::setw(3) << ms.count() << " " << prefix + << filename << ":" << line << " " << func << ": "; + + char buffer[1024]; + vsnprintf(buffer, sizeof(buffer), format, args); + *stream << buffer << std::endl; + + va_end(args); + + return buffer; +} \ No newline at end of file diff --git a/tests/integration/test.logger.hh b/tests/integration/test.logger.hh new file mode 100644 index 00000000..faf5cde7 --- /dev/null +++ b/tests/integration/test.logger.hh @@ -0,0 +1,55 @@ +#include "zarr.h" + +#include + +class Logger +{ + public: + static void set_log_level(LogLevel level); + static LogLevel get_log_level(); + + static std::string log(LogLevel level, + const char* file, + int line, + const char* func, + const char* format, + ...); + + private: + static LogLevel current_level; +}; + +#define LOG_DEBUG(...) \ + Logger::log(LogLevel_Debug, __FILE__, __LINE__, __func__, __VA_ARGS__) +#define LOG_INFO(...) \ + Logger::log(LogLevel_Info, __FILE__, __LINE__, __func__, __VA_ARGS__) +#define LOG_WARNING(...) \ + Logger::log(LogLevel_Warning, __FILE__, __LINE__, __func__, __VA_ARGS__) +#define LOG_ERROR(...) \ + Logger::log(LogLevel_Error, __FILE__, __LINE__, __func__, __VA_ARGS__) + +#define EXPECT(e, ...) \ + do { \ + if (!(e)) { \ + const std::string __err = LOG_ERROR(__VA_ARGS__); \ + throw std::runtime_error(__err); \ + } \ + } while (0) +#define CHECK(e) EXPECT(e, "Expression evaluated as false:\n\t%s", #e) + + +/// Check that a==b +/// example: `ASSERT_EQ(int,"%d",42,meaning_of_life())` +#define EXPECT_EQ(T, fmt, a, b) \ + do { \ + T a_ = (T)(a); \ + T b_ = (T)(b); \ + EXPECT(a_ == b_, "Expected %s==%s but " fmt "!=" fmt, #a, #b, a_, b_); \ + } while (0) + +#define EXPECT_LT(T, fmt, a, b) \ + do { \ + T a_ = (T)(a); \ + T b_ = (T)(b); \ + EXPECT(a_ < b_, "Expected %s<%s but " fmt ">=" fmt, #a, #b, a_, b_); \ + } while (0)