Skip to content

Commit

Permalink
feat(custom-mutators): implement and pass ZeroOption to Replicache
Browse files Browse the repository at this point in the history
  • Loading branch information
tantaman committed Mar 7, 2025
1 parent a3f7c9a commit d244a22
Show file tree
Hide file tree
Showing 12 changed files with 210 additions and 142 deletions.
12 changes: 6 additions & 6 deletions packages/replicache/src/persist/refresh.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ describe('refresh', () => {
undefined,
);
assert(refreshResult);
expect(Object.fromEntries(refreshResult[1])).to.deep.equal({});
expect(Object.fromEntries(refreshResult.diffs)).to.deep.equal({});
const hashes = [
await withRead(memdag, read => read.getHead(DEFAULT_HEAD_NAME)),
];
Expand Down Expand Up @@ -247,7 +247,7 @@ describe('refresh', () => {
await withRead(memdag, read => read.getHead(DEFAULT_HEAD_NAME)),
];

expect(Object.fromEntries(refreshResult[1])).to.deep.equal({});
expect(Object.fromEntries(refreshResult.diffs)).to.deep.equal({});

await assertRefreshHashes(perdag, clientID, hashes);
});
Expand Down Expand Up @@ -281,7 +281,7 @@ describe('refresh', () => {
);
assert(refreshResult);

expect(Object.fromEntries(refreshResult[1])).to.deep.equal({
expect(Object.fromEntries(refreshResult.diffs)).to.deep.equal({
'': [
{
key: 'from mutator_name_3',
Expand Down Expand Up @@ -391,7 +391,7 @@ describe('refresh', () => {
undefined,
);
assert(refreshResult);
expect(Object.fromEntries(refreshResult[1])).to.deep.equal({
expect(Object.fromEntries(refreshResult.diffs)).to.deep.equal({
'': [
{
key: 'from mutator_name_3',
Expand Down Expand Up @@ -455,7 +455,7 @@ describe('refresh', () => {
undefined,
);
assert(refreshResult);
expect(Object.fromEntries(refreshResult[1])).to.deep.equal({
expect(Object.fromEntries(refreshResult.diffs)).to.deep.equal({
'': [
{
key: 'from mutator_name_3',
Expand Down Expand Up @@ -831,7 +831,7 @@ describe('refresh', () => {
undefined,
);
assert(refreshResult);
expect(Object.fromEntries(refreshResult[1])).to.deep.equal({
expect(Object.fromEntries(refreshResult.diffs)).to.deep.equal({
'': [{key: 'c', newValue: 3, op: 'add'}],
});

Expand Down
16 changes: 12 additions & 4 deletions packages/replicache/src/persist/refresh.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type RefreshResult =
type: 'complete';
diffs: DiffsMap;
newPerdagClientHeadHash: Hash;
oldHead: Hash;
newHead: Hash;
};

Expand All @@ -72,7 +73,7 @@ export async function refresh(
closed: () => boolean,
formatVersion: FormatVersion,
zero: ZeroOption | undefined,
): Promise<[Hash, DiffsMap] | undefined> {
): Promise<{oldHead: Hash; newHead: Hash; diffs: DiffsMap} | undefined> {
if (closed()) {
return;
}
Expand Down Expand Up @@ -235,9 +236,11 @@ export async function refresh(
let newMemdagHeadHash = perdagClientGroupHeadHash;
if (newMemdagMutations.length > 0) {
const zeroData = await zero?.getTxData?.(
memdagHeadCommit.chunk.hash,
'rebase',
newMemdagHeadHash,
{openLazyRead: memdagWrite},
{
openLazyRead: memdagWrite,
},
);
for (let i = newMemdagMutations.length - 1; i >= 0; i--) {
newMemdagHeadHash = (
Expand Down Expand Up @@ -271,6 +274,7 @@ export async function refresh(
return {
type: 'complete',
diffs,
oldHead: memdagHeadCommit.chunk.hash,
newHead: newMemdagHeadHash,
newPerdagClientHeadHash: perdagClientGroupHeadHash,
} as const;
Expand Down Expand Up @@ -301,7 +305,11 @@ export async function refresh(
return undefined;
}
await setRefreshHashes([result.newPerdagClientHeadHash]);
return [result.newHead, result.diffs];
return {
oldHead: result.oldHead,
newHead: result.newHead,
diffs: result.diffs,
};
}

function shouldAbortRefresh(
Expand Down
50 changes: 25 additions & 25 deletions packages/replicache/src/replicache-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ import {
withWrite,
withWriteNoImplicitCommit,
} from './with-transactions.ts';
import type {DiffsMap} from './sync/diff.ts';

declare const TESTING: boolean;

Expand Down Expand Up @@ -563,8 +562,8 @@ export class ReplicacheImpl<MD extends MutatorDefs = {}> {
);

// Now we have a profileID, a clientID, a clientGroupID and DB!
resolveReady();
await this.#zero?.init(headHash, this.memdag);
resolveReady();

if (this.#enablePullAndPushInOpen) {
this.pull().catch(noop);
Expand Down Expand Up @@ -756,31 +755,29 @@ export class ReplicacheImpl<MD extends MutatorDefs = {}> {
const lc = this.#lc
.withContext('maybeEndPull')
.withContext('requestID', requestID);
const {replayMutations, diffs, mainHead} = await maybeEndPull<LocalMeta>(
this.memdag,
lc,
syncHead,
clientID,
this.#subscriptions,
FormatVersion.Latest,
);
const {replayMutations, diffs, oldMainHead, mainHead} =
await maybeEndPull<LocalMeta>(
this.memdag,
lc,
syncHead,
clientID,
this.#subscriptions,
FormatVersion.Latest,
);

if (!replayMutations || replayMutations.length === 0) {
// All done.
await this.#zero?.advance(mainHead, diffs.get('') ?? []);
// Having any `await` before calling `subscriptions.fire` would be problematic
// as it would open a window where diffs could get out of order.
// Hence we do not await `zero.advance` here.
this.#zero?.advance(oldMainHead, mainHead, diffs.get('') ?? []);
await this.#subscriptions.fire(diffs);
void this.#schedulePersist();
return;
}

// Replay.
const zeroData = await this.#zero?.getTxData?.(
mainHead, // TODO: this mainHead is incorrect since
// minaHead is advanced in replicache but not in IVM.
// We don't advance in IVM until all replay mutations are done.
// In `if` above. We need to keep around an expected `mainHead`...
syncHead,
);
const zeroData = await this.#zero?.getTxData?.('rebase', syncHead);
for (const mutation of replayMutations) {
// TODO(greg): I'm not sure why this was in Replicache#_mutate...
// Ensure that we run initial pending subscribe functions before starting a
Expand Down Expand Up @@ -1211,7 +1208,7 @@ export class ReplicacheImpl<MD extends MutatorDefs = {}> {
if (this.#closed) {
return;
}
let refreshResult: [Hash, DiffsMap] | undefined;
let refreshResult: Awaited<ReturnType<typeof refresh>>;
try {
refreshResult = await refresh(
this.#lc,
Expand All @@ -1234,11 +1231,12 @@ export class ReplicacheImpl<MD extends MutatorDefs = {}> {
}
}
if (refreshResult !== undefined) {
await this.#zero?.advance(
refreshResult[0],
refreshResult[1].get('') ?? [],
this.#zero?.advance(
refreshResult.oldHead,
refreshResult.newHead,
refreshResult.diffs.get('') ?? [],
);
await this.#subscriptions.fire(refreshResult[1]);
await this.#subscriptions.fire(refreshResult.diffs);
}
}

Expand Down Expand Up @@ -1507,7 +1505,9 @@ export class ReplicacheImpl<MD extends MutatorDefs = {}> {
clientID,
await dbWrite.getMutationID(),
'initial',
undefined,
await this.#zero?.getTxData('initial', headHash, {
openLazyRead: dagWrite,
}),
dbWrite,
this.#lc,
);
Expand All @@ -1518,13 +1518,13 @@ export class ReplicacheImpl<MD extends MutatorDefs = {}> {
DEFAULT_HEAD_NAME,
this.#subscriptions,
);
this.#zero?.advance(headHash, newHead, diffs.get('') ?? []);

// Update this after the commit in case the commit fails.
this.lastMutationID = lastMutationID;

// Send is not supposed to reject
this.#pushConnectionLoop.send(false).catch(() => void 0);
await this.#zero?.advance(newHead, diffs.get('') ?? []);
await this.#subscriptions.fire(diffs);
void this.#schedulePersist();
return result;
Expand Down
21 changes: 11 additions & 10 deletions packages/replicache/src/replicache-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type {MutatorDefs, RequestOptions} from './types.ts';
import type {Hash} from './hash.ts';
import type {InternalDiff} from './btree/node.ts';
import type {Read, Store} from './dag/store.ts';
import type {TransactionReason} from './transactions.ts';

/**
* The options passed to {@link Replicache}.
Expand Down Expand Up @@ -253,11 +254,16 @@ export interface ReplicacheOptions<MD extends MutatorDefs> {
*/
export interface ZeroTxData {}

export type ZeroReadOptions = {
openLazyRead?: Read | undefined;
openLazySourceRead?: Read | undefined;
};

/**
* Minimal interface that Replicache needs to communicate with Zero.
* Prevents us from creating any direct dependencies on Zero.
*/
export type ZeroOption = {
export interface ZeroOption {
/**
* Allow Zero to initialize its IVM state from the given hash and dag.
*/
Expand All @@ -272,18 +278,13 @@ export type ZeroOption = {
* object for use in Zero's mutators.
*/
getTxData(
expectedHead: Hash,
reason: TransactionReason,
desiredHead: Hash,
readOptions?:
| {
openLazyRead?: Read | undefined;
openLazySourceRead?: Read | undefined;
}
| undefined,
readOptions?: ZeroReadOptions | undefined,
): Promise<ZeroTxData>;

/**
* When Replicache's main head moves forward, Zero must advance its IVM state.
*/
advance(hash: Hash, changes: InternalDiff): Promise<void>;
};
advance(expectedHash: Hash, newHash: Hash, changes: InternalDiff): void;
}
9 changes: 6 additions & 3 deletions packages/replicache/src/sync/pull.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ export function maybeEndPull<M extends LocalMeta>(
): Promise<{
syncHead: Hash;
mainHead: Hash;
oldMainHead: Hash;
replayMutations: Commit<M>[];
diffs: DiffsMap;
}> {
Expand All @@ -336,7 +337,7 @@ export function maybeEndPull<M extends LocalMeta>(
// TODO: In DD31, it is expected that a newer snapshot might have appeared
// on the main chain. In that case, we just abort this pull.
const syncSnapshot = await baseSnapshotFromHash(syncHeadHash, dagRead);
let mainHeadHash = await dagRead.getHead(DEFAULT_HEAD_NAME);
const mainHeadHash = await dagRead.getHead(DEFAULT_HEAD_NAME);
if (mainHeadHash === undefined) {
throw new Error('Missing main head');
}
Expand Down Expand Up @@ -381,6 +382,7 @@ export function maybeEndPull<M extends LocalMeta>(
if (pending.length > 0) {
return {
syncHead: syncHeadHash,
oldMainHead: mainHeadHash,
mainHead: mainHeadHash,
replayMutations: pending,
// The changed keys are not reported when further replays are
Expand Down Expand Up @@ -425,7 +427,7 @@ export function maybeEndPull<M extends LocalMeta>(
]);
await dagWrite.commit();
// main head was set to sync head
mainHeadHash = syncHeadHash;
const newMainHeadHash = syncHeadHash;

if (lc.debug) {
const [oldLastMutationID, oldCookie] = snapshotMetaParts(
Expand Down Expand Up @@ -458,7 +460,8 @@ export function maybeEndPull<M extends LocalMeta>(

return {
syncHead: syncHeadHash,
mainHead: mainHeadHash,
oldMainHead: mainHeadHash,
mainHead: newMainHeadHash,
replayMutations: [],
diffs: diffsMap,
};
Expand Down
14 changes: 11 additions & 3 deletions packages/zero-client/src/client/context.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import {MemoryStorage} from '../../../zql/src/ivm/memory-storage.ts';
import {type AddQuery, ZeroContext} from './context.ts';
import {ENTITIES_KEY_PREFIX} from './keys.ts';
import {IVMSourceBranch} from './ivm-branch.ts';
import {createSilentLogContext} from '../../../shared/src/logging-test-utils.ts';
import type {Hash} from '../../../replicache/src/hash.ts';

const testBatchViewUpdates = (applyViewUpdates: () => void) =>
applyViewUpdates();
Expand All @@ -33,6 +35,7 @@ test('getSource', () => {
});

const context = new ZeroContext(
createSilentLogContext(),
new IVMSourceBranch(schema.tables),
null as unknown as AddQuery,
testBatchViewUpdates,
Expand Down Expand Up @@ -103,6 +106,7 @@ const schema = createSchema(1, {

test('processChanges', () => {
const context = new ZeroContext(
createSilentLogContext(),
new IVMSourceBranch(schema.tables),
null as unknown as AddQuery,
testBatchViewUpdates,
Expand All @@ -114,7 +118,7 @@ test('processChanges', () => {
]),
);

context.processChanges([
context.processChanges(undefined, 'ahash' as Hash, [
{
key: `${ENTITIES_KEY_PREFIX}t1/e1`,
op: 'add',
Expand Down Expand Up @@ -166,6 +170,7 @@ test('processChanges wraps source updates with batchViewUpdates', () => {
]);
};
const context = new ZeroContext(
createSilentLogContext(),
new IVMSourceBranch(schema.tables),
null as unknown as AddQuery,
batchViewUpdates,
Expand All @@ -178,7 +183,7 @@ test('processChanges wraps source updates with batchViewUpdates', () => {
);

expect(batchViewUpdatesCalls).toBe(0);
context.processChanges([
context.processChanges(undefined, 'ahash' as Hash, [
{
key: `${ENTITIES_KEY_PREFIX}t1/e1`,
op: 'add',
Expand Down Expand Up @@ -218,6 +223,7 @@ test('transactions', () => {
});

const context = new ZeroContext(
createSilentLogContext(),
new IVMSourceBranch(schema.tables),
null as unknown as AddQuery,
testBatchViewUpdates,
Expand Down Expand Up @@ -274,7 +280,7 @@ test('transactions', () => {
++transactions;
});

context.processChanges(changes);
context.processChanges(undefined, 'ahash' as Hash, changes);

expect(transactions).toEqual(1);
const result = out.fetch({});
Expand All @@ -291,6 +297,7 @@ test('batchViewUpdates errors if applyViewUpdates is not called', () => {
batchViewUpdatesCalls++;
};
const context = new ZeroContext(
createSilentLogContext(),
new IVMSourceBranch(schema.tables),
null as unknown as AddQuery,
batchViewUpdates,
Expand All @@ -308,6 +315,7 @@ test('batchViewUpdates returns value', () => {
batchViewUpdatesCalls++;
};
const context = new ZeroContext(
createSilentLogContext(),
new IVMSourceBranch(schema.tables),
null as unknown as AddQuery,
batchViewUpdates,
Expand Down
Loading

0 comments on commit d244a22

Please sign in to comment.