Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_s3: Add parquet compression type #8837

Open
wants to merge 31 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9a57f8d
aws: Distinguish parquet compression method
cosmo0920 May 20, 2024
1ecbe9b
out_s3: Initial support for parquet format with columnify
cosmo0920 May 20, 2024
6121d22
out_s3: Address verification for schema types
cosmo0920 May 21, 2024
7aa1c26
out_s3: Extract building columnify command prodedure as a function
cosmo0920 May 21, 2024
3e03870
out_s3: Sweep temporary files if not neccesary
cosmo0920 May 21, 2024
01c3c94
out_s3: Use flb_sds_create for simplicity
cosmo0920 May 21, 2024
aff672b
out_s3: Plug undefined reference of mkstemp on Windows
cosmo0920 May 21, 2024
2250c8d
out_s3: Fix wrongly inverted conditions
cosmo0920 May 21, 2024
eb66900
out_s3: Align return value
cosmo0920 May 21, 2024
9beb6ba
out_s3: Use fileno instead of using open to obtain fd
cosmo0920 May 21, 2024
8210117
out_s3: Use flb_sds_create for fixed strings
cosmo0920 May 21, 2024
ef53ef0
out_s3: Migrate Windows API based command executions
cosmo0920 May 21, 2024
ea8ab83
out_s3: Tweak non Windows side of external command executions part
cosmo0920 May 21, 2024
c1db157
out_s3: Cleanup temporary files on Windows
cosmo0920 May 21, 2024
71212d3
out_s3: Validate fread return value
cosmo0920 May 21, 2024
f04b5ff
out_s3: Create temporary files under tmpdir
cosmo0920 May 22, 2024
16fabd6
out_s3: Create temporary files under tempdir on Windows
cosmo0920 May 22, 2024
8938602
out_s3: Eliminate stdout output for existence checking of columnify o…
cosmo0920 May 22, 2024
75a0b33
out_s3: Suppress commandline outputs for existence check
cosmo0920 May 23, 2024
f64a528
compat: in_exec: out_s3: Move common functions for compatibility to c…
cosmo0920 May 23, 2024
35408d9
out_s3: FIx a wrong indent
cosmo0920 May 23, 2024
d8e4ece
out_s3: Tweak for issues when writing a doc
cosmo0920 May 24, 2024
8703913
out_s3: Add check for sds allocations
cosmo0920 Jun 3, 2024
160c688
out_s3: Use _CHECK suffix to clarify meanings
cosmo0920 Jun 3, 2024
56fa751
out_s3: Make temporary directory for parquet processing to be configu…
cosmo0920 Jun 3, 2024
ecf7996
out_s3: Use separated temporary directory against ordinary S3 store_dir
cosmo0920 Jun 4, 2024
a568785
out_s3: Handle creating nested directories for processing parquet obj…
cosmo0920 Jun 4, 2024
41a907b
out_s3: windows: Handle nested directories for processing parquet obj…
cosmo0920 Jun 4, 2024
582d5d3
out_s3: Unify nested directory creations
cosmo0920 Jun 4, 2024
b4c8c53
out_s3: windows: Link neccessary libraries for SHCreateDirectoryExA
cosmo0920 Jun 4, 2024
b414e0e
out_s3: Extract parquet related functions into s3_parquet.c
cosmo0920 Jun 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/fluent-bit/aws/flb_aws_compress.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#define FLB_AWS_COMPRESS_NONE 0
#define FLB_AWS_COMPRESS_GZIP 1
#define FLB_AWS_COMPRESS_ARROW 2
#define FLB_AWS_COMPRESS_PARQUET 3

