@@ -265,7 +265,7 @@ func (s *session) sendInReplyTo(msg *Message, inReplyTo *Message) error {
265265 }
266266
267267 s .toSend = append (s .toSend , msgBytes )
268- s .sendQueued ()
268+ s .sendQueued (true )
269269
270270 return nil
271271}
@@ -294,7 +294,7 @@ func (s *session) dropAndSendInReplyTo(msg *Message, inReplyTo *Message) error {
294294
295295 s .dropQueued ()
296296 s .toSend = append (s .toSend , msgBytes )
297- s .sendQueued ()
297+ s .sendQueued (true )
298298
299299 return nil
300300}
@@ -350,9 +350,9 @@ func (s *session) persist(seqNum int, msgBytes []byte) error {
350350 return s .store .IncrNextSenderMsgSeqNum ()
351351}
352352
353- func (s * session ) sendQueued () {
353+ func (s * session ) sendQueued (blockUntilSent bool ) {
354354 for i , msgBytes := range s .toSend {
355- if ! s .sendBytes (msgBytes ) {
355+ if ! s .sendBytes (msgBytes , blockUntilSent ) {
356356 s .toSend = s .toSend [i :]
357357 s .notifyMessageOut ()
358358 return
@@ -371,15 +371,22 @@ func (s *session) EnqueueBytesAndSend(msg []byte) {
371371 defer s .sendMutex .Unlock ()
372372
373373 s .toSend = append (s .toSend , msg )
374- s .sendQueued ()
374+ s .sendQueued (true )
375375}
376376
377- func (s * session ) sendBytes (msg []byte ) bool {
377+ func (s * session ) sendBytes (msg []byte , blockUntilSent bool ) bool {
378378 if s .messageOut == nil {
379379 s .log .OnEventf ("Failed to send: disconnected" )
380380 return false
381381 }
382382
383+ if blockUntilSent {
384+ s .messageOut <- msg
385+ s .log .OnOutgoing (msg )
386+ s .stateTimer .Reset (s .HeartBtInt )
387+ return true
388+ }
389+
383390 select {
384391 case s .messageOut <- msg :
385392 s .log .OnOutgoing (msg )
0 commit comments