Use a task chain and fix potential misordering of events

This commit is contained in:
smoogipoo 2021-01-25 20:41:51 +09:00
parent 76e1f6e57b
commit 964976f604
4 changed files with 70 additions and 96 deletions

View File

@ -134,17 +134,12 @@ protected override Task<MultiplayerRoom> JoinRoom(long roomId)
return connection.InvokeAsync<MultiplayerRoom>(nameof(IMultiplayerServer.JoinRoom), roomId);
}
public override async Task LeaveRoom()
protected override Task LeaveRoomInternal()
{
if (!isConnected.Value)
{
// even if not connected, make sure the local room state can be cleaned up.
await base.LeaveRoom();
return;
}
return Task.FromCanceled(new CancellationToken(true));
await base.LeaveRoom();
await connection.InvokeAsync(nameof(IMultiplayerServer.LeaveRoom));
return connection.InvokeAsync(nameof(IMultiplayerServer.LeaveRoom));
}
public override Task TransferHost(int userId)

View File

@ -108,67 +108,38 @@ protected StatefulMultiplayerClient()
});
}
private readonly object joinOrLeaveTaskLock = new object();
private Task? joinOrLeaveTask;
private readonly TaskChain joinOrLeaveTaskChain = new TaskChain();
/// <summary>
/// Joins the <see cref="MultiplayerRoom"/> for a given API <see cref="Room"/>.
/// </summary>
/// <param name="room">The API <see cref="Room"/>.</param>
public async Task JoinRoom(Room room)
public async Task JoinRoom(Room room) => await joinOrLeaveTaskChain.Add(async () =>
{
Task? lastTask;
Task newTask;
if (Room != null)
throw new InvalidOperationException("Cannot join a multiplayer room while already in one.");
lock (joinOrLeaveTaskLock)
Debug.Assert(room.RoomID.Value != null);
// Join the server-side room.
var joinedRoom = await JoinRoom(room.RoomID.Value.Value);
Debug.Assert(joinedRoom != null);
// Populate users.
Debug.Assert(joinedRoom.Users != null);
await Task.WhenAll(joinedRoom.Users.Select(PopulateUser));
// Update the stored room (must be done on update thread for thread-safety).
await scheduleAsync(() =>
{
lastTask = joinOrLeaveTask;
joinOrLeaveTask = newTask = Task.Run(async () =>
{
if (lastTask != null)
await lastTask;
Room = joinedRoom;
apiRoom = room;
playlistItemId = room.Playlist.SingleOrDefault()?.ID ?? 0;
});
// Should be thread-safe since joinOrLeaveTask is locked on in both JoinRoom() and LeaveRoom().
if (Room != null)
throw new InvalidOperationException("Cannot join a multiplayer room while already in one.");
Debug.Assert(room.RoomID.Value != null);
// Join the server-side room.
var joinedRoom = await JoinRoom(room.RoomID.Value.Value);
Debug.Assert(joinedRoom != null);
// Populate users.
Debug.Assert(joinedRoom.Users != null);
await Task.WhenAll(joinedRoom.Users.Select(PopulateUser));
// Update the stored room (must be done on update thread for thread-safety).
await scheduleAsync(() =>
{
Room = joinedRoom;
apiRoom = room;
playlistItemId = room.Playlist.SingleOrDefault()?.ID ?? 0;
});
// Update room settings.
await updateLocalRoomSettings(joinedRoom.Settings);
});
}
try
{
await newTask;
}
finally
{
// The task will be awaited in the future, so reset it so that the user doesn't get into a permanently faulted state if anything fails.
lock (joinOrLeaveTaskLock)
{
if (joinOrLeaveTask == newTask)
joinOrLeaveTask = null;
}
}
}
// Update room settings.
await updateLocalRoomSettings(joinedRoom.Settings);
});
/// <summary>
/// Joins the <see cref="MultiplayerRoom"/> with a given ID.
@ -177,48 +148,24 @@ await scheduleAsync(() =>
/// <returns>The joined <see cref="MultiplayerRoom"/>.</returns>
protected abstract Task<MultiplayerRoom> JoinRoom(long roomId);
public virtual async Task LeaveRoom()
public async Task LeaveRoom() => await joinOrLeaveTaskChain.Add(async () =>
{
Task? lastTask;
Task newTask;
if (Room == null)
return;
lock (joinOrLeaveTaskLock)
await scheduleAsync(() =>
{
lastTask = joinOrLeaveTask;
joinOrLeaveTask = newTask = Task.Run(async () =>
{
if (lastTask != null)
await lastTask;
apiRoom = null;
Room = null;
CurrentMatchPlayingUserIds.Clear();
// Should be thread-safe since joinOrLeaveTask is locked on in both JoinRoom() and LeaveRoom().
if (Room == null)
return;
RoomUpdated?.Invoke();
});
await scheduleAsync(() =>
{
apiRoom = null;
Room = null;
CurrentMatchPlayingUserIds.Clear();
await LeaveRoomInternal();
});
RoomUpdated?.Invoke();
});
});
}
try
{
await newTask;
}
finally
{
// The task will be awaited in the future, so reset it so that the user doesn't get into a permanently faulted state if anything fails.
lock (joinOrLeaveTaskLock)
{
if (joinOrLeaveTask == newTask)
joinOrLeaveTask = null;
}
}
}
protected abstract Task LeaveRoomInternal();
/// <summary>
/// Change the current <see cref="MultiplayerRoom"/> settings.

View File

@ -98,6 +98,8 @@ protected override Task<MultiplayerRoom> JoinRoom(long roomId)
return Task.FromResult(room);
}
protected override Task LeaveRoomInternal() => Task.CompletedTask;
public override Task TransferHost(int userId) => ((IMultiplayerClient)this).HostChanged(userId);
public override async Task ChangeSettings(MultiplayerRoomSettings settings)

View File

@ -0,0 +1,30 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
#nullable enable
using System;
using System.Threading.Tasks;
namespace osu.Game.Utils
{
/// <summary>
/// A chain of <see cref="Task"/>s that run sequentially.
/// </summary>
public class TaskChain
{
private readonly object currentTaskLock = new object();
private Task? currentTask;
public Task Add(Func<Task> taskFunc)
{
lock (currentTaskLock)
{
currentTask = currentTask == null
? taskFunc()
: currentTask.ContinueWith(_ => taskFunc()).Unwrap();
return currentTask;
}
}
}
}