From 23cf2e4fc128221e5ef55c47d4a194fab11cd002 Mon Sep 17 00:00:00 2001 From: Pavlina Rolincova Date: Fri, 7 Feb 2020 12:17:40 +0100 Subject: [PATCH] Implement batch write (#19) --- lib/influxdb2/client/client.rb | 4 +-- lib/influxdb2/client/worker.rb | 35 ++++++++++++++++++++++++++ lib/influxdb2/client/write_api.rb | 42 +++++++++++++++++++++++-------- 3 files changed, 68 insertions(+), 13 deletions(-) create mode 100644 lib/influxdb2/client/worker.rb diff --git a/lib/influxdb2/client/client.rb b/lib/influxdb2/client/client.rb index eddc3dcc..f45d48bb 100644 --- a/lib/influxdb2/client/client.rb +++ b/lib/influxdb2/client/client.rb @@ -56,8 +56,8 @@ def initialize(url, token, options = nil) # Write time series data into InfluxDB thought WriteApi. # # @return [WriteApi] New instance of WriteApi. - def create_write_api - WriteApi.new(options: @options) + def create_write_api(write_options: InfluxDB2::WriteOptions.new) + WriteApi.new(options: @options, write_options: write_options) end # Get the Query client. diff --git a/lib/influxdb2/client/worker.rb b/lib/influxdb2/client/worker.rb new file mode 100644 index 00000000..cc4ab795 --- /dev/null +++ b/lib/influxdb2/client/worker.rb @@ -0,0 +1,35 @@ +# The MIT License +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +module InfluxDB2 + # Worker for handling write batching queue + # + class Worker + def initialize(write_options) + @write_options = write_options + + @queue = Queue.new + end + + def push(payload) + @queue.push(payload) + end + end +end diff --git a/lib/influxdb2/client/write_api.rb b/lib/influxdb2/client/write_api.rb index 246ae48a..a0777574 100644 --- a/lib/influxdb2/client/write_api.rb +++ b/lib/influxdb2/client/write_api.rb @@ -17,6 +17,7 @@ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. +require_relative 'worker' module InfluxDB2 module WriteType @@ -24,11 +25,11 @@ module WriteType BATCHING = 2 end - #Creates write api configuration. + # Creates write api configuration. # - #@param write_type: methods of write (batching, asynchronous, synchronous) - #@param batch_size: the number of data point to collect in batch - #@param flush_interval: flush data at least in this interval + # @param write_type: methods of write (batching, asynchronous, synchronous) + # @param batch_size: the number of data point to collect in batch + # @param flush_interval: flush data at least in this interval class WriteOptions def initialize(write_type: WriteType::SYNCHRONOUS, batch_size: 1_000, flush_interval: 1_000) @write_type = write_type @@ -47,7 +48,7 @@ class WritePrecision NANOSECOND = 'ns'.freeze def get_from_value(value) - constants = WritePrecision.constants.select {|c| WritePrecision.const_get(c) == value} + constants = WritePrecision.constants.select { |c| WritePrecision.const_get(c) == value } raise "The time precision #{value} is not supported." if constants.empty? value @@ -109,26 +110,47 @@ def write(data:, precision: nil, bucket: nil, org: nil) uri = URI.parse(File.join(@options[:url], '/api/v2/write')) uri.query = URI.encode_www_form(bucket: bucket_param, org: org_param, precision: precision_param.to_s) - _post(payload, uri) + if WriteType::BATCHING == @write_options.write_type + _worker.push(payload) + else + _post(payload, uri) + end end + # Item for batching queue class BatchItem def initialize(key, data) @key = key @data = data end + attr_reader :key, :data end + # Key for batch item class BatchItemKey def initialize(bucket, org, precision = DEFAULT_WRITE_PRECISION) @bucket = bucket @org = org @precision = precision end + attr_reader :bucket, :org, :precision end private + WORKER_MUTEX = Mutex.new + def _worker + return @worker if @worker + + WORKER_MUTEX.synchronize do + # this return is necessary because the previous mutex holder + # might have already assigned the @worker + return @worker if @worker + + @worker = Worker.new(@write_options) + end + end + def _generate_payload(data, precision: nil, bucket: nil, org: nil) if data.nil? nil @@ -137,12 +159,10 @@ def _generate_payload(data, precision: nil, bucket: nil, org: nil) elsif data.is_a?(String) if data.empty? nil + elsif @write_options.write_type == WriteType::BATCHING + BatchItem.new(BatchItemKey.new(bucket, org, precision), data) else - if @write_options.write_type == WriteType::BATCHING - BatchItem.new(BatchItemKey.new(bucket, org, precision), data) - else - data - end + data end elsif data.is_a?(Hash) _generate_payload(Point.from_hash(data), bucket: bucket, org: org, precision: precision)