1
1
import { logger } from '@powersync/lib-services-framework' ;
2
2
import { bson , InternalOpId } from '@powersync/service-core' ;
3
+ import { LRUCache } from 'lru-cache' ;
3
4
import { PowerSyncMongo } from './db.js' ;
4
5
5
6
export class MongoParameterCompactor {
@@ -9,24 +10,6 @@ export class MongoParameterCompactor {
9
10
private checkpoint : InternalOpId
10
11
) { }
11
12
12
- /**
13
- * This is the oldest checkpoint that we consider safe to still use. We cleanup old parameter
14
- * but no data that would be used by this checkpoint.
15
- *
16
- * Specifically, we return a checkpoint that has been available for at least 5 minutes, then
17
- * we can delete data only used for checkpoints older than that.
18
- *
19
- * @returns null if there is no safe checkpoint available.
20
- */
21
- async getActiveCheckpoint ( ) : Promise < InternalOpId | null > {
22
- const syncRules = await this . db . sync_rules . findOne ( { _id : this . group_id } ) ;
23
- if ( syncRules == null ) {
24
- return null ;
25
- }
26
-
27
- return syncRules . last_checkpoint ;
28
- }
29
-
30
13
async compact ( ) {
31
14
logger . info ( `Compacting parameters for group ${ this . group_id } up to checkpoint ${ this . checkpoint } ` ) ;
32
15
// This is the currently-active checkpoint.
@@ -53,7 +36,10 @@ export class MongoParameterCompactor {
53
36
}
54
37
) ;
55
38
56
- let lastDoc : RawParameterData | null = null ;
39
+ // The index doesn't cover sorting by key, so we keep our own cache of the last seen key.
40
+ let lastByKey = new LRUCache < string , InternalOpId > ( {
41
+ max : 10_000
42
+ } ) ;
57
43
let removeIds : InternalOpId [ ] = [ ] ;
58
44
59
45
while ( await cursor . hasNext ( ) ) {
@@ -62,19 +48,18 @@ export class MongoParameterCompactor {
62
48
if ( doc . _id >= checkpoint ) {
63
49
continue ;
64
50
}
65
- const rawDoc : RawParameterData = {
66
- _id : doc . _id ,
67
- // Serializing to a Buffer is an easy way to check for exact equality of arbitrary BSON values.
68
- data : bson . serialize ( {
69
- key : doc . key ,
70
- lookup : doc . lookup
51
+ const uniqueKey = (
52
+ bson . serialize ( {
53
+ k : doc . key ,
54
+ l : doc . lookup
71
55
} ) as Buffer
72
- } ;
73
- if ( lastDoc != null && lastDoc . data . equals ( rawDoc . data ) && lastDoc . _id < doc . _id ) {
74
- removeIds . push ( lastDoc . _id ) ;
56
+ ) . toString ( 'base64' ) ;
57
+ const previous = lastByKey . get ( uniqueKey ) ;
58
+ if ( previous != null && previous < doc . _id ) {
59
+ // We have a newer entry for the same key, so we can remove the old one.
60
+ removeIds . push ( previous ) ;
75
61
}
76
-
77
- lastDoc = rawDoc ;
62
+ lastByKey . set ( uniqueKey , doc . _id ) ;
78
63
}
79
64
80
65
if ( removeIds . length >= 1000 ) {
@@ -91,8 +76,3 @@ export class MongoParameterCompactor {
91
76
logger . info ( 'Parameter compaction completed' ) ;
92
77
}
93
78
}
94
-
95
- interface RawParameterData {
96
- _id : InternalOpId ;
97
- data : Buffer ;
98
- }
0 commit comments