Skip to content

Commit

Permalink
Revert "Revert changes to paginate miniblocks (#1844)" & Update logic…
Browse files Browse the repository at this point in the history
… to be backwards compatible (#1850)

This reverts commit accb4bd.

And, use the miniblock num of the first miniblock
Roman suggested this initially

until the api change is deployed to the nodes, the fromInclusive will
always be 0
after, we can continue to ignore it, it's duplicate info, but useful to
have so that we can parallelize miniblock unpacking in the future.
  • Loading branch information
texuf authored Dec 17, 2024
1 parent f5218ad commit 6f57bad
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 25 deletions.
23 changes: 10 additions & 13 deletions packages/sdk/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import {
makeSessionKeys,
type EncryptionDeviceInitOpts,
} from '@river-build/encryption'
import { getMaxTimeoutMs, StreamRpcClient } from './makeStreamRpcClient'
import { getMaxTimeoutMs, StreamRpcClient, getMiniblocks } from './makeStreamRpcClient'
import { errorContains, getRpcErrorProperty } from './rpcInterceptors'
import { assert, isDefined } from './check'
import EventEmitter from 'events'
Expand Down Expand Up @@ -86,7 +86,6 @@ import {
checkEventSignature,
makeEvent,
UnpackEnvelopeOpts,
unpackMiniblock,
unpackStream,
unpackStreamEx,
} from './sign'
Expand Down Expand Up @@ -1909,25 +1908,23 @@ export class Client
}
}

const response = await this.rpcClient.getMiniblocks({
streamId: streamIdAsBytes(streamId),
const { miniblocks, terminus } = await getMiniblocks(
this.rpcClient,
streamId,
fromInclusive,
toExclusive,
})
this.unpackEnvelopeOpts,
)

const unpackedMiniblocks: ParsedMiniblock[] = []
for (const miniblock of response.miniblocks) {
const unpackedMiniblock = await unpackMiniblock(miniblock, this.unpackEnvelopeOpts)
unpackedMiniblocks.push(unpackedMiniblock)
}
await this.persistenceStore.saveMiniblocks(
streamIdAsString(streamId),
unpackedMiniblocks,
miniblocks,
'backward',
)

return {
terminus: response.terminus,
miniblocks: [...unpackedMiniblocks, ...cachedMiniblocks],
terminus: terminus,
miniblocks: [...miniblocks, ...cachedMiniblocks],
}
}

Expand Down
73 changes: 73 additions & 0 deletions packages/sdk/src/makeStreamRpcClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import {
retryInterceptor,
type RetryParams,
} from './rpcInterceptors'
import { UnpackEnvelopeOpts, unpackMiniblock } from './sign'
import { RpcOptions, createHttp2ConnectTransport } from './rpcCommon'
import { streamIdAsBytes } from './id'
import { ParsedMiniblock } from './types'

const logInfo = dlog('csb:rpc:info')
let nextRpcClientNum = 0
Expand Down Expand Up @@ -70,3 +73,73 @@ export function getMaxTimeoutMs(opts: StreamRpcClientOptions): number {
}
return maxTimeoutMs
}

export async function getMiniblocks(
client: StreamRpcClient,
streamId: string | Uint8Array,
fromInclusive: bigint,
toExclusive: bigint,
unpackEnvelopeOpts: UnpackEnvelopeOpts | undefined,
): Promise<{ miniblocks: ParsedMiniblock[]; terminus: boolean }> {
const allMiniblocks: ParsedMiniblock[] = []
let currentFromInclusive = fromInclusive
let reachedTerminus = false

while (currentFromInclusive < toExclusive) {
const { miniblocks, terminus, nextFromInclusive } = await fetchMiniblocksFromRpc(
client,
streamId,
currentFromInclusive,
toExclusive,
unpackEnvelopeOpts,
)

allMiniblocks.push(...miniblocks)

// Set the terminus to true if we got at least one response with reached terminus
// The behaviour around this flag is not implemented yet
if (terminus && !reachedTerminus) {
reachedTerminus = true
}

if (currentFromInclusive === nextFromInclusive) {
break
}

currentFromInclusive = nextFromInclusive
}

return {
miniblocks: allMiniblocks,
terminus: reachedTerminus,
}
}

async function fetchMiniblocksFromRpc(
client: StreamRpcClient,
streamId: string | Uint8Array,
fromInclusive: bigint,
toExclusive: bigint,
unpackEnvelopeOpts: UnpackEnvelopeOpts | undefined,
) {
const response = await client.getMiniblocks({
streamId: streamIdAsBytes(streamId),
fromInclusive,
toExclusive,
})

const miniblocks: ParsedMiniblock[] = []
for (const miniblock of response.miniblocks) {
const unpackedMiniblock = await unpackMiniblock(miniblock, unpackEnvelopeOpts)
miniblocks.push(unpackedMiniblock)
}

const respondedFromInclusive =
miniblocks.length > 0 ? miniblocks[0].header.miniblockNum : fromInclusive

return {
miniblocks: miniblocks,
terminus: response.terminus,
nextFromInclusive: respondedFromInclusive + BigInt(response.miniblocks.length),
}
}
21 changes: 9 additions & 12 deletions packages/sdk/src/unauthenticatedClient.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import debug from 'debug'
import { DLogger, check, dlog, dlogError } from '@river-build/dlog'
import { hasElements, isDefined } from './check'
import { StreamRpcClient } from './makeStreamRpcClient'
import { UnpackEnvelopeOpts, unpackMiniblock, unpackStream } from './sign'
import { StreamRpcClient, getMiniblocks } from './makeStreamRpcClient'
import { UnpackEnvelopeOpts, unpackStream } from './sign'
import { StreamStateView } from './streamStateView'
import { ParsedMiniblock, StreamTimelineEvent } from './types'
import { streamIdAsString, streamIdAsBytes, userIdFromAddress, makeUserStreamId } from './id'
Expand Down Expand Up @@ -204,20 +204,17 @@ export class UnauthenticatedClient {
}
}

const response = await this.rpcClient.getMiniblocks({
streamId: streamIdAsBytes(streamId),
const { miniblocks, terminus } = await getMiniblocks(
this.rpcClient,
streamId,
fromInclusive,
toExclusive,
})
this.unpackEnvelopeOpts,
)

const unpackedMiniblocks: ParsedMiniblock[] = []
for (const miniblock of response.miniblocks) {
const unpackedMiniblock = await unpackMiniblock(miniblock, this.unpackEnvelopeOpts)
unpackedMiniblocks.push(unpackedMiniblock)
}
return {
terminus: response.terminus,
miniblocks: unpackedMiniblocks,
terminus: terminus,
miniblocks: miniblocks,
}
}

Expand Down

0 comments on commit 6f57bad

Please sign in to comment.