Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare parquet support #20

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,4 @@ build-iPhoneSimulator/

# Used by RuboCop. Remote config files pulled in from inherit_from directive.
# .rubocop-https?--*
test-folder
42 changes: 42 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Use the official Fluentd image from Docker Hub as the base image
FROM fluent/fluentd:v1.14-1

# Use root account to use apk
USER root

# Install plugins (Example: elasticsearch and kafka plugins)
RUN apk add --no-cache --update --virtual .build-deps \
sudo build-base ruby-dev

# Copy custom configuration file (uncomment this if you have a custom configuration)
# COPY fluent.conf /fluentd/etc/

# Copy plugins configuration file (uncomment this if you have plugins configuration)
# COPY plugins /fluentd/plugins/

# Set the configuration file path environment variable
RUN apk add --no-cache git
RUN apk add --no-cache build-base libffi-dev linux-headers
RUN apk add --no-cache curl tar
ENV FLUENTD_CONF="fluent.conf"

RUN mkdir -p /var/log/fluent
RUN chown fluent:fluent /var/log/fluent

ENV COLUMNIFY_VERSION=0.1.1
RUN curl -L -o columnify.tar.gz https://github.com/reproio/columnify/releases/download/v${COLUMNIFY_VERSION}/columnify_${COLUMNIFY_VERSION}_Linux_x86_64.tar.gz \
&& tar -xzf columnify.tar.gz \
&& rm columnify.tar.gz \
&& mv columnify /usr/local/bin/columnify
ADD . /work
#RUN cd /work && sudo gem build /work/fluent-plugin-azurestorage-gen2.gemspec
#RUN cd /work && sudo gem install fluent-plugin-azurestorage-gen2*.gem
RUN sudo gem install /work/fluent-plugin-azurestorage-gen2-*.gem
# Change the user to fluentd
USER fluent

# Expose the port on which Fluentd will listen

# Start Fluentd
CMD ["fluentd", "-c", "/fluentd/etc/fluent.conf"]

36 changes: 36 additions & 0 deletions example/fluent.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<source>
@type dummy
dummy {"message": "{\"sample\": \"This is a dummy log entry.\"}"}
tag mytag
</source>

<match mytag>
@type azurestorage_gen2
azure_storage_account mystorageabfs
azure_container mycontainer
azure_instance_msi /subscriptions/mysubscriptionid
azure_client_id <msi client id>
azure_object_key_format %{path}-%{index}.%{file_extension}
azure_oauth_refresh_interval 3600
time_slice_format %Y%m%d-%H
file_extension log # only used with store_as none
path "/cluster-logs/myfolder/${tag[1]}-#{Socket.gethostname}-%M"
auto_create_container true
store_as parquet
format single_value
local_testing true
local_testing_folder /fluentd/test
<compress>
parquet_compression_codec uncompressed
schema_file /fluentd/etc/schema.avsc
record_type json
</compress>
<buffer tag,time>
@type file
path /var/log/fluent/azurestorage-buffer
timekey 10s
timekey_wait 0s
timekey_use_utc true
chunk_limit_size 64m
</buffer>
</match>
7 changes: 7 additions & 0 deletions example/schema.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"type": "record",
"name": "LogRecord",
"fields": [
{"name": "sample", "type": "string"}
]
}
121 changes: 115 additions & 6 deletions lib/fluent/plugin/out_azurestorage_gen2.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require 'net/http'
require 'base64'
require 'open3'
require 'openssl'
require 'json'
require 'tempfile'
Expand Down Expand Up @@ -53,6 +54,8 @@ def initialize
config_param :write_only, :bool, :default => false
config_param :upload_timestamp_format, :string, :default => '%H%M%S%L'
config_param :http_timeout_seconds, :integer, :default => 120
config_param :local_testing, :bool, :default => false
config_param :local_testing_folder, :string, :default => ""

DEFAULT_FORMAT_TYPE = "out_file"
ACCESS_TOKEN_API_VERSION = "2018-02-01"
Expand Down Expand Up @@ -107,8 +110,10 @@ def multi_workers_ready?
end

def start
setup_access_token
if !@skip_container_check
if !@local_testing
setup_access_token
end
if !local_testing && !@skip_container_check
if @failsafe_container_check
begin
if @write_only && @auto_create_container
Expand Down Expand Up @@ -145,7 +150,11 @@ def write(chunk)
raw_data = chunk.read
unless raw_data.empty?
log.debug "azurestorage_gen2: processing raw data", chunk_id: dump_unique_id_hex(chunk.unique_id)
upload_blob(raw_data, chunk)
if @local_testing
handle_local_copy(raw_data, chunk)
else
upload_blob(raw_data, chunk)
end
end
chunk.close rescue nil
@last_azure_storage_path = @azure_storage_path
Expand All @@ -162,7 +171,11 @@ def write(chunk)
end
log.debug "azurestorage_gen2: Start uploading temp file: #{tmp.path}"
content = File.open(tmp.path, 'rb') { |file| file.read }
upload_blob(content, chunk)
if @local_testing
handle_local_compressed_copy(content, chunk)
else
upload_blob(content, chunk)
end
@last_azure_storage_path = @azure_storage_path
ensure
tmp.close(true) rescue nil
Expand Down Expand Up @@ -646,6 +659,24 @@ def raise_error(error_message)
end
end

