Skip to content

feat: Expose upload/download errors, credential management, last synced value from core extension #7

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
7 changes: 7 additions & 0 deletions PowerSync/PowerSync.Common/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 0.0.2-alpha.2

- Updated core extension to v0.3.14
- Loading last synced time from core extension
- Expose upload and download errors on SyncStatus
- Improved credentials management and error handling. Credentials are invalidated when they expire or become invalid based on responses from the PowerSync service. The frequency of credential fetching has been reduced as a result of this work.

## 0.0.2-alpha.1

- Introduce package. Support for Desktop .NET use cases.
Expand Down
25 changes: 20 additions & 5 deletions PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public interface IPowerSyncDatabase : IEventStream<PowerSyncDBEvent>

public class PowerSyncDatabase : EventStream<PowerSyncDBEvent>, IPowerSyncDatabase
{
private static readonly int FULL_SYNC_PRIORITY = 2147483647;

public IDBAdapter Database;
private Schema schema;
Expand Down Expand Up @@ -231,21 +232,35 @@ private async Task LoadVersion()
}
}

private record LastSyncedResult(string? synced_at);
private record LastSyncedResult(int? priority, string? last_synced_at);

protected async Task UpdateHasSynced()
{
var result = await Database.Get<LastSyncedResult>("SELECT powersync_last_synced_at() as synced_at");
var results = await Database.GetAll<LastSyncedResult>(
"SELECT priority, last_synced_at FROM ps_sync_state ORDER BY priority DESC"
);

var hasSynced = result.synced_at != null;
DateTime? syncedAt = result.synced_at != null ? DateTime.Parse(result.synced_at + "Z") : null;
DateTime? lastCompleteSync = null;

// TODO: Will be altered/extended when reporting individual sync priority statuses is supported
foreach (var result in results)
{
var parsedDate = DateTime.Parse(result.last_synced_at + "Z");

if (result.priority == FULL_SYNC_PRIORITY)
{
// This lowest-possible priority represents a complete sync.
lastCompleteSync = parsedDate;
}
}

var hasSynced = lastCompleteSync != null;
if (hasSynced != CurrentStatus.HasSynced)
{
CurrentStatus = new SyncStatus(new SyncStatusOptions(CurrentStatus.Options)
{
HasSynced = hasSynced,
LastSyncedAt = syncedAt
LastSyncedAt = lastCompleteSync,
});

Emit(new PowerSyncDBEvent { StatusChanged = CurrentStatus });
Expand Down
50 changes: 45 additions & 5 deletions PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace PowerSync.Common.Client.Sync.Stream;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Text.RegularExpressions;

using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
Expand All @@ -29,7 +30,6 @@ public class RequestDetails

public class Remote
{
private static int REFRESH_CREDENTIALS_SAFETY_PERIOD_MS = 30_000;
private readonly HttpClient httpClient;
protected IPowerSyncBackendConnector connector;

Expand All @@ -41,18 +41,48 @@ public Remote(IPowerSyncBackendConnector connector)
this.connector = connector;
}

/// <summary>
/// Get credentials currently cached, or fetch new credentials if none are available.
/// These credentials may have expired already.
/// </summary>
public async Task<PowerSyncCredentials?> GetCredentials()
{
if (credentials?.ExpiresAt > DateTime.Now.AddMilliseconds(REFRESH_CREDENTIALS_SAFETY_PERIOD_MS))
if (credentials != null)
{
return credentials;
}
return await PrefetchCredentials();
}

credentials = await connector.FetchCredentials();

/// <summary>
/// Fetch a new set of credentials and cache it.
/// Until this call succeeds, GetCredentials will still return the old credentials.
/// This may be called before the current credentials have expired.
/// </summary>
public async Task<PowerSyncCredentials?> PrefetchCredentials()
{
credentials = await FetchCredentials();
return credentials;
}

/// <summary>
/// Get credentials for PowerSync.
/// This should always fetch a fresh set of credentials - don't use cached values.
/// </summary>
public async Task<PowerSyncCredentials?> FetchCredentials()
{
return await connector.FetchCredentials();
}

/// <summary>
/// Immediately invalidate credentials.
/// This may be called when the current credentials have expired.
/// </summary>
public void InvalidateCredentials()
{
credentials = null;
}

static string GetUserAgent()
{
object[] attributes = Assembly.GetExecutingAssembly()
Expand All @@ -76,6 +106,11 @@ public async Task<T> Get<T>(string path, Dictionary<string, string>? headers = n
using var client = new HttpClient();
var response = await client.SendAsync(request);

if (response.StatusCode == System.Net.HttpStatusCode.Unauthorized)
{
InvalidateCredentials();
}

if (!response.IsSuccessStatusCode)
{
var errorMessage = await response.Content.ReadAsStringAsync();
Expand All @@ -95,7 +130,12 @@ public async Task<T> Get<T>(string path, Dictionary<string, string>? headers = n
{
throw new HttpRequestException($"HTTP {response.StatusCode}: No content");
}
else

if (response.StatusCode == System.Net.HttpStatusCode.Unauthorized)
{
InvalidateCredentials();
}

if (!response.IsSuccessStatusCode)
{
var errorText = await response.Content.ReadAsStringAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,10 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio
UpdateSyncStatus(new SyncStatusOptions
{
Connected = false,
DataFlow = new SyncDataFlowStatus
{
DownloadError = ex
}
});

// On error, wait a little before retrying
Expand Down Expand Up @@ -466,7 +470,13 @@ protected async Task<StreamingSyncIterationResult> StreamingSyncIteration(Cancel
{
Connected = true,
LastSyncedAt = DateTime.Now,
DataFlow = new SyncDataFlowStatus { Downloading = false }
DataFlow = new SyncDataFlowStatus
{
Downloading = false
}
}, new UpdateSyncStatusOptions
{
ClearDownloadError = true
});

}
Expand Down Expand Up @@ -539,13 +549,21 @@ protected async Task<StreamingSyncIterationResult> StreamingSyncIteration(Cancel
{
// Connection would be closed automatically right after this
logger.LogDebug("Token expiring; reconnect");
Options.Remote.InvalidateCredentials();

// For a rare case where the backend connector does not update the token
// (uses the same one), this should have some delay.
//
await DelayRetry();
return new StreamingSyncIterationResult { Retry = true };
}
else if (remainingSeconds < 30)
{
logger.LogDebug("Token will expire soon; reconnect");
// Pre-emptively refresh the token
Options.Remote.InvalidateCredentials();
return new StreamingSyncIterationResult { Retry = true };
}
TriggerCrudUpload();
}
else
Expand All @@ -557,8 +575,13 @@ protected async Task<StreamingSyncIterationResult> StreamingSyncIteration(Cancel
UpdateSyncStatus(new SyncStatusOptions
{
Connected = true,
LastSyncedAt = DateTime.Now
});
LastSyncedAt = DateTime.Now,
},
new UpdateSyncStatusOptions
{
ClearDownloadError = true
}
);
}
else if (validatedCheckpoint == targetCheckpoint)
{
Expand All @@ -584,8 +607,12 @@ protected async Task<StreamingSyncIterationResult> StreamingSyncIteration(Cancel
LastSyncedAt = DateTime.Now,
DataFlow = new SyncDataFlowStatus
{
Downloading = false
Downloading = false,
}
},
new UpdateSyncStatusOptions
{
ClearDownloadError = true
});
}
}
Expand Down Expand Up @@ -655,6 +682,14 @@ await locks.ObtainLock(new LockOptions<Task>

checkedCrudItem = nextCrudItem;
await Options.UploadCrud();
UpdateSyncStatus(new SyncStatusOptions
{
},
new UpdateSyncStatusOptions
{
ClearUploadError = true
});

}
else
{
Expand All @@ -666,7 +701,14 @@ await locks.ObtainLock(new LockOptions<Task>
catch (Exception ex)
{
checkedCrudItem = null;
UpdateSyncStatus(new SyncStatusOptions { DataFlow = new SyncDataFlowStatus { Uploading = false } });
UpdateSyncStatus(new SyncStatusOptions
{
DataFlow = new SyncDataFlowStatus
{
Uploading = false,
UploadError = ex
}
});

await DelayRetry();

Expand Down Expand Up @@ -700,7 +742,10 @@ public async Task WaitForReady()
await Task.CompletedTask;
}

protected void UpdateSyncStatus(SyncStatusOptions options)
protected record UpdateSyncStatusOptions(
bool? ClearDownloadError = null, bool? ClearUploadError = null
);
protected void UpdateSyncStatus(SyncStatusOptions options, UpdateSyncStatusOptions? updateOptions = null)
{
var updatedStatus = new SyncStatus(new SyncStatusOptions
{
Expand All @@ -710,7 +755,9 @@ protected void UpdateSyncStatus(SyncStatusOptions options)
DataFlow = new SyncDataFlowStatus
{
Uploading = options.DataFlow?.Uploading ?? SyncStatus.DataFlowStatus.Uploading,
Downloading = options.DataFlow?.Downloading ?? SyncStatus.DataFlowStatus.Downloading
Downloading = options.DataFlow?.Downloading ?? SyncStatus.DataFlowStatus.Downloading,
DownloadError = updateOptions?.ClearDownloadError == true ? null : options.DataFlow?.DownloadError ?? SyncStatus.DataFlowStatus.DownloadError,
UploadError = updateOptions?.ClearUploadError == true ? null : options.DataFlow?.UploadError ?? SyncStatus.DataFlowStatus.UploadError,
}
});

Expand Down
16 changes: 15 additions & 1 deletion PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,20 @@ public class SyncDataFlowStatus

[JsonProperty("uploading")]
public bool Uploading { get; set; } = false;

/// <summary>
/// Error during downloading (including connecting).
/// Cleared on the next successful data download.
/// </summary>
[JsonProperty("downloadError")]
public Exception? DownloadError { get; set; } = null;

/// <summary>
/// Error during uploading.
/// Cleared on the next successful upload.
/// </summary>
[JsonProperty("uploadError")]
public Exception? UploadError { get; set; } = null;
}

public class SyncStatusOptions
Expand Down Expand Up @@ -73,7 +87,7 @@ public bool IsEqual(SyncStatus status)
public string GetMessage()
{
var dataFlow = DataFlowStatus;
return $"SyncStatus<connected: {Connected} connecting: {Connecting} lastSyncedAt: {LastSyncedAt} hasSynced: {HasSynced}. Downloading: {dataFlow.Downloading}. Uploading: {dataFlow.Uploading}>";
return $"SyncStatus<connected: {Connected} connecting: {Connecting} lastSyncedAt: {LastSyncedAt} hasSynced: {HasSynced}. Downloading: {dataFlow.Downloading}. Uploading: {dataFlow.Uploading}. UploadError: {dataFlow.UploadError}, DownloadError?: {dataFlow.DownloadError}>";
}

public string ToJSON()
Expand Down
Loading