Merge pull request #18874 from peppy/realm-fix-async-write-after-disposal

Ensure all async writes are completed before realm is disposed
This commit is contained in:
Dan Balasescu 2022-07-01 15:29:01 +09:00 committed by GitHub
commit 409224560f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 158 additions and 69 deletions

View File

@ -2,11 +2,14 @@
// See the LICENCE file in the repository root for full licence text.
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
using osu.Framework.Extensions;
using osu.Game.Beatmaps;
using osu.Game.Database;
using osu.Game.Tests.Resources;
namespace osu.Game.Tests.Database
{
@ -33,6 +36,85 @@ namespace osu.Game.Tests.Database
});
}
[Test]
public void TestAsyncWriteAsync()
{
RunTestWithRealmAsync(async (realm, _) =>
{
await realm.WriteAsync(r => r.Add(TestResources.CreateTestBeatmapSetInfo()));
realm.Run(r => r.Refresh());
Assert.That(realm.Run(r => r.All<BeatmapSetInfo>().Count()), Is.EqualTo(1));
});
}
[Test]
public void TestAsyncWriteWhileBlocking()
{
RunTestWithRealm((realm, _) =>
{
Task writeTask;
using (realm.BlockAllOperations())
{
writeTask = realm.WriteAsync(r => r.Add(TestResources.CreateTestBeatmapSetInfo()));
Thread.Sleep(100);
Assert.That(writeTask.IsCompleted, Is.False);
}
writeTask.WaitSafely();
realm.Run(r => r.Refresh());
Assert.That(realm.Run(r => r.All<BeatmapSetInfo>().Count()), Is.EqualTo(1));
});
}
[Test]
public void TestAsyncWrite()
{
RunTestWithRealm((realm, _) =>
{
realm.WriteAsync(r => r.Add(TestResources.CreateTestBeatmapSetInfo())).WaitSafely();
realm.Run(r => r.Refresh());
Assert.That(realm.Run(r => r.All<BeatmapSetInfo>().Count()), Is.EqualTo(1));
});
}
[Test]
public void TestAsyncWriteAfterDisposal()
{
RunTestWithRealm((realm, _) =>
{
realm.Dispose();
Assert.ThrowsAsync<ObjectDisposedException>(() => realm.WriteAsync(r => r.Add(TestResources.CreateTestBeatmapSetInfo())));
});
}
[Test]
public void TestAsyncWriteBeforeDisposal()
{
ManualResetEventSlim resetEvent = new ManualResetEventSlim();
RunTestWithRealm((realm, _) =>
{
var writeTask = realm.WriteAsync(r =>
{
// ensure that disposal blocks for our execution
Assert.That(resetEvent.Wait(100), Is.False);
r.Add(TestResources.CreateTestBeatmapSetInfo());
});
realm.Dispose();
resetEvent.Set();
writeTask.WaitSafely();
});
}
/// <summary>
/// Test to ensure that a `CreateContext` call nested inside a subscription doesn't cause any deadlocks
/// due to context fetching semaphores.

View File

@ -5,7 +5,6 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
using osu.Framework.Allocation;
using osu.Framework.Extensions;
@ -84,11 +83,7 @@ namespace osu.Game.Tests.Database
realm.Run(r => r.Refresh());
// Without forcing the write onto its own thread, realm will internally run the operation synchronously, which can cause a deadlock with `WaitSafely`.
Task.Run(async () =>
{
await realm.WriteAsync(r => r.Add(TestResources.CreateTestBeatmapSetInfo()));
}).WaitSafely();
realm.WriteAsync(r => r.Add(TestResources.CreateTestBeatmapSetInfo())).WaitSafely();
realm.Run(r => r.Refresh());

View File

