From a533c5a126df317bb1fe8952ae3e589336795e1e Mon Sep 17 00:00:00 2001 From: Oliver Szabo Date: Sat, 22 Jun 2024 21:08:57 +0200 Subject: [PATCH] Prepare parquet support --- .gitignore | 1 + Dockerfile | 42 +++++++ example/fluent.conf | 36 ++++++ example/schema.avsc | 7 ++ lib/fluent/plugin/out_azurestorage_gen2.rb | 121 ++++++++++++++++++++- test.sh | 9 ++ 6 files changed, 210 insertions(+), 6 deletions(-) create mode 100644 Dockerfile create mode 100644 example/fluent.conf create mode 100644 example/schema.avsc create mode 100755 test.sh diff --git a/.gitignore b/.gitignore index 0a560fa..6b15b46 100644 --- a/.gitignore +++ b/.gitignore @@ -56,3 +56,4 @@ build-iPhoneSimulator/ # Used by RuboCop. Remote config files pulled in from inherit_from directive. # .rubocop-https?--* +test-folder diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..ea2d1c4 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,42 @@ +# Use the official Fluentd image from Docker Hub as the base image +FROM fluent/fluentd:v1.14-1 + +# Use root account to use apk +USER root + +# Install plugins (Example: elasticsearch and kafka plugins) +RUN apk add --no-cache --update --virtual .build-deps \ + sudo build-base ruby-dev + +# Copy custom configuration file (uncomment this if you have a custom configuration) +# COPY fluent.conf /fluentd/etc/ + +# Copy plugins configuration file (uncomment this if you have plugins configuration) +# COPY plugins /fluentd/plugins/ + +# Set the configuration file path environment variable +RUN apk add --no-cache git +RUN apk add --no-cache build-base libffi-dev linux-headers +RUN apk add --no-cache curl tar +ENV FLUENTD_CONF="fluent.conf" + +RUN mkdir -p /var/log/fluent +RUN chown fluent:fluent /var/log/fluent + +ENV COLUMNIFY_VERSION=0.1.1 +RUN curl -L -o columnify.tar.gz https://github.com/reproio/columnify/releases/download/v${COLUMNIFY_VERSION}/columnify_${COLUMNIFY_VERSION}_Linux_x86_64.tar.gz \ + && tar -xzf columnify.tar.gz \ + && rm columnify.tar.gz \ + && mv columnify /usr/local/bin/columnify +ADD . /work +#RUN cd /work && sudo gem build /work/fluent-plugin-azurestorage-gen2.gemspec +#RUN cd /work && sudo gem install fluent-plugin-azurestorage-gen2*.gem +RUN sudo gem install /work/fluent-plugin-azurestorage-gen2-*.gem +# Change the user to fluentd +USER fluent + +# Expose the port on which Fluentd will listen + +# Start Fluentd +CMD ["fluentd", "-c", "/fluentd/etc/fluent.conf"] + diff --git a/example/fluent.conf b/example/fluent.conf new file mode 100644 index 0000000..1e353e2 --- /dev/null +++ b/example/fluent.conf @@ -0,0 +1,36 @@ + + @type dummy + dummy {"message": "{\"sample\": \"This is a dummy log entry.\"}"} + tag mytag + + + + @type azurestorage_gen2 + azure_storage_account mystorageabfs + azure_container mycontainer + azure_instance_msi /subscriptions/mysubscriptionid + azure_client_id + azure_object_key_format %{path}-%{index}.%{file_extension} + azure_oauth_refresh_interval 3600 + time_slice_format %Y%m%d-%H + file_extension log # only used with store_as none + path "/cluster-logs/myfolder/${tag[1]}-#{Socket.gethostname}-%M" + auto_create_container true + store_as parquet + format single_value + local_testing true + local_testing_folder /fluentd/test + + parquet_compression_codec uncompressed + schema_file /fluentd/etc/schema.avsc + record_type json + + + @type file + path /var/log/fluent/azurestorage-buffer + timekey 10s + timekey_wait 0s + timekey_use_utc true + chunk_limit_size 64m + + diff --git a/example/schema.avsc b/example/schema.avsc new file mode 100644 index 0000000..f550613 --- /dev/null +++ b/example/schema.avsc @@ -0,0 +1,7 @@ +{ + "type": "record", + "name": "LogRecord", + "fields": [ + {"name": "sample", "type": "string"} + ] +} diff --git a/lib/fluent/plugin/out_azurestorage_gen2.rb b/lib/fluent/plugin/out_azurestorage_gen2.rb index 633b460..96f0b6e 100644 --- a/lib/fluent/plugin/out_azurestorage_gen2.rb +++ b/lib/fluent/plugin/out_azurestorage_gen2.rb @@ -1,5 +1,6 @@ require 'net/http' require 'base64' +require 'open3' require 'openssl' require 'json' require 'tempfile' @@ -53,6 +54,8 @@ def initialize config_param :write_only, :bool, :default => false config_param :upload_timestamp_format, :string, :default => '%H%M%S%L' config_param :http_timeout_seconds, :integer, :default => 120 + config_param :local_testing, :bool, :default => false + config_param :local_testing_folder, :string, :default => "" DEFAULT_FORMAT_TYPE = "out_file" ACCESS_TOKEN_API_VERSION = "2018-02-01" @@ -107,8 +110,10 @@ def multi_workers_ready? end def start - setup_access_token - if !@skip_container_check + if !@local_testing + setup_access_token + end + if !local_testing && !@skip_container_check if @failsafe_container_check begin if @write_only && @auto_create_container @@ -145,7 +150,11 @@ def write(chunk) raw_data = chunk.read unless raw_data.empty? log.debug "azurestorage_gen2: processing raw data", chunk_id: dump_unique_id_hex(chunk.unique_id) - upload_blob(raw_data, chunk) + if @local_testing + handle_local_copy(raw_data, chunk) + else + upload_blob(raw_data, chunk) + end end chunk.close rescue nil @last_azure_storage_path = @azure_storage_path @@ -162,7 +171,11 @@ def write(chunk) end log.debug "azurestorage_gen2: Start uploading temp file: #{tmp.path}" content = File.open(tmp.path, 'rb') { |file| file.read } - upload_blob(content, chunk) + if @local_testing + handle_local_compressed_copy(content, chunk) + else + upload_blob(content, chunk) + end @last_azure_storage_path = @azure_storage_path ensure tmp.close(true) rescue nil @@ -646,6 +659,24 @@ def raise_error(error_message) end end + def handle_local_copy(content, chunk) + folder = @local_testing_folder.empty? ? Dir.pwd : @local_testing_folder + local_path = File.join(folder, "fluentd_output_#{Time.now.to_i}.log") + File.open(local_path, 'wb') do |file| + chunk.write_to(file) + end + log.info "Data written to local file: #{local_path}" + end + + def handle_local_compressed_copy(content, chunk) + folder = @local_testing_folder.empty? ? Dir.pwd : @local_testing_folder + local_path = File.join(folder, "fluentd_output_#{Time.now.to_i}.#{@final_file_extension}") + File.open(local_path, 'wb') do |file| + file.write(content) + end + log.info "Compressed data written to local file: #{local_path}" + end + def uuid_random require 'uuidtools' ::UUIDTools::UUID.random_create.to_s @@ -745,12 +776,90 @@ def content_type 'application/json'.freeze end end + + class ParquetCompressor < Compressor + + config_section :compress, multi: false do + desc "parquet compression codec" + config_param :parquet_compression_codec, :enum, list: [:uncompressed, :snappy, :gzip, :lzo, :brotli, :lz4, :zstd], default: :snappy + desc "parquet file page size" + config_param :parquet_page_size, :size, default: 8192 + desc "parquet file row group size" + config_param :parquet_row_group_size, :size, default: 128 * 1024 * 1024 + desc "record data format type" + config_param :record_type, :enum, list: [:avro, :csv, :jsonl, :msgpack, :tsv, :json], default: :msgpack + desc "schema type" + config_param :schema_type, :enum, list: [:avro, :bigquery], default: :avro + desc "path to schema file" + config_param :schema_file, :string + end + + def configure(conf) + super + check_command("columnify", "-h") + + if [:lzo, :brotli, :lz4].include?(@compress.parquet_compression_codec) + raise Fluent::ConfigError, "unsupported compression codec: #{@compress.parquet_compression_codec}" + end + + @parquet_compression_codec = @compress.parquet_compression_codec.to_s.upcase + if @compress.record_type == :json + @record_type = :jsonl + else + @record_type = @compress.record_type + end + end + + def ext + "parquet".freeze + end + + def content_type + "application/octet-stream".freeze + end + + def compress(chunk, tmp) + chunk_is_file = @buffer_type == "file" + path = if chunk_is_file + chunk.path + else + w = Tempfile.new("chunk-parquet-tmp") + w.binmode + chunk.write_to(w) + w.close + w.path + end + stdout, stderr, status = columnify(path, tmp.path) + unless status.success? + raise Fluent::UnrecoverableError, "failed to execute columnify command. stdout=#{stdout} stderr=#{stderr} status=#{status.inspect}" + end + ensure + unless chunk_is_file + w.close(true) rescue nil + end + end + + private + + def columnify(src_path, dst_path) + Open3.capture3("columnify", + "-parquetCompressionCodec", @parquet_compression_codec, + "-parquetPageSize", @compress.parquet_page_size.to_s, + "-parquetRowGroupSize", @compress.parquet_row_group_size.to_s, + "-recordType", @record_type.to_s, + "-schemaType", @compress.schema_type.to_s, + "-schemaFile", @compress.schema_file, + "-output", dst_path, + src_path) + end + end COMPRESSOR_REGISTRY = Fluent::Registry.new(:azurestorage_compressor_type, 'fluent/plugin/azurestorage_gen2_compressor_') { 'gzip' => GzipCompressor, 'json' => JsonCompressor, - 'text' => TextCompressor + 'text' => TextCompressor, + 'parquet' => ParquetCompressor }.each { |name, compressor| COMPRESSOR_REGISTRY.register(name, compressor) } @@ -774,4 +883,4 @@ def initialize(message="Default message") super(message) end end -end \ No newline at end of file +end diff --git a/test.sh b/test.sh new file mode 100755 index 0000000..68003f9 --- /dev/null +++ b/test.sh @@ -0,0 +1,9 @@ +#!/bin/sh + +#docker build -t oleewere/fluent-plugin-azurestorage-gen2:latest . + +docker run -it --rm \ + -v $(pwd)/example:/fluentd/etc \ + -v $(pwd)/test-folder:/fluentd/test \ + -v $(pwd)/lib/fluent/plugin/out_azurestorage_gen2.rb:/fluentd/plugins/out_azurestorage_gen2.rb \ + oleewere/fluent:latest