Skip to content

Commit 3594015

Browse files
TRD-1749: Escape send queued when blocked on connection side (#18)
1 parent 3bf2b87 commit 3594015

File tree

1 file changed

+22
-9
lines changed

1 file changed

+22
-9
lines changed

session.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -235,12 +235,16 @@ func (s *session) queueForSend(msg *Message) error {
235235

236236
s.toSend = append(s.toSend, msgBytes)
237237

238+
s.notifyMessageOut()
239+
240+
return nil
241+
}
242+
243+
func (s *session) notifyMessageOut() {
238244
select {
239245
case s.messageEvent <- true:
240246
default:
241247
}
242-
243-
return nil
244248
}
245249

246250
// send will validate, persist, queue the message. If the session is logged on, send all messages in the queue.
@@ -347,8 +351,12 @@ func (s *session) persist(seqNum int, msgBytes []byte) error {
347351
}
348352

349353
func (s *session) sendQueued() {
350-
for _, msgBytes := range s.toSend {
351-
s.sendBytes(msgBytes)
354+
for i, msgBytes := range s.toSend {
355+
if !s.sendBytes(msgBytes) {
356+
s.toSend = s.toSend[i:]
357+
s.notifyMessageOut()
358+
return
359+
}
352360
}
353361

354362
s.dropQueued()
@@ -366,15 +374,20 @@ func (s *session) EnqueueBytesAndSend(msg []byte) {
366374
s.sendQueued()
367375
}
368376

369-
func (s *session) sendBytes(msg []byte) {
377+
func (s *session) sendBytes(msg []byte) bool {
370378
if s.messageOut == nil {
371379
s.log.OnEventf("Failed to send: disconnected")
372-
return
380+
return false
373381
}
374382

375-
s.log.OnOutgoing(msg)
376-
s.messageOut <- msg
377-
s.stateTimer.Reset(s.HeartBtInt)
383+
select {
384+
case s.messageOut <- msg:
385+
s.log.OnOutgoing(msg)
386+
s.stateTimer.Reset(s.HeartBtInt)
387+
return true
388+
default:
389+
return false
390+
}
378391
}
379392

380393
func (s *session) doTargetTooHigh(reject targetTooHigh) (nextState resendState, err error) {

0 commit comments

Comments
 (0)