A ruby client for druid.
ruby-druid features a Squeel-like query DSL and generates a JSON query that can be sent to druid directly. A console for testing is also provided.
Add this line to your application's Gemfile:
gem 'ruby-druid'
And then execute:
$ bundle
Or install it yourself as:
$ gem install ruby-druid
Druid::Client.new('zk1:2181,zk2:2181/druid').query('service/source')
returns a query object on which all other methods can be called to create a full and valid druid query.
A query object can be sent like this:
client = Druid::Client.new('zk1:2181,zk2:2181/druid')
query = Druid::Query.new('service/source')
client.send(query)
The send
method returns the parsed response from the druid server as an
array. If the response is not empty it contains one ResponseRow
object for
each row. The timestamp by can be received by a method with the same name
(i.e. row.timestamp
), all row values by hashlike syntax (i.e.
`row['dimension'])
An options hash can be passed when creating Druid::Client
instance:
client = Druid::Client.new('zk1:2181,zk2:2181/druid', http_timeout: 20)
Supported options are:
static_setup
to explicitly specify a broker url, e.g.static_setup: { 'my/source_name' => 'http://1.2.3.4:8080/druid/v2/' }
http_timeout
to define a timeout for sending http queries to a broker (in minutes, default value is 2)
A GroupByQuery sets the dimensions to group the data.
queryType
is set automatically to groupBy
.
Druid::Query.new('service/source').group_by([:dimension1, :dimension2])
A TimeSeriesQuery returns an array of JSON objects where each object represents a value asked for by the timeseries query.
Druid::Query.new('service/source').time_series([:aggregate1, :aggregate2])
Druid::Query.new('service/source').long_sum([:aggregate1, :aggregate2])
In the same way could be used the following methods for
aggregations adding: double_sum, count, min, max, hyper_unique
Druid::Query.new('service/source').cardinality(:aggregate, [:dimension1, dimension2], <by_row: true | false>)
For example calculation for sum(log(x)/y) + 10
:
Druid::Query.new('service/source').js_aggregation(:aggregate, [:x, :y],
aggregate: "function(current, a, b) { return current + (Math.log(a) * b); }",
combine: "function(partialA, partialB) { return partialA + partialB; }",
reset: "function() { return 10; }"
)
A simple syntax for post aggregations with +,-,/,* can be used like:
query = Druid::Query.new('service/source').long_sum([:aggregate1, :aggregate2])
query.postagg { (aggregate2 + aggregate2).as output_field_name }
Required fields for the postaggregation are fetched automatically by the library.
Javascript post aggregations are also supported:
query.postagg { js('function(aggregate1, aggregate2) { return aggregate1 + aggregate2; }').as result }
The interval for the query takes a string with date and time or objects that
provide an iso8601
method.
query = Druid::Query.new('service/source').long_sum(:aggregate1)
query.interval("2013-01-01T00", Time.now)
The granularity can be :all
, :none
, :minute
, :fifteen_minute
,
:thirthy_minute
, :hour
or :day
.
It can also be a period granularity as described in the druid wiki.
The period 'day'
or :day
will be interpreted as 'P1D'
.
If a period granularity is specifed, the (optional) second parameter is a time zone. It defaults to the machines local time zone. i.e.
query = Druid::Query.new('service/source').long_sum(:aggregate1)
query.granularity(:day)
is (on my box) the same as
query = Druid::Query.new('service/source').long_sum(:aggregate1)
query.granularity('P1D', 'Europe/Berlin')
# equality
Druid::Query.new('service/source').having { metric == 10 }
# inequality
Druid::Query.new('service/source').having { metric != 10 }
# greater, less
Druid::Query.new('service/source').having { metric > 10 }
Druid::Query.new('service/source').having { metric < 10 }
Having filters can be combined with boolean logic.
# and
Druid::Query.new('service/source').having { (metric != 1) & (metric2 != 2) }
# or
Druid::Query.new('service/source').having { (metric == 1) | (metric2 == 2) }
# not
Druid::Query.new('service/source').having{ !metric.eq(1) }
Filters are set by the filter
method. It takes a block or a hash as
parameter.
Filters can be chained filter{...}.filter{...}
# equality
Druid::Query.new('service/source').filter{dimension.eq 1}
Druid::Query.new('service/source').filter{dimension == 1}
# inequality
Druid::Query.new('service/source').filter{dimension.neq 1}
Druid::Query.new('service/source').filter{dimension != 1}
# greater, less
Druid::Query.new('service/source').filter{dimension > 1}
Druid::Query.new('service/source').filter{dimension >= 1}
Druid::Query.new('service/source').filter{dimension < 1}
Druid::Query.new('service/source').filter{dimension <= 1}
# JavaScript
Druid::Query.new('service/source').filter{a.javascript('dimension >= 1 && dimension < 5')}
Filters can be combined with boolean logic.
# and
Druid::Query.new('service/source').filter{dimension.neq 1 & dimension2.neq 2}
# or
Druid::Query.new('service/source').filter{dimension.neq 1 | dimension2.neq 2}
# not
Druid::Query.new('service/source').filter{!dimension.eq(1)}
This filter creates a set of equals filters in an or filter.
Druid::Query.new('service/source').filter{dimension.in(1,2,3)}
These filters have to be combined with time_series and do only work when coordinates is a spatial dimension GeographicQueries
Druid::Query.new('service/source').time_series().long_sum([:aggregate1]).filter{coordinates.in_rec [[50.0,13.0],[54.0,15.0]]}
Druid::Query.new('service/source').time_series().long_sum([:aggregate1]).filter{coordinates.in_circ [[53.0,13.0], 5.0]}
This filter creates a set of not-equals fitlers in an and filter.
Druid::Query.new('service/source').filter{dimension.nin(1,2,3)}
Sometimes it can be useful to use a hash syntax for filtering for example if you already get them from a list or parameter hash.
Druid::Query.new('service/source').filter{dimension => 1, dimension1 =>2, dimension2 => 3}
#this is the same as
Druid::Query.new('service/source').filter{dimension.eq(1) & dimension1.eq(2) & dimension2.eq(3)}
ruby-druid now includes a REPL:
$ bin/dripl
>> metrics
[
[0] "actions"
[1] "words"
]
>> dimensions
[
[0] "type"
]
>> long_sum(:actions)
+---------+
| actions |
+---------+
| 98575 |
+---------+
>> long_sum(:actions, :words)[-3.days].granularity(:day)
+---------------+---------------+
| actions | words |
+---------------+---------------+
| 2013-12-11T00:00:00.000+01:00 |
+---------------+---------------+
| 537345 | 68974 |
+---------------+---------------+
| 2013-12-12T00:00:00.000+01:00 |
+---------------+---------------+
| 675431 | 49253 |
+---------------+---------------+
| 2013-12-13T00:00:00.000+01:00 |
+---------------+---------------+
| 749034 | 87542 |
+---------------+---------------+
>> long_sum(:actions, :words)[-3.days].granularity(:day).as_json
{
:dataSource => "events",
:granularity => {
:type => "period",
:period => "P1D",
:timeZone => "Europe/Berlin"
},
:intervals => [
[0] "2013-12-11T00:00:00+01:00/2013-12-13T09:41:10+01:00"
],
:queryType => :groupBy,
:aggregations => [
[0] {
:type => "longSum",
:name => :actions,
:fieldName => :actions
},
[1] {
:type => "longSum",
:name => :words,
:fieldName => :words
}
]
}
- Fork it
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create new Pull Request