Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message queued with esp_mqtt_client_enqueue() are delayed (IDFGH-14719) #15458

Open
3 tasks done
dwmw2 opened this issue Feb 24, 2025 · 12 comments
Open
3 tasks done

Message queued with esp_mqtt_client_enqueue() are delayed (IDFGH-14719) #15458

dwmw2 opened this issue Feb 24, 2025 · 12 comments
Assignees
Labels
Status: Opened Issue is new Type: Bug bugs in IDF

Comments

@dwmw2
Copy link

dwmw2 commented Feb 24, 2025

Answers checklist.

  • I have read the documentation ESP-IDF Programming Guide and the issue is not addressed there.
  • I have updated my IDF branch (master or release) to the latest version and checked that the issue is present there.
  • I have searched the issue tracker for a similar issue and not found a similar issue.

IDF version.

v5.1.5

Espressif SoC revision.

ESP32-S3

Operating System used.

Linux

How did you build your project?

Other (please specify in More Information)

If you are using Windows, please specify command line type.

None

Development Kit.

Custom Board

Power Supply used.

USB

What is the expected behavior?

Using ESPHome with the idf_send_async option to make it use esp_mqtt_client_enqueue(), I expected it to send my queued messages promptly.

(I set this option because I didn't like it when esp_mqtt_client_publish() blocks for 20 seconds when the network is down).

What is the actual behavior?

Each time around the loop in esp_mqtt_task(), it attempts to send precisely one message, then goes to sleep for another second before sending the next, leading to a large backlog and lost messages.

Steps to reproduce.

Set idf_send_async in an ESPHome configuration and watch how long it takes for MQTT messages to be sent.

Debug Logs.


More Information.

This patch helps a bit, by flushing the whole queue every time. But there's still up to a second (MQTT_POLL_READ_TIMEOUT_MS) before each message is sent. Can we wake the task when a message is queued?

--- components/mqtt/esp-mqtt/mqtt_client.c	2025-02-24 15:40:44.868028678 +0000
+++ components/mqtt/esp-mqtt/mqtt_client.c~	2024-10-31 17:03:14.000000000 +0000
@@ -1662,13 +1662,16 @@ static void esp_mqtt_task(void *pv)
 
 
             // resend all non-transmitted messages first
-            outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED, NULL);
+            outbox_item_handle_t item;
+	    do {
+	    item = outbox_dequeue(client->outbox, QUEUED, NULL);
             if (item) {
                 if (mqtt_resend_queued(client, item) == ESP_OK) {
                     if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos == 0) {
                         // delete all qos0 publish messages once we process them
                         if (outbox_delete_item(client->outbox, item) != ESP_OK) {
                             ESP_LOGE(TAG, "Failed to remove queued qos0 message from the outbox");
+			    break;
                         }
                     }
                     if (client->mqtt_state.pending_publish_qos > 0) {
@@ -1679,7 +1682,7 @@ static void esp_mqtt_task(void *pv)
                         }
 #endif
                     }
-                }
+                } else break;
                 // resend other "transmitted" messages after 1s
             } else if (has_timed_out(last_retransmit, client->config->message_retransmit_timeout)) {
                 last_retransmit = platform_tick_get_ms();
@@ -1691,9 +1694,10 @@ static void esp_mqtt_task(void *pv)
                             esp_mqtt5_increment_packet_counter(client);
                         }
 #endif
-                    }
+                    } else break;
                 }
