Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RNET-1083: Add support for progress estimate on progress notifications #3479

Merged
merged 27 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
* Automatic client reset recovery now does a better job of recovering changes when changesets were downloaded from the server after the unuploaded local changes were committed. If the local Realm happened to be fully up to date with the server prior to the client reset, automatic recovery should now always produce exactly the same state as if no client reset was involved. (Core 13.24.1)
* Exceptions thrown during bootstrap application will now be surfaced to the user rather than terminating the program with an unhandled exception. (Core 13.25.0)
* Allow the using `>`, `>=`, `<`, `<=` operators in `Realm.Filter()` queries for string constants. This is a case sensitive lexicographical comparison. Improved performance of RQL (`.Filter()`) queries on a non-linked string property using: >, >=, <, <=, operators and fixed behaviour that a null string should be evaluated as less than everything, previously nulls were not matched. (Core 13.26.0-14-gdf25f)
* Added `SyncProgress.ProgressEstimate`, a float value between 0.0 and 1.0 that expresses the percentage estimate of the current progress. At the same time `SyncProgress.TransferredBytes` and `SyncProgress.TransferableBytes` have been marked as obsolete in favour of `SyncProgress.ProgressEstimate`. (Issue [#3478](https://github.com/realm/realm-dotnet/issues/3478]))
* `Session.GetProgressObservable` can now be used with Flexible Sync. (Issue [#3478](https://github.com/realm/realm-dotnet/issues/3478]))

### Fixed
* Automatic client reset recovery would duplicate insertions in a list when recovering a write which made an unrecoverable change to a list (i.e. modifying or deleting a pre-existing entry), followed by a subscription change, followed by a write which added an entry to the list. (Core 13.24.0)
Expand Down
6 changes: 3 additions & 3 deletions Realm/Realm/Handles/SessionHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public delegate void SessionErrorCallback(IntPtr session_handle_ptr,
IntPtr managed_sync_config_handle);

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void SessionProgressCallback(IntPtr progress_token_ptr, ulong transferred_bytes, ulong transferable_bytes);
public delegate void SessionProgressCallback(IntPtr progress_token_ptr, ulong transferred_bytes, ulong transferable_bytes, double progressEstimate);

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
public delegate void SessionWaitCallback(IntPtr task_completion_source, int error_code, PrimitiveValue message);
Expand Down Expand Up @@ -405,10 +405,10 @@ private static IntPtr NotifyAfterClientReset(IntPtr beforeFrozen, IntPtr after,
}

[MonoPInvokeCallback(typeof(NativeMethods.SessionProgressCallback))]
private static void HandleSessionProgress(IntPtr tokenPtr, ulong transferredBytes, ulong transferableBytes)
private static void HandleSessionProgress(IntPtr tokenPtr, ulong transferredBytes, ulong transferableBytes, double progressEstimate)
{
var token = (ProgressNotificationToken?)GCHandle.FromIntPtr(tokenPtr).Target;
token?.Notify(transferredBytes, transferableBytes);
token?.Notify(transferredBytes, transferableBytes, progressEstimate);
}

[MonoPInvokeCallback(typeof(NativeMethods.SessionWaitCallback))]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ public ProgressNotificationToken(Action<SyncProgress> observer, Func<GCHandle, u
}
}

public void Notify(ulong transferredBytes, ulong transferableBytes)
public void Notify(ulong transferredBytes, ulong transferableBytes, double progressEstimate)
{
Task.Run(() =>
{
try
{
_observer(new SyncProgress(transferredBytes, transferableBytes));
_observer(new SyncProgress(transferredBytes, transferableBytes, progressEstimate));
}
catch (Exception ex)
{
Expand Down
19 changes: 16 additions & 3 deletions Realm/Realm/Sync/ProgressNotifications/SyncProgress.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@
//
////////////////////////////////////////////////////////////////////////////

using System;

namespace Realms.Sync
{
/// <summary>
/// A struct containing information about the progress state at a given instant.
/// </summary>
public struct SyncProgress
public readonly struct SyncProgress
{
/// <summary>
/// Gets the number of bytes that have been transferred since subscribing for progress notifications.
/// </summary>
/// <value>The number of transferred bytes.</value>
[Obsolete("Not accurate, use ProgressEstimate instead.")]
public ulong TransferredBytes { get; }

/// <summary>
Expand All @@ -36,14 +39,24 @@ public struct SyncProgress
/// successfully transferred.
/// </summary>
/// <value>The number of transferable bytes.</value>
[Obsolete("Not accurate, use ProgressEstimate instead.")]
public ulong TransferableBytes { get; }
papafe marked this conversation as resolved.
Show resolved Hide resolved

internal SyncProgress(ulong transferred, ulong transferable)
/// <summary>
/// Gets the percentage estimate of the current progress, expressed as a float between 0.0 and 1.0.
/// </summary>
/// <value>A percentage estimate of the progress.</value>
public double ProgressEstimate { get; }

internal SyncProgress(ulong transferred, ulong transferable, double progressEstimate)
{
#pragma warning disable CS0618 // Type or member is obsolete
TransferredBytes = transferred;
TransferableBytes = transferable;
#pragma warning restore CS0618 // Type or member is obsolete
ProgressEstimate = progressEstimate;
}

internal bool IsComplete => TransferableBytes == TransferredBytes;
internal readonly bool IsComplete => ProgressEstimate >= 1.0;
papafe marked this conversation as resolved.
Show resolved Hide resolved
}
}
3 changes: 1 addition & 2 deletions Realm/Realm/Sync/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ public event PropertyChangedEventHandler? PropertyChanged
/// var observable = session.GetProgressObservable(ProgressDirection.Upload, ProgressMode.ReportIndefinitely);
/// notificationToken = observable.Subscribe(progress =>
/// {
/// // Update relevant properties by accessing
/// // progress.TransferredBytes and progress.TransferableBytes
/// // Update relevant properties by accessing progress.ProgressEstimate
/// });
/// }
///
Expand Down
74 changes: 60 additions & 14 deletions Tests/Realm.Tests/Sync/SessionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,25 @@ namespace Realms.Tests.Sync
[TestFixture, Preserve(AllMembers = true)]
public class SessionTests : SyncTestBase
{
public static readonly string[] AppTypes = new[]
{
AppConfigType.Default,
AppConfigType.FlexibleSync
};

public static readonly object[] AllClientResetHandlers = new object[]
{
typeof(DiscardUnsyncedChangesHandler),
typeof(RecoverUnsyncedChangesHandler),
typeof(RecoverOrDiscardUnsyncedChangesHandler),
};

public static readonly ProgressMode[] ProgressModeTypes = new ProgressMode[]
{
ProgressMode.ForCurrentlyOutstandingWork,
ProgressMode.ReportIndefinitely,
};

[Preserve]
static SessionTests()
{
Expand All @@ -79,12 +91,6 @@ static SessionTests()
};
}

public static readonly string[] AppTypes = new[]
{
AppConfigType.Default,
AppConfigType.FlexibleSync
};

[Test]
public void Realm_SyncSession_WhenSyncedRealm()
{
Expand Down Expand Up @@ -750,16 +756,31 @@ public void Session_OnSessionError()
});
}

[TestCase(ProgressMode.ForCurrentlyOutstandingWork)]
[TestCase(ProgressMode.ReportIndefinitely)]
public void SessionIntegrationTest_ProgressObservable(ProgressMode mode)
#pragma warning disable CS0618 // Type or member is obsolete
papafe marked this conversation as resolved.
Show resolved Hide resolved
[Test]
public void SessionIntegrationTest_ProgressObservable(
papafe marked this conversation as resolved.
Show resolved Hide resolved
[ValueSource(nameof(AppTypes))] string appType,
[ValueSource(nameof(ProgressModeTypes))] ProgressMode mode)
{
const int objectSize = 1_000_000;
const int objectsToRecord = 2;
SyncTestHelpers.RunBaasTestAsync(async () =>
{
var config = await GetIntegrationConfigAsync(Guid.NewGuid().ToString());
using var realm = GetRealm(config);
Realm realm;
if (appType == AppConfigType.Default)
{
var config = await GetIntegrationConfigAsync(Guid.NewGuid().ToString());
realm = GetRealm(config);
}
else
{
var config = await GetFLXIntegrationConfigAsync();
config.PopulateInitialSubscriptions = (r) =>
{
r.Subscriptions.Add(r.All<HugeSyncObject>());
};
realm = await GetRealmAsync(config);
}

var completionTcs = new TaskCompletionSource<ulong>();
var callbacksInvoked = 0;
Expand All @@ -784,8 +805,12 @@ public void SessionIntegrationTest_ProgressObservable(ProgressMode mode)

if (p.TransferredBytes > p.TransferableBytes)
{
// TODO https://github.com/realm/realm-dotnet/issues/2360: this seems to be a regression in Sync.
// throw new Exception($"Expected: {p.TransferredBytes} <= {p.TransferableBytes}");
throw new Exception($"Expected: {p.TransferredBytes} <= {p.TransferableBytes}");
}

if (p.ProgressEstimate < 0.0 || p.ProgressEstimate > 1.0)
{
throw new Exception($"Expected progress estimate to be between 0.0 and 1.0, but was {p.ProgressEstimate}");
}

if (mode == ProgressMode.ForCurrentlyOutstandingWork)
Expand All @@ -796,14 +821,34 @@ public void SessionIntegrationTest_ProgressObservable(ProgressMode mode)
throw new Exception($"Expected: {p.TransferableBytes} to be in the ({objectSize}, {(objectsToRecord + 1) * objectSize}) range.");
}
}

if (p.TransferredBytes == 0 && p.ProgressEstimate != 0.0)
{
throw new Exception($"Expected progress estimate to be 0.0 when TransferredBytes == 0, but was {p.ProgressEstimate}");
}

if (p.TransferredBytes > 0 && (p.ProgressEstimate <= 0.0 || p.ProgressEstimate > 1.0))
{
throw new Exception($"Expected progress estimate to be between 0.0 and 1.0 when TransferredBytes >= 0, but was {p.ProgressEstimate}");
}
}
catch (Exception e)
{
completionTcs.TrySetException(e);
}

if (p.TransferredBytes >= p.TransferableBytes)
if (p.TransferredBytes >= p.TransferableBytes && p.TransferredBytes > objectSize)
{
if (p.ProgressEstimate != 1.0)
{
throw new Exception($"Expected progress estimate to be 1.0 when TransferredBytes >= TransferableBytes, but was {p.ProgressEstimate}");
}

if (p.IsComplete is false)
{
throw new Exception($"Expected IsComplete to be true when TransferredBytes >= TransferableBytes, but was false");
}

completionTcs.TrySetResult(p.TransferredBytes);
}
});
Expand Down Expand Up @@ -831,6 +876,7 @@ public void SessionIntegrationTest_ProgressObservable(ProgressMode mode)
Assert.That(callbacksInvoked, Is.GreaterThan(1));
}, timeout: 120_000);
}
#pragma warning restore CS0618 // Type or member is obsolete