def handle_local_copy(content, chunk)
folder = @local_testing_folder.empty? ? Dir.pwd : @local_testing_folder
local_path = File.join(folder, "fluentd_output_#{Time.now.to_i}.log")
File.open(local_path, 'wb') do |file|
chunk.write_to(file)
end
log.info "Data written to local file: #{local_path}"
end

def handle_local_compressed_copy(content, chunk)
folder = @local_testing_folder.empty? ? Dir.pwd : @local_testing_folder
local_path = File.join(folder, "fluentd_output_#{Time.now.to_i}.#{@final_file_extension}")
File.open(local_path, 'wb') do |file|
file.write(content)
end
log.info "Compressed data written to local file: #{local_path}"
end

def uuid_random
require 'uuidtools'
::UUIDTools::UUID.random_create.to_s
Expand Down Expand Up @@ -745,12 +776,90 @@ def content_type
'application/json'.freeze
end
end

class ParquetCompressor < Compressor

config_section :compress, multi: false do
desc "parquet compression codec"
config_param :parquet_compression_codec, :enum, list: [:uncompressed, :snappy, :gzip, :lzo, :brotli, :lz4, :zstd], default: :snappy
desc "parquet file page size"
config_param :parquet_page_size, :size, default: 8192
desc "parquet file row group size"
config_param :parquet_row_group_size, :size, default: 128 * 1024 * 1024
desc "record data format type"
config_param :record_type, :enum, list: [:avro, :csv, :jsonl, :msgpack, :tsv, :json], default: :msgpack
desc "schema type"
config_param :schema_type, :enum, list: [:avro, :bigquery], default: :avro
desc "path to schema file"
config_param :schema_file, :string
end

def configure(conf)
super
check_command("columnify", "-h")

if [:lzo, :brotli, :lz4].include?(@compress.parquet_compression_codec)
raise Fluent::ConfigError, "unsupported compression codec: #{@compress.parquet_compression_codec}"
end

@parquet_compression_codec = @compress.parquet_compression_codec.to_s.upcase
if @compress.record_type == :json
@record_type = :jsonl
else
@record_type = @compress.record_type
end
end

def ext
"parquet".freeze
end

def content_type
"application/octet-stream".freeze
end

def compress(chunk, tmp)
chunk_is_file = @buffer_type == "file"
path = if chunk_is_file
chunk.path
else
w = Tempfile.new("chunk-parquet-tmp")
w.binmode
chunk.write_to(w)
w.close
w.path
end
stdout, stderr, status = columnify(path, tmp.path)
unless status.success?
raise Fluent::UnrecoverableError, "failed to execute columnify command. stdout=#{stdout} stderr=#{stderr} status=#{status.inspect}"
end
ensure
unless chunk_is_file
w.close(true) rescue nil
end
end

private

def columnify(src_path, dst_path)
Open3.capture3("columnify",
"-parquetCompressionCodec", @parquet_compression_codec,
"-parquetPageSize", @compress.parquet_page_size.to_s,
"-parquetRowGroupSize", @compress.parquet_row_group_size.to_s,
"-recordType", @record_type.to_s,
"-schemaType", @compress.schema_type.to_s,
"-schemaFile", @compress.schema_file,
"-output", dst_path,
src_path)
end
end

COMPRESSOR_REGISTRY = Fluent::Registry.new(:azurestorage_compressor_type, 'fluent/plugin/azurestorage_gen2_compressor_')
{
'gzip' => GzipCompressor,
'json' => JsonCompressor,
'text' => TextCompressor
'text' => TextCompressor,
'parquet' => ParquetCompressor
}.each { |name, compressor|
COMPRESSOR_REGISTRY.register(name, compressor)
}
Expand All @@ -774,4 +883,4 @@ def initialize(message="Default message")
super(message)
end
end
end
end
9 changes: 9 additions & 0 deletions test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/sh

#docker build -t oleewere/fluent-plugin-azurestorage-gen2:latest .

docker run -it --rm \
-v $(pwd)/example:/fluentd/etc \
-v $(pwd)/test-folder:/fluentd/test \
-v $(pwd)/lib/fluent/plugin/out_azurestorage_gen2.rb:/fluentd/plugins/out_azurestorage_gen2.rb \
oleewere/fluent:latest
Loading