// Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. // See the LICENCE file in the repository root for full licence text. #nullable enable using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Threading.Tasks; using osu.Framework.Allocation; using osu.Framework.Bindables; using osu.Framework.Graphics; using osu.Game.Beatmaps; using osu.Game.Online.API; using osu.Game.Replays.Legacy; using osu.Game.Rulesets; using osu.Game.Rulesets.Mods; using osu.Game.Rulesets.Replays; using osu.Game.Rulesets.Replays.Types; using osu.Game.Scoring; using osu.Game.Screens.Play; namespace osu.Game.Online.Spectator { public abstract class SpectatorClient : Component, ISpectatorClient { /// /// The maximum milliseconds between frame bundle sends. /// public const double TIME_BETWEEN_SENDS = 200; /// /// Whether the is currently connected. /// This is NOT thread safe and usage should be scheduled. /// public abstract IBindable IsConnected { get; } private readonly List watchingUsers = new List(); private readonly object userLock = new object(); public IBindableList PlayingUsers => playingUsers; private readonly BindableList playingUsers = new BindableList(); private readonly Dictionary playingUserStates = new Dictionary(); private IBeatmap? currentBeatmap; private Score? currentScore; [Resolved] private IBindable currentRuleset { get; set; } = null!; [Resolved] private IBindable> currentMods { get; set; } = null!; private readonly SpectatorState currentState = new SpectatorState(); /// /// Whether the local user is playing. /// protected bool IsPlaying { get; private set; } /// /// Called whenever new frames arrive from the server. /// public event Action? OnNewFrames; /// /// Called whenever a user starts a play session, or immediately if the user is being watched and currently in a play session. /// public event Action? OnUserBeganPlaying; /// /// Called whenever a user finishes a play session. /// public event Action? OnUserFinishedPlaying; [BackgroundDependencyLoader] private void load() { IsConnected.BindValueChanged(connected => { if (connected.NewValue) { // get all the users that were previously being watched int[] users; lock (userLock) { users = watchingUsers.ToArray(); watchingUsers.Clear(); } // resubscribe to watched users. foreach (var userId in users) WatchUser(userId); // re-send state in case it wasn't received if (IsPlaying) BeginPlayingInternal(currentState); } else { lock (userLock) { playingUsers.Clear(); playingUserStates.Clear(); } } }, true); } Task ISpectatorClient.UserBeganPlaying(int userId, SpectatorState state) { lock (userLock) { if (!playingUsers.Contains(userId)) playingUsers.Add(userId); // UserBeganPlaying() is called by the server regardless of whether the local user is watching the remote user, and is called a further time when the remote user is watched. // This may be a temporary thing (see: https://github.com/ppy/osu-server-spectator/blob/2273778e02cfdb4a9c6a934f2a46a8459cb5d29c/osu.Server.Spectator/Hubs/SpectatorHub.cs#L28-L29). // We don't want the user states to update unless the player is being watched, otherwise calling BindUserBeganPlaying() can lead to double invocations. if (watchingUsers.Contains(userId)) playingUserStates[userId] = state; } OnUserBeganPlaying?.Invoke(userId, state); return Task.CompletedTask; } Task ISpectatorClient.UserFinishedPlaying(int userId, SpectatorState state) { lock (userLock) { playingUsers.Remove(userId); playingUserStates.Remove(userId); } OnUserFinishedPlaying?.Invoke(userId, state); return Task.CompletedTask; } Task ISpectatorClient.UserSentFrames(int userId, FrameDataBundle data) { OnNewFrames?.Invoke(userId, data); return Task.CompletedTask; } public void BeginPlaying(GameplayBeatmap beatmap, Score score) { if (IsPlaying) throw new InvalidOperationException($"Cannot invoke {nameof(BeginPlaying)} when already playing"); IsPlaying = true; // transfer state at point of beginning play currentState.BeatmapID = beatmap.BeatmapInfo.OnlineBeatmapID; currentState.RulesetID = currentRuleset.Value.ID; currentState.Mods = currentMods.Value.Select(m => new APIMod(m)); currentBeatmap = beatmap.PlayableBeatmap; currentScore = score; BeginPlayingInternal(currentState); } public void SendFrames(FrameDataBundle data) => lastSend = SendFramesInternal(data); public void EndPlaying() { if (!IsPlaying) return; IsPlaying = false; currentBeatmap = null; EndPlayingInternal(currentState); } public void WatchUser(int userId) { lock (userLock) { if (watchingUsers.Contains(userId)) return; watchingUsers.Add(userId); } WatchUserInternal(userId); } public void StopWatchingUser(int userId) { lock (userLock) { watchingUsers.Remove(userId); } StopWatchingUserInternal(userId); } protected abstract Task BeginPlayingInternal(SpectatorState state); protected abstract Task SendFramesInternal(FrameDataBundle data); protected abstract Task EndPlayingInternal(SpectatorState state); protected abstract Task WatchUserInternal(int userId); protected abstract Task StopWatchingUserInternal(int userId); private readonly Queue pendingFrames = new Queue(); private double lastSendTime; private Task? lastSend; private const int max_pending_frames = 30; protected override void Update() { base.Update(); if (pendingFrames.Count > 0 && Time.Current - lastSendTime > TIME_BETWEEN_SENDS) purgePendingFrames(); } public void HandleFrame(ReplayFrame frame) { if (frame is IConvertibleReplayFrame convertible) pendingFrames.Enqueue(convertible.ToLegacy(currentBeatmap)); if (pendingFrames.Count > max_pending_frames) purgePendingFrames(); } private void purgePendingFrames() { if (lastSend?.IsCompleted == false) return; var frames = pendingFrames.ToArray(); pendingFrames.Clear(); Debug.Assert(currentScore != null); SendFrames(new FrameDataBundle(currentScore.ScoreInfo, frames)); lastSendTime = Time.Current; } /// /// Attempts to retrieve the for a currently-playing user. /// /// The user. /// The current for the user, if they're playing. null if the user is not playing. /// true if successful (the user is playing), false otherwise. public bool TryGetPlayingUserState(int userId, out SpectatorState state) { lock (userLock) return playingUserStates.TryGetValue(userId, out state); } /// /// Bind an action to with the option of running the bound action once immediately. /// /// The action to perform when a user begins playing. /// Whether the action provided in should be run once immediately for all users currently playing. public void BindUserBeganPlaying(Action callback, bool runOnceImmediately = false) { // The lock is taken before the event is subscribed to to prevent doubling of events. lock (userLock) { OnUserBeganPlaying += callback; if (!runOnceImmediately) return; foreach (var (userId, state) in playingUserStates) callback(userId, state); } } } }