Skip to content

Commit

Permalink
Merge pull request #2 from Hugoberry/messy
Browse files Browse the repository at this point in the history
Using DuckDB fileSystem calls
  • Loading branch information
Hugoberry authored Apr 16, 2024
2 parents 334104d + 7122510 commit 42eb388
Show file tree
Hide file tree
Showing 19 changed files with 1,914 additions and 11 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/MainDistributionPipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
name: Main Extension Distribution Pipeline
on:
push:
branches:
- main
pull_request:
workflow_dispatch:

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/_extension_deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ on:
exclude_archs:
required: false
type: string
default: "windows_amd64_rtools"
default: "windows_amd64_rtools;wasm_mvp;wasm_eh;wasm_threads"
# default: "linux_amd64;linux_amd64_gcc4;osx_arm64;wasm_mvp;wasm_eh;wasm_threads;linux_arm64;osx_amd64;windows_amd64_rtools"
# Whether to upload this deployment as the latest. This may overwrite a previous deployment.
deploy_latest:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/_extension_distribution.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ on:
exclude_archs:
required: false
type: string
default: "windows_amd64_rtools"
default: "windows_amd64_rtools;wasm_mvp;wasm_eh;wasm_threads"
# default: "linux_amd64;linux_amd64_gcc4;osx_arm64;wasm_mvp;wasm_eh;wasm_threads;linux_arm64;osx_amd64;windows_amd64_rtools"
# Postfix added to artifact names. Can be used to guarantee unique names when this workflow is called multiple times
artifact_postfix:
Expand Down
4 changes: 3 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@
"*.tcc": "cpp",
"memory_resource": "cpp",
"string_view": "cpp",
"shared_mutex": "cpp"
"shared_mutex": "cpp",
"any": "cpp",
"valarray": "cpp"
}
}
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ set(TARGET_NAME pbix)
set(EXTENSION_NAME ${TARGET_NAME}_extension)
set(LOADABLE_EXTENSION_NAME ${TARGET_NAME}_loadable_extension)


project(${TARGET_NAME})
include_directories(src/include)
include_directories(src/abf)
Expand Down
4 changes: 3 additions & 1 deletion extension_config.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ duckdb_extension_load(pbix
)

# Any extra extensions that should be built
# e.g.: duckdb_extension_load(json)
# e.g.:
# duckdb_extension_load(json)
# duckdb_extension_load(httpfs)
91 changes: 91 additions & 0 deletions ksy/zip_central_dir.ksy
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
meta:
id: pbix
title: PBIX archive file
file-extension: pbix
endian: le
bit-endian: le
seq:
- id: sections
type: pk_section
repeat: eos
types:
pk_section:
seq:
- id: magic
contents: 'PK'
- id: section_type
type: u2
- id: body
type:
switch-on: section_type
cases:
0x0201: central_dir_entry
0x0403: local_file
0x0605: end_of_central_dir
0x0807: data_descriptor
data_descriptor:
seq:
- id: data_descriptor_obs
size: 12
local_file:
seq:
- id: header
type: local_file_header
- id: body
size: header.len_body_compressed
local_file_header:
seq:
- id: header_trimmed
size: 14
- id: len_body_compressed
type: u4
- id: len_body_uncompressed
type: u4
- id: len_file_name
type: u2
- id: len_extra
type: u2
- id: file_name
size: len_file_name
- id: extra
size: len_extra
central_dir_entry:
seq:
- id: header_obs
size: 12
- id: crc32
type: u4
- id: len_body_compressed
type: u4
- id: len_body_uncompressed
type: u4
- id: len_file_name
type: u2
- id: len_extra
type: u2
- id: len_comment
type: u2
- id: disk_number_start
type: u2
- id: int_file_attr
type: u2
- id: ext_file_attr
type: u4
- id: ofs_local_header
type: s4
- id: file_name
type: str
size: len_file_name
encoding: UTF-8
- id: extra
size: len_extra
- id: comment
size: len_comment
end_of_central_dir:
seq:
- id: header_obs
size: 16
- id: len_comment
type: u2
- id: comment
size: len_comment
183 changes: 181 additions & 2 deletions src/abf/AbfParser.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#include "AbfParser.h"
// #include "duckdb/common/file_system.hpp"


