Skip to content

Commit a9bc71c

Browse files
DEX-725 Correct logic after storing into queue failure (#250)
* Removing from active orders if case of failure of the storing of places * Do nothing in case of failure of the storing of cancels
1 parent 0b5f6f6 commit a9bc71c

2 files changed

Lines changed: 30 additions & 9 deletions

File tree

dex-it/src/test/scala/com/wavesplatform/it/sync/KafkaIssuesTestSuite.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,31 @@ class KafkaIssuesTestSuite extends MatcherSuiteBase {
6969

7070
"Matcher should free reserved balances if order wasn't placed into the queue" in {
7171

72-
placeAndAwaitAtDex(mkOrderDP(alice, wavesUsdPair, SELL, 10.waves, 3.0))
72+
val order = mkOrderDP(alice, wavesUsdPair, SELL, 10.waves, 3.0)
73+
placeAndAwaitAtDex(order)
7374

7475
dex1.api.currentOffset shouldBe 0
7576
dex1.api.reservedBalance(alice) should matchTo(Map[Asset, Long](Waves -> 10.003.waves))
7677

7778
disconnectKafkaFromNetwork()
7879

79-
dex1.api.tryPlace(mkOrderDP(alice, wavesUsdPair, SELL, 30.waves, 3.0))
80+
dex1.api.tryCancel(alice, order) shouldBe 'left
81+
dex1.api.tryPlace(mkOrderDP(alice, wavesUsdPair, SELL, 30.waves, 3.0)) shouldBe 'left
82+
8083
dex1.api.reservedBalance(alice) should matchTo(Map[Asset, Long](Waves -> 10.003.waves))
8184

85+
val oh = dex1.api.orderHistory(alice, Some(true))
86+
oh should have size 1
87+
oh.head.id shouldBe order.id()
88+
8289
connectKafkaToNetwork()
90+
91+
dex1.api.tryCancel(alice, order) shouldBe 'right
92+
dex1.api.orderHistory(alice, Some(true)) should have size 0
93+
dex1.api.reservedBalance(alice) shouldBe empty
94+
95+
dex1.api.tryPlace(mkOrderDP(alice, wavesUsdPair, SELL, 30.waves, 3.0)) shouldBe 'right
96+
dex1.api.orderHistory(alice, Some(true)) should have size 1
97+
dex1.api.reservedBalance(alice) should matchTo(Map[Asset, Long](Waves -> 30.003.waves))
8398
}
8499
}

dex/src/main/scala/com/wavesplatform/dex/AddressActor.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -156,12 +156,18 @@ class AddressActor(owner: Address,
156156
val orders = if (onlyActive) matchingActiveOrders else matchingActiveOrders ++ orderDB.getFinalizedOrders(owner, maybePair)
157157
sender ! Reply.OrdersStatuses(orders)
158158

159-
case event: Event.StoreFailed =>
160-
log.trace(s"Got $event")
161-
pendingCommands.remove(event.orderId).foreach { item =>
162-
item.client ! CanNotPersist(event.reason)
159+
case storeFailed @ Event.StoreFailed(orderId, reason, queueEvent) =>
160+
log.trace(s"Got $storeFailed")
161+
pendingCommands.remove(orderId).foreach { item =>
162+
item.client ! CanNotPersist(reason)
163+
}
164+
queueEvent match {
165+
case QueueEvent.Placed(_) | QueueEvent.PlacedMarket(_) =>
166+
activeOrders.remove(orderId).foreach { ao =>
167+
openVolume = openVolume |-| ao.reservableBalance
168+
}
169+
case _ =>
163170
}
164-
openVolume = openVolume |-| activeOrders(event.orderId).reservableBalance
165171

166172
case event: ValidationEvent =>
167173
log.trace(s"Got $event")
@@ -389,7 +395,7 @@ class AddressActor(owner: Address,
389395
Success(Some(error.CanNotPersistEvent))
390396
}
391397
.onComplete {
392-
case Success(Some(error)) => self ! Event.StoreFailed(orderId, error)
398+
case Success(Some(error)) => self ! Event.StoreFailed(orderId, error, event)
393399
case Success(None) => log.trace(s"$event saved")
394400
case _ => throw new IllegalStateException("Impossibru")
395401
}
@@ -477,7 +483,7 @@ object AddressActor {
477483
case class ValidationPassed(acceptedOrder: AcceptedOrder) extends ValidationEvent {
478484
override def orderId: Order.Id = acceptedOrder.order.id()
479485
}
480-
case class StoreFailed(orderId: Order.Id, reason: MatcherError) extends Event
486+
case class StoreFailed(orderId: Order.Id, reason: MatcherError, queueEvent: QueueEvent) extends Event
481487
}
482488

483489
private case class CancelExpiredOrder(orderId: ByteStr)

0 commit comments

Comments
 (0)