Skip to content

Commit ebd442a

Browse files
authored
Merge pull request #41 from CrocSwap/better-refresher-and-knockouts
Improve liquidity refresher and knockout tracking
2 parents 06c9208 + 455f4c2 commit ebd442a

23 files changed

Lines changed: 622 additions & 375 deletions

cache/memCache.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ type MemoryCache struct {
2020
poolKnockouts RWLockMapMap[types.PoolLocation, types.PositionLocation, *model.KnockoutSubplot]
2121
userAndPoolKnockouts RWLockMapMap[chainUserAndPool, types.PositionLocation, *model.KnockoutSubplot]
2222

23-
knockoutSagas RWLockMap[types.BookLocation, *model.KnockoutSaga]
23+
knockoutSagas RWLockMap[types.BookLocation, *model.KnockoutSaga]
24+
knockoutPivotTimes RWLockMap[types.BookLocation, int]
2425

2526
userTxs RWLockMapArray[chainAndAddr, types.PoolTxEvent]
2627
poolTxs RWLockMapArray[types.PoolLocation, types.PoolTxEvent]
@@ -47,7 +48,8 @@ func New() *MemoryCache {
4748
poolKnockouts: newRwLockMapMap[types.PoolLocation, types.PositionLocation, *model.KnockoutSubplot](),
4849
userAndPoolKnockouts: newRwLockMapMap[chainUserAndPool, types.PositionLocation, *model.KnockoutSubplot](),
4950

50-
knockoutSagas: newRwLockMap[types.BookLocation, *model.KnockoutSaga](),
51+
knockoutSagas: newRwLockMap[types.BookLocation, *model.KnockoutSaga](),
52+
knockoutPivotTimes: newRwLockMap[types.BookLocation, int](),
5153

5254
userTxs: newRwLockMapArray[chainAndAddr, types.PoolTxEvent](),
5355
poolTxs: newRwLockMapArray[types.PoolLocation, types.PoolTxEvent](),

cache/transactors.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ func (m *MemoryCache) MaterializePosition(loc types.PositionLocation) *model.Pos
293293
return val
294294
}
295295

296-
func (m *MemoryCache) MaterializeKnockoutBook(loc types.BookLocation) *model.KnockoutSaga {
296+
func (m *MemoryCache) MaterializeKnockoutSaga(loc types.BookLocation) *model.KnockoutSaga {
297297
val, ok := m.knockoutSagas.lookup(loc)
298298
if !ok {
299299
val = model.NewKnockoutSaga()
@@ -305,7 +305,7 @@ func (m *MemoryCache) MaterializeKnockoutBook(loc types.BookLocation) *model.Kno
305305
func (m *MemoryCache) MaterializeKnockoutPos(loc types.PositionLocation) *model.KnockoutSubplot {
306306
val, ok := m.liqKnockouts.lookup(loc)
307307
if !ok {
308-
saga := m.MaterializeKnockoutBook(loc.ToBookLoc())
308+
saga := m.MaterializeKnockoutSaga(loc.ToBookLoc())
309309
val = saga.ForUser(loc.User)
310310
m.liqKnockouts.insert(loc, val)
311311
m.userKnockouts.insert(chainAndAddr{loc.ChainId, loc.User}, loc, val)
@@ -317,3 +317,16 @@ func (m *MemoryCache) MaterializeKnockoutPos(loc types.PositionLocation) *model.
317317
m.poolKoUpdates.insert(loc.PoolLocation, koAndLocPair{loc, val})
318318
return val
319319
}
320+
321+
func (m *MemoryCache) RetrievePivotTime(loc types.BookLocation) int {
322+
pos, okay := m.knockoutPivotTimes.lookup(loc)
323+
if okay {
324+
return pos
325+
} else {
326+
return 0
327+
}
328+
}
329+
330+
func (m *MemoryCache) SetPivotTime(loc types.BookLocation, pivotTime int) {
331+
m.knockoutPivotTimes.insert(loc, pivotTime)
332+
}

controller/controller.go

Lines changed: 75 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package controller
22

33
import (
44
"log"
5+
"math"
6+
"math/big"
57
"time"
68

79
"github.com/CrocSwap/graphcache-go/cache"
@@ -47,7 +49,7 @@ func (c *Controller) SpinUntilLiqSync() {
4749
nowTime := time.Now().Unix()
4850
syncSec := c.refresher.lastRefreshSec
4951
if nowTime < syncSec+REFRESH_PAUSE_SECS {
50-
log.Println("Waiting for liquidity sync pause. Last refresh:", syncSec, "now:", nowTime)
52+
// log.Println("Waiting for liquidity sync pause. Last refresh:", syncSec, "now:", nowTime)
5153
} else {
5254
return
5355
}
@@ -82,6 +84,9 @@ func (c *ControllerOverNetwork) IngestBalance(b tables.Balance) {
8284
}
8385

8486
func (c *ControllerOverNetwork) IngestLiqChange(l tables.LiqChange) {
87+
if l.ChangeType == tables.ChangeTypeCross {
88+
c.IngestKnockout(l)
89+
}
8590
c.applyToPosition(l)
8691
c.applyToLiqCurve(l)
8792
c.ctrl.history.CommitLiqChange(l)
@@ -101,7 +106,7 @@ func (c *ControllerOverNetwork) applyToPosition(l tables.LiqChange) {
101106
User: types.RequireEthAddr(l.User),
102107
}
103108

104-
if l.PositionType == "knockout" {
109+
if l.PositionType == tables.PosTypeKnockout {
105110
c.applyToKnockout(l, loc)
106111
} else {
107112
c.applyToPassiveLiq(l, loc)
@@ -121,11 +126,63 @@ func (c *ControllerOverNetwork) applyToLiqCurve(l tables.LiqChange) {
121126
}
122127

123128
func (c *ControllerOverNetwork) applyToKnockout(l tables.LiqChange, loc types.PositionLocation) {
124-
if l.ChangeType == "cross" {
125-
return // Cross events are handled KnockoutCross table
129+
pivotLoc := loc.ToBookLoc()
130+
if l.IsBid == 1 {
131+
pivotLoc.LiquidityLocation.AskTick = l.BidTick
132+
} else {
133+
pivotLoc.LiquidityLocation.BidTick = l.AskTick
134+
}
135+
if l.ChangeType == tables.ChangeTypeCross {
136+
c.ctrl.cache.SetPivotTime(pivotLoc, 0)
137+
return
138+
}
139+
pivotTime := c.ctrl.cache.RetrievePivotTime(pivotLoc)
140+
if l.ChangeType == tables.ChangeTypeMint && pivotTime == 0 {
141+
c.ctrl.cache.SetPivotTime(pivotLoc, l.Time)
142+
pivotTime = l.Time
143+
}
144+
145+
event := model.KnockoutSagaTx{
146+
TxTime: l.Time,
147+
TxHash: l.TX,
148+
PivotTime: pivotTime,
126149
}
127150
pos := c.ctrl.cache.MaterializeKnockoutPos(loc)
151+
if l.ChangeType == tables.ChangeTypeMint {
152+
pos.AppendMint(event)
153+
} else if l.ChangeType == tables.ChangeTypeBurn {
154+
pos.AppendBurn(event)
155+
}
156+
128157
c.ctrl.workers.omniUpdates <- &koPosUpdateMsg{liq: l, pos: pos, loc: loc}
158+
159+
// Estimate position liquidity from the flows
160+
if (l.ChangeType == tables.ChangeTypeMint || l.ChangeType == tables.ChangeTypeBurn || l.ChangeType == tables.ChangeTypeRecover) && l.BaseFlow != nil && l.QuoteFlow != nil {
161+
liq := model.DeriveLiquidityFromConcFlow(*l.BaseFlow, *l.QuoteFlow, l.BidTick, l.AskTick)
162+
if math.IsInf(liq, 0) || math.IsNaN(liq) {
163+
log.Println("Invalid liq", liq, "for", l)
164+
liq = 0
165+
}
166+
liqBigInt, _ := big.NewFloat(liq).Int(nil)
167+
168+
activeLiq := pos.Liq.GetActiveLiq()
169+
pos.Liq.UpdateActiveLiq(*big.NewInt(0).Add(activeLiq, liqBigInt), 0)
170+
afterLiq := pos.Liq.GetActiveLiq()
171+
if afterLiq.Cmp(big.NewInt(0)) < 0 {
172+
pos.Liq.UpdateActiveLiq(*big.NewInt(0), 0)
173+
afterLiq = big.NewInt(0)
174+
}
175+
afterLiqFloat, _ := afterLiq.Float64()
176+
177+
// If it's a burn and the remaining liq is less than 10% of the liq change, set it to 0
178+
if l.ChangeType == tables.ChangeTypeBurn && afterLiqFloat > 0 && math.Abs(liq)*0.10 > math.Abs(afterLiqFloat) {
179+
pos.Liq.UpdateActiveLiq(*big.NewInt(0), 0)
180+
}
181+
if l.ChangeType == tables.ChangeTypeRecover || l.ChangeType == tables.ChangeTypeClaim {
182+
pos.Liq.UpdateActiveLiq(*big.NewInt(0), 0)
183+
pos.Liq.UpdatePostKOLiq(*l.PivotTime, *big.NewInt(0), 0)
184+
}
185+
}
129186
}
130187

131188
func (c *ControllerOverNetwork) applyToPassiveLiq(l tables.LiqChange, loc types.PositionLocation) {
@@ -158,20 +215,21 @@ func (c *ControllerOverNetwork) IngestAggEvent(r tables.AggEvent) {
158215
hist.NextEvent(r)
159216
}
160217

161-
func (c *ControllerOverNetwork) IngestKnockout(r tables.KnockoutCross) {
162-
liq := types.KnockoutTickLocation(r.Tick, r.IsBid > 0, c.knockoutTickWidth())
218+
func (c *ControllerOverNetwork) IngestKnockout(l tables.LiqChange) {
219+
liq := types.KnockoutTickLocation(l.BidTick, l.IsBid > 0, c.knockoutTickWidth())
163220
pool := types.PoolLocation{
164221
ChainId: c.chainId,
165-
PoolIdx: r.PoolIdx,
166-
Base: types.RequireEthAddr(r.Base),
167-
Quote: types.RequireEthAddr(r.Quote),
222+
PoolIdx: l.PoolIdx,
223+
Base: types.RequireEthAddr(l.Base),
224+
Quote: types.RequireEthAddr(l.Quote),
168225
}
169226
loc := types.BookLocation{
170227
PoolLocation: pool,
171228
LiquidityLocation: liq,
172229
}
173-
pos := c.ctrl.cache.MaterializeKnockoutBook(loc)
174-
c.ctrl.workers.omniUpdates <- &koCrossUpdateMsg{loc: loc, pos: pos, cross: r}
230+
pos := c.ctrl.cache.MaterializeKnockoutSaga(loc)
231+
pos.UpdateCross(l)
232+
c.ctrl.workers.omniUpdates <- &koCrossUpdateMsg{loc: loc, pos: pos, cross: l}
175233
}
176234

177235
/* Called to indicate that all tables have completed the most recent sync cycle up
@@ -188,9 +246,9 @@ func (c *ControllerOverNetwork) knockoutTickWidth() int {
188246
}
189247

190248
func formLiqLoc(l tables.LiqChange) types.LiquidityLocation {
191-
if l.PositionType == "ambient" {
249+
if l.PositionType == tables.PosTypeAmbient {
192250
return types.AmbientLiquidityLocation()
193-
} else if l.PositionType == "knockout" {
251+
} else if l.PositionType == tables.PosTypeKnockout {
194252
return types.KnockoutRangeLocation(l.BidTick, l.AskTick, l.IsBid > 0)
195253
} else {
196254
return types.RangeLiquidityLocation(l.BidTick, l.AskTick)
@@ -199,7 +257,7 @@ func formLiqLoc(l tables.LiqChange) types.LiquidityLocation {
199257

200258
func (c *Controller) resyncFullCycle(time int) {
201259
for poolLoc, poolPos := range c.cache.RetrieveAllPositions() {
202-
if !poolPos.IsEmpty() {
260+
if !poolPos.IsEmpty() || poolPos.RefreshTime == 0 {
203261
c.workers.omniUpdates <- &posImpactMsg{poolLoc, poolPos, time}
204262
}
205263
}
@@ -208,9 +266,12 @@ func (c *Controller) resyncFullCycle(time int) {
208266
const REFRESH_CYCLE_TIME = 30 * 60
209267

210268
func (c *Controller) runPeriodicRefresh() {
269+
// To prevent running periodic refreshes while the startup sync is still running
270+
c.SpinUntilLiqSync()
211271
for {
212272
time.Sleep(time.Second * REFRESH_CYCLE_TIME)
213273
refreshTime := time.Now().Unix()
274+
log.Println("Running full refresh at", refreshTime)
214275
c.resyncFullCycle(int(refreshTime))
215276
}
216277
}

controller/handles.go

Lines changed: 69 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,17 @@ package controller
33
import (
44
"log"
55
"math/big"
6+
"time"
67

78
"github.com/CrocSwap/graphcache-go/loader"
89
"github.com/CrocSwap/graphcache-go/model"
910
"github.com/CrocSwap/graphcache-go/types"
1011
)
1112

1213
type IRefreshHandle interface {
14+
Hash() [32]byte
15+
RefreshTime() int64
16+
Skippable() bool
1317
RefreshQuery(query *loader.ICrocQuery)
1418
LabelTag() string
1519
}
@@ -66,31 +70,33 @@ func (p *KnockoutAliveHandle) RefreshQuery(query *loader.ICrocQuery) {
6670
pivotTime := int(tryQueryAttempt(pivotTimeFn, "pivotTimeLatest"))
6771

6872
if pivotTime == 0 {
69-
p.pos.Liq.UpdateActiveLiq(*big.NewInt(0))
73+
p.pos.Liq.UpdateActiveLiq(*big.NewInt(0), time.Now().Unix())
7074

7175
} else {
7276
claimLoc := types.KOClaimLocation{PositionLocation: p.location, PivotTime: pivotTime}
73-
liqFn := func() (*big.Int, error) { return (*query).QueryKnockoutLiq(claimLoc) }
74-
75-
knockoutLiq := tryQueryAttempt(liqFn, "knockoutLiq")
76-
p.pos.Liq.UpdateActiveLiq(*knockoutLiq)
77+
liqFn := func() (loader.KnockoutLiqResp, error) { return (*query).QueryKnockoutLiq(claimLoc) }
78+
koLiqResp := tryQueryAttempt(liqFn, "knockoutLiq")
79+
p.pos.Liq.UpdateActiveLiq(*koLiqResp.Liq, time.Now().Unix())
7780
}
7881
}
7982

8083
func (p *KnockoutPostHandle) RefreshQuery(query *loader.ICrocQuery) {
81-
liqFn := func() (*big.Int, error) { return (*query).QueryKnockoutLiq(p.location) }
82-
knockoutLiq := tryQueryAttempt(liqFn, "knockoutLiq")
83-
p.pos.Liq.UpdatePostKOLiq(p.location.PivotTime, *knockoutLiq)
84+
liqFn := func() (loader.KnockoutLiqResp, error) { return (*query).QueryKnockoutLiq(p.location) }
85+
koLiqResp := tryQueryAttempt(liqFn, "knockoutLiq")
86+
if koLiqResp.KnockedOut {
87+
p.pos.Liq.UpdatePostKOLiq(p.location.PivotTime, *koLiqResp.Liq, time.Now().Unix())
88+
}
8489
}
8590

8691
func tryQueryAttempt[T any](queryFn func() (T, error), label string) T {
8792
result, err := queryFn()
8893
for retryCount := 0; err != nil && retryCount < N_MAX_RETRIES; retryCount += 1 {
94+
log.Printf("Query attempt %d/%d failed for \"%s\" with err: \"%s\"", retryCount, N_MAX_RETRIES, label, err)
8995
retryWaitRandom()
9096
result, err = queryFn()
9197
}
9298
if err != nil {
93-
log.Fatal("Unable to query liquidity for " + label)
99+
log.Fatalf("Unable to query \"%s\", err: %s", label, err)
94100
}
95101
return result
96102
}
@@ -110,3 +116,57 @@ func (p *KnockoutAliveHandle) LabelTag() string {
110116
func (p *KnockoutPostHandle) LabelTag() string {
111117
return "knockoutPost"
112118
}
119+
120+
func (p *PositionRefreshHandle) RefreshTime() int64 {
121+
return p.pos.RefreshTime
122+
}
123+
124+
func (p *RewardsRefreshHandle) RefreshTime() int64 {
125+
return p.pos.RefreshTime
126+
}
127+
128+
func (p *KnockoutAliveHandle) RefreshTime() int64 {
129+
return p.pos.Liq.Active.RefreshTime
130+
}
131+
132+
func (p *KnockoutPostHandle) RefreshTime() int64 {
133+
return p.pos.Liq.Active.RefreshTime
134+
}
135+
136+
func (p *PositionRefreshHandle) Hash() [32]byte {
137+
return p.location.Hash()
138+
}
139+
140+
func (p *RewardsRefreshHandle) Hash() [32]byte {
141+
return p.location.Hash()
142+
}
143+
144+
func (p *KnockoutAliveHandle) Hash() [32]byte {
145+
h := p.location.Hash()
146+
// Since PositionLocation for regular positions also has an IsBid bool, it's
147+
// possible for the hash of a knockout order to collide with a position of
148+
// the same user. To avoid this, we increment the first byte of the hash.
149+
// Sure wish Go had a non-painful way to make fields optional/nullable.
150+
h[0] = byte(h[0] + 1)
151+
return h
152+
}
153+
154+
func (p *KnockoutPostHandle) Hash() [32]byte {
155+
return p.location.Hash()
156+
}
157+
158+
func (p *PositionRefreshHandle) Skippable() bool {
159+
return false
160+
}
161+
162+
func (p *RewardsRefreshHandle) Skippable() bool {
163+
return true
164+
}
165+
166+
func (p *KnockoutAliveHandle) Skippable() bool {
167+
return false
168+
}
169+
170+
func (p *KnockoutPostHandle) Skippable() bool {
171+
return false
172+
}

0 commit comments

Comments
 (0)