@@ -235,12 +235,16 @@ func (s *session) queueForSend(msg *Message) error {
235235
236236 s .toSend = append (s .toSend , msgBytes )
237237
238+ s .notifyMessageOut ()
239+
240+ return nil
241+ }
242+
243+ func (s * session ) notifyMessageOut () {
238244 select {
239245 case s .messageEvent <- true :
240246 default :
241247 }
242-
243- return nil
244248}
245249
246250// send will validate, persist, queue the message. If the session is logged on, send all messages in the queue.
@@ -261,7 +265,7 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error {
261265 }
262266
263267 s .toSend = append (s .toSend , msgBytes )
264- s .sendQueued ()
268+ s .sendQueued (true )
265269
266270 return nil
267271}
@@ -290,7 +294,7 @@ func (s *session) dropAndSendInReplyTo(msg *Message, inReplyTo *Message) error {
290294
291295 s .dropQueued ()
292296 s .toSend = append (s .toSend , msgBytes )
293- s .sendQueued ()
297+ s .sendQueued (true )
294298
295299 return nil
296300}
@@ -346,9 +350,13 @@ func (s *session) persist(seqNum int, msgBytes []byte) error {
346350 return s .store .IncrNextSenderMsgSeqNum ()
347351}
348352
349- func (s * session ) sendQueued () {
350- for _ , msgBytes := range s .toSend {
351- s .sendBytes (msgBytes )
353+ func (s * session ) sendQueued (blockUntilSent bool ) {
354+ for i , msgBytes := range s .toSend {
355+ if ! s .sendBytes (msgBytes , blockUntilSent ) {
356+ s .toSend = s .toSend [i :]
357+ s .notifyMessageOut ()
358+ return
359+ }
352360 }
353361
354362 s .dropQueued ()
@@ -363,18 +371,30 @@ func (s *session) EnqueueBytesAndSend(msg []byte) {
363371 defer s .sendMutex .Unlock ()
364372
365373 s .toSend = append (s .toSend , msg )
366- s .sendQueued ()
374+ s .sendQueued (true )
367375}
368376
369- func (s * session ) sendBytes (msg []byte ) {
377+ func (s * session ) sendBytes (msg []byte , blockUntilSent bool ) bool {
370378 if s .messageOut == nil {
371379 s .log .OnEventf ("Failed to send: disconnected" )
372- return
380+ return false
381+ }
382+
383+ if blockUntilSent {
384+ s .messageOut <- msg
385+ s .log .OnOutgoing (msg )
386+ s .stateTimer .Reset (s .HeartBtInt )
387+ return true
373388 }
374389
375- s .log .OnOutgoing (msg )
376- s .messageOut <- msg
377- s .stateTimer .Reset (s .HeartBtInt )
390+ select {
391+ case s .messageOut <- msg :
392+ s .log .OnOutgoing (msg )
393+ s .stateTimer .Reset (s .HeartBtInt )
394+ return true
395+ default :
396+ return false
397+ }
378398}
379399
380400func (s * session ) doTargetTooHigh (reject targetTooHigh ) (nextState resendState , err error ) {
0 commit comments