Skip to content

Commit

Permalink
Implement batch write (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
rolincova committed Feb 7, 2020
1 parent 838b1c4 commit 23cf2e4
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 13 deletions.
4 changes: 2 additions & 2 deletions lib/influxdb2/client/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ def initialize(url, token, options = nil)
# Write time series data into InfluxDB thought WriteApi.
#
# @return [WriteApi] New instance of WriteApi.
def create_write_api
WriteApi.new(options: @options)
def create_write_api(write_options: InfluxDB2::WriteOptions.new)
WriteApi.new(options: @options, write_options: write_options)
end

# Get the Query client.
Expand Down
35 changes: 35 additions & 0 deletions lib/influxdb2/client/worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# The MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

module InfluxDB2
# Worker for handling write batching queue
#
class Worker
def initialize(write_options)
@write_options = write_options

@queue = Queue.new
end

def push(payload)
@queue.push(payload)
end
end
end
42 changes: 31 additions & 11 deletions lib/influxdb2/client/write_api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
require_relative 'worker'

module InfluxDB2
module WriteType
SYNCHRONOUS = 1
BATCHING = 2
end

#Creates write api configuration.
# Creates write api configuration.
#
#@param write_type: methods of write (batching, asynchronous, synchronous)
#@param batch_size: the number of data point to collect in batch
#@param flush_interval: flush data at least in this interval
# @param write_type: methods of write (batching, asynchronous, synchronous)
# @param batch_size: the number of data point to collect in batch
# @param flush_interval: flush data at least in this interval
class WriteOptions
def initialize(write_type: WriteType::SYNCHRONOUS, batch_size: 1_000, flush_interval: 1_000)
@write_type = write_type
Expand All @@ -47,7 +48,7 @@ class WritePrecision
NANOSECOND = 'ns'.freeze

def get_from_value(value)
constants = WritePrecision.constants.select {|c| WritePrecision.const_get(c) == value}
constants = WritePrecision.constants.select { |c| WritePrecision.const_get(c) == value }
raise "The time precision #{value} is not supported." if constants.empty?

value
Expand Down Expand Up @@ -109,26 +110,47 @@ def write(data:, precision: nil, bucket: nil, org: nil)
uri = URI.parse(File.join(@options[:url], '/api/v2/write'))
uri.query = URI.encode_www_form(bucket: bucket_param, org: org_param, precision: precision_param.to_s)

_post(payload, uri)
if WriteType::BATCHING == @write_options.write_type
_worker.push(payload)
else
_post(payload, uri)
end
end

# Item for batching queue
class BatchItem
def initialize(key, data)
@key = key
@data = data
end
attr_reader :key, :data
end

# Key for batch item
class BatchItemKey
def initialize(bucket, org, precision = DEFAULT_WRITE_PRECISION)
@bucket = bucket
@org = org
@precision = precision
end
attr_reader :bucket, :org, :precision
end

private

WORKER_MUTEX = Mutex.new
def _worker
return @worker if @worker

WORKER_MUTEX.synchronize do
# this return is necessary because the previous mutex holder
# might have already assigned the @worker
return @worker if @worker

@worker = Worker.new(@write_options)
end
end

def _generate_payload(data, precision: nil, bucket: nil, org: nil)
if data.nil?
nil
Expand All @@ -137,12 +159,10 @@ def _generate_payload(data, precision: nil, bucket: nil, org: nil)
elsif data.is_a?(String)
if data.empty?
nil
elsif @write_options.write_type == WriteType::BATCHING
BatchItem.new(BatchItemKey.new(bucket, org, precision), data)
else
if @write_options.write_type == WriteType::BATCHING
BatchItem.new(BatchItemKey.new(bucket, org, precision), data)
else
data
end
data
end
elsif data.is_a?(Hash)
_generate_payload(Point.from_hash(data), bucket: bucket, org: org, precision: precision)
Expand Down

0 comments on commit 23cf2e4

Please sign in to comment.