diff --git a/README.md b/README.md index 1faf754..81a6f39 100644 --- a/README.md +++ b/README.md @@ -29,31 +29,47 @@ gem install ruby-druid ## Usage +1. Connect: + ```ruby -Druid::Client.new('zk1:2181,zk2:2181/druid').query('service/source') +client = Druid::Client.new('zk1:2181,zk2:2181/druid', opts) +datasource = client.data_source('druid:broker/datasource_name') ``` -returns a query object on which all other methods can be called to create a full and valid Druid query. +if broker is behind of load balancer you can connect to static host without service discovery: -A query object can be sent like this: ```ruby -client = Druid::Client.new('zk1:2181,zk2:2181/druid') -query = Druid::Query.new('service/source') -client.send(query) +datasource = Druid::DataSource.new('datasource_name', 'http://broker-host:8080/druid/v2/', opts) ``` -The `send` method returns the parsed response from the druid server as an array. If the response is not empty it contains one `ResponseRow` object for each row. The timestamp by can be received by a method with the same name (i.e. `row.timestamp`), all row values by hashlike syntax (i.e. `row['dimension']) +`opts` is an optional hash of connection options: + +| key | description | type | default | +| ------------------- | -------------------------------------------------- | ------ | ------------ | +| :open_timeout | open timeout for druid services (in seconds) | int | 60 | +| :read_timeout | read timeout for druid services (in seconds) | int | nil | +| :discovery_path | druid service discovery path in zookeeper | string | '/discovery' | + +3. Create query: + +```ruby +query = Druid::Query::Builder.new +``` +4. Build query, e.g.: +```ruby +query.granularity(:all) +query.long_sum(:aggregate1) +# .... +``` -An options hash can be passed when creating `Druid::Client` instance: +5. Send request: ```ruby -client = Druid::Client.new('zk1:2181,zk2:2181/druid', http_timeout: 20) +result = datasource.post(query) ``` -Supported options are: -* `static_setup` to explicitly specify a broker url, e.g. `static_setup: { 'my/source_name' => 'http://1.2.3.4:8080/druid/v2/' }` -* `http_timeout` to define a timeout for sending http queries to a broker (in minutes, default value is 2) +The `post` method returns the parsed response from the druid server as an array. If the response is not empty it contains one `ResponseRow` object for each row. The timestamp by can be received by a method with the same name (i.e. `row.timestamp`), all row values by hashlike syntax (i.e. `row['dimension']) ### GroupBy diff --git a/lib/druid/client.rb b/lib/druid/client.rb index 1462cff..c050c6d 100644 --- a/lib/druid/client.rb +++ b/lib/druid/client.rb @@ -7,12 +7,13 @@ class Client attr_reader :zk def initialize(zookeeper, opts = {}) + @opts = opts @zk = ZK.new(zookeeper, opts) end def data_source(source) uri = @zk.data_sources[source] - Druid::DataSource.new(source, uri) + Druid::DataSource.new(source, uri, @opts) end def data_sources diff --git a/lib/druid/data_source.rb b/lib/druid/data_source.rb index d821207..702dda5 100644 --- a/lib/druid/data_source.rb +++ b/lib/druid/data_source.rb @@ -6,8 +6,10 @@ class DataSource attr_reader :name, :uri, :metrics, :dimensions - def initialize(name, uri) + def initialize(name, uri, opt = {}) @name = name.split('/').last + @open_timeout = opt[:open_timeout] || 10 # if druid is down fail fast + @read_timeout = opt[:read_timeout] # we wait until druid is finished uri = uri.sample if uri.is_a?(Array) if uri.is_a?(String) @uri = URI(uri) @@ -32,11 +34,11 @@ def metadata!(opts = {}) end req = Net::HTTP::Get.new(meta_path) - response = Net::HTTP.new(uri.host, uri.port).start do |http| - http.open_timeout = 10 # if druid is down fail fast - http.read_timeout = nil # we wait until druid is finished - http.request(req) - end + + http = Net::HTTP.new(uri.host, uri.port) + http.open_timeout = @open_timeout + http.read_timeout = @read_timeout + response = http.start { |h| h.request(req) } if response.code != '200' raise "Request failed: #{response.code}: #{response.body}" @@ -61,11 +63,10 @@ def post(query) req = Net::HTTP::Post.new(uri.path, { 'Content-Type' => 'application/json' }) req.body = query.to_json - response = Net::HTTP.new(uri.host, uri.port).start do |http| - http.open_timeout = 10 # if druid is down fail fast - http.read_timeout = nil # we wait until druid is finished - http.request(req) - end + http = Net::HTTP.new(uri.host, uri.port) + http.open_timeout = @open_timeout + http.read_timeout = @read_timeout + response = http.start { |h| h.request(req) } if response.code != '200' # ignore GroupBy cache issues and try again without cached results