-
Notifications
You must be signed in to change notification settings - Fork 225
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
A long queue of to-device messages can prevent outgoing federation working #17035
Comments
Did this happen again? Also, in the |
Sorry for the late reply. For some reason github emails have been going to my spam folder. It has not happened again. Let's assume it was fixed. |
Hi. I encountered the problem again in 1.113 and again with matrix.org . Couldn't log unfortunately but fixed by wiping the oldest edu which was ~600kb. |
Maybe not helpful information but now when it happened again i just checked the size of messages_json is device_federation_outbox and deleted all rows over 300kb. Had about 10 rows between 300-600kb. After deleting all queued messages to matrix.org were completed instantly. I have attached one of the big EDUs. EDIT: Just realised now that I may be confused with PDU vs EDU :) |
We also still see this issue (nitro.chat), see #17678 (comment) The problem seems to be, that limiting the number of events in the EDUs to 50 is not enough, if there are huge events in it. Maybe we should just drop these big events, or are there valid reasons for them? |
Thanks for commenting. Will try your fix in #17678 next time it happens. |
It's not a fix, just a mitigation. It just further reduces the number of events per EDU from 50 to 25. I'm working on a real fix now. |
@shyrwall I'm now testing this patch, which uses the actual size of the strings in the JSON instead of a hardcoded 50 events. It stops adding events to a EDU when it's larger than 10k, it warns about events larger than 1k, and drops events completely they are larger than 100k. if it will work for a couple of days, I will submit it as PR: EDIT: don't use this, use the patch below, which works. diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index d097e65ea7..f46da8185d 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -71,7 +71,7 @@ CATCHUP_RETRY_INTERVAL = 60 * 60 * 1000
# Limit how many presence states we add to each presence EDU, to ensure that
# they are bounded in size.
-MAX_PRESENCE_STATES_PER_EDU = 50
+LIMIT_PRESENCE_SIZE_PER_EDU = 10000
class PerDestinationQueue:
@@ -694,6 +694,16 @@ class PerDestinationQueue:
self._pending_pdus = []
+def _json_strings_sum(obj):
+ if isinstance(obj, dict):
+ return sum(_json_strings_sum(key) + _json_strings_sum(value) for key, value in obj.items())
+ elif isinstance(obj, list):
+ return sum(_json_strings_sum(item) for item in obj)
+ elif isinstance(obj, str):
+ return len(obj)
+ else:
+ return 0
+
@attr.s(slots=True, auto_attribs=True)
class _TransactionQueueManager:
"""A helper async context manager for pulling stuff off the queues and
@@ -728,14 +738,21 @@ class _TransactionQueueManager:
# Only send max 50 presence entries in the EDU, to bound the amount
# of data we're sending.
presence_to_add: List[JsonDict] = []
+ edu_size = 0
while (
self.queue._pending_presence
- and len(presence_to_add) < MAX_PRESENCE_STATES_PER_EDU
+ and edu_size < LIMIT_PRESENCE_SIZE_PER_EDU
):
_, presence = self.queue._pending_presence.popitem(last=False)
- presence_to_add.append(
- format_user_presence_state(presence, self.queue._clock.time_msec())
- )
+ formatted_presence = format_user_presence_state(presence, self.queue._clock.time_msec())
+ presence_size = _json_strings_sum(formatted_presence)
+ if presence_size > 1000:
+ logger.warning("Large presence event (size: %d): %s", presence_size, str(formatted_presence)[:5000])
+ if presence_size > 100000:
+ logger.warning("Dropping huge presence event")
+ continue
+ edu_size += presence_size
+ presence_to_add.append(formatted_presence)
pending_edus.append(
Edu( |
That sounds much better. I was not familiar with how it actually worked. That one edu could contain multiple events etc. I guess there is no way of knowing if the patch works if I implement it after the "error" has occured because then it's already in the device federation outbox . But I will implement it and hope the problem does not occur again :) |
my last patch didn't help us. as in your case, @shyrwall, the large EDUs are "to device messages", which are not presence EDUs, but handled separately in later code. Here is the new patch, that I'm testing currently. It has a cut-off if the to-device EDUs in one transaction are getting using more than 100kB, and completely drops huge to-device EDUs (larger than 500kB): diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index d097e65ea7..953a1067b6 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -73,6 +73,15 @@ CATCHUP_RETRY_INTERVAL = 60 * 60 * 1000
# they are bounded in size.
MAX_PRESENCE_STATES_PER_EDU = 50
+def _strings_sum(obj):
+ if isinstance(obj, dict):
+ return sum(_strings_sum(key) + _strings_sum(value) for key, value in obj.items())
+ elif isinstance(obj, list):
+ return sum(_strings_sum(item) for item in obj)
+ elif isinstance(obj, str):
+ return len(obj)
+ else:
+ return 0
class PerDestinationQueue:
"""
@@ -654,15 +663,26 @@ class PerDestinationQueue:
async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int]:
last_device_stream_id = self._last_device_stream_id
to_device_stream_id = self._store.get_to_device_stream_token()
- contents, stream_id = await self._store.get_new_device_msgs_for_remote(
+ messages, stream_id = await self._store.get_new_device_msgs_for_remote(
self._destination, last_device_stream_id, to_device_stream_id, limit
)
- for content in contents:
- message_id = content.get("message_id")
- if not message_id:
+ total_size = 0
+ contents: List[JsonDict] = []
+ for sid, content in messages:
+ size = _strings_sum(content)
+ logger.info("to-device-edu-size: %d", size)
+ if size > 500000:
+ logger.warning("Dropping huge to-device-message: %s", str(content)[:5000])
continue
-
- set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id)
+ message_id = content.get("message_id")
+ if message_id:
+ set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id)
+ total_size += size
+ contents.append(content)
+ if total_size > 100000:
+ logger.warning("to-device messages are too large (size: %d), defer rest to next transaction.", total_size)
+ stream_id = sid
+ break
edus = [
Edu(
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 0612b82b9b..e45110c4ed 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -569,7 +569,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
@trace
async def get_new_device_msgs_for_remote(
self, destination: str, last_stream_id: int, current_stream_id: int, limit: int
- ) -> Tuple[List[JsonDict], int]:
+ ) -> Tuple[List[Tuple[int, JsonDict]], int]:
"""
Args:
destination: The name of the remote server.
@@ -599,7 +599,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
@trace
def get_new_messages_for_remote_destination_txn(
txn: LoggingTransaction,
- ) -> Tuple[List[JsonDict], int]:
+ ) -> Tuple[List[Tuple[int, JsonDict]], int]:
sql = (
"SELECT stream_id, messages_json FROM device_federation_outbox"
" WHERE destination = ?"
@@ -609,12 +609,12 @@ class DeviceInboxWorkerStore(SQLBaseStore):
)
txn.execute(sql, (destination, last_stream_id, current_stream_id, limit))
- messages = []
+ messages: List[Tuple[int, JsonDict]] = []
stream_pos = current_stream_id
for row in txn:
stream_pos = row[0]
- messages.append(db_to_json(row[1]))
+ messages.append((stream_pos, db_to_json(row[1])))
# If the limit was not reached we know that there's no more data for this
# user/device pair up to current_stream_id. |
@ansiwen Thank you so much for this patch. ArcticFoxes.net has been experiencing inexplicable outbound federation issues with Matrix.org for many months until someone brought this issue to my attention, and your (second) patch finally helped solve the problem. For context for anyone else, look for entries like these in the log:
Cloudflare returns error code 520 due to some sort of issue with the backend server. @turt2live confirmed back in May 2024 that our federation requests were failing at the Matrix.org reverse proxy: Through this GitHub issue, we determined that the reverse proxy for Matrix.org was silently dropping the requests because they were too large.1 At some point the EDUs in a single transaction grow too large (possibly due to a single excessively large EDU, not sure), so Matrix.org drops the request. Since the request is unsuccessful, the failed transaction and successive transactions pile up in the Synapse queue until the maximum of 50 PDUs and 100 EDUs per transaction is reached. Another symptom of this issue is that federation to Matrix.org works properly for a brief period immediately after restarting Synapse. Footnotes
|
This is currently also affecting envs.net, completely preventing any envs.net -> matrix.org communication. Although this doesn't affect viewing content originating from matrix.org. |
We've experienced this on 4d2.org as well, with similar symptoms as described, including federation working very briefly after restarting Synapse - or in our case, the specific federation-sender worker queueing messages for matrix.org. Deleting large messages from device_federation_outbox temporarily fixes the problem, at the cost of some data loss:
|
Just digging into this a bit: matrix-rust-sdk sends to-device messages in batches of up to 250 target devices at a time. Each individual to-device message can be up to about 2500 bytes. The batch is then divided up into target homeservers, and added to Synapse then puts up to 100 of those EDUs in a single Synapse immediately closes the TCP connection when a request exceeds the limit; Cloudflare therefore sees an empty HTTP response, which it turns into a 520 error. |
I wonder if it might be worthwhile to spec a MESSAGE_TOO_LARGE errorcode on federation endpoints so homeservers can indicate how large of a message they are willing to accept? Just throwing ideas in the ring here, though. 60 MiB doesn't sound all too dramatic to me, but i'm sure others would be happy to disagree. |
@TheArcaneBrony I would think setting an agreed maximum length that senders have to stay within and receivers have to accept would be a much simpler solution than some sort of negotiation system. |
I agree in general, but the maximum has to be chosen wisely, because it also implicitly limits the maximum size of a single EDU (or is there already a mechanism to fragment large EDUs?). Your example from above indicates there are "valid" 625kB EDUs. However, in your example it could be easily fixed by creating smaller batches. |
Related: matrix-org/matrix-spec#807 |
After a long quiet time it happened to us again (the patch was not applied anymore because of automatic updates), and now over the holidays it happened again. I could collect some debug logging and it was a burst of large to-device EDUs of size around 500kB each, which resulted in a total transaction size of 32MB which appears to be blocked by cloudflare. Click here to show log
I still don't know where the limit is, but for now I changed my patch from above to drop to-device EDUs that are larger than 1MB, and also have a threshold of 1MB per transaction, which seems to be able to handle all EDUs I am observing without stalling. Here the updated patch for matrix-synapse 1.121.1diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index d097e65ea7..953a1067b6 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -73,6 +73,15 @@ CATCHUP_RETRY_INTERVAL = 60 * 60 * 1000
# they are bounded in size.
MAX_PRESENCE_STATES_PER_EDU = 50
+def _strings_sum(obj):
+ if isinstance(obj, dict):
+ return sum(_strings_sum(key) + _strings_sum(value) for key, value in obj.items())
+ elif isinstance(obj, list):
+ return sum(_strings_sum(item) for item in obj)
+ elif isinstance(obj, str):
+ return len(obj)
+ else:
+ return 0
class PerDestinationQueue:
"""
@@ -642,15 +651,26 @@ class PerDestinationQueue:
async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int]:
last_device_stream_id = self._last_device_stream_id
to_device_stream_id = self._store.get_to_device_stream_token()
- contents, stream_id = await self._store.get_new_device_msgs_for_remote(
+ messages, stream_id = await self._store.get_new_device_msgs_for_remote(
self._destination, last_device_stream_id, to_device_stream_id, limit
)
- for content in contents:
- message_id = content.get("message_id")
- if not message_id:
+ total_size = 0
+ contents: List[JsonDict] = []
+ for sid, content in messages:
+ size = _strings_sum(content)
+ logger.info("to-device-edu-size: %d", size)
+ if size > 1000000:
+ logger.warning("Dropping huge to-device-message: %s", str(content)[:5000])
continue
-
- set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id)
+ message_id = content.get("message_id")
+ if message_id:
+ set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id)
+ total_size += size
+ contents.append(content)
+ if total_size > 1000000:
+ logger.warning("to-device messages are too large (size: %d), defer rest to next transaction.", total_size)
+ stream_id = sid
+ break
edus = [
Edu(
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 0612b82b9b..e45110c4ed 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -569,7 +569,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
@trace
async def get_new_device_msgs_for_remote(
self, destination: str, last_stream_id: int, current_stream_id: int, limit: int
- ) -> Tuple[List[JsonDict], int]:
+ ) -> Tuple[List[Tuple[int, JsonDict]], int]:
"""
Args:
destination: The name of the remote server.
@@ -599,7 +599,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
@trace
def get_new_messages_for_remote_destination_txn(
txn: LoggingTransaction,
- ) -> Tuple[List[JsonDict], int]:
+ ) -> Tuple[List[Tuple[int, JsonDict]], int]:
sql = (
"SELECT stream_id, messages_json FROM device_federation_outbox"
" WHERE destination = ?"
@@ -609,12 +609,12 @@ class DeviceInboxWorkerStore(SQLBaseStore):
)
txn.execute(sql, (destination, last_stream_id, current_stream_id, limit))
- messages = []
+ messages: List[Tuple[int, JsonDict]] = []
stream_pos = current_stream_id
for row in txn:
stream_pos = row[0]
- messages.append(db_to_json(row[1]))
+ messages.append((stream_pos, db_to_json(row[1])))
# If the limit was not reached we know that there's no more data for this
# user/device pair up to current_stream_id. |
We recently had the same issue at https://unredacted.org/ and that query resolved the issue. |
@lunarthegrey FYI: If you apply my patch and restart the federation sender it should be resolved without data loss. |
Description
Hi
This will be a vague bug report so I'm hoping someone will come with a "aha" moment when reading this.
For the past week my homeserver has been unable to send outgoing federation messages towards matrix.org. Initially after starting synapse some messages go through but after a few seconds it stops and goes into a retry loop.
During these retries matrix.org , or rather cloudflare, returns a http error of 520.
Upon further inspection i managed to narrow it down to a m.direct_to_device (very large json object) being posted by a single user.
After deleting the event from device_federation_outbox everything worked again.
My theory is that the object was too large so matrix.org/cloudflare threw an error and Synapse just kept retrying.
If this is correct is there a bug in Synapse where it should somehow split this into multiple requests?
Or is it a matrix.org bug that has a low limit on requests size?
Attaching the deleted event.
Thank you
bad_edu.log
Steps to reproduce
Homeserver
xmr.se
Synapse Version
Multiple tested, 1.99 and up. Now 1.103.0
Installation Method
pip (from PyPI)
Database
postgresql. Single, no, no
Workers
Multiple workers
Platform
Not relevant.
Configuration
No response
Relevant log output
2024-03-25 19:22:26,936 - synapse.http.matrixfederationclient - 755 - INFO - federation_transaction_transmission_loop-24 - {PUT-O-29} [matrix.org] Got response headers: 520 2024-03-25 19:22:26,936 - synapse.http.matrixfederationclient - 798 - INFO - federation_transaction_transmission_loop-24 - {PUT-O-29} [matrix.org] Request failed: PUT matrix-federation://matrix.org/_matrix/federation/v1/send/1711389457952: HttpResponseException('520: ')
Anything else that would be useful to know?
No response
The text was updated successfully, but these errors were encountered: