Skip to content

Commit e9c3f78

Browse files
committed
Remove write() return value and simplify TopicWriter API
- Fix seqNo renumbering bug: messages written before session initialization are now properly renumbered after receiving lastSeqNo from server - Remove return value from write() method (now returns void) to simplify API - Remove resolveSeqNo() method and related seqNo shift tracking infrastructure - Update tests to remove assertions on write() return values - Add changeset describing bug fix and API simplification
1 parent 3873268 commit e9c3f78

File tree

11 files changed

+52
-558
lines changed

11 files changed

+52
-558
lines changed

.changeset/heavy-facts-happen.md

Lines changed: 0 additions & 22 deletions
This file was deleted.
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
---
2+
'@ydbjs/topic': minor
3+
---
4+
5+
Fix seqNo renumbering bug and simplify TopicWriter API.
6+
7+
**Bug fix:**
8+
9+
- Fixed issue where messages written before session initialization were not renumbered after receiving `lastSeqNo` from server. Previously, auto-generated seqNo started from 0 and were not updated when server provided actual `lastSeqNo`, causing seqNo conflicts. Now messages are properly renumbered to continue from server's `lastSeqNo + 1`.
10+
11+
**API changes:**
12+
13+
- `TopicWriter.write()` no longer returns sequence number (now returns `void`) to simplify API and prevent confusion about temporary vs final seqNo values
14+
15+
**Migration guide:**
16+
17+
- If you were storing seqNo from `write()` return value, use `flush()` instead to get final seqNo:
18+
19+
```typescript
20+
// Before
21+
let seqNo = writer.write(data)
22+
23+
// After
24+
writer.write(data)
25+
let lastSeqNo = await writer.flush() // Get final seqNo
26+
```
27+
28+
- User-provided seqNo (via `extra.seqNo`) remain final and unchanged - no migration needed for this case.

packages/topic/src/writer2/machine.ts

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import { isRetryableError } from '@ydbjs/retry'
3737
import { assign, enqueueActions, sendTo, setup } from 'xstate'
3838
import { defaultCodecMap } from '../codec.js'
3939
import { WriterStream, type WriterStreamReceiveEvent } from './stream.js'
40-
import { SeqNoShiftBuilder } from './seqno-shift-builder.js'
4140
import type { TopicWriterOptions, WriterContext, WriterEmitted, WriterEvents, WriterInput } from './types.js'
4241
import { loggers } from '@ydbjs/debug'
4342

@@ -268,8 +267,7 @@ let writerMachineFactory = setup({
268267
*
269268
* Mode-specific behaviour:
270269
* - Manual seqNo: compact the window, update bookkeeping, keep user-provided seqNo as-is
271-
* - Auto seqNo: compact the window, renumber remaining messages, emit `SeqNoShift` segments so
272-
* `TopicWriter.resolveSeqNo()` can translate temporary numbers into the final ones
270+
* - Auto seqNo: compact the window, renumber remaining messages sequentially
273271
*
274272
* @param enqueue - XState enqueue helper for scheduling actions
275273
* @param event - init response with session metadata
@@ -375,23 +373,15 @@ let writerMachineFactory = setup({
375373

376374
let newBufferStart = firstPendingIndex
377375

378-
let shiftBuilder = new SeqNoShiftBuilder()
379-
380376
// Renumber the remaining messages sequentially so we continue where the server left off.
381377
for (let i = firstPendingIndex; i < bufferEndIndex; i++) {
382378
let message = context.messages[i]
383379
if (!message) continue
384380

385-
let oldSeqNo = message.seqNo
386-
let newSeqNo = nextSeqNo
381+
message.seqNo = nextSeqNo
387382
nextSeqNo++
388-
389-
shiftBuilder.addShift(oldSeqNo, newSeqNo)
390-
message.seqNo = newSeqNo
391383
}
392384

393-
let seqNoShifts = shiftBuilder.build()
394-
395385
let inflightSize = context.inflightSize - acknowledgedSize - pendingSize
396386
let bufferSize = context.bufferSize + pendingSize
397387
let garbageSize = context.garbageSize + acknowledgedSize
@@ -413,7 +403,6 @@ let writerMachineFactory = setup({
413403
sessionId: event.data.sessionId,
414404
lastSeqNo: lastSeqNo,
415405
nextSeqNo,
416-
...(seqNoShifts.length ? { seqNoShifts } : {}),
417406
}))
418407
}),
419408

packages/topic/src/writer2/seqno-resolver.test.ts

Lines changed: 0 additions & 87 deletions
This file was deleted.

packages/topic/src/writer2/seqno-resolver.ts

Lines changed: 0 additions & 100 deletions
This file was deleted.

0 commit comments

Comments
 (0)