Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions client/core/bookie.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,22 @@ func (f *bookFeed) Candles(durStr string) error {
}

// candleCache adds synchronization and an on/off switch to *candles.Cache.
// NOTE: Occasionally we are missing candle updates (epoch_report notification)
// when downloading/applying server snapshot. For candle cache such atomicity property
// isn't supported at the moment, that's both neither on client nor on server side
// (so it's 2 different places to adjust, if atomicity is desirable). It isn't as
// noticeable for candles as it is for orders (where lack of similar atomicity property
// would result in ghost orders), still without this candle chart doesn't reflect what
// actually happened on dex server with 100% accuracy in real time (when cache resets,
// e.g. on dexc restart, this discrepancy is resolved for older candles, it affects
// only recent ones).
type candleCache struct {
*candles.Cache
// candleMtx protects the integrity of candles.Cache (e.g. we can't update
// it while making copy at the same time), so it represents a consistent
// data snapshot.
candleMtx sync.RWMutex
on uint32
on uint32 // whether cache has been initialized
}

// init resets the candles with the supplied set.
Expand All @@ -81,6 +90,14 @@ func (c *candleCache) init(in []*msgjson.Candle) {
}
}

// snapshot takes snapshot of candle cache (preventing concurrent cache updates).
func (c *candleCache) snapshot() []msgjson.Candle {
c.candleMtx.RLock()
defer c.candleMtx.RUnlock()

return c.CandlesCopy()
}

// addCandle adds the candle using candles.Cache.Add. It returns most recent candle
// in cache.
func (c *candleCache) addCandle(msgCandle *msgjson.Candle) (recent msgjson.Candle, ok bool) {
Expand All @@ -100,7 +117,8 @@ func (c *candleCache) addCandle(msgCandle *msgjson.Candle) (recent msgjson.Candl
// supplied close() callback.
type bookie struct {
*orderbook.OrderBook
dc *dexConnection
dc *dexConnection
// candleCaches is indexing candle caches by durations [5m,1h,24h].
candleCaches map[string]*candleCache
log dex.Logger

Expand Down Expand Up @@ -178,7 +196,7 @@ func (b *bookie) logEpochReport(note *msgjson.EpochReportNote) error {
if err != nil {
return err
}
if note.Candle.EndStamp == 0 {
if note.Candle.EndStamp == 0 { // should never happen
return fmt.Errorf("epoch report has zero-valued candle end stamp")
}

Expand Down Expand Up @@ -272,20 +290,21 @@ func (b *bookie) candles(durStr string, feedID uint32) error {
return
}
dur, _ := time.ParseDuration(durStr)
cache.candleMtx.RLock()
cdls := cache.CandlesCopy()
cache.candleMtx.RUnlock()
f.c <- &BookUpdate{
Action: FreshCandlesAction,
Host: b.dc.acct.host,
MarketID: marketName(b.base, b.quote),
Payload: &CandlesPayload{
Dur: durStr,
DurMilliSecs: uint64(dur.Milliseconds()),
Candles: cdls,
Candles: cache.snapshot(),
Comment on lines -285 to +300
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To just improve readability slightly.

},
}
}()
// Two or more concurrent calls might fall through here, resulting in cache
// being initialized with cache.init multiple times. It's OK though because
// candleCache is not 100% consistent with server's anyway, see comments on
// candleCache struct for details, and probably doesn't need to be.
if atomic.LoadUint32(&cache.on) == 1 {
return nil
}
Expand Down