-            }
+            } else break;
+	    } while (item /* && this thread doesn't need to voluntarily yield */);
 
             if (process_keepalive(client) != ESP_OK) {
                 break;
@dwmw2 dwmw2 added the Type: Bug bugs in IDF label Feb 24, 2025
@github-actions github-actions bot changed the title Message queued with esp_mqtt_client_enqueue() are delayed Message queued with esp_mqtt_client_enqueue() are delayed (IDFGH-14719) Feb 24, 2025
@espressif-bot espressif-bot added the Status: Opened Issue is new label Feb 24, 2025
@dwmw2
Copy link
Author

dwmw2 commented Feb 25, 2025

Perhaps max_poll_timeout() should return 10ms instead of max_timeout if there are any QUEUED messages in the outbox?

@dwmw2
Copy link
Author

dwmw2 commented Feb 25, 2025

This helps to flush the already-queued messages without waiting a whole second between each one, but it doesn't solve the problem that it's a whole second before the first message is sent, after calling esp_mqtt_client_publish(). I can't see a way in FreeRTOS to wake a thread that's waiting in esp_transport_poll_read() / select().

Using esp_mqtt_client_publish() to attempt to publish it directly and fall back to queuing doesn't seem to be an option because that can block for a long time. If there was a way to use esp_mqtt_client_publish() with a much shorter network timeout (and without aborting the connection if it does time out) then that might work?

--- mqtt_client.c.orig	2025-02-25 10:41:20.280544113 +0000
+++ mqtt_client.c	2025-02-25 10:45:39.571513453 +0000
@@ -1577,6 +1577,7 @@ static void esp_mqtt_task(void *pv)
     client->state = MQTT_STATE_INIT;
     xEventGroupClearBits(client->status_bits, STOPPED_BIT);
     while (client->run) {
+        uint32_t next_timeout = MQTT_POLL_READ_TIMEOUT_MS;
         MQTT_API_LOCK(client);
         run_event_loop(client);
         // delete long pending messages
@@ -1665,6 +1666,7 @@ static void esp_mqtt_task(void *pv)
             outbox_item_handle_t item = outbox_dequeue(client->outbox, QUEUED, NULL);
             if (item) {
                 if (mqtt_resend_queued(client, item) == ESP_OK) {
+                    next_timeout = 10;
                     if (client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_publish_qos == 0) {
                         // delete all qos0 publish messages once we process them
                         if (outbox_delete_item(client->outbox, item) != ESP_OK) {
@@ -1693,7 +1695,7 @@ static void esp_mqtt_task(void *pv)
 #endif
                     }
                 }
-            }
+            } // else reduce next_timeout if it *will* time out before the max. */
 
             if (process_keepalive(client) != ESP_OK) {
                 break;
@@ -1704,7 +1706,7 @@ static void esp_mqtt_task(void *pv)
                 ESP_LOGD(TAG, "Refreshing the connection...");
                 esp_mqtt_abort_connection(client);
                 client->state = MQTT_STATE_INIT;
-            }
+            } // else reduce next_timeout if it *will* time out before the max. */
 
             break;
         case MQTT_STATE_WAIT_RECONNECT:
@@ -1733,7 +1735,7 @@ static void esp_mqtt_task(void *pv)
         }
         MQTT_API_UNLOCK(client);
         if (MQTT_STATE_CONNECTED == client->state) {
-            if (esp_transport_poll_read(client->transport, max_poll_timeout(client, MQTT_POLL_READ_TIMEOUT_MS)) < 0) {
+            if (esp_transport_poll_read(client->transport, max_poll_timeout(client, next_timeout)) < 0) {
                 ESP_LOGE(TAG, "Poll read error: %d, aborting connection", errno);
                 esp_mqtt_abort_connection(client);
             }

dwmw2 added a commit to dwmw2/esphome that referenced this issue Feb 25, 2025
The ESP-IDF MQTT component is fairly unusable for low-latency setups such
as ESPHome. By default, ESPHome calls esp_mqtt_client_publish() directly
from the MQTT component's loop() function, or when publishing status
updates for other components. This may block for up to 20 seconds(!!) in
adverse network conditions.

With the `idf_send_async` option, subscribe and unsubscribe requests can
still block the loop thread for multiple seconds, but publishing sensor
updates is queued for esp-mqtt's own thread to actually send them. Which
it does very slowly, no more than one per second, as discussed in
espressif/esp-idf#15458

And to top it all off, even with `idf_send_async` set, the so-called
'asynchronous' send can still block for ten seconds because it takes
the same MQTT_API_LOCK that the esp-mqtt thread holds while it runs
its loop and a network send is timing out. This is reported in
espressif/esp-idf#13078

The only way I can see to use esp-mqtt sanely is to use a thread of our
own, queueing all sub/unsub/publish requests and invoking the esp-mqtt
APIs from that thread.

The existing RingBuffer abstraction works nicely for this as it already
handles all the atomicity and waking when data are available. I've chosen
to avoid allocations by passing the actual data through the ringbuffer,
which means we impose a hard limit on the total topic+payload size for
each request. An alternative would be to allocate copies in the enqueue()
function and to pass *pointers* through the ringbuffer (which could be
a different type of queue then, if we wanted to reinvent things).
dwmw2 added a commit to dwmw2/esphome that referenced this issue Feb 25, 2025
The ESP-IDF MQTT component is fairly unusable for low-latency setups such
as ESPHome. By default, ESPHome calls esp_mqtt_client_publish() directly
from the MQTT component's loop() function, or when publishing status
updates for other components. This may block for up to 20 seconds(!!) in
adverse network conditions.

With the `idf_send_async` option, subscribe and unsubscribe requests can
still block the loop thread for multiple seconds, but publishing sensor
updates is queued for esp-mqtt's own thread to actually send them. Which
it does very slowly, no more than one per second, as discussed in
espressif/esp-idf#15458

And to top it all off, even with `idf_send_async` set, the so-called
'asynchronous' send can still block for ten seconds because it takes
the same MQTT_API_LOCK that the esp-mqtt thread holds while it runs
its loop and a network send is timing out. This is reported in
espressif/esp-idf#13078

The only way I can see to use esp-mqtt sanely is to use a thread of our
own, queueing all sub/unsub/publish requests and invoking the esp-mqtt
APIs from that thread.

The existing RingBuffer abstraction works nicely for this as it already
handles all the atomicity and waking when data are available. I've chosen
to avoid allocations by passing the actual data through the ringbuffer,
which means we impose a hard limit on the total topic+payload size for
each request. An alternative would be to allocate copies in the enqueue()
function and to pass *pointers* through the ringbuffer (which could be
a different type of queue then, if we wanted to reinvent things).

Fixes: esphome#6810
dwmw2 added a commit to dwmw2/esphome that referenced this issue Feb 25, 2025
The ESP-IDF MQTT component is fairly unusable for low-latency setups such
as ESPHome. By default, ESPHome calls esp_mqtt_client_publish() directly
from the MQTT component's loop() function, or when publishing status
updates for other components. This may block for up to 20 seconds(!!) in
adverse network conditions.

With the `idf_send_async` option, subscribe and unsubscribe requests can
still block the loop thread for multiple seconds, but publishing sensor
updates is queued for esp-mqtt's own thread to actually send them. Which
it does very slowly, no more than one per second, as discussed in
espressif/esp-idf#15458

And to top it all off, even with `idf_send_async` set, the so-called
'asynchronous' send can still block for ten seconds because it takes
the same MQTT_API_LOCK that the esp-mqtt thread holds while it runs
its loop and a network send is timing out. This is reported in
espressif/esp-idf#13078

The only way I can see to use esp-mqtt sanely is to use a thread of our
own, queueing all sub/unsub/publish requests and invoking the esp-mqtt
APIs from that thread.

The existing RingBuffer abstraction works nicely for this as it already
handles all the atomicity and waking when data are available. I've chosen
to avoid allocations by passing the actual data through the ringbuffer,
which means we impose a hard limit on the total topic+payload size for
each request. An alternative would be to allocate copies in the enqueue()
function and to pass *pointers* through the ringbuffer (which could be
a different type of queue then, if we wanted to reinvent things).

Fixes: esphome#6810
dwmw2 added a commit to dwmw2/esphome that referenced this issue Feb 25, 2025
The ESP-IDF MQTT component is fairly unusable for low-latency setups such
as ESPHome. By default, ESPHome calls esp_mqtt_client_publish() directly
from the MQTT component's loop() function, or when publishing status
updates for other components. This may block for up to 20 seconds(!!) in
adverse network conditions.

With the `idf_send_async` option, subscribe and unsubscribe requests can
still block the loop thread for multiple seconds, but publishing sensor
updates is queued for esp-mqtt's own thread to actually send them. Which
it does very slowly, no more than one per second, as discussed in
espressif/esp-idf#15458

And to top it all off, even with `idf_send_async` set, the so-called
'asynchronous' send can still block for ten seconds because it takes
the same MQTT_API_LOCK that the esp-mqtt thread holds while it runs
its loop and a network send is timing out. This is reported in
espressif/esp-idf#13078

The only way I can see to use esp-mqtt sanely is to use a thread of our
own, queueing all sub/unsub/publish requests and invoking the esp-mqtt
APIs from that thread.

The existing RingBuffer abstraction works nicely for this as it already
handles all the atomicity and waking when data are available. I've chosen
to avoid allocations by passing the actual data through the ringbuffer,
which means we impose a hard limit on the total topic+payload size for
each request. An alternative would be to allocate copies in the enqueue()
function and to pass *pointers* through the ringbuffer (which could be
a different type of queue then, if we wanted to reinvent things).

Fixes: esphome#6810
@euripedesrocha
Copy link
Collaborator

Hi @dwmw2 thanks for reporting the issue.

Do adjust CONFIG_MQTT_POLL_READ_TIMEOUT_MS from Kconfig help improve the system behavior for your scenario?

For the async scenarios we have esp_mqtt_client_enqueue, that unfortunately can also block in poor network scenarios. But It will only happen if the client is running the loop. If blocked in waiting for read path the enqueue function will be possible to use.

We don't process more than one message per loop iteration to avoid holding the client mutex for too long. I'm assuming that you are publishing very often, could you describe your system so I can understand if we have some workaround for your context or if we need to add something to the component.

Could you describe the scenario you get the 20 s block with network disconnected?

@dwmw2
Copy link
Author

dwmw2 commented Mar 17, 2025

I'm using ESPHome, which will publish a set of messages (for each component's state) when it reconnects to MQTT. I've just deployed a device in a part of my house with poor network coverage.

When the WiFi isn't behaving, esp_mqtt_write() can take up to 10 seconds (MQTT_NETWORK_TIMEOUT_MS) to fail. And does so with the lock held, when invoked via mqtt_resend_queue() from the esp-mqtt thread. So it blocks even esp_mqtt_client_enqueue() for 10 seconds.

@dwmw2
Copy link
Author

dwmw2 commented Mar 17, 2025

Without actually testing: Yes, reducing CONFIG_MQTT_POLL_READ_TIMEOUT_MS will slightly improve the behaviour. When using esp_mqtt_client_enqueue(), that macro actually defines the maximum rate at which queued messages will be sent. So if I reduce it to 100ms, then the system will send a maximum of 10 messages per second. I can reduce it to 1ms, waste a lot of CPU time, and be able to send 1000 messages a second.

That's why my proof-of-concept hack above reduces the poll timeout when there are messages queued, but stays at 1000ms when the queue is empty, to try to balance between wasted time and sending rate.

But even with that hack, esp_mqtt_client_enqueue() can still be blocked for 10 seconds. And even if it isn't, there can still be a delay of up to a second after a message is queued, until esp_mqtt_task() first wakes up and tries to send it.

@euripedesrocha
Copy link
Collaborator

@dwmw2 we are aware of the esp_mqtt_client_enqueue blocking problem, and I believe you are also aware of it from the mention from your side on that issue. While that would be fixed later, It will probably not be a complete solution for your case given that the dispatching from the outbox queue will always have some delay, and that is by design. I'm trying to find a workaround for your issue and identify some improvements for such scenarios without compromising the common requirements and library operation.

One possible change is to enable the flush of the queue if enabled by the user. So add a new entry on the config struct to enable flushing the queue when needed. Similar to the solution you suggested, but controlled to keep current behavior.

As you pointed, we need to balance the load of the client with the retries and accommodate scenarios with poor network connections.

@dwmw2
Copy link
Author

dwmw2 commented Mar 18, 2025

Thanks, @euripedesrocha.

Yes, I'm aware of the esp_mqtt_client_enqueue blocking problem, but it seems that #13078 still has the wrong title. Please could you fix it to say "esp_mqtt_client_enqueue sometimes blocks for 10+ seconds" (not milliseconds).

Despatching from the queue doesn't have to have a significant delay. What if the thread waited not just in esp_transport_poll_read() as it does at the moment, but could also be woken directly by esp_mqtt_client_enqueue() when a message has been added to the queue?

I'm thinking of a design where there's a separate queue (or perhaps even a Queue) between esp_mqtt_client_enqueue() and esp_mqtt_task(), so the former doesn't have to take the main MQTT API lock and thus doesn't have to block for multiple seconds. All it would do is put the item on the new queue and wake the esp_mqtt_task() to consume it.

Then the loop in esp_mqtt_task() would take the item off the queue and transfer it into its own outbox. Couple that with some variant of my hack above so that esp_mqtt_task() doesn't wait a whole second between sending each item from the outbox, but actually sends them a bit faster than that.

That should actually be fairly simple. I'd hack up a proof of concept but I don't know FreeRTOS well enough to implement the part which waits for either the transport to be readable, or the Queue (or other form of queue/semaphore) to wake up.

@euripedesrocha
Copy link
Collaborator

@dwmw2 The solution you propose is something you can achieve by providing your own outbox implementation and sharing the queue with your system. To have it as a general solution might have some issues like the msg id that needs to be returned when enqueueing the message.

What makes this a not so trivial task is that the whole library was designed considering that it would be locked for the whole processing. While it makes easier to consider the potential concurrency problems by avoiding them entirely, it makes hard to refactor to accomplish the async nature that esp_mqtt_client_enqueue must have.

Since this is a recurring issue in the library, I'll prioritize the work on this. As I mention, the original design that we assumed would fix the problem rose other issues in testing and question in review, hence it was abandoned.

@dwmw2
Copy link
Author

dwmw2 commented Mar 18, 2025

The solution you propose is something you can achieve by providing your own outbox implementation and sharing the queue with your system.

Hm, I thought I'd looked at that (basically the idea being just to copy mqtt_outbox.c, use a separate internal lock for the outbox alone, and have a way for the system to quietly insert its own messages into the outbox directly).

But things like outbox_enqueue() require visibility into data structures like outbox_message_t, so even that doesn't work very nicely. And then there's the fact that mqtt_client_enqueue_publish() does a whole bunch of stuff to generate the data that even gets passed into outbox_enqueue().

And it still doesn't solve the problem mentioned in this ticket, that esp_mqtt_task() can't be woken up directly to process the messages from the outbox, and would still take its sweet time sending one message a second.

But yes, this is fairly close to the solution I came up with for ESPHome in esphome/esphome#8325 — just put the messages ({topic, message, qos, retain flag}) into a queue from the 'real time' loop, and have a separate thread pull them out and call esp_mqtt_client_publish() synchronously.

@euripedesrocha
Copy link
Collaborator

And it still doesn't solve the problem mentioned in this ticket, that esp_mqtt_task() can't be woken up directly to process the messages from the outbox, and would still take its sweet time sending one message a second.

This isn't exactly a problem but a design decision. If you are doing an async operation, we assume that the message can be published later. You have control on the load you intend to have on your system by controlling the network related timeouts and task priority. If you want the message to be published right away, there is esp_mqtt_client_publish.

I understand that in scenarios of a poor network this imposes some blocks on the system, but we always prioritize that the process is completed and messages are sent, and the client is always in a consistent state.

I can add the option to the config to process all message to be published in the loop, and a PR from your side is welcomed as well. But this needs to be opt in and keep the current behavior as default.

@dwmw2
Copy link
Author

dwmw2 commented Mar 19, 2025

Ah, the perennial 'bug' vs. 'feature' debate. Which philosophically often ends up boiling down to how well-documented the behaviour in question is.

To be clear, the behaviour in question is that messages enqueued with esp_mqtt_client_enqueue() are delayed for a period of up to MQTT_POLL_READ_TIMEOUT_MS, and the maximum rate of transmission is 1000 / MQTT_POLL_READ_TIMEOUT_MS messages per second.

But MQTT_POLL_READ_TIMEOUT_MS is documented as "Timeout when polling underlying transport for read". And the documentation for esp_mqtt_client_enqueue() does say 'later' but makes no mention of a limit of one message per second.

If you want to call it a design decision, I think we have to fix the documentation accordingly, don't we?

I can add the option to the config to process all message to be published in the loop, and a PR from your side is welcomed as well. But this needs to be opt in and keep the current behavior as default.

Rather than hogging CPU by sending all queued messages in the loop, my hack above was just setting the timeout to something like 10ms (when there were queued messages) so that the thread would wake sooner and send the next message in a more reasonable amount of time. Is that something that could be refined and made the default?

The other thing that would still be missing is a way to wake the thread when a message is first enqueued, so that first message isn't delayed for a second.

The main thing I'm trying to avoid is having to have two threads, as in my current workaround. Here's another idea which might work... extract the core of the esp_mqtt_task() loop (from the MQTT_API_LOCK() to the MQTT_API_UNLOCK()) into a separate function. The application can (optionally) disable the normal thread that runs esp_mqtt_task(), and has to invoke that newly exported function regularly instead.

Then the application's own thread can suck messages off its own nonblocking queue (like that one I implemented for ESPHome), queue them up with esp_mqtt_client_enqueue(), then immediately call into the old esp_mqtt_task() core function to process them.

@euripedesrocha
Copy link
Collaborator

@dwmw2 thanks for the suggestions for the improvements.

There is no debate regarding the async nature of the publishing of esp_mqtt_client_enqueue function being a bug. It's async, and it's to be dispatched later. For publishing directly, we have esp_mqtt_client_publish.
If you need shorter interval when processing the queue, adjust the timeouts accordingly. The defaults are what we have for typical use cases.

The proposed solution of processing all unpublished messages on each loop will only hog the CPU depending on the frequency of your published messages.

I believe we are discussing multiple problems at once:

  • esp_mqtt_client_enqueue is async and there will always have a time interval, max 1s in default settings for first message.
  • esp_mqtt_client_enqueue has a bug that makes it block when it shouldn't, I will work to fix it.
  • Waking up from a different source would require a more complex redesign, which I don't see the case for. Sorry, we need to address the more common cases, if this start to be requested more I will consider how to solve the issue. But I believe that esp_mqtt_client_publish is the current solution. I will still spend some time thinking about it, to see if I can find an easy solution that could be adopted.
  • Extracting the core loop of the mqtt client is something that we would like to do but the first refactor attempt in that direction wasn't successful due to many concerns about potential concurrency bugs and the test needed. It is still on our future plans to do it.

In summary:

  • I will apply the documentation improvements
  • Provide the opt-in flush unpublished to config
  • Work on the enqueue fix.

To fix all it will take us some time and the regular IDF workflow to be available on idf branches.

I will gladly review any PR from your side, either here on esp-mqtt repository.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Status: Opened Issue is new Type: Bug bugs in IDF
Projects
None yet
Development

No branches or pull requests

3 participants