Skip to content

Commit

Permalink
Standalone 11/N: Implement and test ZarrV2ArrayWriter (#303)
Browse files Browse the repository at this point in the history
Depends on #302.
  • Loading branch information
aliddell authored Oct 4, 2024
1 parent 97cc823 commit 41f71a7
Show file tree
Hide file tree
Showing 17 changed files with 749 additions and 652 deletions.
2 changes: 2 additions & 0 deletions src/streaming/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ add_library(${tgt}
sink.creator.cpp
array.writer.hh
array.writer.cpp
zarrv2.array.writer.hh
zarrv2.array.writer.cpp
)

target_include_directories(${tgt}
Expand Down
15 changes: 15 additions & 0 deletions src/streaming/array.writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,3 +436,18 @@ zarr::ArrayWriter::rollover_()
close_sinks_();
++append_chunk_index_;
}

bool
zarr::finalize_array(std::unique_ptr<ArrayWriter>&& writer)
{
writer->is_finalizing_ = true;
try {
writer->flush_();
} catch (const std::exception& exc) {
LOG_ERROR("Failed to finalize array writer: ", exc.what());
return false;
}

writer.reset();
return true;
}
4 changes: 4 additions & 0 deletions src/streaming/array.writer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,9 @@ class ArrayWriter
[[nodiscard]] virtual bool write_array_metadata_() = 0;

void close_sinks_();

friend bool finalize_array(std::unique_ptr<ArrayWriter>&& writer);
};

bool finalize_array(std::unique_ptr<ArrayWriter>&& writer);
} // namespace zarr
8 changes: 4 additions & 4 deletions src/streaming/zarr.dimension.hh
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ struct ZarrDimension
}

std::string name;
ZarrDimensionType type;
ZarrDimensionType type{ ZarrDimensionType_Space };

uint32_t array_size_px;
uint32_t chunk_size_px;
uint32_t shard_size_chunks;
uint32_t array_size_px{ 0 };
uint32_t chunk_size_px{ 0 };
uint32_t shard_size_chunks{ 0 };
};

class ArrayDimensions
Expand Down
185 changes: 185 additions & 0 deletions src/streaming/zarrv2.array.writer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
#include "macros.hh"
#include "zarrv2.array.writer.hh"
#include "sink.creator.hh"
#include "zarr.common.hh"

#include <nlohmann/json.hpp>

#include <latch>
#include <stdexcept>

namespace {
[[nodiscard]]
bool
sample_type_to_dtype(ZarrDataType t, std::string& t_str)

{
const std::string dtype_prefix =
std::endian::native == std::endian::big ? ">" : "<";

switch (t) {
case ZarrDataType_uint8:
t_str = dtype_prefix + "u1";
break;
case ZarrDataType_uint16:
t_str = dtype_prefix + "u2";
break;
case ZarrDataType_uint32:
t_str = dtype_prefix + "u4";
break;
case ZarrDataType_uint64:
t_str = dtype_prefix + "u8";
break;
case ZarrDataType_int8:
t_str = dtype_prefix + "i1";
break;
case ZarrDataType_int16:
t_str = dtype_prefix + "i2";
break;
case ZarrDataType_int32:
t_str = dtype_prefix + "i4";
break;
case ZarrDataType_int64:
t_str = dtype_prefix + "i8";
break;
case ZarrDataType_float32:
t_str = dtype_prefix + "f4";
break;
case ZarrDataType_float64:
t_str = dtype_prefix + "f8";
break;
default:
LOG_ERROR("Unsupported sample type: ", t);
return false;
}

return true;
}
} // namespace

zarr::ZarrV2ArrayWriter::ZarrV2ArrayWriter(
ArrayWriterConfig&& config,
std::shared_ptr<ThreadPool> thread_pool)
: ArrayWriter(std::move(config), thread_pool)
{
}

zarr::ZarrV2ArrayWriter::ZarrV2ArrayWriter(
ArrayWriterConfig&& config,
std::shared_ptr<ThreadPool> thread_pool,
std::shared_ptr<S3ConnectionPool> s3_connection_pool)
: ArrayWriter(std::move(config), thread_pool, s3_connection_pool)
{
}

bool
zarr::ZarrV2ArrayWriter::flush_impl_()
{
// create chunk files
CHECK(data_sinks_.empty());
if (!make_data_sinks_()) {
return false;
}

CHECK(data_sinks_.size() == chunk_buffers_.size());

std::latch latch(chunk_buffers_.size());
{
std::scoped_lock lock(buffers_mutex_);
for (auto i = 0; i < data_sinks_.size(); ++i) {
auto& chunk = chunk_buffers_.at(i);
EXPECT(thread_pool_->push_job(
std::move([&sink = data_sinks_.at(i),
data_ = chunk.data(),
size = chunk.size(),
&latch](std::string& err) -> bool {
bool success = false;
try {
std::span data{
reinterpret_cast<std::byte*>(data_), size
};
CHECK(sink->write(0, data));
success = true;
} catch (const std::exception& exc) {
err = "Failed to write chunk: " +
std::string(exc.what());
} catch (...) {
err = "Failed to write chunk: (unknown)";
}

latch.count_down();
return success;
})),
"Failed to push job to thread pool");
}
}

// wait for all threads to finish
latch.wait();

return true;
}

