Skip to content

Commit 419ee66

Browse files
committed
Add resolveSeqNo method and track temporary seqNo shifts in TopicWriter
- Add resolveSeqNo() method to get final seqNo for messages written before session initialization - Track seqNo shifts through SeqNoShiftEvent segments when session reconnects - Update write() JSDoc to warn about temporary seqNo values before session initialization - Implement efficient seqNo shift tracking using range merging and inversion algorithms - Update flush() documentation to clarify when seqNo values become final Breaking changes: None (backward compatible)
1 parent 46eff19 commit 419ee66

File tree

7 files changed

+642
-82
lines changed

7 files changed

+642
-82
lines changed

packages/topic/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@
3030
"types": "dist/index.d.ts",
3131
"exports": {
3232
".": "./dist/index.js",
33+
"./codec": "./dist/codec.js",
3334
"./reader": "./dist/reader/index.js",
3435
"./writer": "./dist/writer/index.js",
35-
"./writer2": "./dist/writer2/index.js"
36+
"./writer2": "./dist/writer2/index.js",
37+
"./message": "./dist/message.js"
3638
},
3739
"engines": {
3840
"node": ">=20.19.0",

packages/topic/src/index.ts

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,41 @@
1-
import { Driver } from "@ydbjs/core";
1+
import { Driver } from '@ydbjs/core'
22

3-
import { type TopicReader, type TopicReaderOptions, type TopicTxReader, createTopicReader, createTopicTxReader } from "./reader/index.js";
4-
import type { TX } from "./tx.js";
5-
import { type TopicTxWriter, type TopicWriter, type TopicWriterOptions, createTopicTxWriter, createTopicWriter } from "./writer/index.js";
3+
import {
4+
type TopicReader,
5+
type TopicReaderOptions,
6+
type TopicTxReader,
7+
createTopicReader,
8+
createTopicTxReader,
9+
} from './reader/index.js'
10+
import type { TX } from './tx.js'
11+
import {
12+
type TopicTxWriter,
13+
type TopicWriter,
14+
type TopicWriterOptions,
15+
createTopicTxWriter,
16+
createTopicWriter,
17+
} from './writer/index.js'
618

719
export interface TopicClient {
8-
createReader(options: TopicReaderOptions): TopicReader;
9-
createTxReader(tx: TX, options: TopicReaderOptions): TopicTxReader;
10-
createWriter(options: TopicWriterOptions): TopicWriter;
11-
createTxWriter(tx: TX, options: TopicWriterOptions): TopicTxWriter;
20+
createReader(options: TopicReaderOptions): TopicReader
21+
createTxReader(tx: TX, options: TopicReaderOptions): TopicTxReader
22+
createWriter(options: TopicWriterOptions): TopicWriter
23+
createTxWriter(tx: TX, options: TopicWriterOptions): TopicTxWriter
1224
}
1325

1426
export function topic(driver: Driver): TopicClient {
1527
return {
1628
createReader(options) {
17-
return createTopicReader(driver, options);
29+
return createTopicReader(driver, options)
1830
},
1931
createTxReader(tx: TX, options: TopicReaderOptions) {
20-
return createTopicTxReader(tx, driver, options);
32+
return createTopicTxReader(tx, driver, options)
2133
},
2234
createWriter(options: TopicWriterOptions) {
23-
return createTopicWriter(driver, options);
35+
return createTopicWriter(driver, options)
2436
},
2537
createTxWriter(tx: TX, options: Omit<TopicWriterOptions, 'tx'>) {
26-
return createTopicTxWriter(tx, driver, options);
38+
return createTopicTxWriter(tx, driver, options)
2739
},
2840
} as TopicClient
2941
}
30-
31-
export type { TopicTxReader } from './reader/index.js';
32-
export type { TopicTxWriter } from './writer/index.js';

packages/topic/src/writer2/machine.ts

Lines changed: 206 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,14 @@ import { isRetryableError } from '@ydbjs/retry'
3737
import { assign, enqueueActions, sendTo, setup } from 'xstate'
3838
import { defaultCodecMap } from '../codec.js'
3939
import { WriterStream, type WriterStreamReceiveEvent } from './stream.js'
40-
import type { TopicWriterOptions, WriterContext, WriterEmitted, WriterEvents, WriterInput } from './types.js'
40+
import type {
41+
SeqNoShiftEvent,
42+
TopicWriterOptions,
43+
WriterContext,
44+
WriterEmitted,
45+
WriterEvents,
46+
WriterInput,
47+
} from './types.js'
4148
import { loggers } from '@ydbjs/debug'
4249

4350
// ============================================================================
@@ -259,24 +266,196 @@ let writerMachineFactory = setup({
259266
// ====================================================================
260267

261268
/**
262-
* Updates session state after receiving an init response.
263-
* Emits a session event with session ID and last sequence number.
269+
* Updates the writer context after receiving STREAM_WRITE_SESSION_INIT.
270+
* Common steps for both seqNo modes:
271+
* - Determine the new `inflightStart` using `serverLastSeqNo`
272+
* - Leave all unsent messages in place while keeping their original order
264273
*
265-
* @param enqueue - Enqueue function for scheduling actions
266-
* @param event - Init response event containing session details
274+
* Mode specific logic:
275+
* - manual: perform a single pass over `[inflight, buffer)`; drop messages with `seqNo <= serverLastSeqNo`
276+
* (already persisted on the server), compact the window, and update counters without changing seqNo values.
277+
* - auto: after the same pass, renumber every message whose seqNo may shift and emit `SeqNoShiftEvent`
278+
* segments so `TopicWriter.resolveSeqNo()` can map initial numbers to the final ones.
279+
*
280+
* @param enqueue - XState enqueue helper for scheduling actions
281+
* @param event - init response with session metadata
282+
* @param context - current state machine context
267283
*/
268-
updateWriteSession: enqueueActions(({ enqueue, event }) => {
284+
updateWriteSession: enqueueActions(({ enqueue, event, context }) => {
269285
assert.ok(event.type === 'writer.stream.response.init')
270286
assert.ok(event.data)
271287

288+
let lastSeqNo = event.data.lastSeqNo || 0n
289+
let nextSeqNo = lastSeqNo + 1n
290+
291+
// --------------------------------------------------------------------
292+
// 1. Подсчитываем подтверждённые сообщения и новое начало окна inflight
293+
// Это позволяет дальше просто сдвигать указатели без пересоздания массивов
294+
// --------------------------------------------------------------------
295+
let inflightStartIndex = context.inflightStart
296+
let inflightEndIndex = context.inflightStart + context.inflightLength
297+
let bufferEndIndex = context.bufferStart + context.bufferLength
298+
299+
if (context.seqNoMode === 'manual') {
300+
let writeIndex = inflightStartIndex
301+
let acknowledgedSize = 0n
302+
let pendingCount = 0
303+
let pendingSize = 0n
304+
let bufferKeptCount = 0
305+
let skippedSize = 0n
306+
let bufferSize = context.bufferSize
307+
308+
for (let i = inflightStartIndex; i < bufferEndIndex; i++) {
309+
let message = context.messages[i]
310+
if (!message) continue
311+
312+
let messageSize = BigInt(message.data.length)
313+
314+
if (i < inflightEndIndex) {
315+
if (message.seqNo <= lastSeqNo) {
316+
acknowledgedSize += messageSize
317+
continue
318+
}
319+
320+
pendingCount++
321+
pendingSize += messageSize
322+
} else {
323+
if (message.seqNo <= lastSeqNo) {
324+
skippedSize += messageSize
325+
bufferSize -= messageSize
326+
continue
327+
}
328+
329+
bufferKeptCount++
330+
}
331+
332+
if (writeIndex !== i) {
333+
context.messages[writeIndex] = message
334+
}
335+
writeIndex++
336+
}
337+
338+
let newBufferStart = inflightStartIndex
339+
let bufferLength = pendingCount + bufferKeptCount
340+
let inflightSize = context.inflightSize - (acknowledgedSize + pendingSize)
341+
let garbageSize = context.garbageSize + acknowledgedSize + skippedSize
342+
let newBufferSize = bufferSize + pendingSize
343+
344+
enqueue.assign({
345+
sessionId: event.data.sessionId,
346+
inflightStart: newBufferStart,
347+
inflightLength: 0,
348+
inflightSize,
349+
bufferStart: newBufferStart,
350+
bufferLength,
351+
bufferSize: newBufferSize,
352+
garbageSize,
353+
})
354+
355+
enqueue.emit(() => ({
356+
type: 'writer.session',
357+
sessionId: event.data.sessionId,
358+
lastSeqNo,
359+
nextSeqNo: lastSeqNo + 1n,
360+
}))
361+
362+
return
363+
}
364+
365+
let firstPendingIndex = inflightEndIndex
366+
let acknowledgedSize = 0n
367+
let pendingCount = 0
368+
let pendingSize = 0n
369+
370+
for (let i = inflightStartIndex; i < inflightEndIndex; i++) {
371+
let message = context.messages[i]
372+
if (!message) continue
373+
374+
if (firstPendingIndex === inflightEndIndex && message.seqNo > lastSeqNo) {
375+
firstPendingIndex = i
376+
}
377+
378+
if (i < firstPendingIndex) {
379+
acknowledgedSize += BigInt(message.data.length)
380+
} else {
381+
pendingCount++
382+
pendingSize += BigInt(message.data.length)
383+
}
384+
}
385+
386+
let newBufferStart = firstPendingIndex
387+
388+
let seqNoShifts: SeqNoShiftEvent[] = []
389+
let currentShiftStart: bigint | null = null
390+
let currentShiftDelta: bigint | null = null
391+
let currentShiftCount = 0
392+
393+
let flushCurrentShift = () => {
394+
if (currentShiftStart !== null && currentShiftDelta !== null && currentShiftCount > 0) {
395+
seqNoShifts.push({
396+
startOld: currentShiftStart,
397+
count: currentShiftCount,
398+
delta: currentShiftDelta,
399+
})
400+
}
401+
currentShiftStart = null
402+
currentShiftDelta = null
403+
currentShiftCount = 0
404+
}
405+
406+
for (let i = firstPendingIndex; i < bufferEndIndex; i++) {
407+
let message = context.messages[i]
408+
if (!message) continue
409+
410+
let oldSeqNo = message.seqNo
411+
let newSeqNo = nextSeqNo
412+
nextSeqNo++
413+
414+
if (oldSeqNo !== newSeqNo) {
415+
let delta = newSeqNo - oldSeqNo
416+
if (
417+
currentShiftStart !== null &&
418+
currentShiftDelta === delta &&
419+
oldSeqNo === currentShiftStart + BigInt(currentShiftCount)
420+
) {
421+
currentShiftCount++
422+
} else {
423+
flushCurrentShift()
424+
currentShiftStart = oldSeqNo
425+
currentShiftDelta = delta
426+
currentShiftCount = 1
427+
}
428+
} else {
429+
flushCurrentShift()
430+
}
431+
432+
message.seqNo = newSeqNo
433+
}
434+
435+
flushCurrentShift()
436+
437+
let inflightSize = context.inflightSize - acknowledgedSize - pendingSize
438+
let bufferSize = context.bufferSize + pendingSize
439+
let garbageSize = context.garbageSize + acknowledgedSize
440+
let bufferLength = pendingCount + context.bufferLength
441+
272442
enqueue.assign({
273443
sessionId: event.data.sessionId,
444+
inflightStart: newBufferStart,
445+
inflightLength: 0,
446+
inflightSize,
447+
bufferStart: newBufferStart,
448+
bufferLength,
449+
bufferSize,
450+
garbageSize,
274451
})
275452

276453
enqueue.emit(() => ({
277454
type: 'writer.session',
278455
sessionId: event.data.sessionId,
279-
lastSeqNo: event.data.lastSeqNo || 0n,
456+
lastSeqNo: lastSeqNo,
457+
nextSeqNo,
458+
...(seqNoShifts.length ? { seqNoShifts } : {}),
280459
}))
281460
}),
282461

@@ -304,7 +483,9 @@ let writerMachineFactory = setup({
304483
if (context.inflightLength >= context.options.maxInflightCount!) {
305484
enqueue.emit(() => ({
306485
type: 'writer.error',
307-
error: new Error('Internal Error: Max inflight messages limit reached. If you see this error, please report it.'),
486+
error: new Error(
487+
'Internal Error: Max inflight messages limit reached. If you see this error, please report it.'
488+
),
308489
}))
309490

310491
return
@@ -440,7 +621,6 @@ let writerMachineFactory = setup({
440621
})
441622
}
442623

443-
444624
// @ts-ignore
445625
enqueue({ type: 'log', params: { message: 'ACK | {stats}' } })
446626
}),
@@ -458,7 +638,9 @@ let writerMachineFactory = setup({
458638
if (event.message.data.length > MAX_PAYLOAD_SIZE) {
459639
enqueue.emit(() => ({
460640
type: 'writer.error',
461-
error: new Error('Internal Error: Payload size exceeds 48MiB limit. If you see this error, please report it.'),
641+
error: new Error(
642+
'Internal Error: Payload size exceeds 48MiB limit. If you see this error, please report it.'
643+
),
462644
}))
463645

464646
return
@@ -471,6 +653,10 @@ let writerMachineFactory = setup({
471653
}))
472654
let uncompressedSize = BigInt(event.message.data.length)
473655

656+
// Track seqNo mode (set once on first message, then remains constant)
657+
// Mode is passed from TopicWriter which knows it from SeqNoManager
658+
let seqNoMode: 'auto' | 'manual' | null = context.seqNoMode ?? event.seqNoMode ?? null
659+
474660
let message = create(StreamWriteMessage_WriteRequest_MessageDataSchema, {
475661
data: event.message.data,
476662
seqNo: event.message.seqNo,
@@ -483,8 +669,9 @@ let writerMachineFactory = setup({
483669
context.messages.push(message)
484670

485671
enqueue.assign(({ context }) => ({
672+
seqNoMode,
486673
bufferSize: context.bufferSize + BigInt(event.message.data.length),
487-
bufferLength: context.bufferLength + 1
674+
bufferLength: context.bufferLength + 1,
488675
}))
489676

490677
//@ts-ignore
@@ -531,6 +718,7 @@ let writerMachineFactory = setup({
531718
releaseResources: assign(() => {
532719
return {
533720
messages: [],
721+
seqNoMode: null,
534722
bufferStart: 0,
535723
bufferLength: 0,
536724
inflightStart: 0,
@@ -694,6 +882,7 @@ export const WriterMachine = writerMachineFactory.createMachine({
694882

695883
// Single array approach with sliding window
696884
messages: [],
885+
seqNoMode: null,
697886
bufferStart: 0,
698887
bufferLength: 0,
699888
inflightStart: 0,
@@ -716,12 +905,12 @@ export const WriterMachine = writerMachineFactory.createMachine({
716905
on: {
717906
'writer.close': {
718907
target: '.closing',
719-
actions: [log('CLS | {topicPath}')]
908+
actions: [log('CLS | {topicPath}')],
720909
},
721910
'writer.destroy': {
722911
// Force close, skip graceful shutdown
723912
target: '.closed',
724-
actions: [log('DST | {topicPath}')]
913+
actions: [log('DST | {topicPath}')],
725914
},
726915
'writer.stream.error': {
727916
// Enter error state on stream error
@@ -738,8 +927,8 @@ export const WriterMachine = writerMachineFactory.createMachine({
738927
idle: {
739928
always: {
740929
target: 'connecting',
741-
actions: [log('INT | {topicPath}')]
742-
}
930+
actions: [log('INT | {topicPath}')],
931+
},
743932
},
744933
/**
745934
* Connecting state: Establishes connection to the topic stream.
@@ -941,11 +1130,7 @@ export const WriterMachine = writerMachineFactory.createMachine({
9411130
closed: {
9421131
// All resources are released in this final state
9431132
type: 'final',
944-
entry: [
945-
'closeConnection',
946-
'releaseResources',
947-
log('FIN | {stats}'),
948-
],
949-
}
1133+
entry: ['closeConnection', 'releaseResources', log('FIN | {stats}')],
1134+
},
9501135
},
9511136
})

0 commit comments

Comments
 (0)