Skip to content

Commit 1ea85ab

Browse files
feat: Added streaming for fetching and aggregating MongoDB data for batch insertion.
1 parent 6a0e0b3 commit 1ea85ab

File tree

4 files changed

+64
-27
lines changed

4 files changed

+64
-27
lines changed

src/cluster.js

-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ function initializeCluster({ backgroundTaskFile, clusterSize, onMessage }) {
2626
})
2727

2828
child.on('message', (message) => {
29-
if (message !== 'item-done') return
3029
onMessage(message)
3130
})
3231

src/data-streaming.js

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import { StreamCache } from './stram-cache.js'
2+
import { getMongoConnection } from './db.js'
3+
4+
const ITEMS_PER_PAGE = 4000
5+
6+
const mongoDB = await getMongoConnection()
7+
const stream = mongoDB.students.find().stream()
8+
const cache = new StreamCache(stream, ITEMS_PER_PAGE)
9+
10+
cache.stream().on('data', (data) => {
11+
process.send(JSON.parse(data));
12+
});

src/index.js

+19-26
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,16 @@
11
import { initialize } from "./cluster.js"
22
import { getMongoConnection, getPostgresConnection } from './db.js'
33
import cliProgress from 'cli-progress'
4-
import { setTimeout } from 'node:timers/promises'
54
const mongoDB = await getMongoConnection()
65
const postgresDB = await getPostgresConnection()
7-
const ITEMS_PER_PAGE = 4000
8-
const CLUSTER_SIZE = 99
6+
// const ITEMS_PER_PAGE = 4000
7+
const CLUSTER_SIZE = 8
98
const TASK_FILE = new URL('./background-task.js', import.meta.url).pathname
9+
const DATA_STREAMING_FILE = new URL('./data-streaming.js', import.meta.url).pathname
1010

1111
// console.log(`there was ${await postgresDB.students.count()} items on Postgres, deleting all...`)
1212
await postgresDB.students.deleteAll()
1313

14-
async function* getAllPagedData(itemsPerPage, page = 0) {
15-
16-
const data = mongoDB.students.find().skip(page).limit(itemsPerPage)
17-
const items = await data.toArray()
18-
if (!items.length) return
19-
20-
yield items
21-
22-
yield* getAllPagedData(itemsPerPage, page += itemsPerPage)
23-
}
24-
2514
const total = await mongoDB.students.countDocuments()
2615
// console.log(`total items on DB: ${total}`)
2716

@@ -37,25 +26,29 @@ const cp = initialize(
3726
backgroundTaskFile: TASK_FILE,
3827
clusterSize: CLUSTER_SIZE,
3928
amountToBeProcessed: total,
40-
async onMessage(message) {
41-
progress.increment()
29+
async onMessage(cumulativeProcessed) {
30+
totalProcessed += cumulativeProcessed;
31+
progress.update(totalProcessed);
4232

43-
if (++totalProcessed !== total) return
33+
if (totalProcessed !== total) return
4434
// console.log(`all ${amountToBeProcessed} processed! Exiting...`)
4535
progress.stop()
4636
cp.killAll()
4737

48-
const insertedOnSQLite = await postgresDB.students.count()
49-
console.log(`total on MongoDB ${total} and total on PostGres ${insertedOnSQLite}`)
50-
console.log(`are the same? ${total === insertedOnSQLite ? 'yes' : 'no'}`)
38+
const insertedOnSQLPostGres = await postgresDB.students.count()
39+
console.log(`total on MongoDB ${total} and total on PostGres ${insertedOnSQLPostGres}`)
40+
console.log(`are the same? ${total === insertedOnSQLPostGres ? 'yes' : 'no'}`)
5141
process.exit()
52-
5342
}
5443
}
5544
)
56-
await setTimeout(1000)
57-
58-
for await (const data of getAllPagedData(ITEMS_PER_PAGE)) {
59-
cp.sendToChild(data)
60-
}
6145

46+
initialize(
47+
{
48+
backgroundTaskFile: DATA_STREAMING_FILE,
49+
clusterSize: 1,
50+
async onMessage(message) {
51+
cp.sendToChild(message)
52+
}
53+
}
54+
)

src/stram-cache.js

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import { Readable } from 'stream';
2+
3+
export class StreamCache {
4+
constructor(inputStream, cacheThreshold = 4000) {
5+
this.cacheStream = new Readable({
6+
read() { }
7+
});
8+
this.cache = [];
9+
this.cacheThreshold = cacheThreshold;
10+
inputStream.on('data', this._addDataToCache);
11+
inputStream.on('end', () => {
12+
this._emitCache();
13+
this.cacheStream.emit('end');
14+
});
15+
}
16+
17+
_addDataToCache = (data) => {
18+
this.cache.push(data);
19+
if (this.cache.length >= this.cacheThreshold) {
20+
this._emitCache();
21+
}
22+
}
23+
24+
_emitCache() {
25+
if (!this.cache.length) return
26+
this.cacheStream.push(JSON.stringify(this.cache));
27+
this.cache = [];
28+
}
29+
30+
stream() {
31+
return this.cacheStream;
32+
}
33+
}

0 commit comments

Comments
 (0)