55 ProductTypeElement ,
66 SumType ,
77 SumTypeVariant ,
8+ type ComparablePrimitive ,
89} from './algebraic_type.ts' ;
910import {
1011 AlgebraicValue ,
@@ -15,7 +16,13 @@ import {
1516} from './algebraic_value.ts' ;
1617import BinaryReader from './binary_reader.ts' ;
1718import BinaryWriter from './binary_writer.ts' ;
18- import * as ws from './client_api/index.ts' ;
19+ import { BsatnRowList } from './client_api/bsatn_row_list_type.ts' ;
20+ import { ClientMessage } from './client_api/client_message_type.ts' ;
21+ import { DatabaseUpdate } from './client_api/database_update_type.ts' ;
22+ import { QueryUpdate } from './client_api/query_update_type.ts' ;
23+ import { ServerMessage } from './client_api/server_message_type.ts' ;
24+ import { TableUpdate as RawTableUpdate } from './client_api/table_update_type.ts' ;
25+ import type * as clientApi from './client_api/index.ts' ;
1926import { ClientCache } from './client_cache.ts' ;
2027import { DbConnectionBuilder } from './db_connection_builder.ts' ;
2128import { type DbContext } from './db_context.ts' ;
@@ -41,7 +48,7 @@ import {
4148 TableCache ,
4249 type Operation ,
4350 type PendingCallback ,
44- type TableUpdate ,
51+ type TableUpdate as CacheTableUpdate ,
4552} from './table_cache.ts' ;
4653import { deepEqual , toPascalCase } from './utils.ts' ;
4754import { WebsocketDecompressAdapter } from './websocket_decompress_adapter.ts' ;
@@ -53,7 +60,8 @@ import {
5360 type SubscribeEvent ,
5461} from './subscription_builder_impl.ts' ;
5562import { stdbLogger } from './logger.ts' ;
56- import type { ReducerRuntimeTypeInfo } from './spacetime_module.ts' ;
63+ import { type ReducerRuntimeTypeInfo } from './spacetime_module.ts' ;
64+ import { fromByteArray } from 'base64-js' ;
5765
5866export {
5967 AlgebraicType ,
@@ -273,7 +281,7 @@ export class DbConnectionImpl<
273281 emitter : handleEmitter ,
274282 } ) ;
275283 this . #sendMessage(
276- ws . ClientMessage . SubscribeMulti ( {
284+ ClientMessage . SubscribeMulti ( {
277285 queryStrings : querySql ,
278286 queryId : { id : queryId } ,
279287 // The TypeScript SDK doesn't currently track `request_id`s,
@@ -286,7 +294,7 @@ export class DbConnectionImpl<
286294
287295 unregisterSubscription ( queryId : number ) : void {
288296 this . #sendMessage(
289- ws . ClientMessage . UnsubscribeMulti ( {
297+ ClientMessage . UnsubscribeMulti ( {
290298 queryId : { id : queryId } ,
291299 // The TypeScript SDK doesn't currently track `request_id`s,
292300 // so always use 0.
@@ -297,25 +305,38 @@ export class DbConnectionImpl<
297305
298306 // This function is async because we decompress the message async
299307 async #processParsedMessage(
300- message : ws . ServerMessage
308+ message : ServerMessage
301309 ) : Promise < Message | undefined > {
302310 const parseRowList = (
303311 type : 'insert' | 'delete' ,
304312 tableName : string ,
305- rowList : ws . BsatnRowList
313+ rowList : BsatnRowList
306314 ) : Operation [ ] => {
307315 const buffer = rowList . rowsData ;
308316 const reader = new BinaryReader ( buffer ) ;
309- const rows : any [ ] = [ ] ;
317+ const rows : Operation [ ] = [ ] ;
310318 const rowType = this . #remoteModule. tables [ tableName ] ! . rowType ;
319+ const primaryKeyInfo =
320+ this . #remoteModule. tables [ tableName ] ! . primaryKeyInfo ;
311321 while ( reader . offset < buffer . length + buffer . byteOffset ) {
312322 const initialOffset = reader . offset ;
313323 const row = rowType . deserialize ( reader ) ;
314- // This is super inefficient, but the buffer indexes are weird, so we are doing this for now.
315- // We should just base64 encode the bytes.
316- const rowId = JSON . stringify ( row , ( _ , v ) =>
317- typeof v === 'bigint' ? v . toString ( ) : v
318- ) ;
324+ let rowId : ComparablePrimitive | undefined = undefined ;
325+ if ( primaryKeyInfo !== undefined ) {
326+ rowId = primaryKeyInfo . colType . intoMapKey (
327+ row [ primaryKeyInfo . colName ]
328+ ) ;
329+ } else {
330+ // Get a view of the bytes for this row.
331+ const rowBytes = buffer . subarray (
332+ initialOffset - buffer . byteOffset ,
333+ reader . offset - buffer . byteOffset
334+ ) ;
335+ // Convert it to a base64 string, so we can use it as a map key.
336+ const asBase64 = fromByteArray ( rowBytes ) ;
337+ rowId = asBase64 ;
338+ }
339+
319340 rows . push ( {
320341 type,
321342 rowId,
@@ -326,15 +347,15 @@ export class DbConnectionImpl<
326347 } ;
327348
328349 const parseTableUpdate = async (
329- rawTableUpdate : ws . TableUpdate
330- ) : Promise < TableUpdate > => {
350+ rawTableUpdate : RawTableUpdate
351+ ) : Promise < CacheTableUpdate > => {
331352 const tableName = rawTableUpdate . tableName ;
332353 let operations : Operation [ ] = [ ] ;
333354 for ( const update of rawTableUpdate . updates ) {
334- let decompressed : ws . QueryUpdate ;
355+ let decompressed : QueryUpdate ;
335356 if ( update . tag === 'Gzip' ) {
336357 const decompressedBuffer = await decompress ( update . value , 'gzip' ) ;
337- decompressed = ws . QueryUpdate . deserialize (
358+ decompressed = QueryUpdate . deserialize (
338359 new BinaryReader ( decompressedBuffer )
339360 ) ;
340361 } else if ( update . tag === 'Brotli' ) {
@@ -358,9 +379,9 @@ export class DbConnectionImpl<
358379 } ;
359380
360381 const parseDatabaseUpdate = async (
361- dbUpdate : ws . DatabaseUpdate
362- ) : Promise < TableUpdate [ ] > => {
363- const tableUpdates : TableUpdate [ ] = [ ] ;
382+ dbUpdate : DatabaseUpdate
383+ ) : Promise < CacheTableUpdate [ ] > => {
384+ const tableUpdates : CacheTableUpdate [ ] = [ ] ;
364385 for ( const rawTableUpdate of dbUpdate . tables ) {
365386 tableUpdates . push ( await parseTableUpdate ( rawTableUpdate ) ) ;
366387 }
@@ -398,7 +419,7 @@ export class DbConnectionImpl<
398419 const args = txUpdate . reducerCall . args ;
399420 const energyQuantaUsed = txUpdate . energyQuantaUsed ;
400421
401- let tableUpdates : TableUpdate [ ] ;
422+ let tableUpdates : CacheTableUpdate [ ] ;
402423 let errMessage = '' ;
403424 switch ( txUpdate . status . tag ) {
404425 case 'Committed' :
@@ -498,11 +519,11 @@ export class DbConnectionImpl<
498519 }
499520 }
500521
501- #sendMessage( message : ws . ClientMessage ) : void {
522+ #sendMessage( message : ClientMessage ) : void {
502523 this . wsPromise . then ( wsResolved => {
503524 if ( wsResolved ) {
504525 const writer = new BinaryWriter ( 1024 ) ;
505- ws . ClientMessage . serialize ( writer , message ) ;
526+ ClientMessage . serialize ( writer , message ) ;
506527 const encoded = writer . getBuffer ( ) ;
507528 wsResolved . send ( encoded ) ;
508529 }
@@ -517,24 +538,28 @@ export class DbConnectionImpl<
517538 }
518539
519540 #applyTableUpdates(
520- tableUpdates : TableUpdate [ ] ,
541+ tableUpdates : CacheTableUpdate [ ] ,
521542 eventContext : EventContextInterface
522543 ) : PendingCallback [ ] {
523- const pendingCallbacks : PendingCallback [ ] = [ ] ;
544+ let pendingCallbacks : PendingCallback [ ] = [ ] ;
524545 for ( let tableUpdate of tableUpdates ) {
525546 // Get table information for the table being updated
526547 const tableName = tableUpdate . tableName ;
527548 const tableTypeInfo = this . #remoteModule. tables [ tableName ] ! ;
528549 const table = this . clientCache . getOrCreateTable ( tableTypeInfo ) ;
529- pendingCallbacks . push (
530- ...table . applyOperations ( tableUpdate . operations , eventContext )
550+ const newCallbacks = table . applyOperations (
551+ tableUpdate . operations ,
552+ eventContext
531553 ) ;
554+ for ( const callback of newCallbacks ) {
555+ pendingCallbacks . push ( callback ) ;
556+ }
532557 }
533558 return pendingCallbacks ;
534559 }
535560
536561 async #processMessage( data : Uint8Array ) : Promise < void > {
537- const serverMessage = parseValue ( ws . ServerMessage , data ) ;
562+ const serverMessage = parseValue ( ServerMessage , data ) ;
538563 const message = await this . #processParsedMessage( serverMessage ) ;
539564 if ( ! message ) {
540565 return ;
@@ -788,7 +813,7 @@ export class DbConnectionImpl<
788813 argsBuffer : Uint8Array ,
789814 flags : CallReducerFlags
790815 ) : void {
791- const message = ws . ClientMessage . CallReducer ( {
816+ const message = ClientMessage . CallReducer ( {
792817 reducer : reducerName ,
793818 args : argsBuffer ,
794819 // The TypeScript SDK doesn't currently track `request_id`s,
0 commit comments