Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 33 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,48 @@ gem install ruby-druid

## Usage

A query can be constructed and sent like so:
1. Connect:

```ruby
data_source = Druid::Client.new('zk1:2181,zk2:2181/druid').data_source('service/source')
query = Druid::Query::Builder.new.long_sum(:aggregate1).last(1.day).granularity(:all)
result = data_source.post(query)
client = Druid::Client.new('zk1:2181,zk2:2181/druid', opts)
datasource = client.data_source('druid:broker/datasource_name')
```

The `post` method on the `DataSource` returns the parsed response from the Druid server as an array.
if broker is behind of load balancer you can connect to static host without service discovery:

If you don't want to use ZooKeeper for broker discovery, you can explicitly construct a `DataSource`:

```ruby
data_source = Druid::DataSource.new('service/source', 'http://localhost:8080/druid/v2')
datasource = Druid::DataSource.new('datasource_name', 'http://broker-host:8080/druid/v2/', opts)
```

`opts` is an optional hash of connection options:

| key | description | type | default |
| ------------------- | -------------------------------------------------- | ------ | ------------ |
| :open_timeout | open timeout for druid services (in seconds) | int | 60 |
| :read_timeout | read timeout for druid services (in seconds) | int | nil |
| :discovery_path | druid service discovery path in zookeeper | string | '/discovery' |

3. Create query:

```ruby
query = Druid::Query::Builder.new
```
4. Build query, e.g.:
```ruby
query.granularity(:all)
query.long_sum(:aggregate1)
# ....
```

5. Send request:

```ruby
result = datasource.post(query)
```

The `post` method returns the parsed response from the druid server as an array. If the response is not empty it contains one `ResponseRow` object for each row. The timestamp by can be received by a method with the same name (i.e. `row.timestamp`), all row values by hashlike syntax (i.e. `row['dimension'])

### GroupBy

A [GroupByQuery](http://druid.io/docs/latest/querying/groupbyquery.html) sets the
Expand Down
3 changes: 2 additions & 1 deletion lib/druid/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ class Client
attr_reader :zk

def initialize(zookeeper, opts = {})
@opts = opts
@zk = ZK.new(zookeeper, opts)
end

def data_source(source)
uri = @zk.data_sources[source]
Druid::DataSource.new(source, uri)
Druid::DataSource.new(source, uri, @opts)
end

def data_sources
Expand Down
23 changes: 12 additions & 11 deletions lib/druid/data_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ class DataSource

attr_reader :name, :uri, :metrics, :dimensions

def initialize(name, uri)
def initialize(name, uri, opt = {})
@name = name.split('/').last
@open_timeout = opt[:open_timeout] || 10 # if druid is down fail fast
@read_timeout = opt[:read_timeout] # we wait until druid is finished
uri = uri.sample if uri.is_a?(Array)
if uri.is_a?(String)
@uri = URI(uri)
Expand All @@ -32,11 +34,11 @@ def metadata!(opts = {})
end

req = Net::HTTP::Get.new(meta_path)
response = Net::HTTP.new(uri.host, uri.port).start do |http|
http.open_timeout = 10 # if druid is down fail fast
http.read_timeout = nil # we wait until druid is finished
http.request(req)
end

http = Net::HTTP.new(uri.host, uri.port)
http.open_timeout = @open_timeout
http.read_timeout = @read_timeout
response = http.start { |h| h.request(req) }

if response.code != '200'
raise "Request failed: #{response.code}: #{response.body}"
Expand All @@ -61,11 +63,10 @@ def post(query)
req = Net::HTTP::Post.new(uri.path, { 'Content-Type' => 'application/json' })
req.body = query.to_json

response = Net::HTTP.new(uri.host, uri.port).start do |http|
http.open_timeout = 10 # if druid is down fail fast
http.read_timeout = nil # we wait until druid is finished
http.request(req)
end
http = Net::HTTP.new(uri.host, uri.port)
http.open_timeout = @open_timeout
http.read_timeout = @read_timeout
response = http.start { |h| h.request(req) }

if response.code != '200'
# ignore GroupBy cache issues and try again without cached results
Expand Down