/*
* Get compression type from compression keyword. The return value is used to identify
Expand Down
45 changes: 44 additions & 1 deletion include/fluent-bit/flb_compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,29 @@ static inline int usleep(LONGLONG usec)
// Convert into 100ns unit.
return nanosleep(usec * 10);
}

static inline FILE* flb_popen(const char *command, const char *type) {
return _popen(command, type);
}
/*
* flb_pclose() has the same return value on Windows as win32 _pclose(), rather
* than posix pclose(). The process exit code is not bit-shifted to the high
* byte.
*
* The MSVC docs for _pclose() at
* https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/pclose?view=msvc-170
* are misleading; they say that "The format of the return value is the same as
* for _cwait, except the low-order and high-order bytes are swapped." But
* _cwait isn't documented as having any meaningful return on success, the
* process exit code is meant to be in its "termstat" out parameter per
* https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/cwait?view=msvc-170
* The return code of _pclose() actually appears to be the process exit code
* without the bit-shift that waitpid() applies.
*/
static inline int flb_pclose(FILE *stream) {
return _pclose(stream);
}

#else
#include <netdb.h>
#include <netinet/in.h>
Expand All @@ -137,7 +160,27 @@ static inline int usleep(LONGLONG usec)
#include <dlfcn.h>

#define FLB_DIRCHAR '/'
#endif

/*
* Because Windows has to do everything differently, call _popen() and
* _pclose() instead of the POSIX popen() and pclose() functions.
*
* flb_pclose() has different return value semantics on Windows vs non-windows
* targets because it propagates the pclose() or _pclose() return value
* directly. You MUST use the FLB_WIFEXITED(), FLB_WEXITSTATUS(),
* FLB_WIFSIGNALED() and FLB_WTERMSIG() macros to consume the return value,
* rather than the underlying POSIX macros or manual bit-shifts.
*/
static inline FILE* flb_popen(const char *command, const char *type) {
return popen(command, type);
}
static inline int flb_pclose(FILE *stream) {
return pclose(stream);
}

#define FLB_PCLOSE pclose

#endif /* FLB_SYSTEM_WINDOWS */

#ifdef FLB_HAVE_UNIX_SOCKET
#include <sys/un.h>
Expand Down
42 changes: 0 additions & 42 deletions plugins/in_exec/in_exec_win32_compat.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,46 +49,4 @@
#define FLB_WTERMSIG(status) (-1)
#endif

/*
* Because Windows has to do everything differently, call _popen() and
* _pclose() instead of the POSIX popen() and pclose() functions.
*
* flb_pclose() has different return value semantics on Windows vs non-windows
* targets because it propagates the pclose() or _pclose() return value
* directly. You MUST use the FLB_WIFEXITED(), FLB_WEXITSTATUS(),
* FLB_WIFSIGNALED() and FLB_WTERMSIG() macros to consume the return value,
* rather than the underlying POSIX macros or manual bit-shifts.
*/
#if !defined(FLB_SYSTEM_WINDOWS)
static inline FILE* flb_popen(const char *command, const char *type) {
return popen(command, type);
}
static inline int flb_pclose(FILE *stream) {
return pclose(stream);
}
#define FLB_PCLOSE pclose
#else
static inline FILE* flb_popen(const char *command, const char *type) {
return _popen(command, type);
}
/*
* flb_pclose() has the same return value on Windows as win32 _pclose(), rather
* than posix pclose(). The process exit code is not bit-shifted to the high
* byte.
*
* The MSVC docs for _pclose() at
* https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/pclose?view=msvc-170
* are misleading; they say that "The format of the return value is the same as
* for _cwait, except the low-order and high-order bytes are swapped." But
* _cwait isn't documented as having any meaningful return on success, the
* process exit code is meant to be in its "termstat" out parameter per
* https://learn.microsoft.com/en-us/cpp/c-runtime-library/reference/cwait?view=msvc-170
* The return code of _pclose() actually appears to be the process exit code
* without the bit-shift that waitpid() applies.
*/
static inline int flb_pclose(FILE *stream) {
return _pclose(stream);
}
#endif

#endif /* FLB_IN_EXEC_WIN32_COMPAT_H */
13 changes: 12 additions & 1 deletion plugins/out_s3/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
set(src
s3.c
s3_store.c
s3_multipart.c)
s3_multipart.c
s3_parquet.c)

set(libs "")

if(${CMAKE_SYSTEM_NAME} MATCHES "Windows")
set(libs
${libs}
Shell32.lib
Shlwapi.lib)
endif()

FLB_PLUGIN(out_s3 "${src}" "")
target_link_libraries(flb-plugin-out_s3 ${libs})
Loading
Loading