using namespace tinyxml2;
using namespace duckdb;

std::vector<uint8_t> AbfParser::read_buffer_bytes(const std::vector<uint8_t> &buffer, uint64_t offset, int size)
{
Expand Down Expand Up @@ -109,6 +112,49 @@ std::pair<uint64_t, uint64_t> AbfParser::initialize_zip_and_locate_datamodel(con
return {file_stat.m_local_header_ofs, file_stat.m_comp_size};
}


std::pair<uint64_t, uint64_t> AbfParser::locate_datamodel(duckdb::FileHandle &file_handle_p, const std::string &path) {
constexpr auto DataModelFileName = "DataModel";
mz_zip_archive zip_archive;
memset(&zip_archive, 0, sizeof(zip_archive));


// Setup the custom IO operations
zip_archive.m_pIO_opaque = &file_handle_p;
zip_archive.m_pRead = [](void *opaque, mz_uint64 file_offset, void *buffer, size_t n) {
auto handle = static_cast<duckdb::FileHandle *>(opaque);
handle->Seek(file_offset);
return static_cast<size_t>(handle->Read(buffer, n));
};

// Initialize the zip archive for reading using the custom IO
if (!mz_zip_reader_init(&zip_archive, file_handle_p.GetFileSize(), MZ_ZIP_FLAG_COMPRESSED_DATA)) { // Note: MZ_ZIP_FLAG_DO_NOT_SORT_CENTRAL_DIRECTORY might be needed depending on use case
throw std::runtime_error("Could not initialize zip reader");
}

// Locate the DataModel file within the zip
int file_index = mz_zip_reader_locate_file(&zip_archive, DataModelFileName, nullptr, 0);
if (file_index < 0) {
mz_zip_reader_end(&zip_archive); // Clean up before throwing
throw std::runtime_error("DataModel not found in the zip file.");
}

// Retrieve information about the DataModel file
mz_zip_archive_file_stat file_stat;
if (!mz_zip_reader_file_stat(&zip_archive, file_index, &file_stat)) {
mz_zip_reader_end(&zip_archive); // Clean up before throwing
throw std::runtime_error("Could not retrieve information about DataModel.");
}

// // Clean up the zip reader as it's no longer needed after getting the info
mz_zip_reader_end(&zip_archive);
// file_handle = file_handle_p.release();

// Return the offset and compressed size of the DataModel file
return {file_stat.m_local_header_ofs, file_stat.m_comp_size};
}


void AbfParser::read_compressed_datamodel_header(std::ifstream &entryStream, uint64_t &datamodel_ofs) {
// Read compressed DataModel header to adjust offset
entryStream.seekg(datamodel_ofs+ZIP_LOCAL_FILE_HEADER_FIXED);
Expand All @@ -122,7 +168,7 @@ void AbfParser::read_compressed_datamodel_header(std::ifstream &entryStream, uin

std::vector<uint8_t> AbfParser::decompress_initial_block(std::ifstream &entryStream, uint64_t datamodel_ofs, XPress9Wrapper &xpress9_wrapper) {
// Seek to the start of the DataModel compressed data
entryStream.seekg(datamodel_ofs + ABF_XPRESS9_SIGNATRUE, std::ios::beg);
entryStream.seekg(datamodel_ofs + ABF_XPRESS9_SIGNATURE, std::ios::beg);

uint32_t uncompressed_size;
uint32_t compressed_size;
Expand All @@ -144,10 +190,39 @@ std::vector<uint8_t> AbfParser::decompress_initial_block(std::ifstream &entryStr
}
return decompressed_buffer;
}
std::vector<uint8_t> AbfParser::decompress_initial_block(duckdb::FileHandle &file_handle_p, uint64_t &bytes_read,XPress9Wrapper &xpress9_wrapper) {
// Seek to the start of the DataModel compressed data
std::vector<uint8_t> signature(ABF_XPRESS9_SIGNATURE);
file_handle_p.Read(reinterpret_cast<char*>(signature.data()),ABF_XPRESS9_SIGNATURE);

bytes_read += ABF_XPRESS9_SIGNATURE;

uint32_t uncompressed_size;
uint32_t compressed_size;
// Read the compressed and uncompressed sizes before the offset
file_handle_p.Read(reinterpret_cast<char*>(&uncompressed_size), sizeof(uint32_t));
file_handle_p.Read(reinterpret_cast<char*>(&compressed_size), sizeof(uint32_t));
bytes_read += sizeof(uint32_t) + sizeof(uint32_t);

// Allocate buffers for compressed and decompressed data
std::vector<uint8_t> decompressed_buffer(uncompressed_size);
std::vector<uint8_t> compressed_buffer(compressed_size);

file_handle_p.Read(reinterpret_cast<char*>(compressed_buffer.data()), compressed_size);
bytes_read += compressed_size;

// Decompress the entire data
uint32_t decompressed_size = xpress9_wrapper.Decompress(compressed_buffer.data(), compressed_size, decompressed_buffer.data(), decompressed_buffer.size());
// Verify that the total decompressed size matches the expected size
if (decompressed_size != uncompressed_size) {
throw std::runtime_error("Mismatch in decompressed block size in first block.");
}
// file_handle = file_handle_p.release();
return decompressed_buffer;
}

std::vector<uint8_t> AbfParser::iterate_and_decompress_blocks(std::ifstream &entryStream, uint64_t datamodel_ofs, uint64_t datamodel_size, XPress9Wrapper &xpress9_wrapper, uint64_t virtual_directory_offset, int virtual_directory_size, const int trailing_blocks, uint64_t &skip_offset) {
// Calculate the total number of blocks
constexpr uint32_t BLOCK_SIZE = 0x200000;
auto total_blocks = (virtual_directory_size + virtual_directory_offset) / BLOCK_SIZE;

std::vector<uint8_t> all_decompressed_data;
Expand Down Expand Up @@ -195,6 +270,61 @@ std::vector<uint8_t> AbfParser::iterate_and_decompress_blocks(std::ifstream &ent
return all_decompressed_data;
}

std::vector<uint8_t> AbfParser::iterate_and_decompress_blocks(duckdb::FileHandle &file_handle_p, uint64_t &bytes_read, uint64_t datamodel_ofs,uint64_t datamodel_size, XPress9Wrapper &xpress9_wrapper, uint64_t virtual_directory_offset, int virtual_directory_size, const int trailing_blocks, uint64_t &skip_offset) {
// Calculate the total number of blocks

auto total_blocks = (virtual_directory_size + virtual_directory_offset) / BLOCK_SIZE;

std::vector<uint8_t> all_decompressed_data;
uint32_t block_index = 0;
uint32_t block_index_iterator = 0;

// Iterate through each block in the DataModel
while (bytes_read < datamodel_size) {
block_index++;
// Read the compressed and uncompressed sizes
uint32_t uncompressed_size = 0;
uint32_t compressed_size = 0;
file_handle_p.Read(reinterpret_cast<char*>(&uncompressed_size), sizeof(uncompressed_size));
file_handle_p.Read(reinterpret_cast<char*>(&compressed_size), sizeof(compressed_size));
bytes_read += sizeof(uncompressed_size) + sizeof(compressed_size);

// Skip blocks if not within the last `trailing_blocks` (based on your logic)
if (total_blocks > trailing_blocks && block_index < (total_blocks - trailing_blocks)) {
skip_offset += uncompressed_size;
bytes_read += compressed_size;
file_handle_p.Seek(datamodel_ofs+bytes_read); // Skip this block
continue;
}

// Allocate buffers for the compressed and decompressed data
std::vector<uint8_t> compressed_buffer(compressed_size);
std::vector<uint8_t> decompressed_buffer(uncompressed_size);

// Read the compressed block
file_handle_p.Read(reinterpret_cast<char*>(compressed_buffer.data()), compressed_size);
bytes_read += compressed_size;

// call to a new function process header_buffer which we'll use to modify compressed_buffer
patch_header_of_compressed_buffer(compressed_buffer, block_index_iterator);

// Decompress the block
uint32_t decompressed_size = xpress9_wrapper.Decompress(compressed_buffer.data(), compressed_size, decompressed_buffer.data(), decompressed_buffer.size());

// Verify decompression success
if (decompressed_size != uncompressed_size) {
throw std::runtime_error("Decompression failed or resulted in unexpected size.");
}

// Add decompressed data to the overall buffer
all_decompressed_data.insert(all_decompressed_data.end(), decompressed_buffer.begin(), decompressed_buffer.end());
}

// file_handle = file_handle_p.release();

return all_decompressed_data;
}

std::vector<uint8_t> AbfParser::get_sqlite(const std::string &path, const int trailing_blocks=15)
{
// Initialize zip and locate DataModel
Expand Down Expand Up @@ -228,6 +358,55 @@ std::vector<uint8_t> AbfParser::get_sqlite(const std::string &path, const int tr
// Prefix all_decompressed_buffer with initial_decompressed_buffer in case we have only one block
all_decompressed_buffer.insert(all_decompressed_buffer.begin(), initial_decompressed_buffer.begin(), initial_decompressed_buffer.end());

if (skip_offset + all_decompressed_buffer.size() < virtual_directory_offset + virtual_directory_size)
{
throw std::runtime_error("Could not parse the entire DataModel.");
}
// Finally, extract the SQLite buffer from the decompressed data
return extract_sqlite_buffer(all_decompressed_buffer, skip_offset, virtual_directory_offset, virtual_directory_size);
}
std::vector<uint8_t> AbfParser::get_sqlite_v2(duckdb::ClientContext &context, const std::string &path, const int trailing_blocks=15)
{
auto &fs = duckdb::FileSystem::GetFileSystem(context);
// Open the file using FileSystem
auto file_handle = fs.OpenFile(path, FILE_READ);
if (!file_handle) {
throw std::runtime_error("Could not open zip file");
}

auto [datamodel_ofs, datamodel_size] = locate_datamodel(*file_handle, path);
uint64_t bytes_read = 0;
uint16_t zip_pointer = 0;

// Read compressed DataModel header to adjust offset
file_handle->Seek(datamodel_ofs+ZIP_LOCAL_FILE_HEADER_FIXED);
uint16_t filename_len = 0;
uint16_t extra_len = 0;
file_handle->Read(reinterpret_cast<char *>(&filename_len), sizeof(filename_len));
file_handle->Read(reinterpret_cast<char *>(&extra_len), sizeof(extra_len));
datamodel_ofs += ZIP_LOCAL_FILE_HEADER + filename_len + extra_len;

file_handle->Seek(datamodel_ofs);

XPress9Wrapper xpress9_wrapper;
if (!xpress9_wrapper.Initialize())
{
throw std::runtime_error("Failed to initialize XPress9Wrapper");
}

// Decompress initial block to get the virtual directory info
auto initial_decompressed_buffer = decompress_initial_block(*file_handle, bytes_read,xpress9_wrapper);

// Process backup log header to get virtual directory offset and size
auto [virtual_directory_offset, virtual_directory_size] = process_backup_log_header(initial_decompressed_buffer);

uint64_t skip_offset = 0; //optimization for skipping blocks
// Iterate through the remaining blocks and decompress them
auto all_decompressed_buffer = iterate_and_decompress_blocks(*file_handle, bytes_read, datamodel_ofs, datamodel_size, xpress9_wrapper, virtual_directory_offset, virtual_directory_size, trailing_blocks, skip_offset);

// Prefix all_decompressed_buffer with initial_decompressed_buffer in case we have only one block
all_decompressed_buffer.insert(all_decompressed_buffer.begin(), initial_decompressed_buffer.begin(), initial_decompressed_buffer.end());

if (skip_offset + all_decompressed_buffer.size() < virtual_directory_offset + virtual_directory_size)
{
throw std::runtime_error("Could not parse the entire DataModel.");
Expand Down
Loading

0 comments on commit 42eb388

Please sign in to comment.