Skip to content

Commit a50923b

Browse files
authored
Improve documentation around streams, particularly ID generators and adding new streams. (#18943)
This arises mostly from my recent experience adding a stream for Thread Subscriptions and trying to help others add their own streams. --------- Signed-off-by: Olivier 'reivilibre <[email protected]>
1 parent 378c5c8 commit a50923b

File tree

4 files changed

+66
-10
lines changed

4 files changed

+66
-10
lines changed

changelog.d/18943.doc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Improve documentation around streams, particularly ID generators and adding new streams.

docs/development/synapse_architecture/streams.md

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
## Streams
1+
# Streams
22

33
Synapse has a concept of "streams", which are roughly described in [`id_generators.py`](
44
https://github.com/element-hq/synapse/blob/develop/synapse/storage/util/id_generators.py
@@ -19,7 +19,7 @@ To that end, let's describe streams formally, paraphrasing from the docstring of
1919
https://github.com/element-hq/synapse/blob/a719b703d9bd0dade2565ddcad0e2f3a7a9d4c37/synapse/storage/util/id_generators.py#L96
2020
).
2121

22-
### Definition
22+
## Definition
2323

2424
A stream is an append-only log `T1, T2, ..., Tn, ...` of facts[^1] which grows over time.
2525
Only "writers" can add facts to a stream, and there may be multiple writers.
@@ -47,7 +47,7 @@ But unhappy cases (e.g. transaction rollback due to an error) also count as comp
4747
Once completed, the rows written with that stream ID are fixed, and no new rows
4848
will be inserted with that ID.
4949

50-
### Current stream ID
50+
## Current stream ID
5151

5252
For any given stream reader (including writers themselves), we may define a per-writer current stream ID:
5353

@@ -93,7 +93,7 @@ Consider a single-writer stream which is initially at ID 1.
9393
| Complete 6 | 6 | |
9494

9595

96-
### Multi-writer streams
96+
## Multi-writer streams
9797

9898
There are two ways to view a multi-writer stream.
9999

@@ -115,7 +115,7 @@ The facts this stream holds are instructions to "you should now invalidate these
115115
We only ever treat this as a multiple single-writer streams as there is no important ordering between cache invalidations.
116116
(Invalidations are self-contained facts; and the invalidations commute/are idempotent).
117117

118-
### Writing to streams
118+
## Writing to streams
119119

120120
Writers need to track:
121121
- track their current position (i.e. its own per-writer stream ID).
@@ -133,7 +133,7 @@ To complete a fact, first remove it from your map of facts currently awaiting co
133133
Then, if no earlier fact is awaiting completion, the writer can advance its current position in that stream.
134134
Upon doing so it should emit an `RDATA` message[^3], once for every fact between the old and the new stream ID.
135135

136-
### Subscribing to streams
136+
## Subscribing to streams
137137

138138
Readers need to track the current position of every writer.
139139

@@ -146,10 +146,44 @@ The `RDATA` itself is not a self-contained representation of the fact;
146146
readers will have to query the stream tables for the full details.
147147
Readers must also advance their record of the writer's current position for that stream.
148148

149-
# Summary
149+
## Summary
150150

151151
In a nutshell: we have an append-only log with a "buffer/scratchpad" at the end where we have to wait for the sequence to be linear and contiguous.
152152

153+
---
154+
155+
## Cheatsheet for creating a new stream
156+
157+
These rough notes and links may help you to create a new stream and add all the
158+
necessary registration and event handling.
159+
160+
**Create your stream:**
161+
- [create a stream class and stream row class](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/streams/_base.py#L728)
162+
- will need an [ID generator](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L75)
163+
- may need [writer configuration](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/config/workers.py#L177), if there isn't already an obvious source of configuration for which workers should be designated as writers to your new stream.
164+
- if adding new writer configuration, add Docker-worker configuration, which lets us configure the writer worker in Complement tests: [[1]](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/docker/configure_workers_and_start.py#L331), [[2]](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/docker/configure_workers_and_start.py#L440)
165+
- most of the time, you will likely introduce a new datastore class for the concept represented by the new stream, unless there is already an obvious datastore that covers it.
166+
- consider whether it may make sense to introduce a handler
167+
168+
**Register your stream in:**
169+
- [`STREAMS_MAP`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/streams/__init__.py#L71)
170+
171+
**Advance your stream in:**
172+
- [`process_replication_position` of your appropriate datastore](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L111)
173+
- don't forget the super call
174+
175+
**If you're going to do any caching that needs invalidation from new rows:**
176+
- add invalidations to [`process_replication_rows` of your appropriate datastore](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L91)
177+
- don't forget the super call
178+
- add local-only [invalidations to your writer transactions](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/storage/databases/main/thread_subscriptions.py#L201)
179+
180+
**For streams to be used in sync:**
181+
- add a new field to [`StreamToken`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/types/__init__.py#L1003)
182+
- add a new [`StreamKeyType`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/types/__init__.py#L999)
183+
- add appropriate wake-up rules
184+
- in [`on_rdata`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/replication/tcp/client.py#L260)
185+
- locally on the same worker when completing a write, [e.g. in your handler](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/handlers/thread_subscriptions.py#L139)
186+
- add the stream in [`bound_future_token`](https://github.com/element-hq/synapse/blob/4367fb2d078c52959aeca0fe6874539c53e8360d/synapse/streams/events.py#L127)
153187

154188
---
155189

synapse/storage/database.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ def call_after(
322322
self, callback: Callable[P, object], *args: P.args, **kwargs: P.kwargs
323323
) -> None:
324324
"""Call the given callback on the main twisted thread after the transaction has
325-
finished.
325+
finished successfully.
326326
327327
Mostly used to invalidate the caches on the correct thread.
328328
@@ -343,7 +343,7 @@ def async_call_after(
343343
self, callback: Callable[P, Awaitable], *args: P.args, **kwargs: P.kwargs
344344
) -> None:
345345
"""Call the given asynchronous callback on the main twisted thread after
346-
the transaction has finished (but before those added in `call_after`).
346+
the transaction has finished successfully (but before those added in `call_after`).
347347
348348
Mostly used to invalidate remote caches after transactions.
349349

synapse/storage/util/id_generators.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,8 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
175175
Uses a Postgres sequence to coordinate ID assignment, but positions of other
176176
writers will only get updated when `advance` is called (by replication).
177177
178-
Note: Only works with Postgres.
178+
On SQLite, falls back to a single-writer implementation, which is fine because
179+
Synapse only supports monolith mode when SQLite is the database driver.
179180
180181
Warning: Streams using this generator start at ID 2, because ID 1 is always assumed
181182
to have been 'seen as persisted'.
@@ -536,6 +537,16 @@ def get_next_mult(self, n: int) -> AsyncContextManager[list[int]]:
536537

537538
def get_next_txn(self, txn: LoggingTransaction) -> int:
538539
"""
540+
Generate an ID for immediate use within a database transaction.
541+
542+
The ID will automatically be marked as finished at the end of the
543+
database transaction, therefore the stream rows MUST be persisted
544+
within the active transaction (MUST NOT be persisted in a later
545+
transaction).
546+
547+
The replication notifier will automatically be notified when the
548+
transaction ends successfully.
549+
539550
Usage:
540551
541552
stream_id = stream_id_gen.get_next_txn(txn)
@@ -573,6 +584,16 @@ def get_next_txn(self, txn: LoggingTransaction) -> int:
573584

574585
def get_next_mult_txn(self, txn: LoggingTransaction, n: int) -> list[int]:
575586
"""
587+
Generate multiple IDs for immediate use within a database transaction.
588+
589+
The IDs will automatically be marked as finished at the end of the
590+
database transaction, therefore the stream rows MUST be persisted
591+
within the active transaction (MUST NOT be persisted in a later
592+
transaction).
593+
594+
The replication notifier will automatically be notified when the
595+
transaction ends successfully.
596+
576597
Usage:
577598
578599
stream_id = stream_id_gen.get_next_txn(txn)

0 commit comments

Comments
 (0)