Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Running update concurrently #39

Open
fcollonval opened this issue Nov 10, 2022 · 10 comments
Open

Running update concurrently #39

fcollonval opened this issue Nov 10, 2022 · 10 comments

Comments

@fcollonval
Copy link
Collaborator

fcollonval commented Nov 10, 2022

The publication of update is done task at a time:

for client in [c for c in room.clients if c != websocket]:
# clients may have disconnected but not yet removed from the room
# ignore them and continue forwarding to other clients
try:
await client.send(message)
except Exception:
pass
# update our internal state
update = await process_message(message, room.ydoc, websocket)
if room.ystore and update:
await room.ystore.write(update)

We should do those update concurrently in a asyncio.gather to get them as quickly as possible and not be stuck by a slow client.

Moreover we should update with higher priority the document because it is the reference to be uploaded by any new clients before receiving deltas.

I think that part of the code is partly responsible for a data lost case seen when 20 people were collaborating simultaneously. What happen is some clients (cannot know if all) were still receiving the deltas. But the file on disk (that is a regular dump of the in-memory ydoc) stops updated. And if a new client connects, the document at the latest dump version was the one pushed.

My best guess (unfortunately we did not see any error) is that one client was blocking and so the code after the client loop was never executed.

cc: @davidbrochart @hbcarlos

@davidbrochart
Copy link
Collaborator

These are very good points @fcollonval, thanks for pointing that out.
I'll open a PR for that. It also goes in the same direction as #38.
cc @ellisonbg

@SylvainCorlay
Copy link
Collaborator

SylvainCorlay commented Nov 11, 2022

I would go even further than what @fcollonval is describing. I don't think that we even need to gather at all between updates to the peers and saving to the backend. We could do:

async for message in websocket:
    await self._process_queue.put(message)
    await self._broadcast_queue.put(message)

and then have two completely independent asyncio tasks,

  • One processing _process_queue.
    message = await self._process_queue.get()
    await process_message(message, room.ydoc, websocket)
  • One forwarding updates to the peers.

In doing so, we will still prevent one of the two tasks from stopping if the other one is (momentarily) choking on updates and having a longer queue.

Another thing we could experiment with is to synchronously use put_nowait instead of put. If queue.Full is raised for one of the two Queues, this means that it is not beeing emptied quickly enough and the server is choking on Y updates. This may be a good way to detect a performance issue in either of those tasks.

@afshin

@ellisonbg
Copy link

Great discussion! I like the idea of @SylvainCorlay of having two async queues for processing and broadcasting the messages. Another concern that I have is the presence of blocking calls in the server or server extensions. Those would quickly slow everything down. We should probably spend some time profiling the server to understand where blocking calls are happening.

@davidbrochart
Copy link
Collaborator

Another concern that I have is the presence of blocking calls in the server or server extensions.

It could be interesting to compare with Jupyverse. In particular, the file ID service will be fully async, while jupyter-server-fileid is not async.

@davidbrochart
Copy link
Collaborator

Yet another alternative to @SylvainCorlay's proposal is to just create background tasks for processing messages, to update our internal state and to broadcast to clients. They would be like "fire and forget" tasks.
Indeed, the idea behind putting the updates in a queue before processing them seems to be that they have to be processed in order, but one of the characteristics of CRDTs is that they can be processed out of order.

@davidbrochart
Copy link
Collaborator

I made these changes part of #38.

@davidbrochart
Copy link
Collaborator

In @SylvainCorlay's proposal, a task consumes the _broadcast_queue and forwards the updates to the peers, I suppose sequentially? If so, then we have the same potential issue of one slow client slowing down the whole broadcast. To solve that, we would need to forward the updates to the peers in individual tasks, which is what I did in #38.
Let me know if I'm missing something.

@SylvainCorlay
Copy link
Collaborator

Would Websocket.send block the processing of the broadcast queue for a slow client?

@davidbrochart
Copy link
Collaborator

davidbrochart commented Nov 15, 2022

ypy-websocket is agnostic to the WebSocket implementation.
In jupyverse we use websockets, where send is async, which suggests that it's not that fast, maybe checking for connection and eventually timing out. So I don't think queuing calls to send is a good idea.
In jupyter-server we use Tornado's WebSocket implementation, where write_message is not async. I don't know if they use threads under the hood.
In both implementation, I don't think it's safe to assume that sending data on a WebSocket is fast.

@SylvainCorlay
Copy link
Collaborator

In jupyverse we use websockets, where send is async, which suggests that it's not that fast, maybe checking for connection and eventually timing out.

OK, I did not know that send could potentially be slow. Thanks for the explanation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants