Skip to content

Commit a33d8b1

Browse files
committed
Fix seqNo renumbering when messages written before session initialization
- Remove lastSeqNo update from _flush() - it was updating before messages were actually sent to server - Fix renumbering logic in _on_init_response to properly handle messages written before init - Check if messages in buffer need renumbering by comparing their seqNo with serverLastSeqNo - Never renumber messages if user provided seqNo (manual mode) - Update lastSeqNo only when session is initialized or new message is written, not on ACKs
1 parent e9c3f78 commit a33d8b1

File tree

7 files changed

+240
-101
lines changed

7 files changed

+240
-101
lines changed

.changeset/remove-write-seqno-return.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22
'@ydbjs/topic': minor
33
---
44

5-
Fix seqNo renumbering bug and simplify TopicWriter API.
5+
Fix seqNo renumbering bug in both writer implementations and simplify TopicWriter API.
66

77
**Bug fix:**
88

99
- Fixed issue where messages written before session initialization were not renumbered after receiving `lastSeqNo` from server. Previously, auto-generated seqNo started from 0 and were not updated when server provided actual `lastSeqNo`, causing seqNo conflicts. Now messages are properly renumbered to continue from server's `lastSeqNo + 1`.
10+
- Fixed in both `writer` (legacy) and `writer2` implementations
1011

1112
**API changes:**
1213

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,37 @@
1-
import type { StreamWriteMessage_FromClient, StreamWriteMessage_WriteRequest_MessageData } from "@ydbjs/api/topic";
2-
import type { CompressionCodec } from "../codec.js";
3-
import type { AsyncPriorityQueue } from "../queue.js";
4-
import type { TX } from "../tx.js";
5-
import { _batch_messages } from "./_batch_messages.js";
6-
import { _emit_write_request } from "./_write_request.js";
7-
import type { ThroughputSettings } from "./types.js";
1+
import type { StreamWriteMessage_FromClient, StreamWriteMessage_WriteRequest_MessageData } from '@ydbjs/api/topic'
2+
import type { CompressionCodec } from '../codec.js'
3+
import type { AsyncPriorityQueue } from '../queue.js'
4+
import type { TX } from '../tx.js'
5+
import { _batch_messages } from './_batch_messages.js'
6+
import { _emit_write_request } from './_write_request.js'
7+
import type { ThroughputSettings } from './types.js'
88

99
export const _flush = function flush(ctx: {
1010
readonly tx?: TX
11-
readonly queue: AsyncPriorityQueue<StreamWriteMessage_FromClient>,
12-
readonly codec: CompressionCodec, // Codec to use for compression
13-
readonly buffer: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages in the buffer
14-
readonly inflight: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages that are currently in-flight
15-
readonly throughputSettings: ThroughputSettings;
16-
updateBufferSize: (bytes: bigint) => void; // Function to update the buffer size
11+
readonly queue: AsyncPriorityQueue<StreamWriteMessage_FromClient>
12+
readonly codec: CompressionCodec // Codec to use for compression
13+
readonly buffer: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages in the buffer
14+
readonly inflight: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages that are currently in-flight
15+
readonly throughputSettings: ThroughputSettings
16+
updateBufferSize: (bytes: bigint) => void // Function to update the buffer size
1717
}) {
1818
if (!ctx.buffer.length) {
19-
return; // Nothing to flush
19+
return // Nothing to flush
2020
}
2121

22-
let messagesToSend: StreamWriteMessage_WriteRequest_MessageData[] = [];
22+
let messagesToSend: StreamWriteMessage_WriteRequest_MessageData[] = []
2323

2424
while (ctx.inflight.length < ctx.throughputSettings.maxInflightCount) {
25-
let message = ctx.buffer.shift();
25+
let message = ctx.buffer.shift()
2626
if (!message) {
27-
break; // No more messages to send
27+
break // No more messages to send
2828
}
2929

30-
ctx.inflight.push(message);
31-
messagesToSend.push(message);
30+
ctx.inflight.push(message)
31+
messagesToSend.push(message)
3232
}
3333

3434
for (let batch of _batch_messages(messagesToSend)) {
35-
_emit_write_request(ctx, batch); // Emit the write request with the batch of messages
35+
_emit_write_request(ctx, batch) // Emit the write request with the batch of messages
3636
}
3737
}
Lines changed: 75 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,84 @@
1-
import type { StreamWriteMessage_FromClient, StreamWriteMessage_InitResponse, StreamWriteMessage_WriteRequest_MessageData } from "@ydbjs/api/topic";
2-
import type { CompressionCodec } from "../codec.js";
3-
import type { AsyncPriorityQueue } from "../queue.js";
4-
import type { TX } from "../tx.js";
5-
import { _flush } from "./_flush.js";
6-
import type { ThroughputSettings } from "./types.js";
1+
import type {
2+
StreamWriteMessage_FromClient,
3+
StreamWriteMessage_InitResponse,
4+
StreamWriteMessage_WriteRequest_MessageData,
5+
} from '@ydbjs/api/topic'
6+
import type { CompressionCodec } from '../codec.js'
7+
import type { AsyncPriorityQueue } from '../queue.js'
8+
import type { TX } from '../tx.js'
9+
import { _flush } from './_flush.js'
10+
import type { ThroughputSettings } from './types.js'
711

8-
export const _on_init_response = function on_init_response(ctx: {
9-
readonly tx?: TX
10-
readonly queue: AsyncPriorityQueue<StreamWriteMessage_FromClient>,
11-
readonly codec: CompressionCodec, // Codec to use for compression
12-
readonly buffer: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages in the buffer
13-
readonly inflight: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages that are currently in-flight
14-
readonly lastSeqNo?: bigint; // The last sequence number acknowledged by the server
15-
readonly throughputSettings: ThroughputSettings; // Current throughput settings for the writer
16-
updateLastSeqNo: (seqNo: bigint) => void;
17-
updateBufferSize: (bytes: bigint) => void; // Function to update the buffer size
18-
}, input: StreamWriteMessage_InitResponse) {
19-
if (!ctx.lastSeqNo) {
20-
// Store the last sequence number from the server.
21-
ctx.updateLastSeqNo(input.lastSeqNo);
22-
}
12+
export const _on_init_response = function on_init_response(
13+
ctx: {
14+
readonly tx?: TX
15+
readonly queue: AsyncPriorityQueue<StreamWriteMessage_FromClient>
16+
readonly codec: CompressionCodec // Codec to use for compression
17+
readonly buffer: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages in the buffer
18+
readonly inflight: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages that are currently in-flight
19+
readonly lastSeqNo?: bigint // The last sequence number acknowledged by the server
20+
readonly throughputSettings: ThroughputSettings // Current throughput settings for the writer
21+
readonly isSeqNoProvided?: boolean // Whether user provided seqNo (manual mode)
22+
updateLastSeqNo: (seqNo: bigint) => void
23+
updateBufferSize: (bytes: bigint) => void // Function to update the buffer size
24+
},
25+
input: StreamWriteMessage_InitResponse
26+
) {
27+
let serverLastSeqNo = input.lastSeqNo || 0n
28+
let currentLastSeqNo = ctx.lastSeqNo
29+
let isFirstInit = currentLastSeqNo === undefined
30+
let lastSeqNoChanged = isFirstInit || currentLastSeqNo !== serverLastSeqNo
2331

32+
// Return inflight messages to buffer
2433
while (ctx.inflight.length > 0) {
25-
const message = ctx.inflight.pop();
34+
const message = ctx.inflight.pop()
2635
if (!message) {
27-
continue;
36+
continue
2837
}
2938

30-
ctx.buffer.unshift(message);
31-
ctx.updateBufferSize(BigInt(message.data.length));
39+
ctx.buffer.unshift(message)
40+
ctx.updateBufferSize(BigInt(message.data.length))
41+
}
42+
43+
// If this is the first initialization or server provided a new lastSeqNo, and we're in auto seqNo mode,
44+
// renumber all messages in buffer to continue from serverLastSeqNo + 1
45+
// Always renumber on first init, even if currentLastSeqNo === serverLastSeqNo (messages written before init)
46+
// Also renumber if there are messages in buffer that were written before init (their seqNo start from 1, not serverLastSeqNo + 1)
47+
let finalLastSeqNo = serverLastSeqNo
48+
let shouldRenumber = false
49+
// Only renumber in auto mode (when user didn't provide seqNo)
50+
if (!ctx.isSeqNoProvided && ctx.buffer.length > 0) {
51+
if (isFirstInit) {
52+
// First initialization: always renumber messages written before init
53+
shouldRenumber = true
54+
} else if (lastSeqNoChanged) {
55+
// Reconnection: renumber if server's lastSeqNo changed
56+
shouldRenumber = true
57+
} else if (ctx.buffer.length > 0) {
58+
// Check if messages in buffer were written before init (seqNo start from 1, not serverLastSeqNo + 1)
59+
// If first message's seqNo is <= serverLastSeqNo, it was written before init and needs renumbering
60+
let firstMessageSeqNo = ctx.buffer[0]?.seqNo
61+
if (firstMessageSeqNo !== undefined && firstMessageSeqNo <= serverLastSeqNo) {
62+
shouldRenumber = true
63+
}
64+
}
65+
}
66+
67+
if (shouldRenumber) {
68+
let nextSeqNo = serverLastSeqNo + 1n
69+
// Renumber all messages in buffer sequentially starting from serverLastSeqNo + 1
70+
for (let message of ctx.buffer) {
71+
message.seqNo = nextSeqNo
72+
nextSeqNo++
73+
}
74+
// Update lastSeqNo to the last renumbered seqNo so flush() returns correct value
75+
finalLastSeqNo = nextSeqNo - 1n
76+
ctx.updateLastSeqNo(finalLastSeqNo)
77+
} else if (lastSeqNoChanged) {
78+
// Store the last sequence number from the server if we didn't renumber
79+
ctx.updateLastSeqNo(serverLastSeqNo)
3280
}
3381

34-
_flush(ctx); // Flush the buffer to send any pending messages.
82+
// Flush the buffer to send any pending messages
83+
_flush(ctx)
3584
}
Lines changed: 42 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,59 @@
1-
import { create } from "@bufbuild/protobuf";
2-
import { timestampFromDate } from "@bufbuild/protobuf/wkt";
3-
import { type StreamWriteMessage_WriteRequest_MessageData, StreamWriteMessage_WriteRequest_MessageDataSchema } from "@ydbjs/api/topic";
4-
import type { CompressionCodec } from "../codec.js";
5-
import { _flush } from "./_flush.js";
6-
import { MAX_PAYLOAD_SIZE } from "./constants.js";
1+
import { create } from '@bufbuild/protobuf'
2+
import { timestampFromDate } from '@bufbuild/protobuf/wkt'
3+
import {
4+
type StreamWriteMessage_WriteRequest_MessageData,
5+
StreamWriteMessage_WriteRequest_MessageDataSchema,
6+
} from '@ydbjs/api/topic'
7+
import type { CompressionCodec } from '../codec.js'
8+
import { _flush } from './_flush.js'
9+
import { MAX_PAYLOAD_SIZE } from './constants.js'
710

8-
export function _write(ctx: {
9-
readonly codec: CompressionCodec, // Codec to use for compression
10-
readonly buffer: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages in the buffer
11-
readonly inflight: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages that are currently in-flight
12-
readonly lastSeqNo: bigint, // Last sequence number used
13-
updateLastSeqNo: (seqNo: bigint) => void;
14-
updateBufferSize: (bytes: bigint) => void; // Function to update the buffer size
15-
}, msg: {
16-
data: Uint8Array,
17-
seqNo?: bigint,
18-
createdAt?: Date,
19-
metadataItems?: Record<string, Uint8Array>
20-
}): bigint {
21-
let data = ctx.codec ? ctx.codec.compress(msg.data) : msg.data;
11+
export function _write(
12+
ctx: {
13+
readonly codec: CompressionCodec // Codec to use for compression
14+
readonly buffer: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages in the buffer
15+
readonly inflight: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages that are currently in-flight
16+
readonly lastSeqNo: bigint // Last sequence number used
17+
updateLastSeqNo: (seqNo: bigint) => void
18+
updateBufferSize: (bytes: bigint) => void // Function to update the buffer size
19+
},
20+
msg: {
21+
data: Uint8Array
22+
seqNo?: bigint
23+
createdAt?: Date
24+
metadataItems?: Record<string, Uint8Array>
25+
}
26+
): bigint {
27+
let data = ctx.codec ? ctx.codec.compress(msg.data) : msg.data
2228

2329
// Validate the payload size, it should not exceed MAX_PAYLOAD_SIZE
2430
// This is a YDB limitation for single message size.
2531
if (data.length > MAX_PAYLOAD_SIZE) {
26-
throw new Error(`Payload size exceeds ${Number(MAX_PAYLOAD_SIZE / (1024n * 1024n))}MiB limit.`);
32+
throw new Error(`Payload size exceeds ${Number(MAX_PAYLOAD_SIZE / (1024n * 1024n))}MiB limit.`)
2733
}
2834

29-
let seqNo = msg.seqNo ?? ((ctx.lastSeqNo ?? 0n) + 1n);
30-
let createdAt = timestampFromDate(msg.createdAt ?? new Date());
31-
let metadataItems = Object.entries(msg.metadataItems || {}).map(([key, value]) => ({ key, value }));
32-
let uncompressedSize = BigInt(data.length);
35+
let seqNo = msg.seqNo ?? (ctx.lastSeqNo ?? 0n) + 1n
36+
let createdAt = timestampFromDate(msg.createdAt ?? new Date())
37+
let metadataItems = Object.entries(msg.metadataItems || {}).map(([key, value]) => ({ key, value }))
38+
let uncompressedSize = BigInt(data.length)
3339

3440
let message = create(StreamWriteMessage_WriteRequest_MessageDataSchema, {
3541
data,
3642
seqNo,
3743
createdAt,
3844
metadataItems,
3945
uncompressedSize,
40-
});
46+
})
47+
48+
ctx.buffer.push(message) // Store the message in the buffer
49+
ctx.updateBufferSize(BigInt(data.length)) // Update the buffer size
4150

42-
ctx.buffer.push(message); // Store the message in the buffer
43-
ctx.updateBufferSize(BigInt(data.length)); // Update the buffer size
44-
ctx.updateLastSeqNo(seqNo); // Update the last sequence number
51+
// Only update lastSeqNo if session is initialized (lastSeqNo is defined)
52+
// For messages written before session initialization, lastSeqNo will be updated
53+
// after renumbering in _on_init_response
54+
if (ctx.lastSeqNo !== undefined) {
55+
ctx.updateLastSeqNo(seqNo)
56+
}
4557

46-
return seqNo;
58+
return seqNo
4759
}

packages/topic/src/writer/_write_response.ts

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,45 @@
1-
import type { StreamWriteMessage_FromClient, StreamWriteMessage_WriteRequest_MessageData, StreamWriteMessage_WriteResponse } from "@ydbjs/api/topic";
2-
import type { CompressionCodec } from "../codec.js";
3-
import type { AsyncPriorityQueue } from "../queue.js";
4-
import type { TX } from "../tx.js";
5-
import { _flush } from "./_flush.js";
6-
import type { ThroughputSettings } from "./types.js";
1+
import type {
2+
StreamWriteMessage_FromClient,
3+
StreamWriteMessage_WriteRequest_MessageData,
4+
StreamWriteMessage_WriteResponse,
5+
} from '@ydbjs/api/topic'
6+
import type { CompressionCodec } from '../codec.js'
7+
import type { AsyncPriorityQueue } from '../queue.js'
8+
import type { TX } from '../tx.js'
9+
import { _flush } from './_flush.js'
10+
import type { ThroughputSettings } from './types.js'
711

8-
export const _on_write_response = function on_write_response(ctx: {
9-
readonly tx?: TX
10-
readonly queue: AsyncPriorityQueue<StreamWriteMessage_FromClient>,
11-
readonly codec: CompressionCodec, // Codec to use for compression
12-
readonly buffer: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages that are currently in-flight
13-
readonly inflight: StreamWriteMessage_WriteRequest_MessageData[]; // Array of messages that are currently in-flight
14-
readonly throughputSettings: ThroughputSettings; // Current throughput settings for the writer
15-
onAck?: (seqNo: bigint, status?: 'skipped' | 'written' | 'writtenInTx') => void // Callback for handling acknowledgments
16-
updateBufferSize: (bytes: bigint) => void; // Function to update the buffer size
17-
}, input: StreamWriteMessage_WriteResponse) {
12+
export const _on_write_response = function on_write_response(
13+
ctx: {
14+
readonly tx?: TX
15+
readonly queue: AsyncPriorityQueue<StreamWriteMessage_FromClient>
16+
readonly codec: CompressionCodec // Codec to use for compression
17+
readonly buffer: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages that are currently in-flight
18+
readonly inflight: StreamWriteMessage_WriteRequest_MessageData[] // Array of messages that are currently in-flight
19+
readonly throughputSettings: ThroughputSettings // Current throughput settings for the writer
20+
onAck?: (seqNo: bigint, status?: 'skipped' | 'written' | 'writtenInTx') => void // Callback for handling acknowledgments
21+
updateBufferSize: (bytes: bigint) => void // Function to update the buffer size
22+
},
23+
input: StreamWriteMessage_WriteResponse
24+
) {
1825
// Process each acknowledgment in the response.
1926

20-
let acks = new Map<bigint, 'skipped' | 'written' | 'writtenInTx'>();
27+
let acks = new Map<bigint, 'skipped' | 'written' | 'writtenInTx'>()
2128
for (let ack of input.acks) {
22-
acks.set(ack.seqNo, ack.messageWriteStatus.case!);
29+
acks.set(ack.seqNo, ack.messageWriteStatus.case!)
2330
}
2431

2532
// Acknowledge messages that have been processed.
2633
for (let i = ctx.inflight.length - 1; i >= 0; i--) {
27-
const message = ctx.inflight[i]!;
34+
const message = ctx.inflight[i]!
2835
if (acks.has(message.seqNo)) {
29-
ctx.onAck?.(message.seqNo, acks.get(message.seqNo));
30-
ctx.inflight.splice(i, 1);
36+
ctx.onAck?.(message.seqNo, acks.get(message.seqNo))
37+
ctx.inflight.splice(i, 1)
3138
}
3239
}
3340

3441
// Clear the acknowledgment map.
35-
acks.clear();
42+
acks.clear()
3643

3744
// If there are still messages in the buffer, flush them.
3845
_flush(ctx)

packages/topic/src/writer/index.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,8 +205,9 @@ export const createTopicWriter = function createTopicWriter(driver: Driver, opti
205205
throughputSettings,
206206
updateLastSeqNo,
207207
updateBufferSize,
208+
isSeqNoProvided,
208209
...(options.tx && { tx: options.tx }),
209-
...(lastSeqNo && { lastSeqNo })
210+
...(lastSeqNo && { lastSeqNo }),
210211
},
211212
chunk.serverMessage.value
212213
)
@@ -503,4 +504,4 @@ export const createTopicTxWriter = function createTopicTxWriter(
503504
}
504505

505506
// Re-export types for compatibility
506-
export type { TopicTxWriter, TopicWriter, TopicWriterOptions } from "./types.js"
507+
export type { TopicTxWriter, TopicWriter, TopicWriterOptions } from './types.js'

0 commit comments

Comments
 (0)