[Test]
public void Session_Stop_StopsSession()
Expand Down
4 changes: 2 additions & 2 deletions wrappers/src/async_open_task_cs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ REALM_EXPORT void realm_asyncopentask_cancel(SharedAsyncOpenTask& task, NativeEx
REALM_EXPORT uint64_t realm_asyncopentask_register_progress_notifier(const SharedAsyncOpenTask& task, void* managed_state, NativeException::Marshallable& ex)
{
return handle_errors(ex, [&] {
return task->register_download_progress_notifier([managed_state](uint64_t transferred, uint64_t transferable) {
s_progress_callback(managed_state, transferred, transferable);
return task->register_download_progress_notifier([managed_state](uint64_t transferred, uint64_t transferable, double progress_estimate) {
s_progress_callback(managed_state, transferred, transferable, progress_estimate);
});
});
}
Expand Down
4 changes: 2 additions & 2 deletions wrappers/src/sync_session_cs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ REALM_EXPORT uint64_t realm_syncsession_register_progress_notifier(const SharedS
? SyncSession::ProgressDirection::upload
: SyncSession::ProgressDirection::download;

return session->register_progress_notifier([managed_state](uint64_t transferred, uint64_t transferable) {
s_progress_callback(managed_state, transferred, transferable);
return session->register_progress_notifier([managed_state](uint64_t transferred, uint64_t transferable, double progress_estimate) {
s_progress_callback(managed_state, transferred, transferable, progress_estimate);
}, notifier_direction, is_streaming);
});
}
Expand Down
2 changes: 1 addition & 1 deletion wrappers/src/sync_session_cs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace realm::binding {

using SharedSyncSession = std::shared_ptr<SyncSession>;
using SessionErrorCallbackT = void(SharedSyncSession* session, realm_sync_error error, void* managed_sync_config);
using ProgressCallbackT = void(void* state, uint64_t transferred_bytes, uint64_t transferrable_bytes);
using ProgressCallbackT = void(void* state, uint64_t transferred_bytes, uint64_t transferrable_bytes, double progress_estimate);
using NotifyBeforeClientResetCallbackT = void*(SharedRealm& before_frozen, void* managed_sync_config);
using NotifyAfterClientResetCallbackT = void*(SharedRealm& before_frozen, SharedRealm& after, void* managed_sync_config, bool did_recover);

Expand Down
Loading