@@ -318,11 +318,41 @@ IoT_Error_t aws_iot_mqtt_internal_send_packet(AWS_IoT_Client *pClient, size_t le
318
318
FUNC_EXIT_RC (rc )
319
319
}
320
320
321
- static IoT_Error_t _aws_iot_mqtt_internal_decode_packet_remaining_len (AWS_IoT_Client * pClient ,
321
+ static IoT_Error_t _aws_iot_mqtt_internal_readWrapper ( AWS_IoT_Client * pClient , size_t offset , size_t size , Timer * pTimer , size_t * read_len ) {
322
+ IoT_Error_t rc ;
323
+ int byteToRead ;
324
+ size_t byteRead = 0 ;
325
+
326
+ byteToRead = ( offset + size ) - pClient -> clientData .readBufIndex ;
327
+
328
+ if ( byteToRead > 0 )
329
+ {
330
+ rc = pClient -> networkStack .read ( & ( pClient -> networkStack ),
331
+ pClient -> clientData .readBuf + pClient -> clientData .readBufIndex ,
332
+ (size_t )byteToRead ,
333
+ pTimer ,
334
+ & byteRead );
335
+ pClient -> clientData .readBufIndex += byteRead ;
336
+
337
+ /* refresh byte to read */
338
+ byteToRead = ( offset + size ) - ((int )pClient -> clientData .readBufIndex );
339
+ * read_len = size - (size_t )byteToRead ;
340
+ }
341
+ else
342
+ {
343
+ * read_len = size ;
344
+ rc = SUCCESS ;
345
+ }
346
+
347
+
348
+
349
+ return rc ;
350
+ }
351
+ static IoT_Error_t _aws_iot_mqtt_internal_decode_packet_remaining_len (AWS_IoT_Client * pClient , size_t * offset ,
322
352
size_t * rem_len , Timer * pTimer ) {
323
- unsigned char encodedByte ;
324
353
size_t multiplier , len ;
325
354
IoT_Error_t rc ;
355
+ size_t read_len ;
326
356
327
357
FUNC_ENTRY ;
328
358
@@ -335,22 +365,23 @@ static IoT_Error_t _aws_iot_mqtt_internal_decode_packet_remaining_len(AWS_IoT_Cl
335
365
/* bad data */
336
366
FUNC_EXIT_RC (MQTT_DECODE_REMAINING_LENGTH_ERROR );
337
367
}
368
+ rc = _aws_iot_mqtt_internal_readWrapper ( pClient , len , 1 , pTimer , & read_len );
338
369
339
- rc = pClient -> networkStack .read (& (pClient -> networkStack ), & encodedByte , 1 , pTimer , & len );
340
370
if (SUCCESS != rc ) {
341
371
FUNC_EXIT_RC (rc );
342
372
}
343
373
344
- * rem_len += ((encodedByte & 127 ) * multiplier );
374
+ * rem_len += (( pClient -> clientData . readBuf [ len ] & 127 ) * multiplier );
345
375
multiplier *= 128 ;
346
- } while ((encodedByte & 128 ) != 0 );
347
-
376
+ } while (( pClient -> clientData . readBuf [ len ] & 128 ) != 0 );
377
+ * offset = len + 1 ;
348
378
FUNC_EXIT_RC (rc );
349
379
}
350
380
351
381
static IoT_Error_t _aws_iot_mqtt_internal_read_packet (AWS_IoT_Client * pClient , Timer * pTimer , uint8_t * pPacketType ) {
352
- size_t len , rem_len , total_bytes_read , bytes_to_be_read , read_len ;
382
+ size_t rem_len , total_bytes_read , bytes_to_be_read , read_len ;
353
383
IoT_Error_t rc ;
384
+ size_t offset = 0 ;
354
385
MQTTHeader header = {0 };
355
386
Timer packetTimer ;
356
387
init_timer (& packetTimer );
@@ -361,30 +392,22 @@ static IoT_Error_t _aws_iot_mqtt_internal_read_packet(AWS_IoT_Client *pClient, T
361
392
bytes_to_be_read = 0 ;
362
393
read_len = 0 ;
363
394
364
- rc = pClient -> networkStack . read ( & ( pClient -> networkStack ), pClient -> clientData . readBuf , 1 , pTimer , & read_len );
395
+ rc = _aws_iot_mqtt_internal_readWrapper ( pClient , offset , 1 , pTimer , & read_len );
365
396
/* 1. read the header byte. This has the packet type in it */
366
397
if (NETWORK_SSL_NOTHING_TO_READ == rc ) {
367
398
return MQTT_NOTHING_TO_READ ;
368
399
} else if (SUCCESS != rc ) {
369
400
return rc ;
370
401
}
371
402
372
- len = 1 ;
373
-
374
- /* Use the constant packet receive timeout, instead of the variable (remaining) pTimer time, to
375
- * determine packet receiving timeout. This is done so we don't prematurely time out packet receiving
376
- * if the remaining time in pTimer is too short.
377
- */
378
- pTimer = & packetTimer ;
379
-
380
403
/* 2. read the remaining length. This is variable in itself */
381
- rc = _aws_iot_mqtt_internal_decode_packet_remaining_len (pClient , & rem_len , pTimer );
404
+ rc = _aws_iot_mqtt_internal_decode_packet_remaining_len (pClient , & offset , & rem_len , pTimer );
382
405
if (SUCCESS != rc ) {
383
406
return rc ;
384
- }
385
-
407
+ }
408
+
386
409
/* if the buffer is too short then the message will be dropped silently */
387
- if (rem_len >= pClient -> clientData .readBufSize ) {
410
+ if (( rem_len + offset ) >= pClient -> clientData .readBufSize ) {
388
411
bytes_to_be_read = pClient -> clientData .readBufSize ;
389
412
do {
390
413
rc = pClient -> networkStack .read (& (pClient -> networkStack ), pClient -> clientData .readBuf , bytes_to_be_read ,
@@ -398,21 +421,29 @@ static IoT_Error_t _aws_iot_mqtt_internal_read_packet(AWS_IoT_Client *pClient, T
398
421
}
399
422
}
400
423
} while (total_bytes_read < rem_len && SUCCESS == rc );
401
- return MQTT_RX_BUFFER_TOO_SHORT_ERROR ;
402
- }
403
424
404
- /* put the original remaining length into the read buffer */
405
- len += aws_iot_mqtt_internal_write_len_to_buffer (pClient -> clientData .readBuf + 1 , (uint32_t ) rem_len );
425
+ /* Check buffer was correctly emptied, otherwise, return error message. */
426
+ if ( total_bytes_read == rem_len )
427
+ {
428
+ aws_iot_mqtt_internal_flushBuffers ( pClient );
429
+ return MQTT_RX_BUFFER_TOO_SHORT_ERROR ;
430
+ }
431
+ else
432
+ {
433
+ return rc ;
434
+ }
435
+ }
406
436
407
437
/* 3. read the rest of the buffer using a callback to supply the rest of the data */
408
438
if (rem_len > 0 ) {
409
- rc = pClient -> networkStack .read (& (pClient -> networkStack ), pClient -> clientData .readBuf + len , rem_len , pTimer ,
410
- & read_len );
439
+ rc = _aws_iot_mqtt_internal_readWrapper ( pClient , offset , rem_len , pTimer , & read_len );
411
440
if (SUCCESS != rc || read_len != rem_len ) {
412
441
return FAILURE ;
413
442
}
414
443
}
415
444
445
+ /* Pack has been received, we can flush the buffers for next call. */
446
+ aws_iot_mqtt_internal_flushBuffers ( pClient );
416
447
header .byte = pClient -> clientData .readBuf [0 ];
417
448
* pPacketType = MQTT_HEADER_FIELD_TYPE (header .byte );
418
449
@@ -613,6 +644,10 @@ IoT_Error_t aws_iot_mqtt_internal_cycle_read(AWS_IoT_Client *pClient, Timer *pTi
613
644
return rc ;
614
645
}
615
646
647
+ IoT_Error_t aws_iot_mqtt_internal_flushBuffers ( AWS_IoT_Client * pClient ) {
648
+ pClient -> clientData .readBufIndex = 0 ;
649
+ }
650
+
616
651
/* only used in single-threaded mode where one command at a time is in process */
617
652
IoT_Error_t aws_iot_mqtt_internal_wait_for_read (AWS_IoT_Client * pClient , uint8_t packetType , Timer * pTimer ) {
618
653
IoT_Error_t rc ;
0 commit comments