Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 3 additions & 30 deletions dev/resources/dsl/stream.clj
Original file line number Diff line number Diff line change
@@ -1,31 +1,4 @@
;; (streams
;; ;; (stream {:name :traces :default true}
;; ;; (where [:= :kind "client"]
;; ;; (rate {:duration 10}
;; ;; (with {:name "RATE"}
;; ;; (info)))))
;; ;; (stream {:name :test :default false}
;; ;; (where [:= :kind "client"]
;; ;; (rate {:duration 10}
;; ;; (with {:name "RATE"}
;; ;; (tap :result)
;; ;; (info)))))

;; (stream {:name :percentiles })
;; )

(streams
(stream {:name :percentiles :default true}
(where [:= :name "http_request_duration_seconds"]

(by {:fields [[:attributes :application]
[:attributes :environment]]}
(percentiles {:percentiles [0.5 0.75 0.99]
:duration 20
:nb-significant-digits 3}
(info)
(where [:and [:= :quantile "0.99"]
[:> :metric 1]]
(with {:state "error"}
(error)
(tap :alert))))))))
(stream {:name :bar :default true}
(where [:= :service "bar"]
(publish! :my-channel))))
47 changes: 7 additions & 40 deletions dev/resources/streams/stream.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{:percentiles
{:bar
{:default true,
:actions
{:action :sdo,
Expand All @@ -7,44 +7,11 @@
({:action :where,
:description
{:message "Filter events based on the provided condition",
:params "[:= :name \"http_request_duration_seconds\"]"},
:params [[:= :name "http_request_duration_seconds"]],
:params "[:= :service \"bar\"]"},
:params [[:= :service "bar"]],
:children
({:action :by,
({:action :publish!,
:description
{:message
"Split streams by field(s) [[:attributes :application] [:attributes :environment]]"},
:params
[{:fields
[[:attributes :application] [:attributes :environment]]}],
:children
({:action :percentiles,
:description
{:message "Computes the quantiles [0.5 0.75 0.99]"},
:params
[{:percentiles [0.5 0.75 0.99],
:duration 20000000000,
:nb-significant-digits 3}],
:children
({:action :info,
:description
{:message "Print the event in the logs as info"}}
{:action :where,
:description
{:message "Filter events based on the provided condition",
:params "[:and [:= :quantile \"0.99\"] [:> :metric 1]]"},
:params [[:and [:= :quantile "0.99"] [:> :metric 1]]],
:children
({:action :with,
:description
{:message "Merge the events with the provided fields",
:params "{:state \"error\"}"},
:children
({:action :error,
:description
{:message "Print the event in the logs as error"}}
{:action :tap,
:description
{:message "Save events into the tap :alert"},
:params [:alert]}),
:params [{:state "error"}]})})})})})}}}
{:message "Publish events into the channel :my-channel"},
:params [:my-channel],
:children []})})}}}
40 changes: 14 additions & 26 deletions dev/resources/tests/test1.edn
Original file line number Diff line number Diff line change
@@ -1,32 +1,20 @@
{:percentiles {:input [{:name "http_request_duration_seconds"
:metric 1.1
:time 1e9
:attributes {:application "app1"
:environment "production"}}
:metric 0.1
:time 1e9}
{:name "http_request_duration_seconds"
:metric 4.2
:time 10e9
:attributes {:application "app1"
:environment "production"}}
:metric 1.2
:time 30e9}
{:name "http_request_duration_seconds"
:metric 2
:time 14e9
:attributes {:application "app1"
:environment "production"}}
:metric 10
:time 40e9}
{:name "http_request_duration_seconds"
:metric 5
:time 18e9
:attributes {:application "app1"
:environment "production"}}
:metric 8
:time 50e9}
{:name "http_request_duration_seconds"
:metric 0.2
:time 22e9
:attributes {:application "app1"
:environment "production"}}]
:metric 3
:time 70e9}]
:taps {:alert [{:name "http_request_duration_seconds"
:metric 5
:time 22e9
:quantile "0.99"
:attributes {:application "app1"
:environment "production"}
:state "error"}]}}}
:metric 10
:time 70e9
:state "critical"
:attributes {:quantile "0.99"}}]}}}
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject fr.mcorbin/mirabelle "0.13.0"
(defproject fr.mcorbin/mirabelle "0.14.0"
:description "A stream processing engine inspired by Riemann"
:url "https://github.com/mcorbin/mirabelle"
:license {:name "EPL-1.0"
Expand Down
66 changes: 35 additions & 31 deletions site/mirabelle/content/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ Mirabelle isinspired by [Riemann](https://riemann.io/). I would like to thank al

Mirabelle supports the same protocol than Riemann. It means all Riemann tooling and integrations should work seamlessly with Mirabelle (which also contains a lot of new features).

Mirabelle also provides an HTTP API and natively supports receiving metrics in [Prometheus remote write](https://prometheus.io/docs/operating/integrations/) format. See the [API documentation](/api/#prometheus-remote-write) for more information about Prometheus integration.

It also supports [Opentelemetry traces](/api/#opentelemetry-traces-input) as input.
Mirabelle also provides an HTTP API and natively supports receiving metrics in [Prometheus remote write](https://prometheus.io/docs/operating/integrations/). It also supports [Opentelemetry traces](/api/#opentelemetry-traces-input) as input.

## Streams clocks, real time, continuous queries

Expand All @@ -40,7 +38,7 @@ Let's say a web application is pushing the duration (in seconds) of the HTTP req

```clojure
{:name "http_request_duration_seconds"
:time 1619731016,145
:time 1619731016145000000
:tags ["web"]
:metric 0.5
:attributes {:application "my-api"
Expand All @@ -52,38 +50,45 @@ You could write a Mirabelle stream which will compute on the fly the quantiles f
```clojure
(streams
(stream {:name :percentiles :default true}
(where [:= :service "http_request_duration_seconds"]
(fixed-time-window {:duration 60}
(coll-percentiles [0.5 0.75 0.99]
(where [:and [:= :quantile "0.99"]
[:> :metric 1]]
(with :state "critical"
(tap :alert)
(output! :pagerduty))))))))
(where [:= :name "http_request_duration_seconds"]
(percentiles {:duration 60
:percentiles [0.5 0.75 0.99 1]}
(where [:and [:= [:attributes :quantile] "0.99"]
[:> :metric 1]]
(with :state "critical"
(tap :alert)
(output! :pagerduty)))))))
```

The `tap` action is an action which will be only enabled in test mode, and which will save in a tap named `:alert` events passing by it. Indeed, everything can be unit tested easily in Mirabelle.

A test for this stream would be:
A test for this stream would be (remember that the time is in nanoseconds):

```clojure
{:percentiles {:input [{:service "http_request_duration_seconds"
{:percentiles {:input [{:name "http_request_duration_seconds"
:metric 0.1
:time 1}
{:service "http_request_duration_seconds"
:time 1e9}
{:name "http_request_duration_seconds"
:metric 1.2
:time 30}
{:service "http_request_duration_seconds"
:metric 0.2
:time 70}]
:tap-results {:alert [{:service "http_request_duration_seconds"
:metric 1.2
:time 30
:quantile "0.99"
:state "critical"}]}}}
:time 30e9}
{:name "http_request_duration_seconds"
:metric 10
:time 40e9}
{:name "http_request_duration_seconds"
:metric 8
:time 50e9}
{:name "http_request_duration_seconds"
:metric 3
:time 70e9}]
:taps {:alert [{:name "http_request_duration_seconds"
:metric 10
:time 70e9
:state "critical"
:attributes {:quantile "0.99"}}]}}}

```

In this test, we inject into the `:percentiles` stream three events, and we verify that the tap named `:alert` contains the expected alert (the `0.99` quantile is greater than 1) generated for these events.
In this test, we inject into the `:percentiles` stream some events, and we verify that the tap named `:alert` contains the expected alert (the `0.99` quantile is greater than 1) generated for these events.

Thanks to Clojure datastructures, there is *no side effects between streams and actions*. It's OK to modify events in parallel (in multiple threads) and to have multiple branches per stream. You can even pass events between streams (like described [here](todo)). You are free to organize streams and how they communicate between each other exactly how you want to; the tool and its DSL will not limit you.

Expand All @@ -92,12 +97,10 @@ Here is a more complete and commented example, with multiple actions performed i
```clojure
(streams
(stream {:name :multiple-branches}
(where [:= :service "http_request_duration_seconds"]
(where [:= :name "http_request_duration_seconds"]
(with :ttl 60
;; push everything into influxdb
(output! :influxdb)
;; index events in memory by host and service
(index [:host :service])
;; push everything into Prometheus
(output! :prometheus)
;; by will generate a branch for each :host value. Like that, downstream
;; computations will be per host and will not conflict between each other
(by {:fields [:host]}
Expand All @@ -111,3 +114,4 @@ Here is a more complete and commented example, with multiple actions performed i
(output! :pagerduty)))))))))
```

Raw events or computation results could also be forwarded to external systems (TSDB for example) for long-term storage.
4 changes: 2 additions & 2 deletions site/mirabelle/content/api/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@ curl -X DELETE localhost:5558/api/v1/stream/trololo

- **PUT** `/api/v1/stream/<stream-name>`

Push an event to the specified stream.
Push an event (or a batch of events) to the specified stream.

---

```
curl -H "Content-Type: application/json" -X PUT --data '{"event": {"service": "foo", "metric": 10}}' 127.0.0.1:5558/api/v1/stream/foo
curl -H "Content-Type: application/json" -X PUT --data '{"events": [{"service": "foo", "metric": 10}]}' 127.0.0.1:5558/api/v1/stream/foo

{"message":"ok"}
```
Expand Down
Loading