@@ -25,9 +25,6 @@ public class SqliteBucketStorage : EventStream<BucketStorageEvent>, IBucketStora
2525 private readonly HashSet < string > tableNames ;
2626 private string ? clientId ;
2727
28- private static readonly int COMPACT_OPERATION_INTERVAL = 1000 ;
29- private int compactCounter = COMPACT_OPERATION_INTERVAL ;
30-
3128 private ILogger logger ;
3229
3330 private CancellationTokenSource updateCts ;
@@ -95,50 +92,6 @@ public string GetMaxOpId()
9592 return MAX_OP_ID ;
9693 }
9794
98- public void StartSession ( ) { }
99-
100- public async Task < BucketState [ ] > GetBucketStates ( )
101- {
102- return
103- await db . GetAll < BucketState > ( "SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0 AND name != '$local'" ) ;
104- }
105-
106- public async Task SaveSyncData ( SyncDataBatch batch )
107- {
108- await db . WriteTransaction ( async tx =>
109- {
110- int count = 0 ;
111- foreach ( var b in batch . Buckets )
112- {
113- var result = await tx . Execute ( "INSERT INTO powersync_operations(op, data) VALUES(?, ?)" ,
114- [ "save" , JsonConvert . SerializeObject ( new { buckets = new [ ] { JsonConvert . DeserializeObject ( b . ToJSON ( ) ) } } ) ] ) ;
115- logger . LogDebug ( "saveSyncData {message}" , JsonConvert . SerializeObject ( result ) ) ;
116- count += b . Data . Length ;
117- }
118- compactCounter += count ;
119- } ) ;
120- }
121-
122- public async Task RemoveBuckets ( string [ ] buckets )
123- {
124- foreach ( var bucket in buckets )
125- {
126- await DeleteBucket ( bucket ) ;
127- }
128- }
129-
130- private async Task DeleteBucket ( string bucket )
131- {
132- await db . WriteTransaction ( async tx =>
133- {
134- await tx . Execute ( "INSERT INTO powersync_operations(op, data) VALUES(?, ?)" ,
135- [ "delete_bucket" , bucket ] ) ;
136- } ) ;
137-
138- logger . LogDebug ( "Done deleting bucket" ) ;
139- pendingBucketDeletes = true ;
140- }
141-
14295 private record LastSyncedResult ( string ? synced_at ) ;
14396 public async Task < bool > HasCompletedSync ( )
14497 {
@@ -150,71 +103,6 @@ public async Task<bool> HasCompletedSync()
150103 return hasCompletedSync ;
151104 }
152105
153- public async Task < SyncLocalDatabaseResult > SyncLocalDatabase ( Checkpoint checkpoint )
154- {
155- var validation = await ValidateChecksums ( checkpoint ) ;
156- if ( ! validation . CheckpointValid )
157- {
158- logger . LogError ( "Checksums failed for {failures}" , JsonConvert . SerializeObject ( validation . CheckpointFailures ) ) ;
159- foreach ( var failedBucket in validation . CheckpointFailures ?? [ ] )
160- {
161- await DeleteBucket ( failedBucket ) ;
162- }
163- return new SyncLocalDatabaseResult
164- {
165- Ready = false ,
166- CheckpointValid = false ,
167- CheckpointFailures = validation . CheckpointFailures
168- } ;
169- }
170-
171- var bucketNames = checkpoint . Buckets . Select ( b => b . Bucket ) . ToArray ( ) ;
172- await db . WriteTransaction ( async tx =>
173- {
174- await tx . Execute (
175- "UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))" ,
176- [ checkpoint . LastOpId , JsonConvert . SerializeObject ( bucketNames ) ]
177- ) ;
178-
179- if ( checkpoint . WriteCheckpoint != null )
180- {
181- await tx . Execute (
182- "UPDATE ps_buckets SET last_op = ? WHERE name = '$local'" ,
183- [ checkpoint . WriteCheckpoint ]
184- ) ;
185- }
186- } ) ;
187-
188- var valid = await UpdateObjectsFromBuckets ( checkpoint ) ;
189- if ( ! valid )
190- {
191- logger . LogDebug ( "Not at a consistent checkpoint - cannot update local db" ) ;
192- return new SyncLocalDatabaseResult
193- {
194- Ready = false ,
195- CheckpointValid = true
196- } ;
197- }
198-
199- await ForceCompact ( ) ;
200-
201- return new SyncLocalDatabaseResult
202- {
203- Ready = true ,
204- CheckpointValid = true
205- } ;
206- }
207-
208- private async Task < bool > UpdateObjectsFromBuckets ( Checkpoint checkpoint )
209- {
210- return await db . WriteTransaction ( async tx =>
211- {
212- var result = await tx . Execute ( "INSERT INTO powersync_operations(op, data) VALUES(?, ?)" ,
213- [ "sync_local" , "" ] ) ;
214-
215- return result . InsertId == 1 ;
216- } ) ;
217- }
218106
219107 private record ResultResult ( object result ) ;
220108
@@ -227,75 +115,6 @@ public class ResultDetail
227115 public List < string > ? FailedBuckets { get ; set ; }
228116 }
229117
230- public async Task < SyncLocalDatabaseResult > ValidateChecksums (
231- Checkpoint checkpoint )
232- {
233- var result = await db . Get < ResultResult > ( "SELECT powersync_validate_checkpoint(?) as result" ,
234- [ JsonConvert . SerializeObject ( checkpoint ) ] ) ;
235-
236- logger . LogDebug ( "validateChecksums result item {message}" , JsonConvert . SerializeObject ( result ) ) ;
237-
238- if ( result == null ) return new SyncLocalDatabaseResult { CheckpointValid = false , Ready = false } ;
239-
240- var resultDetail = JsonConvert . DeserializeObject < ResultDetail > ( result . result . ToString ( ) ?? "{}" ) ;
241-
242- if ( resultDetail ? . Valid == true )
243- {
244- return new SyncLocalDatabaseResult { Ready = true , CheckpointValid = true } ;
245- }
246- else
247- {
248- return new SyncLocalDatabaseResult
249- {
250- CheckpointValid = false ,
251- Ready = false ,
252- CheckpointFailures = resultDetail ? . FailedBuckets ? . ToArray ( ) ?? [ ]
253- } ;
254- }
255- }
256-
257- /// <summary>
258- /// Force a compact operation, primarily for testing purposes.
259- /// </summary>
260- public async Task ForceCompact ( )
261- {
262- compactCounter = COMPACT_OPERATION_INTERVAL ;
263- pendingBucketDeletes = true ;
264-
265- await AutoCompact ( ) ;
266- }
267-
268- public async Task AutoCompact ( )
269- {
270- await DeletePendingBuckets ( ) ;
271- await ClearRemoveOps ( ) ;
272- }
273-
274- private async Task DeletePendingBuckets ( )
275- {
276- if ( ! pendingBucketDeletes ) return ;
277-
278- await db . WriteTransaction ( async tx =>
279- {
280- await tx . Execute ( "INSERT INTO powersync_operations(op, data) VALUES (?, ?)" ,
281- [ "delete_pending_buckets" , "" ] ) ;
282- } ) ;
283-
284- pendingBucketDeletes = false ;
285- }
286-
287- private async Task ClearRemoveOps ( )
288- {
289- if ( compactCounter < COMPACT_OPERATION_INTERVAL ) return ;
290-
291- await db . WriteTransaction ( async tx =>
292- {
293- await tx . Execute ( "INSERT INTO powersync_operations(op, data) VALUES (?, ?)" ,
294- [ "clear_remove_ops" , "" ] ) ;
295- } ) ;
296-
297- compactCounter = 0 ;
298- }
299118
300119 private record TargetOpResult ( string target_op ) ;
301120 private record SequenceResult ( int seq ) ;
@@ -431,12 +250,6 @@ public async Task<bool> HasCrud()
431250 return await db . GetOptional < object > ( "SELECT 1 as ignore FROM ps_crud LIMIT 1" ) != null ;
432251 }
433252
434- public async Task SetTargetCheckpoint ( Checkpoint checkpoint )
435- {
436- // No Op
437- await Task . CompletedTask ;
438- }
439-
440253 record ControlResult ( string ? r ) ;
441254
442255 public async Task < string > Control ( string op , object ? payload = null )
0 commit comments