bool
zarr::ZarrV2ArrayWriter::write_array_metadata_()
{
if (!make_metadata_sink_()) {
return false;
}

using json = nlohmann::json;

std::string dtype;
if (!sample_type_to_dtype(config_.dtype, dtype)) {
return false;
}

std::vector<size_t> array_shape, chunk_shape;

size_t append_size = frames_written_;
for (auto i = config_.dimensions->ndims() - 3; i > 0; --i) {
const auto& dim = config_.dimensions->at(i);
const auto& array_size_px = dim.array_size_px;
CHECK(array_size_px);
append_size = (append_size + array_size_px - 1) / array_size_px;
}
array_shape.push_back(append_size);

chunk_shape.push_back(config_.dimensions->final_dim().chunk_size_px);
for (auto i = 1; i < config_.dimensions->ndims(); ++i) {
const auto& dim = config_.dimensions->at(i);
array_shape.push_back(dim.array_size_px);
chunk_shape.push_back(dim.chunk_size_px);
}

json metadata;
metadata["zarr_format"] = 2;
metadata["shape"] = array_shape;
metadata["chunks"] = chunk_shape;
metadata["dtype"] = dtype;
metadata["fill_value"] = 0;
metadata["order"] = "C";
metadata["filters"] = nullptr;
metadata["dimension_separator"] = "/";

if (config_.compression_params) {
const BloscCompressionParams bcp = *config_.compression_params;
metadata["compressor"] = json{ { "id", "blosc" },
{ "cname", bcp.codec_id },
{ "clevel", bcp.clevel },
{ "shuffle", bcp.shuffle } };
} else {
metadata["compressor"] = nullptr;
}

std::string metadata_str = metadata.dump(4);
std::span data{ reinterpret_cast<std::byte*>(metadata_str.data()),
metadata_str.size() };
return metadata_sink_->write(0, data);
}

bool
zarr::ZarrV2ArrayWriter::should_rollover_() const
{
return true;
}
22 changes: 22 additions & 0 deletions src/streaming/zarrv2.array.writer.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#pragma once

#include "array.writer.hh"

namespace zarr {
class ZarrV2ArrayWriter final : public ArrayWriter
{
public:
ZarrV2ArrayWriter(ArrayWriterConfig&& config,
std::shared_ptr<ThreadPool> thread_pool);

ZarrV2ArrayWriter(ArrayWriterConfig&& config,
std::shared_ptr<ThreadPool> thread_pool,
std::shared_ptr<S3ConnectionPool> s3_connection_pool);

private:
ZarrVersion version_() const override { return ZarrVersion_2; };
bool flush_impl_() override;
bool write_array_metadata_() override;
bool should_rollover_() const override;
};
} // namespace zarr
3 changes: 3 additions & 0 deletions tests/unit-tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ set(tests
sink-creator-make-data-sinks
array-writer-downsample-writer-config
array-writer-write-frame-to-chunks
zarrv2-writer-write-even
zarrv2-writer-write-ragged-append-dim
zarrv2-writer-write-ragged-internal-dim
)

foreach (name ${tests})
Expand Down
2 changes: 1 addition & 1 deletion tests/unit-tests/array-writer-downsample-writer-config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ main()

retval = 0;
} catch (const std::exception& exc) {
LOG_ERROR("Exception: %s\n", exc.what());
LOG_ERROR("Exception: ", exc.what());
} catch (...) {
LOG_ERROR("Exception: (unknown)");
}
Expand Down
4 changes: 2 additions & 2 deletions tests/unit-tests/array-writer-write-frame-to-chunks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ main()
try {
auto thread_pool = std::make_shared<zarr::ThreadPool>(
std::thread::hardware_concurrency(),
[](const std::string& err) { LOG_ERROR("Error: %s", err.c_str()); });
[](const std::string& err) { LOG_ERROR("Error: ", err); });

std::vector<ZarrDimension> dims;
dims.emplace_back(
Expand Down Expand Up @@ -75,7 +75,7 @@ main()

retval = 0;
} catch (const std::exception& exc) {
LOG_ERROR("Exception: %s\n", exc.what());
LOG_ERROR("Exception: ", exc.what());
} catch (...) {
LOG_ERROR("Exception: (unknown)");
}
Expand Down
Loading

0 comments on commit 41f71a7

Please sign in to comment.