Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Performance] allow for multithreaded channel extraction #101

Merged
merged 11 commits into from
Sep 9, 2024
12 changes: 6 additions & 6 deletions PhotoshopAPI/src/Core/Compression/Compress_RLE.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ namespace RLE_Impl
// ---------------------------------------------------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------------------------------------
template<typename T>
std::vector<uint8_t> CompressRLE(std::vector<T>& uncompressedData, std::span<uint8_t> buffer, const FileHeader& header, const uint32_t width, const uint32_t height)
std::vector<uint8_t> CompressRLE(std::span<T> uncompressedData, std::span<uint8_t> buffer, const FileHeader& header, const uint32_t width, const uint32_t height)
{
PROFILE_FUNCTION();
endianEncodeBEArray(uncompressedData);
Expand Down Expand Up @@ -347,7 +347,7 @@ std::vector<uint8_t> CompressRLE(std::vector<T>& uncompressedData, std::span<uin
std::memcpy(dstAddress, srcAddress, compressedDataViews[index].size());
});
// We deliberately only copy over the scanline sizes at the end since they need to be endian swapped first
endianEncodeBEArray(scanlineSizes);
endianEncodeBEArray(std::span<uint16_t>(scanlineSizes));
std::memcpy(compressedData.data(), reinterpret_cast<uint8_t*>(scanlineSizes.data()), height * sizeof(uint16_t));
}
else
Expand All @@ -370,7 +370,7 @@ std::vector<uint8_t> CompressRLE(std::vector<T>& uncompressedData, std::span<uin
std::memcpy(dstAddress, srcAddress, compressedDataViews[index].size());
});
// We deliberately only copy over the scanline sizes at the end since they need to be endian swapped first
endianEncodeBEArray(scanlineSizes);
endianEncodeBEArray(std::span<uint32_t>(scanlineSizes));
std::memcpy(compressedData.data(), reinterpret_cast<uint8_t*>(scanlineSizes.data()), height * sizeof(uint32_t));
}