@ -98,8 +98,6 @@ namespace osu.Game.Database
private static readonly GlobalStatistic<int> total_writes_async = GlobalStatistics.Get<int>(@"Realm", @"Writes (Async)");
private readonly object realmLock = new object();
private Realm? updateRealm;
/// <summary>
@ -122,24 +120,21 @@ namespace osu.Game.Database
if (!ThreadSafety.IsUpdateThread)
throw new InvalidOperationException(@$"Use {nameof(getRealmInstance)} when performing realm operations from a non-update thread");
lock (realmLock)
if (updateRealm == null)
{
if (updateRealm == null)
{
updateRealm = getRealmInstance();
hasInitialisedOnce = true;
updateRealm = getRealmInstance();
hasInitialisedOnce = true;
Logger.Log(@$"Opened realm ""{updateRealm.Config.DatabasePath}"" at version {updateRealm.Config.SchemaVersion}");
Logger.Log(@$"Opened realm ""{updateRealm.Config.DatabasePath}"" at version {updateRealm.Config.SchemaVersion}");
// Resubscribe any subscriptions
foreach (var action in customSubscriptionsResetMap.Keys.ToArray())
registerSubscription(action);
}
Debug.Assert(updateRealm != null);
return updateRealm;
// Resubscribe any subscriptions
foreach (var action in customSubscriptionsResetMap.Keys.ToArray())
registerSubscription(action);
}
Debug.Assert(updateRealm != null);
return updateRealm;
}
internal static bool CurrentThreadSubscriptionsAllowed => current_thread_subscriptions_allowed.Value;
@ -388,16 +383,29 @@ namespace osu.Game.Database
}
}
private readonly CountdownEvent pendingAsyncWrites = new CountdownEvent(0);
/// <summary>
/// Write changes to realm asynchronously, guaranteeing order of execution.
/// </summary>
/// <param name="action">The work to run.</param>
public Task WriteAsync(Action<Realm> action)
{
if (isDisposed)
throw new ObjectDisposedException(nameof(RealmAccess));
// Required to ensure the write is tracked and accounted for before disposal.
// Can potentially be avoided if we have a need to do so in the future.
if (!ThreadSafety.IsUpdateThread)
throw new InvalidOperationException(@$"{nameof(WriteAsync)} must be called from the update thread.");
// CountdownEvent will fail if already at zero.
if (!pendingAsyncWrites.TryAddCount())
pendingAsyncWrites.Reset(1);
// Regardless of calling Realm.GetInstance or Realm.GetInstanceAsync, there is a blocking overhead on retrieval.
// Adding a forced Task.Run resolves this.
return Task.Run(async () =>
var writeTask = Task.Run(async () =>
{
total_writes_async.Value++;
@ -407,7 +415,11 @@ namespace osu.Game.Database
using (var realm = getRealmInstance())
// ReSharper disable once AccessToDisposedClosure (WriteAsync should be marked as [InstantHandle]).
await realm.WriteAsync(() => action(realm));
pendingAsyncWrites.Signal();
});
return writeTask;
}
/// <summary>
@ -432,14 +444,15 @@ namespace osu.Game.Database
public IDisposable RegisterForNotifications<T>(Func<Realm, IQueryable<T>> query, NotificationCallbackDelegate<T> callback)
where T : RealmObjectBase
{
lock (realmLock)
{
Func<Realm, IDisposable?> action = realm => query(realm).QueryAsyncWithNotifications(callback);
Func<Realm, IDisposable?> action = realm => query(realm).QueryAsyncWithNotifications(callback);
lock (notificationsResetMap)
{
// Store an action which is used when blocking to ensure consumers don't use results of a stale changeset firing.
notificationsResetMap.Add(action, () => callback(new EmptyRealmSet<T>(), null, null));
return RegisterCustomSubscription(action);
}
return RegisterCustomSubscription(action);
}
/// <summary>
@ -530,15 +543,17 @@ namespace osu.Game.Database
void unsubscribe()
{
lock (realmLock)
if (customSubscriptionsResetMap.TryGetValue(action, out var unsubscriptionAction))
{
if (customSubscriptionsResetMap.TryGetValue(action, out var unsubscriptionAction))
unsubscriptionAction?.Dispose();
customSubscriptionsResetMap.Remove(action);
lock (notificationsResetMap)
{
unsubscriptionAction?.Dispose();
customSubscriptionsResetMap.Remove(action);
notificationsResetMap.Remove(action);
total_subscriptions.Value--;
}
total_subscriptions.Value--;
}
}
});
@ -548,19 +563,16 @@ namespace osu.Game.Database
{
Debug.Assert(ThreadSafety.IsUpdateThread);
lock (realmLock)
{
// Retrieve realm instance outside of flag update to ensure that the instance is retrieved,
// as attempting to access it inside the subscription if it's not constructed would lead to
// cyclic invocations of the subscription callback.
var realm = Realm;
// Retrieve realm instance outside of flag update to ensure that the instance is retrieved,
// as attempting to access it inside the subscription if it's not constructed would lead to
// cyclic invocations of the subscription callback.
var realm = Realm;
Debug.Assert(!customSubscriptionsResetMap.TryGetValue(action, out var found) || found == null);
Debug.Assert(!customSubscriptionsResetMap.TryGetValue(action, out var found) || found == null);
current_thread_subscriptions_allowed.Value = true;
customSubscriptionsResetMap[action] = action(realm);
current_thread_subscriptions_allowed.Value = false;
}
current_thread_subscriptions_allowed.Value = true;
customSubscriptionsResetMap[action] = action(realm);
current_thread_subscriptions_allowed.Value = false;
}
private Realm getRealmInstance()
@ -802,6 +814,9 @@ namespace osu.Game.Database
/// <returns>An <see cref="IDisposable"/> which should be disposed to end the blocking section.</returns>
public IDisposable BlockAllOperations()
{
if (!ThreadSafety.IsUpdateThread)
throw new InvalidOperationException(@$"{nameof(BlockAllOperations)} must be called from the update thread.");
if (isDisposed)
throw new ObjectDisposedException(nameof(RealmAccess));
@ -811,31 +826,25 @@ namespace osu.Game.Database
{
realmRetrievalLock.Wait();
lock (realmLock)
if (hasInitialisedOnce)
{
if (hasInitialisedOnce)
syncContext = SynchronizationContext.Current;
// Before disposing the update context, clean up all subscriptions.
// Note that in the case of realm notification subscriptions, this is not really required (they will be cleaned up by disposal).
// In the case of custom subscriptions, we want them to fire before the update realm is disposed in case they do any follow-up work.
foreach (var action in customSubscriptionsResetMap.ToArray())
{
if (!ThreadSafety.IsUpdateThread)
throw new InvalidOperationException(@$"{nameof(BlockAllOperations)} must be called from the update thread.");
syncContext = SynchronizationContext.Current;
// Before disposing the update context, clean up all subscriptions.
// Note that in the case of realm notification subscriptions, this is not really required (they will be cleaned up by disposal).
// In the case of custom subscriptions, we want them to fire before the update realm is disposed in case they do any follow-up work.
foreach (var action in customSubscriptionsResetMap.ToArray())
{
action.Value?.Dispose();
customSubscriptionsResetMap[action.Key] = null;
}
updateRealm?.Dispose();
updateRealm = null;
action.Value?.Dispose();
customSubscriptionsResetMap[action.Key] = null;
}
Logger.Log(@"Blocking realm operations.", LoggingTarget.Database);
updateRealm?.Dispose();
updateRealm = null;
}
Logger.Log(@"Blocking realm operations.", LoggingTarget.Database);
const int sleep_length = 200;
int timeout = 5000;
@ -871,8 +880,11 @@ namespace osu.Game.Database
try
{
foreach (var action in notificationsResetMap.Values)
action();
lock (notificationsResetMap)
{
foreach (var action in notificationsResetMap.Values)
action();
}
}
finally
{
@ -910,10 +922,10 @@ namespace osu.Game.Database
public void Dispose()
{
lock (realmLock)
{
updateRealm?.Dispose();
}
if (!pendingAsyncWrites.Wait(10000))
Logger.Log("Realm took too long waiting on pending async writes", level: LogLevel.Error);
updateRealm?.Dispose();
if (!isDisposed)
{