Skip to content

Commit f0ea39e

Browse files
authored
DOP-3394: Upsert page documents to updated_documents collection (#816)
1 parent b8fdd7d commit f0ea39e

File tree

8 files changed

+1542
-924
lines changed

8 files changed

+1542
-924
lines changed

modules/persistence/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import AdmZip from 'adm-zip';
66
import minimist from 'minimist';
77
import * as mongodb from 'mongodb';
88
import { teardown as closeDBConnection } from './src/services/connector';
9-
import { insertPages } from './src/services/pages';
9+
import { insertAndUpdatePages } from './src/services/pages';
1010
import {
1111
insertMetadata,
1212
insertMergedMetadataEntries,
@@ -37,7 +37,7 @@ const app = async (path: string) => {
3737
// that only one build will be used per run of this module.
3838
const buildId = new mongodb.ObjectId();
3939
const metadata = await metadataFromZip(zip);
40-
await Promise.all([insertPages(buildId, zip), insertMetadata(buildId, metadata), upsertAssets(zip)]);
40+
await Promise.all([insertAndUpdatePages(buildId, zip), insertMetadata(buildId, metadata), upsertAssets(zip)]);
4141
await insertMergedMetadataEntries(buildId, metadata);
4242
// DOP-3447 clean up stale metadata
4343
await deleteStaleMetadata(metadata);

modules/persistence/package-lock.json

Lines changed: 1194 additions & 892 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

modules/persistence/package.json

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,21 @@
2626
"bson": "^4.7.0",
2727
"config": "^3.3.7",
2828
"dotenv": "^16.0.2",
29+
"fast-deep-equal": "^3.1.3",
2930
"minimist": "^1.2.6",
3031
"mongodb": "^4.9.1",
3132
"ts-jest": "^27.0.5",
3233
"typescript": "^4.4.3"
3334
},
3435
"devDependencies": {
3536
"@shelf/jest-mongodb": "^4.1.4",
37+
"@typescript-eslint/eslint-plugin": "~5.5.0",
38+
"@typescript-eslint/parser": "^5.47.0",
3639
"eslint": "^8.3.0",
3740
"eslint-config-airbnb-base": "^15.0.0",
3841
"eslint-plugin-import": "^2.25.3",
3942
"eslint-plugin-jest": "^22.1.3",
4043
"jest": "^27.2.4",
41-
"jest-environment-node": "^27.1.0",
42-
"@typescript-eslint/eslint-plugin": "~5.5.0",
43-
"@typescript-eslint/parser": "^5.47.0"
44+
"jest-environment-node": "^27.1.0"
4445
}
4546
}

modules/persistence/src/services/assets/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import AdmZip from 'adm-zip';
2-
import { bulkUpsert } from '../connector';
2+
import { bulkUpsertAll } from '../connector';
33

44
const COLLECTION_NAME = 'assets';
55

@@ -15,7 +15,7 @@ const assetsFromZip = (zip: AdmZip) => {
1515
export const upsertAssets = async (zip: AdmZip) => {
1616
try {
1717
const assets = assetsFromZip(zip);
18-
return bulkUpsert(assets, COLLECTION_NAME);
18+
return bulkUpsertAll(assets, COLLECTION_NAME);
1919
} catch (error) {
2020
console.error(`Error at upsertion time for ${COLLECTION_NAME}: ${error}`);
2121
throw error;

modules/persistence/src/services/connector/index.ts

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -50,28 +50,32 @@ export const insert = async (docs: any[], collection: string, buildId: ObjectId)
5050
}
5151
};
5252

53-
// Upsert wrapper, requires an _id field.
54-
export const bulkUpsert = async (items: Document[], collection: string) => {
55-
const upsertSession = await db();
53+
export const bulkWrite = async (operations: mongodb.AnyBulkWriteOperation[], collection: string) => {
54+
const dbSession = await db();
5655
try {
57-
const operations: mongodb.AnyBulkWriteOperation[] = [];
58-
items.forEach((item: Document) => {
59-
const op = {
60-
updateOne: {
61-
filter: { _id: item._id },
62-
update: { $set: item },
63-
upsert: true,
64-
},
65-
};
66-
operations.push(op);
67-
});
68-
return upsertSession.collection(collection).bulkWrite(operations);
56+
return dbSession.collection(collection).bulkWrite(operations);
6957
} catch (error) {
70-
console.error(`Error at bulk upsertion time for ${collection}: ${error}`);
58+
console.error(`Error at bulk write time for ${collection}: ${error}`);
7159
throw error;
7260
}
7361
};
7462

63+
// Upsert wrapper, requires an _id field.
64+
export const bulkUpsertAll = async (items: Document[], collection: string) => {
65+
const operations: mongodb.AnyBulkWriteOperation[] = [];
66+
items.forEach((item: Document) => {
67+
const op = {
68+
updateOne: {
69+
filter: { _id: item._id },
70+
update: { $set: item },
71+
upsert: true,
72+
},
73+
};
74+
operations.push(op);
75+
});
76+
return bulkWrite(operations, collection);
77+
};
78+
7579
export const deleteDocuments = async (_ids: ObjectId[], collection: string) => {
7680
const deleteSession = await db();
7781
try {
Lines changed: 190 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,31 @@
11
import AdmZip from 'adm-zip';
22
import { deserialize } from 'bson';
3-
import { ObjectId } from 'mongodb';
4-
import { insert } from '../connector';
3+
import isEqual from 'fast-deep-equal';
4+
import { AnyBulkWriteOperation, Document, FindCursor, ObjectId } from 'mongodb';
5+
import { bulkWrite, db, insert } from '../connector';
6+
7+
interface StaticAsset {
8+
checksum: string;
9+
key: string;
10+
}
11+
12+
interface PageAst {
13+
[key: string]: any;
14+
}
15+
16+
export interface UpdatedPage {
17+
page_id: string;
18+
filename: string;
19+
ast: PageAst;
20+
static_assets: StaticAsset[];
21+
22+
created_at: Date;
23+
updated_at: Date;
24+
deleted: boolean;
25+
}
526

627
const COLLECTION_NAME = 'documents';
28+
const UPDATED_AST_COLL_NAME = 'updated_documents';
729

830
// Service responsible for memoization of page level documents.
931
// Any extraneous logic performed on page level documents as part of upload should be added here
@@ -15,12 +37,175 @@ const pagesFromZip = (zip: AdmZip) => {
1537
.map((entry) => deserialize(entry.getData()));
1638
};
1739

18-
export const insertPages = async (buildId: ObjectId, zip: AdmZip) => {
40+
/**
41+
*
42+
* Finds the page documents for a given Snooty project name + branch combination.
43+
* If this is the first build for the Snooty project name + branch, no documents
44+
* will be found.
45+
*
46+
* @param pageIdPrefix - Includes the Snooty project name, user (docsworker-xlarge), and branch
47+
* @param collection - The collection to perform the find query on
48+
*/
49+
const findPrevPageDocs = async (pageIdPrefix: string, collection: string) => {
50+
const dbSession = await db();
51+
const findQuery = {
52+
page_id: { $regex: new RegExp(`^${pageIdPrefix}`) },
53+
deleted: false,
54+
};
55+
const projection = {
56+
_id: 0,
57+
page_id: 1,
58+
ast: 1,
59+
};
60+
61+
try {
62+
return dbSession.collection<UpdatedPage>(collection).find(findQuery).project(projection);
63+
} catch (error) {
64+
console.error(
65+
`Error trying to find previous page documents using prefix ${pageIdPrefix} in ${collection}}: ${error}`
66+
);
67+
throw error;
68+
}
69+
};
70+
71+
const createPageAstMapping = async (docsCursor: FindCursor) => {
72+
// Create mapping for page id and its AST
73+
const mapping: Record<string, object> = {};
74+
// Create set of all page ids. To be used for tracking unseen pages in the current build
75+
const pageIds = new Set<string>();
76+
for await (const doc of docsCursor) {
77+
mapping[doc.page_id] = doc.ast;
78+
pageIds.add(doc.page_id);
79+
}
80+
return { mapping, pageIds };
81+
};
82+
83+
class UpdatedPagesManager {
84+
currentPages: Document[];
85+
operations: AnyBulkWriteOperation[];
86+
prevPageDocsMapping: Record<string, object>;
87+
prevPageIds: Set<string>;
88+
89+
constructor(prevPageDocsMapping: Record<string, object>, prevPagesIds: Set<string>, pages: Document[]) {
90+
this.currentPages = pages;
91+
this.operations = [];
92+
this.prevPageDocsMapping = prevPageDocsMapping;
93+
this.prevPageIds = prevPagesIds;
94+
95+
const updateTime = new Date();
96+
this.checkForPageDiffs(updateTime);
97+
this.markUnseenPagesAsDeleted(updateTime);
98+
}
99+
100+
/**
101+
*
102+
* Compares the ASTs of the current pages with the previous pages. New update
103+
* operations are added whenever a diff in the page ASTs is found. Page IDs are
104+
* removed from `prevPageIds` to signal that the previous page has been "seen"
105+
*
106+
* @param updateTime - the time to set updates to
107+
*/
108+
checkForPageDiffs(updateTime: Date) {
109+
this.currentPages.forEach((page) => {
110+
// Filter out rst (non-page) files
111+
if (!page.filename.endsWith('.txt')) {
112+
return;
113+
}
114+
115+
const currentPageId = page.page_id;
116+
this.prevPageIds.delete(currentPageId);
117+
118+
// Update the document if page's current AST is different from previous build's.
119+
// New pages should always count as having a "different" AST
120+
if (!isEqual(page.ast, this.prevPageDocsMapping[currentPageId])) {
121+
const operation = {
122+
updateOne: {
123+
filter: { page_id: currentPageId },
124+
update: {
125+
$set: {
126+
page_id: currentPageId,
127+
filename: page.filename,
128+
ast: page.ast,
129+
static_assets: page.static_assets,
130+
updated_at: updateTime,
131+
deleted: false,
132+
},
133+
$setOnInsert: {
134+
created_at: updateTime,
135+
},
136+
},
137+
upsert: true,
138+
},
139+
};
140+
this.operations.push(operation);
141+
}
142+
});
143+
}
144+
145+
/**
146+
*
147+
* Marks any pages from the previous build that were not used as "deleted"
148+
*
149+
* @param updateTime - the time to set updates to
150+
*/
151+
markUnseenPagesAsDeleted(updateTime: Date) {
152+
this.prevPageIds.forEach((unseenPageId) => {
153+
const operation = {
154+
updateOne: {
155+
filter: { page_id: unseenPageId },
156+
update: {
157+
$set: {
158+
deleted: true,
159+
updated_at: updateTime,
160+
},
161+
},
162+
},
163+
};
164+
this.operations.push(operation);
165+
});
166+
}
167+
168+
getOperations() {
169+
return this.operations;
170+
}
171+
}
172+
173+
/**
174+
*
175+
* Upserts pages in separate collection. Copies of a page are created by page_id.
176+
* Updated pages within the same Snooty project name + branch should only update
177+
* related page documents.
178+
*
179+
* @param pages
180+
* @param collection
181+
*/
182+
const updatePages = async (pages: Document[], collection: string) => {
183+
if (pages.length === 0) {
184+
return;
185+
}
186+
187+
// Find all pages that share the same project name + branch. Expects page IDs
188+
// to include these two properties after parse
189+
const pageIdPrefix = pages[0].page_id.split('/').slice(0, 3).join('/');
190+
const previousPagesCursor = await findPrevPageDocs(pageIdPrefix, collection);
191+
const { mapping: prevPageDocsMapping, pageIds: prevPageIds } = await createPageAstMapping(previousPagesCursor);
192+
193+
const updatedPagesManager = new UpdatedPagesManager(prevPageDocsMapping, prevPageIds, pages);
194+
const operations = updatedPagesManager.getOperations();
195+
196+
if (operations.length > 0) {
197+
await bulkWrite(operations, collection);
198+
}
199+
};
200+
201+
export const insertAndUpdatePages = async (buildId: ObjectId, zip: AdmZip) => {
19202
try {
20-
const pages = await pagesFromZip(zip);
21-
return insert(pages, COLLECTION_NAME, buildId);
203+
const pages = pagesFromZip(zip);
204+
return Promise.all([insert(pages, COLLECTION_NAME, buildId), updatePages(pages, UPDATED_AST_COLL_NAME)]);
22205
} catch (error) {
23206
console.error(`Error at insertion time for ${COLLECTION_NAME}: ${error}`);
24207
throw error;
25208
}
26209
};
210+
211+
export const _updatePages = updatePages;

modules/persistence/tests/services/connector.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { ObjectID } from 'bson';
2-
import { bulkUpsert, db, insert } from '../../src/services/connector';
2+
import { bulkUpsertAll, db, insert } from '../../src/services/connector';
33

44
const mockConnect = jest.fn();
55
const mockDb = jest.fn();
@@ -123,7 +123,7 @@ describe('Connector module', () => {
123123
const collection = 'metadata';
124124

125125
test('it calls on collection to update one with upsert option true', async () => {
126-
await bulkUpsert([payload], collection);
126+
await bulkUpsertAll([payload], collection);
127127
expect(mockCollection).toBeCalledWith(collection);
128128
expect(mockBulkWrite).toBeCalledWith([
129129
{
@@ -136,10 +136,10 @@ describe('Connector module', () => {
136136
]);
137137
});
138138

139-
test('it throws error on updateone error', async () => {
139+
test('it throws error on bulkWrite error', async () => {
140140
mockBulkWrite.mockRejectedValueOnce(new Error('test error') as never);
141141
try {
142-
await bulkUpsert([payload], collection);
142+
await bulkUpsertAll([payload], collection);
143143
} catch (e) {
144144
expect(e.message).toEqual('test error');
145145
}

0 commit comments

Comments
 (0)