Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 28 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,47 @@ gem install ruby-druid

## Usage

1. Connect:

```ruby
Druid::Client.new('zk1:2181,zk2:2181/druid').query('service/source')
client = Druid::Client.new('zk1:2181,zk2:2181/druid', opts)
datasource = client.data_source('druid:broker/datasource_name')
```

returns a query object on which all other methods can be called to create a full and valid Druid query.
if broker is behind of load balancer you can connect to static host without service discovery:

A query object can be sent like this:

```ruby
client = Druid::Client.new('zk1:2181,zk2:2181/druid')
query = Druid::Query.new('service/source')
client.send(query)
datasource = Druid::DataSource.new('datasource_name', 'http://broker-host:8080/druid/v2/', opts)
```

The `send` method returns the parsed response from the druid server as an array. If the response is not empty it contains one `ResponseRow` object for each row. The timestamp by can be received by a method with the same name (i.e. `row.timestamp`), all row values by hashlike syntax (i.e. `row['dimension'])
`opts` is an optional hash of connection options:

| key | description | type | default |
| ------------------- | -------------------------------------------------- | ------ | ------------ |
| :open_timeout | open timeout for druid services (in seconds) | int | 60 |
| :read_timeout | read timeout for druid services (in seconds) | int | nil |
| :discovery_path | druid service discovery path in zookeeper | string | '/discovery' |

3. Create query:

```ruby
query = Druid::Query::Builder.new
```
4. Build query, e.g.:
```ruby
query.granularity(:all)
query.long_sum(:aggregate1)
# ....
```

An options hash can be passed when creating `Druid::Client` instance:
5. Send request:

```ruby
client = Druid::Client.new('zk1:2181,zk2:2181/druid', http_timeout: 20)
result = datasource.post(query)
```

Supported options are:
* `static_setup` to explicitly specify a broker url, e.g. `static_setup: { 'my/source_name' => 'http://1.2.3.4:8080/druid/v2/' }`
* `http_timeout` to define a timeout for sending http queries to a broker (in minutes, default value is 2)
The `post` method returns the parsed response from the druid server as an array. If the response is not empty it contains one `ResponseRow` object for each row. The timestamp by can be received by a method with the same name (i.e. `row.timestamp`), all row values by hashlike syntax (i.e. `row['dimension'])

### GroupBy

Expand Down
3 changes: 2 additions & 1 deletion lib/druid/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ class Client
attr_reader :zk

def initialize(zookeeper, opts = {})
@opts = opts
@zk = ZK.new(zookeeper, opts)
end

def data_source(source)
uri = @zk.data_sources[source]
Druid::DataSource.new(source, uri)
Druid::DataSource.new(source, uri, @opts)
end

def data_sources
Expand Down
23 changes: 12 additions & 11 deletions lib/druid/data_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ class DataSource

attr_reader :name, :uri, :metrics, :dimensions

def initialize(name, uri)
def initialize(name, uri, opt = {})
@name = name.split('/').last
@open_timeout = opt[:open_timeout] || 10 # if druid is down fail fast
@read_timeout = opt[:read_timeout] # we wait until druid is finished
uri = uri.sample if uri.is_a?(Array)
if uri.is_a?(String)
@uri = URI(uri)
Expand All @@ -32,11 +34,11 @@ def metadata!(opts = {})
end

req = Net::HTTP::Get.new(meta_path)
response = Net::HTTP.new(uri.host, uri.port).start do |http|
http.open_timeout = 10 # if druid is down fail fast
http.read_timeout = nil # we wait until druid is finished
http.request(req)
end

http = Net::HTTP.new(uri.host, uri.port)
http.open_timeout = @open_timeout
http.read_timeout = @read_timeout
response = http.start { |h| h.request(req) }

if response.code != '200'
raise "Request failed: #{response.code}: #{response.body}"
Expand All @@ -61,11 +63,10 @@ def post(query)
req = Net::HTTP::Post.new(uri.path, { 'Content-Type' => 'application/json' })
req.body = query.to_json

response = Net::HTTP.new(uri.host, uri.port).start do |http|
http.open_timeout = 10 # if druid is down fail fast
http.read_timeout = nil # we wait until druid is finished
http.request(req)
end
http = Net::HTTP.new(uri.host, uri.port)
http.open_timeout = @open_timeout
http.read_timeout = @read_timeout
response = http.start { |h| h.request(req) }

if response.code != '200'
# ignore GroupBy cache issues and try again without cached results
Expand Down