Expand All @@ -387,7 +387,7 @@ template<typename T>
std::vector<uint8_t> CompressRLE(std::vector<T>& uncompressedData, const FileHeader& header, const uint32_t width, const uint32_t height)
{
PROFILE_FUNCTION();
endianEncodeBEArray(uncompressedData);
endianEncodeBEArray(std::span<T>(uncompressedData));

std::vector<std::span<uint8_t>> uncompressedDataViews;
for (int i = 0; i < height; ++i)
Expand Down Expand Up @@ -443,7 +443,7 @@ template<typename T>
std::vector<uint8_t> CompressRLEImageDataPsd(std::vector<T>& uncompressedData, const FileHeader& header, const uint32_t width, const uint32_t height, std::vector<uint16_t>& scanlineSizes)
{
PROFILE_FUNCTION();
endianEncodeBEArray(uncompressedData);
endianEncodeBEArray(std::span<T>(uncompressedData));

std::vector<std::span<uint8_t>> uncompressedDataViews;
for (int i = 0; i < height; ++i)
Expand Down Expand Up @@ -483,7 +483,7 @@ template<typename T>
std::vector<uint8_t> CompressRLEImageDataPsb(std::vector<T>& uncompressedData, const FileHeader& header, const uint32_t width, const uint32_t height, std::vector<uint32_t>& scanlineSizes)
{
PROFILE_FUNCTION();
endianEncodeBEArray(uncompressedData);
endianEncodeBEArray(std::span<T>(uncompressedData));

std::vector<std::span<uint8_t>> uncompressedDataViews;
for (int i = 0; i < height; ++i)
Expand Down
10 changes: 5 additions & 5 deletions PhotoshopAPI/src/Core/Compression/Compress_ZIP.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace ZIP_Impl
// ---------------------------------------------------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------------------------------------
template <typename T>
void PredictionEncode(std::vector<T>& data, std::span<uint8_t> buffer, const uint32_t width, const uint32_t height)
void PredictionEncode(std::span<T> data, std::span<uint8_t> buffer, const uint32_t width, const uint32_t height)
{
if (data.size() > buffer.size() * sizeof(T))
PSAPI_LOG_ERROR("PredictionEncode", "Buffer size does not match data size, expected at least %zu bytes but got %zu instead", data.size() * sizeof(T), buffer.size());
Expand Down Expand Up @@ -66,7 +66,7 @@ namespace ZIP_Impl
// ---------------------------------------------------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------------------------------------
template <>
inline void PredictionEncode(std::vector<float32_t>& data, std::span<uint8_t> buffer, const uint32_t width, const uint32_t height)
inline void PredictionEncode(std::span<float32_t> data, std::span<uint8_t> buffer, const uint32_t width, const uint32_t height)
{
if (data.size() > buffer.size() * sizeof(float32_t))
PSAPI_LOG_ERROR("PredictionEncode", "Buffer size does not match data size, expected at least %zu bytes but got %zu instead", data.size() * sizeof(float32_t), buffer.size());
Expand Down Expand Up @@ -118,7 +118,7 @@ namespace ZIP_Impl
// ---------------------------------------------------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------------------------------------
template <typename T>
std::vector<uint8_t> Compress(const std::vector<T>& uncompressedData, std::span<uint8_t> buffer, libdeflate_compressor* compressor)
std::vector<uint8_t> Compress(const std::span<T> uncompressedData, std::span<uint8_t> buffer, libdeflate_compressor* compressor)
{
PROFILE_FUNCTION();
std::vector<uint8_t> compressedData;
Expand Down Expand Up @@ -183,7 +183,7 @@ namespace ZIP_Impl
// ---------------------------------------------------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------------------------------------
template <typename T>
std::vector<uint8_t> CompressZIP(std::vector<T>& uncompressedIn, std::span<uint8_t> buffer, libdeflate_compressor* compressor)
std::vector<uint8_t> CompressZIP(std::span<T> uncompressedIn, std::span<uint8_t> buffer, libdeflate_compressor* compressor)
{
PROFILE_FUNCTION();
// Convert uncompressed data to native endianness in-place
Expand Down Expand Up @@ -225,7 +225,7 @@ std::vector<uint8_t> CompressZIP(std::vector<T>& uncompressedIn)
// ---------------------------------------------------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------------------------------------
template <typename T>
std::vector<uint8_t> CompressZIPPrediction(std::vector<T>& uncompressedIn, std::span<uint8_t> buffer, libdeflate_compressor* compressor, const uint32_t width, const uint32_t height)
std::vector<uint8_t> CompressZIPPrediction(std::span<T> uncompressedIn, std::span<uint8_t> buffer, libdeflate_compressor* compressor, const uint32_t width, const uint32_t height)
{
PROFILE_FUNCTION();

Expand Down
4 changes: 2 additions & 2 deletions PhotoshopAPI/src/Core/Compression/Compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ inline void DecompressData(ByteStream& stream, std::span<T> buffer, uint64_t off
// RLE compression will encode the scanline sizes at the start of the data as well. This would equals to
// 2/4 * height bytes of additional data (2 bytes for PSD and 4 for PSB)
template <typename T>
inline std::vector<uint8_t> CompressData(std::vector<T>& uncompressedIn, std::span<uint8_t> buffer, libdeflate_compressor* compressor, const Enum::Compression& compression, const FileHeader& header, const uint32_t width, const uint32_t height)
inline std::vector<uint8_t> CompressData(std::span<T> uncompressedIn, std::span<uint8_t> buffer, libdeflate_compressor* compressor, const Enum::Compression& compression, const FileHeader& header, const uint32_t width, const uint32_t height)
{
if (compression == Enum::Compression::Raw)
{
Expand Down Expand Up @@ -96,7 +96,7 @@ inline std::vector<uint8_t> CompressData(std::vector<T>& uncompressedIn, const E
{
if (compression == Enum::Compression::Raw)
{
endianEncodeBEArray(uncompressedIn);
endianEncodeBEArray(std::span<T>(uncompressedIn));
std::vector<uint8_t> data(uncompressedIn.size() * sizeof(T));
std::memcpy(reinterpret_cast<void*>(data.data()), reinterpret_cast<void*>(uncompressedIn.data()), data.size());
return data;
Expand Down
6 changes: 3 additions & 3 deletions PhotoshopAPI/src/Core/Endian/EndianByteSwapArr.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ inline void endianDecodeBEArray<uint8_t>(std::span<uint8_t> data)
// ---------------------------------------------------------------------------------------------------------------------
#ifdef __AVX2__
template<typename T>
void endianEncodeBEArray(std::vector<T>& data)
void endianEncodeBEArray(std::span<T> data)
{
PROFILE_FUNCTION();
// We want to split up the vector into blocks that can easily fit into a L1 cache
Expand Down Expand Up @@ -361,7 +361,7 @@ inline void endianDecodeBEArray<uint8_t>(std::span<uint8_t> data)
}
#else
template<typename T>
void endianEncodeBEArray(std::vector<T>& data)
void endianEncodeBEArray(std::span<T> data)
{
PROFILE_FUNCTION();
for (uint64_t i = 0; i < data.size(); ++i)
Expand All @@ -375,7 +375,7 @@ inline void endianDecodeBEArray<uint8_t>(std::span<uint8_t> data)
// ---------------------------------------------------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------------------------------------
template <>
inline void endianEncodeBEArray(std::vector<uint8_t>& data)
inline void endianEncodeBEArray(std::span<uint8_t> data)
{
}

Expand Down
133 changes: 111 additions & 22 deletions PhotoshopAPI/src/Core/Struct/ImageChannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "Profiling/Memory/CompressionTracker.h"
#include "PhotoshopFile/FileHeader.h"
#include "CoordinateUtil.h"
#include "ThreadPool.h"

#include "blosc2.h"

Expand All @@ -15,6 +16,14 @@
#include <random>
#include <execution>

// If we compile with C++<20 we replace the stdlib implementation with the compatibility
// library
#if (__cplusplus < 202002L)
#include "tcb_span.hpp"
#else
#include <span>
#endif


#define __STDC_FORMAT_MACROS 1
#include <inttypes.h>
Expand All @@ -30,8 +39,8 @@ PSAPI_NAMESPACE_BEGIN
struct ImageChannel
{
/// The size of each sub-chunk in the super-chunk. For more information about what a chunk and super-chunk is
/// please refer to the c-blosc2 documentation
static const uint64_t m_ChunkSize = 16384 * 16384;
/// please refer to the c-blosc2 documentation. Defaults to 8MB
static const uint64_t m_ChunkSize = 1024 * 1024 * 8;
/// This does not indicate the compression method of the channel in memory
/// but rather the compression method it writes the PhotoshopFile with
Enum::Compression m_Compression = Enum::Compression::Raw;
Expand All @@ -41,15 +50,30 @@ struct ImageChannel
uint64_t m_OrigByteSize = 0u;


/// Get the width of the uncompressed ImageChannel
int32_t getWidth() const { return m_Width; };
/// Get the height of the uncompressed ImageChannel
int32_t getHeight() const { return m_Height; };
/// Get the x-coordinate of the uncompressed ImageChannel
float getCenterX() const { return m_XCoord; };
/// Get the y-coordinate of the uncompressed ImageChannel
float getCenterY() const { return m_YCoord; };
/// Get the total number of chunks held in the ImageChannel
uint64_t getNumChunks() const { return m_NumChunks; };


/// Extract the data from the image channel and invalidate it (can only be called once).
/// If the image data does not exist yet we simply return an empty vector<T>. If the data
/// was already freed we throw
///
/// \param numThreads The number of threads to use for decompression. By default this is 0 which will set it to hardware_concurrency.
/// If you are calling this in a non-threaded environment this is likely the option you should choose
// ---------------------------------------------------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------------------------------------
template <typename T>
std::vector<T> extractData() {
std::vector<T> extractData(size_t numThreads = 0) {
PROFILE_FUNCTION();
auto buffer = getData<T>();
auto buffer = getData<T>(numThreads);
if (buffer.size() > 0)
{
blosc2_schunk_free(m_Data);
Expand All @@ -59,42 +83,113 @@ struct ImageChannel
return std::vector<T>();
}

/// Extract the data from the image channel and invalidate it (can only be called once).
/// If the image data does not exist yet we simply return an empty vector<T>. If the data
/// was already freed we throw
///
/// \param buffer A preallocated buffer whose size matches that of m_OrigByteSize / sizeof(T). If this is not given we throw an error
/// \param numThreads The number of threads to use for decompression. By default this is 0 which will set it to hardware_concurrency.
/// If you are calling this in a non-threaded environment this is likely the option you should choose
// ---------------------------------------------------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------------------------------------
template <typename T>
void extractData(std::span<T> buffer, size_t numThreads = 0) {
PROFILE_FUNCTION();

if (!m_Data)
{
PSAPI_LOG_WARNING("ImageChannel", "extractData() called without the channel having been initialized yet, returning without having filled the given buffer");
return;
}

getData<T>(buffer, numThreads);
blosc2_schunk_free(m_Data);
m_wasFreed = true;
}


/// Copy the image data out of the ImageChannel, does not free the data afterwards. Returns an empty vector if the
/// data does not exist yet. If the data was already freed we throw
/// Copy the image data out of the ImageChannel using a preallocated buffer, does not free the data afterwards. If the data was already freed we throw
///
/// \param buffer A preallocated buffer whose size matches that of m_OrigByteSize / sizeof(T). If this is not given we throw an error
/// \param numThreads The number of threads to use for decompression. By default this is 0 which will set it to hardware_concurrency.
/// If you are calling this in a non-threaded environment this is likely the option you should choose
// ---------------------------------------------------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------------------------------------
template <typename T>
std::vector<T> getData()
void getData(std::span<T> buffer, size_t numThreads = 0)
{
PROFILE_FUNCTION();

if (!m_Data)
{
PSAPI_LOG_WARNING("ImageChannel", "Channel data does not exist yet, was it initialized?");
return std::vector<T>();
return;
}
if (m_wasFreed)
{
PSAPI_LOG_ERROR("ImageChannel", "Data was already freed, cannot extract it anymore");
}
if (buffer.size() != m_OrigByteSize / sizeof(T))
{
PSAPI_LOG_ERROR("ImageChannel", "getData() buffer must be exactly the size of m_OrigByteSize / sizeof(T)");
}

std::vector<T> buffer(m_OrigByteSize / sizeof(T), 0);
// Set thread number to hardware concurrency if we dont detect threading
if (numThreads == 0)
{
numThreads = std::thread::hardware_concurrency();
}

Internal::ThreadPool pool(numThreads);

uint64_t remainingSize = m_OrigByteSize;
std::vector<blosc2_context*> contexts;
std::vector<std::future<void>> futures; // To store future objects for each task

blosc2_dparams params = BLOSC2_DPARAMS_DEFAULTS;


for (uint64_t nchunk = 0; nchunk < m_NumChunks; ++nchunk)
{
// Create a unique context
contexts.push_back(blosc2_create_dctx(*m_Data->storage->dparams));

void* ptr = reinterpret_cast<uint8_t*>(buffer.data()) + nchunk * m_ChunkSize;
if (remainingSize > m_ChunkSize)
{
blosc2_schunk_decompress_chunk(m_Data, nchunk, ptr, m_ChunkSize);
futures.emplace_back(pool.enqueue([=, this]() {
blosc2_decompress_ctx(contexts.back(), m_Data->data[nchunk], std::numeric_limits<int32_t>::max(), ptr, m_ChunkSize);
}));
remainingSize -= m_ChunkSize;
}
else
{
blosc2_schunk_decompress_chunk(m_Data, nchunk, ptr, remainingSize);
futures.emplace_back(pool.enqueue([=, this]() {
blosc2_decompress_ctx(contexts.back(), m_Data->data[nchunk], std::numeric_limits<int32_t>::max(), ptr, remainingSize);
}));
remainingSize = 0;
}
}

// Wait for all tasks to complete
for (auto& future : futures) {
future.wait();
}
}


/// Copy the image data out of the ImageChannel, does not free the data afterwards. Returns an empty vector if the
/// data does not exist yet. If the data was already freed we throw
///
/// \param numThreads The number of threads to use for decompression. By default this is 0 which will set it to hardware_concurrency.
/// If you are calling this in a non-threaded environment this is likely the option you should choose
// ---------------------------------------------------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------------------------------------
template <typename T>
std::vector<T> getData(size_t numThreads = 0)
{
std::vector<T> buffer(m_OrigByteSize / sizeof(T));
getData(std::span<T>(buffer), numThreads);
return buffer;
}

Expand Down Expand Up @@ -178,17 +273,6 @@ struct ImageChannel
}


/// Get the width of the uncompressed ImageChannel
int32_t getWidth() const { return m_Width; };
/// Get the height of the uncompressed ImageChannel
int32_t getHeight() const { return m_Height; };
/// Get the x-coordinate of the uncompressed ImageChannel
float getCenterX() const { return m_XCoord; };
/// Get the y-coordinate of the uncompressed ImageChannel
float getCenterY() const { return m_YCoord; };
/// Get the total number of chunks held in the ImageChannel
uint64_t getNumChunks() const { return m_NumChunks; };

// On destruction free the blosc2 schunk if it wasnt freed yet
~ImageChannel()
{
Expand All @@ -197,7 +281,10 @@ struct ImageChannel
m_wasFreed = true;
}
ImageChannel() = default;


private:

blosc2_schunk* m_Data = nullptr;
/// Total number of chunks in the super-chunk
uint64_t m_NumChunks = 0u;
Expand All @@ -209,6 +296,8 @@ struct ImageChannel
float m_XCoord = 0.0f;
float m_YCoord = 0.0f;

private:

// Initialize a blosc2 superchunk from a given data span, maybe we could augment this to give control over compression params?
// ---------------------------------------------------------------------------------------------------------------------
// ---------------------------------------------------------------------------------------------------------------------
Expand Down
Loading
Loading