Implement full subscription flow

This commit is contained in:
Dean Herbert 2022-01-21 18:50:25 +09:00
parent 18bf690a30
commit 45aea9add5
1 changed files with 34 additions and 12 deletions

View File

@ -2,6 +2,8 @@
// See the LICENCE file in the repository root for full licence text. // See the LICENCE file in the repository root for full licence text.
using System; using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using System.Reflection; using System.Reflection;
@ -80,6 +82,10 @@ public Realm Context
{ {
context = createContext(); context = createContext();
Logger.Log(@$"Opened realm ""{context.Config.DatabasePath}"" at version {context.Config.SchemaVersion}"); Logger.Log(@$"Opened realm ""{context.Config.DatabasePath}"" at version {context.Config.SchemaVersion}");
// Resubscribe any subscriptions
foreach (var action in subscriptionActions.Keys)
registerSubscription(action);
} }
// creating a context will ensure our schema is up-to-date and migrated. // creating a context will ensure our schema is up-to-date and migrated.
@ -226,26 +232,42 @@ public void Write(Action<Realm> action)
} }
} }
private readonly Dictionary<Func<Realm, IDisposable?>, IDisposable?> subscriptionActions = new Dictionary<Func<Realm, IDisposable?>, IDisposable?>();
/// <summary> /// <summary>
/// Run work on realm that will be run every time the update thread realm context gets recycled. /// Run work on realm that will be run every time the update thread realm context gets recycled.
/// </summary> /// </summary>
/// <param name="action">The work to run.</param> /// <param name="action">The work to run. Return value should be an <see cref="IDisposable"/> from QueryAsyncWithNotifications, or an <see cref="InvokeOnDisposal"/> to clean up any bindings.</param>
public void Register(Action<Realm> action) /// <returns>An <see cref="IDisposable"/> which should be disposed to unsubscribe any inner subscription.</returns>
public IDisposable Register(Func<Realm, IDisposable?> action)
{ {
if (!ThreadSafety.IsUpdateThread && context != null) if (!ThreadSafety.IsUpdateThread)
throw new InvalidOperationException(@$"{nameof(BlockAllOperations)} must be called from the update thread."); throw new InvalidOperationException(@$"{nameof(Register)} must be called from the update thread.");
if (ThreadSafety.IsUpdateThread) subscriptionActions.Add(action, null);
registerSubscription(action);
return new InvokeOnDisposal(() =>
{ {
current_thread_subscriptions_allowed.Value = true; // TODO: this likely needs to be run on the update thread.
action(Context); if (subscriptionActions.TryGetValue(action, out var unsubscriptionAction))
current_thread_subscriptions_allowed.Value = false; {
unsubscriptionAction?.Dispose();
subscriptionActions.Remove(action);
} }
else });
}
private void registerSubscription(Func<Realm, IDisposable?> action)
{ {
Debug.Assert(ThreadSafety.IsUpdateThread);
lock (contextLock)
{
Debug.Assert(context != null);
current_thread_subscriptions_allowed.Value = true; current_thread_subscriptions_allowed.Value = true;
using (var realm = createContext()) subscriptionActions[action] = action(context);
action(realm);
current_thread_subscriptions_allowed.Value = false; current_thread_subscriptions_allowed.Value = false;
} }
} }