Redirect batch imports to a separate task scheduler to avoid contention with interactive actions

This commit is contained in:
Dean Herbert 2021-02-17 19:09:38 +09:00
parent c1db33e075
commit 0196ee882a
2 changed files with 30 additions and 10 deletions

View File

@ -165,10 +165,10 @@ public TestBeatmapManager(Storage storage, IDatabaseContextFactory contextFactor
{
}
public override async Task<BeatmapSetInfo> Import(BeatmapSetInfo item, ArchiveReader archive = null, CancellationToken cancellationToken = default)
public override async Task<BeatmapSetInfo> Import(BeatmapSetInfo item, ArchiveReader archive = null, bool lowPriority = false, CancellationToken cancellationToken = default)
{
await AllowImport.Task;
return await (CurrentImportTask = base.Import(item, archive, cancellationToken));
return await (CurrentImportTask = base.Import(item, archive, lowPriority, cancellationToken));
}
}

View File

@ -38,6 +38,11 @@ public abstract class ArchiveModelManager<TModel, TFileModel> : ICanAcceptFiles,
{
private const int import_queue_request_concurrency = 1;
/// <summary>
/// The size of a batch import operation before considering it a lower priority operation.
/// </summary>
private const int low_priority_import_batch_size = 1;
/// <summary>
/// A singleton scheduler shared by all <see cref="ArchiveModelManager{TModel,TFileModel}"/>.
/// </summary>
@ -47,6 +52,13 @@ public abstract class ArchiveModelManager<TModel, TFileModel> : ICanAcceptFiles,
/// </remarks>
private static readonly ThreadedTaskScheduler import_scheduler = new ThreadedTaskScheduler(import_queue_request_concurrency, nameof(ArchiveModelManager<TModel, TFileModel>));
/// <summary>
/// A second scheduler for lower priority imports.
/// For simplicity, these will just run in parallel with normal priority imports, but a future refactor would see this implemented via a custom scheduler/queue.
/// See https://gist.github.com/peppy/f0e118a14751fc832ca30dd48ba3876b for an incomplete version of this.
/// </summary>
private static readonly ThreadedTaskScheduler import_scheduler_low_priority = new ThreadedTaskScheduler(import_queue_request_concurrency, nameof(ArchiveModelManager<TModel, TFileModel>));
/// <summary>
/// Set an endpoint for notifications to be posted to.
/// </summary>
@ -103,8 +115,11 @@ protected ArchiveModelManager(Storage storage, IDatabaseContextFactory contextFa
/// <summary>
/// Import one or more <typeparamref name="TModel"/> items from filesystem <paramref name="paths"/>.
/// This will post notifications tracking progress.
/// </summary>
/// <remarks>
/// This will be treated as a low priority import if more than one path is specified; use <see cref="Import(ImportTask[])"/> to always import at standard priority.
/// This will post notifications tracking progress.
/// </remarks>
/// <param name="paths">One or more archive locations on disk.</param>
public Task Import(params string[] paths)
{
@ -133,13 +148,15 @@ protected async Task<IEnumerable<TModel>> Import(ProgressNotification notificati
var imported = new List<TModel>();
bool isLowPriorityImport = tasks.Length > low_priority_import_batch_size;
await Task.WhenAll(tasks.Select(async task =>
{
notification.CancellationToken.ThrowIfCancellationRequested();
try
{
var model = await Import(task, notification.CancellationToken);
var model = await Import(task, isLowPriorityImport, notification.CancellationToken);
lock (imported)
{
@ -193,15 +210,16 @@ await Task.WhenAll(tasks.Select(async task =>
/// Note that this bypasses the UI flow and should only be used for special cases or testing.
/// </summary>
/// <param name="task">The <see cref="ImportTask"/> containing data about the <typeparamref name="TModel"/> to import.</param>
/// <param name="lowPriority">Whether this is a low priority import.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>The imported model, if successful.</returns>
internal async Task<TModel> Import(ImportTask task, CancellationToken cancellationToken = default)
internal async Task<TModel> Import(ImportTask task, bool lowPriority = false, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
TModel import;
using (ArchiveReader reader = task.GetReader())
import = await Import(reader, cancellationToken);
import = await Import(reader, lowPriority, cancellationToken);
// We may or may not want to delete the file depending on where it is stored.
// e.g. reconstructing/repairing database with items from default storage.
@ -229,8 +247,9 @@ internal async Task<TModel> Import(ImportTask task, CancellationToken cancellati
/// Silently import an item from an <see cref="ArchiveReader"/>.
/// </summary>
/// <param name="archive">The archive to be imported.</param>
/// <param name="lowPriority">Whether this is a low priority import.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
public Task<TModel> Import(ArchiveReader archive, CancellationToken cancellationToken = default)
public Task<TModel> Import(ArchiveReader archive, bool lowPriority = false, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
@ -253,7 +272,7 @@ public Task<TModel> Import(ArchiveReader archive, CancellationToken cancellation
return null;
}
return Import(model, archive, cancellationToken);
return Import(model, archive, lowPriority, cancellationToken);
}
/// <summary>
@ -307,8 +326,9 @@ protected virtual string ComputeHash(TModel item, ArchiveReader reader = null)
/// </summary>
/// <param name="item">The model to be imported.</param>
/// <param name="archive">An optional archive to use for model population.</param>
/// <param name="lowPriority">Whether this is a low priority import.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
public virtual async Task<TModel> Import(TModel item, ArchiveReader archive = null, CancellationToken cancellationToken = default) => await Task.Factory.StartNew(async () =>
public virtual async Task<TModel> Import(TModel item, ArchiveReader archive = null, bool lowPriority = false, CancellationToken cancellationToken = default) => await Task.Factory.StartNew(async () =>
{
cancellationToken.ThrowIfCancellationRequested();
@ -383,7 +403,7 @@ void rollback()
flushEvents(true);
return item;
}, cancellationToken, TaskCreationOptions.HideScheduler, import_scheduler).Unwrap();
}, cancellationToken, TaskCreationOptions.HideScheduler, lowPriority ? import_scheduler_low_priority : import_scheduler).Unwrap();
/// <summary>
/// Exports an item to a legacy (.zip based) package.