diff --git a/analytics.go b/analytics.go index fa13434..5b3f266 100644 --- a/analytics.go +++ b/analytics.go @@ -21,6 +21,7 @@ const Version = "3.0.0" // provided by the package and provide a way to send messages via the HTTP API. type Client interface { io.Closer + http.Flusher // Queues a message to be sent by the client when the conditions for a batch // upload are met. @@ -37,6 +38,7 @@ type Client interface { // happens if the client was already closed at the time the method was // called or if the message was malformed. Enqueue(Message) error + EnqueueSync(Message) error } type client struct { @@ -59,6 +61,12 @@ type client struct { // This HTTP client is used to send requests to the backend, it uses the // HTTP transport provided in the configuration. http http.Client + + // Used by the Flush method to send requests to the backend synchronously. + mq *messageQueue + + // We can only run flush once at a time, so we use this mutex to synchronize. + mu *sync.Mutex } // Instantiate a new client that uses the write key passed as first argument to @@ -89,6 +97,13 @@ func NewWithConfig(writeKey string, config Config) (cli Client, err error) { http: makeHttpClient(config.Transport), } + mq := messageQueue{ + maxBatchSize: c.BatchSize, + maxBatchBytes: c.maxBatchBytes(), + } + c.mq = &mq + c.mu = &sync.Mutex{} + go c.loop() cli = c @@ -143,6 +158,12 @@ func dereferenceMessage(msg Message) Message { } func (c *client) Enqueue(msg Message) (err error) { + return c.enqueue(msg, true) +} +func (c *client) EnqueueSync(msg Message) (err error) { + return c.enqueue(msg, false) +} +func (c *client) enqueue(msg Message, async bool) (err error) { msg = dereferenceMessage(msg) if err = msg.Validate(); err != nil { return @@ -203,10 +224,24 @@ func (c *client) Enqueue(msg Message) (err error) { } }() - c.msgs <- msg + if async { + c.msgs <- msg + } else { + c.push(c.mq, msg, nil, nil) + } return } +// Flush flush metrics synchronously without closing the client. +func (c *client) Flush() { + c.mu.Lock() + defer c.mu.Unlock() + if msgs := c.mq.flush(); msgs != nil { + c.debugf("flushing %d messages synchronously", len(msgs)) + c.send(msgs) + } +} + // Close and flush metrics. func (c *client) Close() (err error) { defer func() { @@ -336,18 +371,13 @@ func (c *client) loop() { ex := newExecutor(c.maxConcurrentRequests) defer ex.close() - mq := messageQueue{ - maxBatchSize: c.BatchSize, - maxBatchBytes: c.maxBatchBytes(), - } - for { select { case msg := <-c.msgs: - c.push(&mq, msg, wg, ex) + c.push(c.mq, msg, wg, ex) case <-tick.C: - c.flush(&mq, wg, ex) + c.flush(c.mq, wg, ex) case <-c.quit: c.debugf("exit requested – draining messages") @@ -356,10 +386,10 @@ func (c *client) loop() { // messages can be pushed and otherwise the loop would never end. close(c.msgs) for msg := range c.msgs { - c.push(&mq, msg, wg, ex) + c.push(c.mq, msg, wg, ex) } - c.flush(&mq, wg, ex) + c.flush(c.mq, wg, ex) c.debugf("exit") return } @@ -380,11 +410,15 @@ func (c *client) push(q *messageQueue, m Message, wg *sync.WaitGroup, ex *execut if msgs := q.push(msg); msgs != nil { c.debugf("exceeded messages batch limit with batch of %d messages – flushing", len(msgs)) - c.sendAsync(msgs, wg, ex) + if wg != nil && ex != nil { + c.sendAsync(msgs, wg, ex) + } } } func (c *client) flush(q *messageQueue, wg *sync.WaitGroup, ex *executor) { + c.mu.Lock() + defer c.mu.Unlock() if msgs := q.flush(); msgs != nil { c.debugf("flushing %d messages", len(msgs)) c.sendAsync(msgs, wg, ex)