From f11a4ab0947f30c28fb9957946db907afc8500be Mon Sep 17 00:00:00 2001 From: Ryan Murray Date: Thu, 15 Jun 2023 13:44:31 +0200 Subject: [PATCH 1/3] Add simple Flush method --- analytics.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/analytics.go b/analytics.go index fa13434..fea0bff 100644 --- a/analytics.go +++ b/analytics.go @@ -21,6 +21,7 @@ const Version = "3.0.0" // provided by the package and provide a way to send messages via the HTTP API. type Client interface { io.Closer + http.Flusher // Queues a message to be sent by the client when the conditions for a batch // upload are met. @@ -59,6 +60,12 @@ type client struct { // This HTTP client is used to send requests to the backend, it uses the // HTTP transport provided in the configuration. http http.Client + + // Used by the Flush method to send requests to the backend synchronously. + mq *messageQueue + + // We can only run flush once at a time, so we use this mutex to synchronize. + mu *sync.Mutex } // Instantiate a new client that uses the write key passed as first argument to @@ -207,6 +214,16 @@ func (c *client) Enqueue(msg Message) (err error) { return } +// Flush flush metrics synchronously without closing the client. +func (c *client) Flush() { + c.mu.Lock() + defer c.mu.Unlock() + if msgs := c.mq.flush(); msgs != nil { + c.debugf("flushing %d messages synchronously", len(msgs)) + c.send(msgs) + } +} + // Close and flush metrics. func (c *client) Close() (err error) { defer func() { @@ -340,6 +357,8 @@ func (c *client) loop() { maxBatchSize: c.BatchSize, maxBatchBytes: c.maxBatchBytes(), } + c.mq = &mq + c.mu = &sync.Mutex{} for { select { @@ -385,6 +404,8 @@ func (c *client) push(q *messageQueue, m Message, wg *sync.WaitGroup, ex *execut } func (c *client) flush(q *messageQueue, wg *sync.WaitGroup, ex *executor) { + c.mu.Lock() + defer c.mu.Unlock() if msgs := q.flush(); msgs != nil { c.debugf("flushing %d messages", len(msgs)) c.sendAsync(msgs, wg, ex) From b7f33e7f1490c96fb2204bb84404c71217fb2c7f Mon Sep 17 00:00:00 2001 From: Ryan Murray Date: Mon, 28 Aug 2023 12:57:45 +0200 Subject: [PATCH 2/3] sync push w/o the channel --- analytics.go | 39 ++++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/analytics.go b/analytics.go index fea0bff..82b5376 100644 --- a/analytics.go +++ b/analytics.go @@ -21,7 +21,7 @@ const Version = "3.0.0" // provided by the package and provide a way to send messages via the HTTP API. type Client interface { io.Closer - http.Flusher + http.Flusher // Queues a message to be sent by the client when the conditions for a batch // upload are met. @@ -38,6 +38,7 @@ type Client interface { // happens if the client was already closed at the time the method was // called or if the message was malformed. Enqueue(Message) error + EnqueueSync(Message) error } type client struct { @@ -61,11 +62,11 @@ type client struct { // HTTP transport provided in the configuration. http http.Client - // Used by the Flush method to send requests to the backend synchronously. - mq *messageQueue + // Used by the Flush method to send requests to the backend synchronously. + mq *messageQueue - // We can only run flush once at a time, so we use this mutex to synchronize. - mu *sync.Mutex + // We can only run flush once at a time, so we use this mutex to synchronize. + mu *sync.Mutex } // Instantiate a new client that uses the write key passed as first argument to @@ -150,6 +151,12 @@ func dereferenceMessage(msg Message) Message { } func (c *client) Enqueue(msg Message) (err error) { + return c.enqueue(msg, true) +} +func (c *client) EnqueueSync(msg Message) (err error) { + return c.enqueue(msg, false) +} +func (c *client) enqueue(msg Message, async bool) (err error) { msg = dereferenceMessage(msg) if err = msg.Validate(); err != nil { return @@ -210,14 +217,18 @@ func (c *client) Enqueue(msg Message) (err error) { } }() - c.msgs <- msg + if async { + c.msgs <- msg + } else { + c.push(c.mq, msg, nil, nil) + } return } // Flush flush metrics synchronously without closing the client. func (c *client) Flush() { - c.mu.Lock() - defer c.mu.Unlock() + c.mu.Lock() + defer c.mu.Unlock() if msgs := c.mq.flush(); msgs != nil { c.debugf("flushing %d messages synchronously", len(msgs)) c.send(msgs) @@ -357,8 +368,8 @@ func (c *client) loop() { maxBatchSize: c.BatchSize, maxBatchBytes: c.maxBatchBytes(), } - c.mq = &mq - c.mu = &sync.Mutex{} + c.mq = &mq + c.mu = &sync.Mutex{} for { select { @@ -399,13 +410,15 @@ func (c *client) push(q *messageQueue, m Message, wg *sync.WaitGroup, ex *execut if msgs := q.push(msg); msgs != nil { c.debugf("exceeded messages batch limit with batch of %d messages – flushing", len(msgs)) - c.sendAsync(msgs, wg, ex) + if wg != nil && ex != nil { + c.sendAsync(msgs, wg, ex) + } } } func (c *client) flush(q *messageQueue, wg *sync.WaitGroup, ex *executor) { - c.mu.Lock() - defer c.mu.Unlock() + c.mu.Lock() + defer c.mu.Unlock() if msgs := q.flush(); msgs != nil { c.debugf("flushing %d messages", len(msgs)) c.sendAsync(msgs, wg, ex) From aceef05d16a581363ec01c92f11be36178c98d8e Mon Sep 17 00:00:00 2001 From: Ryan Murray Date: Fri, 22 Sep 2023 14:50:12 +0200 Subject: [PATCH 3/3] reduce possibilities of race --- analytics.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/analytics.go b/analytics.go index 82b5376..5b3f266 100644 --- a/analytics.go +++ b/analytics.go @@ -97,6 +97,13 @@ func NewWithConfig(writeKey string, config Config) (cli Client, err error) { http: makeHttpClient(config.Transport), } + mq := messageQueue{ + maxBatchSize: c.BatchSize, + maxBatchBytes: c.maxBatchBytes(), + } + c.mq = &mq + c.mu = &sync.Mutex{} + go c.loop() cli = c @@ -364,20 +371,13 @@ func (c *client) loop() { ex := newExecutor(c.maxConcurrentRequests) defer ex.close() - mq := messageQueue{ - maxBatchSize: c.BatchSize, - maxBatchBytes: c.maxBatchBytes(), - } - c.mq = &mq - c.mu = &sync.Mutex{} - for { select { case msg := <-c.msgs: - c.push(&mq, msg, wg, ex) + c.push(c.mq, msg, wg, ex) case <-tick.C: - c.flush(&mq, wg, ex) + c.flush(c.mq, wg, ex) case <-c.quit: c.debugf("exit requested – draining messages") @@ -386,10 +386,10 @@ func (c *client) loop() { // messages can be pushed and otherwise the loop would never end. close(c.msgs) for msg := range c.msgs { - c.push(&mq, msg, wg, ex) + c.push(c.mq, msg, wg, ex) } - c.flush(&mq, wg, ex) + c.flush(c.mq, wg, ex) c.debugf("exit") return }