mirror of
https://github.com/ppy/osu
synced 2025-01-14 01:51:04 +00:00
Refactor user request to fix threadsafety issues
This commit is contained in:
parent
08a127eedc
commit
1b1f4c9c09
@ -1,7 +1,6 @@
|
|||||||
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
|
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
|
||||||
// See the LICENCE file in the repository root for full licence text.
|
// See the LICENCE file in the repository root for full licence text.
|
||||||
|
|
||||||
using System;
|
|
||||||
using System.Collections.Generic;
|
using System.Collections.Generic;
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
@ -15,103 +14,75 @@ namespace osu.Game.Database
|
|||||||
{
|
{
|
||||||
public class UserLookupCache : MemoryCachingComponent<int, User>
|
public class UserLookupCache : MemoryCachingComponent<int, User>
|
||||||
{
|
{
|
||||||
private readonly HashSet<int> nextTaskIDs = new HashSet<int>();
|
|
||||||
|
|
||||||
[Resolved]
|
[Resolved]
|
||||||
private IAPIProvider api { get; set; }
|
private IAPIProvider api { get; set; }
|
||||||
|
|
||||||
private readonly object taskAssignmentLock = new object();
|
|
||||||
|
|
||||||
private Task<List<User>> pendingRequest;
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Whether <see cref="pendingRequest"/> has already grabbed its IDs.
|
|
||||||
/// </summary>
|
|
||||||
private bool pendingRequestConsumedIDs;
|
|
||||||
|
|
||||||
public Task<User> GetUserAsync(int userId, CancellationToken token = default) => GetAsync(userId, token);
|
public Task<User> GetUserAsync(int userId, CancellationToken token = default) => GetAsync(userId, token);
|
||||||
|
|
||||||
protected override async Task<User> ComputeValueAsync(int lookup, CancellationToken token = default)
|
protected override async Task<User> ComputeValueAsync(int lookup, CancellationToken token = default)
|
||||||
{
|
=> await queryUser(lookup);
|
||||||
var users = await getQueryTaskForUser(lookup);
|
|
||||||
return users.FirstOrDefault(u => u.Id == lookup);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>
|
private readonly List<(int id, TaskCompletionSource<User>)> pendingUserTasks = new List<(int, TaskCompletionSource<User>)>();
|
||||||
/// Return the task responsible for fetching the provided user.
|
private Task pendingRequestTask;
|
||||||
/// This may be part of a larger batch lookup to reduce web requests.
|
private readonly object taskAssignmentLock = new object();
|
||||||
/// </summary>
|
|
||||||
/// <param name="userId">The user to lookup.</param>
|
private Task<User> queryUser(int userId)
|
||||||
/// <returns>The task responsible for the lookup.</returns>
|
|
||||||
private Task<List<User>> getQueryTaskForUser(int userId)
|
|
||||||
{
|
{
|
||||||
lock (taskAssignmentLock)
|
lock (taskAssignmentLock)
|
||||||
{
|
{
|
||||||
nextTaskIDs.Add(userId);
|
var tcs = new TaskCompletionSource<User>();
|
||||||
|
|
||||||
// if there's a pending request which hasn't been started yet (and is not yet full), we can wait on it.
|
// Add to the queue.
|
||||||
if (pendingRequest != null && !pendingRequestConsumedIDs && nextTaskIDs.Count < 50)
|
pendingUserTasks.Add((userId, tcs));
|
||||||
return pendingRequest;
|
|
||||||
|
|
||||||
return queueNextTask(nextLookup);
|
// Create a request task if there's not already one.
|
||||||
}
|
if (pendingRequestTask == null)
|
||||||
|
createNewTask();
|
||||||
|
|
||||||
List<User> nextLookup()
|
return tcs.Task;
|
||||||
{
|
|
||||||
int[] lookupItems;
|
|
||||||
|
|
||||||
lock (taskAssignmentLock)
|
|
||||||
{
|
|
||||||
pendingRequestConsumedIDs = true;
|
|
||||||
lookupItems = nextTaskIDs.ToArray();
|
|
||||||
nextTaskIDs.Clear();
|
|
||||||
|
|
||||||
if (lookupItems.Length == 0)
|
|
||||||
{
|
|
||||||
queueNextTask(null);
|
|
||||||
return new List<User>();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var request = new GetUsersRequest(lookupItems);
|
|
||||||
|
|
||||||
// rather than queueing, we maintain our own single-threaded request stream.
|
|
||||||
api.Perform(request);
|
|
||||||
|
|
||||||
return request.Result?.Users;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
private void performLookup()
|
||||||
/// Queues new work at the end of the current work tasks.
|
|
||||||
/// Ensures the provided work is eventually run.
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="work">The work to run. Can be null to signify the end of available work.</param>
|
|
||||||
/// <returns>The task tracking this work.</returns>
|
|
||||||
private Task<List<User>> queueNextTask(Func<List<User>> work)
|
|
||||||
{
|
{
|
||||||
|
var userTasks = new List<(int id, TaskCompletionSource<User> task)>();
|
||||||
|
|
||||||
|
// Grab at most 50 users from the queue.
|
||||||
lock (taskAssignmentLock)
|
lock (taskAssignmentLock)
|
||||||
{
|
{
|
||||||
if (work == null)
|
while (pendingUserTasks.Count > 0 && userTasks.Count < 50)
|
||||||
{
|
{
|
||||||
pendingRequest = null;
|
(int id, TaskCompletionSource<User> task) next = pendingUserTasks[^1];
|
||||||
pendingRequestConsumedIDs = false;
|
|
||||||
}
|
|
||||||
else if (pendingRequest == null)
|
|
||||||
{
|
|
||||||
// special case for the first request ever.
|
|
||||||
pendingRequest = Task.Run(work);
|
|
||||||
pendingRequestConsumedIDs = false;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// append the new request on to the last to be executed.
|
|
||||||
pendingRequest = pendingRequest.ContinueWith(_ => work());
|
|
||||||
pendingRequestConsumedIDs = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return pendingRequest;
|
pendingUserTasks.RemoveAt(pendingUserTasks.Count - 1);
|
||||||
|
|
||||||
|
// Perform a secondary check for existence, in case the user was queried in a previous batch.
|
||||||
|
if (CheckExists(next.id, out var existing))
|
||||||
|
next.task.SetResult(existing);
|
||||||
|
else
|
||||||
|
userTasks.Add(next);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Query the users.
|
||||||
|
var request = new GetUsersRequest(userTasks.Select(t => t.id).ToArray());
|
||||||
|
|
||||||
|
// rather than queueing, we maintain our own single-threaded request stream.
|
||||||
|
api.Perform(request);
|
||||||
|
|
||||||
|
// Create a new request task if there's still more users to query.
|
||||||
|
lock (taskAssignmentLock)
|
||||||
|
{
|
||||||
|
pendingRequestTask = null;
|
||||||
|
if (pendingUserTasks.Count > 0)
|
||||||
|
createNewTask();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Notify of completion.
|
||||||
|
foreach (var (id, task) in userTasks)
|
||||||
|
task.SetResult(request.Result?.Users?.FirstOrDefault(u => u.Id == id));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void createNewTask() => pendingRequestTask = Task.Run(performLookup);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user