Skip to content

Commit b9b5d18

Browse files
committed
Map stream status from core
1 parent b23aff0 commit b9b5d18

File tree

5 files changed

+138
-22
lines changed

5 files changed

+138
-22
lines changed

packages/powersync_core/lib/src/database/powersync_db_mixin.dart

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import 'package:powersync_core/src/schema_logic.dart' as schema_logic;
1616
import 'package:powersync_core/src/sync/options.dart';
1717
import 'package:powersync_core/src/sync/sync_status.dart';
1818

19+
import '../sync/stream.dart';
20+
1921
mixin PowerSyncDatabaseMixin implements SqliteConnection {
2022
/// Schema used for the local database.
2123
Schema get schema;
@@ -140,6 +142,10 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
140142
return isInitialized;
141143
}
142144

145+
Future<List<SyncStreamSubscription>> get activeSubscriptions {
146+
throw UnimplementedError();
147+
}
148+
143149
Future<void> _updateHasSynced() async {
144150
// Query the database to see if any data has been synced.
145151
final result = await database.getAll(

packages/powersync_core/lib/src/sync/instruction.dart

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import 'stream.dart';
12
import 'sync_status.dart';
23

34
/// An internal instruction emitted by the sync client in the core extension in
@@ -62,12 +63,14 @@ final class CoreSyncStatus {
6263
final bool connecting;
6364
final List<SyncPriorityStatus> priorityStatus;
6465
final DownloadProgress? downloading;
66+
final List<CoreActiveStreamSubscription>? streams;
6567

6668
CoreSyncStatus({
6769
required this.connected,
6870
required this.connecting,
6971
required this.priorityStatus,
7072
required this.downloading,
73+
required this.streams,
7174
});
7275

7376
factory CoreSyncStatus.fromJson(Map<String, Object?> json) {
@@ -82,6 +85,10 @@ final class CoreSyncStatus {
8285
null => null,
8386
final raw as Map<String, Object?> => DownloadProgress.fromJson(raw),
8487
},
88+
streams: (json['stream'] as List<Object?>?)
89+
?.map((e) =>
90+
CoreActiveStreamSubscription.fromJson(e as Map<String, Object?>))
91+
.toList(),
8592
);
8693
}
8794

packages/powersync_core/lib/src/sync/mutable_sync_status.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import 'dart:async';
33
import 'package:collection/collection.dart';
44

55
import 'instruction.dart';
6+
import 'stream.dart';
67
import 'sync_status.dart';
78
import 'bucket_storage.dart';
89
import 'protocol.dart';
@@ -15,6 +16,7 @@ final class MutableSyncStatus {
1516

1617
InternalSyncDownloadProgress? downloadProgress;
1718
List<SyncPriorityStatus> priorityStatusEntries = const [];
19+
List<CoreActiveStreamSubscription> streams = const [];
1820

1921
DateTime? lastSyncedAt;
2022

packages/powersync_core/lib/src/sync/stream.dart

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import 'package:meta/meta.dart';
2+
13
import 'sync_status.dart';
24

35
abstract interface class SyncStreamDescription {
@@ -10,7 +12,7 @@ abstract interface class SyncSubscriptionDefinition
1012
bool get active;
1113
DateTime? get expiresAt;
1214
bool get hasSynced;
13-
DateTime? lastSyncedAt;
15+
DateTime? get lastSyncedAt;
1416
}
1517

1618
abstract interface class SyncStream extends SyncStreamDescription {
@@ -26,3 +28,56 @@ abstract interface class SyncStreamSubscription
2628
Future<void> waitForFirstSync();
2729
Future<void> unsubscribe({bool immediately = false});
2830
}
31+
32+
/// An `ActiveStreamSubscription` as part of the sync status in Rust.
33+
@internal
34+
final class CoreActiveStreamSubscription implements SyncSubscriptionDefinition {
35+
@override
36+
final String name;
37+
@override
38+
final Map<String, Object?>? parameters;
39+
final BucketPriority priority;
40+
final List<String> associatedBuckets;
41+
@override
42+
final bool active;
43+
final bool isDefault;
44+
@override
45+
final DateTime? expiresAt;
46+
@override
47+
final DateTime? lastSyncedAt;
48+
49+
@override
50+
bool get hasSynced => lastSyncedAt != null;
51+
52+
CoreActiveStreamSubscription._({
53+
required this.name,
54+
required this.parameters,
55+
required this.priority,
56+
required this.associatedBuckets,
57+
required this.active,
58+
required this.isDefault,
59+
required this.expiresAt,
60+
required this.lastSyncedAt,
61+
});
62+
63+
factory CoreActiveStreamSubscription.fromJson(Map<String, Object?> json) {
64+
return CoreActiveStreamSubscription._(
65+
name: json['name'] as String,
66+
parameters: json['parameters'] as Map<String, Object?>,
67+
priority: BucketPriority(json['priority'] as int),
68+
associatedBuckets: (json['associated_buckets'] as List).cast(),
69+
active: json['active'] as bool,
70+
isDefault: json['is_default'] as bool,
71+
expiresAt: switch (json['expires_at']) {
72+
null => null,
73+
final timestamp as int =>
74+
DateTime.fromMillisecondsSinceEpoch(timestamp * 1000),
75+
},
76+
lastSyncedAt: switch (json['last_synced_at']) {
77+
null => null,
78+
final timestamp as int =>
79+
DateTime.fromMillisecondsSinceEpoch(timestamp * 1000),
80+
},
81+
);
82+
}
83+
}

packages/powersync_core/lib/src/sync/sync_status.dart

Lines changed: 67 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import 'dart:math';
22

33
import 'package:collection/collection.dart';
44
import 'package:meta/meta.dart';
5+
import '../database/powersync_database.dart';
56

67
import 'bucket_storage.dart';
78
import 'protocol.dart';
@@ -55,7 +56,7 @@ final class SyncStatus {
5556

5657
final List<SyncPriorityStatus> priorityStatusEntries;
5758

58-
final List<SyncStreamStatus>? activeSubscriptions;
59+
final List<CoreActiveStreamSubscription>? _internalSubscriptions;
5960

6061
const SyncStatus({
6162
this.connected = false,
@@ -68,8 +69,8 @@ final class SyncStatus {
6869
this.downloadError,
6970
this.uploadError,
7071
this.priorityStatusEntries = const [],
71-
this.activeSubscriptions,
72-
});
72+
List<CoreActiveStreamSubscription>? streamSubscriptions,
73+
}) : _internalSubscriptions = streamSubscriptions;
7374

7475
@override
7576
bool operator ==(Object other) {
@@ -82,8 +83,10 @@ final class SyncStatus {
8283
other.uploadError == uploadError &&
8384
other.lastSyncedAt == lastSyncedAt &&
8485
other.hasSynced == hasSynced &&
85-
_statusEquality.equals(
86+
_listEquality.equals(
8687
other.priorityStatusEntries, priorityStatusEntries) &&
88+
_listEquality.equals(
89+
other._internalSubscriptions, _internalSubscriptions) &&
8790
other.downloadProgress == downloadProgress);
8891
}
8992

@@ -114,6 +117,18 @@ final class SyncStatus {
114117
);
115118
}
116119

120+
/// All sync streams currently being tracked in this subscription.
121+
///
122+
/// This returns null when the sync stream is currently being opened and we
123+
/// don't have reliable information about all included streams yet (in that
124+
/// state, [PowerSyncDatabase.activeSubscriptions] can still be used to
125+
/// resolve known subscriptions locally).
126+
Iterable<SyncStreamStatus>? get activeSubscriptions {
127+
return _internalSubscriptions?.map((subscription) {
128+
return SyncStreamStatus._(subscription, downloadProgress);
129+
});
130+
}
131+
117132
/// Get the current [downloadError] or [uploadError].
118133
Object? get anyError {
119134
return downloadError ?? uploadError;
@@ -153,6 +168,21 @@ final class SyncStatus {
153168
);
154169
}
155170

171+
/// If the [stream] appears in [activeSubscriptions], returns the current
172+
/// status for that stream.
173+
SyncStreamStatus? statusFor(SyncStreamDescription stream) {
174+
final raw = _internalSubscriptions?.firstWhereOrNull(
175+
(e) =>
176+
e.name == stream.name &&
177+
_mapEquality.equals(e.parameters, stream.parameters),
178+
);
179+
180+
if (raw == null) {
181+
return null;
182+
}
183+
return SyncStreamStatus._(raw, downloadProgress);
184+
}
185+
156186
@override
157187
int get hashCode {
158188
return Object.hash(
@@ -163,8 +193,9 @@ final class SyncStatus {
163193
uploadError,
164194
downloadError,
165195
lastSyncedAt,
166-
_statusEquality.hash(priorityStatusEntries),
196+
_listEquality.hash(priorityStatusEntries),
167197
downloadProgress,
198+
_listEquality.hash(_internalSubscriptions),
168199
);
169200
}
170201

@@ -173,21 +204,20 @@ final class SyncStatus {
173204
return "SyncStatus<connected: $connected connecting: $connecting downloading: $downloading (progress: $downloadProgress) uploading: $uploading lastSyncedAt: $lastSyncedAt, hasSynced: $hasSynced, error: $anyError>";
174205
}
175206

176-
// This should be a ListEquality<SyncPriorityStatus>, but that appears to
177-
// cause weird type errors with DDC (but only after hot reloads?!)
178-
static const _statusEquality = ListEquality<Object?>();
207+
static const _listEquality = ListEquality<Object?>();
208+
static const _mapEquality = MapEquality<Object?, Object?>();
179209
}
180210

181211
final class SyncStreamStatus {
182-
final SyncSubscriptionDefinition subscription;
183-
final BucketPriority priority;
212+
final ProgressWithOperations? progress;
213+
final CoreActiveStreamSubscription _internal;
184214

185-
final bool isDefault;
186-
final ProgressWithOperations progress;
215+
SyncSubscriptionDefinition get subscription => _internal;
216+
BucketPriority get priority => _internal.priority;
217+
bool get isDefault => _internal.isDefault;
187218

188-
@internal
189-
SyncStreamStatus(
190-
this.subscription, this.priority, this.isDefault, this.progress);
219+
SyncStreamStatus._(this._internal, SyncDownloadProgress? progress)
220+
: progress = progress?._internal._forStream(_internal);
191221
}
192222

193223
/// The priority of a PowerSync bucket.
@@ -304,13 +334,23 @@ final class InternalSyncDownloadProgress extends ProgressWithOperations {
304334
/// Sums the total target and completed operations for all buckets up until
305335
/// the given [priority] (inclusive).
306336
ProgressWithOperations untilPriority(BucketPriority priority) {
307-
final (total, downloaded) =
308-
buckets.values.where((e) => e.priority >= priority).fold(
337+
final (total, downloaded) = buckets.values
338+
.where((e) => e.priority >= priority)
339+
.fold((0, 0), _addProgress);
340+
341+
return ProgressWithOperations._(total, downloaded);
342+
}
343+
344+
ProgressWithOperations _forStream(CoreActiveStreamSubscription subscription) {
345+
final (total, downloaded) = subscription.associatedBuckets.fold(
309346
(0, 0),
310-
(prev, entry) {
311-
final downloaded = entry.sinceLast;
312-
final total = entry.targetCount - entry.atLast;
313-
return (prev.$1 + total, prev.$2 + downloaded);
347+
(prev, bucket) {
348+
final foundProgress = buckets[bucket];
349+
if (foundProgress == null) {
350+
return prev;
351+
}
352+
353+
return _addProgress(prev, foundProgress);
314354
},
315355
);
316356

@@ -356,6 +396,12 @@ final class InternalSyncDownloadProgress extends ProgressWithOperations {
356396
}
357397

358398
static const _mapEquality = MapEquality<Object?, Object?>();
399+
400+
(int, int) _addProgress((int, int) prev, BucketProgress entry) {
401+
final downloaded = entry.sinceLast;
402+
final total = entry.targetCount - entry.atLast;
403+
return (prev.$1 + total, prev.$2 + downloaded);
404+
}
359405
}
360406

361407
/// Information about a progressing download.

0 commit comments

Comments
 (0)