Skip to content

Commit

Permalink
refactor(zql): pushing splitting of edits up to source
Browse files Browse the repository at this point in the history
  • Loading branch information
grgbkr committed Mar 6, 2025
1 parent 31cf63d commit ddcda51
Show file tree
Hide file tree
Showing 11 changed files with 1,628 additions and 3,855 deletions.
44 changes: 44 additions & 0 deletions packages/zql/src/builder/builder.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,50 @@ test('self-join edit', () => {
},
"type": "remove",
},
{
"node": {
"relationships": {
"recruiter": [
{
"relationships": {},
"row": {
"id": 1,
"name": "aaron",
"recruiterID": null,
},
},
],
},
"row": {
"id": 4,
"name": "matt",
"recruiterID": 1,
},
},
"type": "add",
},
{
"node": {
"relationships": {
"recruiter": [
{
"relationships": {},
"row": {
"id": 1,
"name": "aaron",
"recruiterID": null,
},
},
],
},
"row": {
"id": 4,
"name": "matt",
"recruiterID": 1,
},
},
"type": "remove",
},
{
"node": {
"relationships": {
Expand Down
25 changes: 22 additions & 3 deletions packages/zql/src/builder/builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,16 +167,35 @@ function buildPipelineInternal(
if (!source) {
throw new Error(`Source not found: ${ast.table}`);
}
const conn = source.connect(must(ast.orderBy), ast.where);
ast = uniquifyCorrelatedSubqueryConditionAliases(ast);

const csqsFromCondition = gatherCorrelatedSubqueryQueriesFromCondition(
ast.where,
);
const splitEditKeys: Set<string> = partitionKey
? new Set(partitionKey)
: new Set();
for (const csq of csqsFromCondition) {
for (const key of csq.correlation.parentField) {
splitEditKeys.add(key);
}
}
if (ast.related) {
for (const csq of ast.related) {
for (const key of csq.correlation.parentField) {
splitEditKeys.add(key);
}
}
}
const conn = source.connect(must(ast.orderBy), ast.where, splitEditKeys);
let end: Input = conn;
const {fullyAppliedFilters} = conn;
ast = uniquifyCorrelatedSubqueryConditionAliases(ast);

if (ast.start) {
end = new Skip(end, ast.start);
}

for (const csq of gatherCorrelatedSubqueryQueriesFromCondition(ast.where)) {
for (const csq of csqsFromCondition) {
end = applyCorrelatedSubQuery(csq, delegate, end);
}

Expand Down
241 changes: 0 additions & 241 deletions packages/zql/src/ivm/exists.push.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1485,144 +1485,6 @@ suite('EXISTS', () => {
`);
});

test('child edit changes correlation', () => {
const {log, data, actualStorage, pushes} = runJoinTest({
sources,
sourceContents,
joins,
pushes: [
[
'comment',
{
type: 'edit',
oldRow: {id: 'c1', issueID: 'i1', text: 'i1 c1 text'},
row: {id: 'c1', issueID: 'i2', text: 'i2 c1 text'},
},
],
],
format,
addPostJoinsOperator: (i: Input, storage: Storage) => ({
name: 'exists',
op: new Exists(i, storage, 'comments', ['id'], existsType),
}),
});

expect(data).toMatchInlineSnapshot(`
[
{
"comments": [
{
"id": "c1",
"issueID": "i2",
"text": "i2 c1 text",
},
],
"id": "i2",
"text": "second issue",
},
{
"comments": [
{
"id": "c2",
"issueID": "i3",
"text": "i3 c2 text",
},
{
"id": "c3",
"issueID": "i3",
"text": "i3 c3 text",
},
],
"id": "i3",
"text": "third issue",
},
]
`);

expect(log.filter(msg => msg[0] === 'exists')).toMatchInlineSnapshot(`
[
[
"exists",
"push",
{
"row": {
"id": "i1",
"text": "first issue",
},
"type": "remove",
},
],
[
"exists",
"push",
{
"row": {
"id": "i2",
"text": "second issue",
},
"type": "add",
},
],
]
`);

expect(pushes).toMatchInlineSnapshot(`
[
{
"node": {
"relationships": {
"comments": [
{
"relationships": {},
"row": {
"id": "c1",
"issueID": "i1",
"text": "i1 c1 text",
},
},
],
},
"row": {
"id": "i1",
"text": "first issue",
},
},
"type": "remove",
},
{
"node": {
"relationships": {
"comments": [
{
"relationships": {},
"row": {
"id": "c1",
"issueID": "i2",
"text": "i2 c1 text",
},
},
],
},
"row": {
"id": "i2",
"text": "second issue",
},
},
"type": "add",
},
]
`);

expect(actualStorage['exists']).toMatchInlineSnapshot(`
{
"row//["i1"]": 0,
"row//["i2"]": 1,
"row//["i3"]": 2,
"row//["i4"]": 0,
}
`);
});

// potential tests to add
// 1. child child change is pushed
// 2. child change to other relationship is pushed if parent has child
Expand Down Expand Up @@ -2341,109 +2203,6 @@ suite('NOT EXISTS', () => {
`);
});

test('child edit changes correlation', () => {
const {log, data, actualStorage, pushes} = runJoinTest({
sources,
sourceContents,
joins,
pushes: [
[
'comment',
{
type: 'edit',
oldRow: {id: 'c1', issueID: 'i1', text: 'i1 c1 text'},
row: {id: 'c1', issueID: 'i2', text: 'i2 c1 text'},
},
],
],
format,
addPostJoinsOperator: (i: Input, storage: Storage) => ({
name: 'exists',
op: new Exists(i, storage, 'comments', ['id'], existsType),
}),
});

expect(data).toMatchInlineSnapshot(`
[
{
"comments": [],
"id": "i1",
"text": "first issue",
},
{
"comments": [],
"id": "i4",
"text": "fourth issue",
},
]
`);

expect(log.filter(msg => msg[0] === 'exists')).toMatchInlineSnapshot(`
[
[
"exists",
"push",
{
"row": {
"id": "i1",
"text": "first issue",
},
"type": "add",
},
],
[
"exists",
"push",
{
"row": {
"id": "i2",
"text": "second issue",
},
"type": "remove",
},
],
]
`);

expect(pushes).toMatchInlineSnapshot(`
[
{
"node": {
"relationships": {
"comments": [],
},
"row": {
"id": "i1",
"text": "first issue",
},
},
"type": "add",
},
{
"node": {
"relationships": {
"comments": [],
},
"row": {
"id": "i2",
"text": "second issue",
},
},
"type": "remove",
},
]
`);

expect(actualStorage['exists']).toMatchInlineSnapshot(`
{
"row//["i1"]": 0,
"row//["i2"]": 1,
"row//["i3"]": 2,
"row//["i4"]": 0,
}
`);
});

// potential tests to add
// 1. child child change is not pushed
// 2. child change to other relationship is not pushed if parent has child
Expand Down
5 changes: 4 additions & 1 deletion packages/zql/src/ivm/exists.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ export class Exists implements Operator {
this.#relationshipName = relationshipName;
this.#input.setOutput(this);
this.#storage = storage as ExistsStorage;
assert(this.#input.getSchema().relationships[relationshipName]);
assert(
this.#input.getSchema().relationships[relationshipName],
`Input schema missing ${relationshipName}`,
);
this.#not = type === 'NOT EXISTS';
this.#parentJoinKey = parentJoinKey;

Expand Down
Loading

0 comments on commit ddcda51

Please sign in to comment.