diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_loader.cpp index a84c6b39cf..aaa79ffaad 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader.cpp @@ -50,6 +50,10 @@ bool PriorityQueue::remove(int value) { return true; } +bool PriorityQueue::empty() const { + return _data.empty(); +} + std::optional<int> PriorityQueue::front() const { return _data.empty() ? std::nullopt diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader.h b/Telegram/SourceFiles/media/streaming/media_streaming_loader.h index 51c486e7b2..cfe19238e6 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader.h @@ -52,9 +52,10 @@ public: bool add(int value); bool remove(int value); void increasePriority(); - std::optional<int> front() const; - std::optional<int> take(); - base::flat_set<int> takeInRange(int from, int till); + [[nodiscard]] bool empty() const; + [[nodiscard]] std::optional<int> front() const; + [[nodiscard]] std::optional<int> take(); + [[nodiscard]] base::flat_set<int> takeInRange(int from, int till); void clear(); private: diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp index 6a713909b3..13ac04549e 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp @@ -37,6 +37,7 @@ LoaderMtproto::~LoaderMtproto() { for (const auto [index, amount] : _amountByDcIndex) { changeRequestedAmount(index, -amount); } + _owner->remove(this); } std::optional<Storage::Cache::Key> LoaderMtproto::baseCacheKey() const { @@ -60,7 +61,7 @@ void LoaderMtproto::load(int offset) { if (_requests.contains(offset)) { return; } else if (_requested.add(offset)) { - sendNext(); + _owner->enqueue(this); // #TODO download priority } }); } @@ -72,6 +73,7 @@ void LoaderMtproto::stop() { _api.requestCanceller(), &base::flat_map<int, mtpRequestId>::value_type::second); _requested.clear(); + _owner->remove(this); }); } @@ -84,7 +86,7 @@ void LoaderMtproto::cancel(int offset) { void LoaderMtproto::cancelForOffset(int offset) { if (const auto requestId = _requests.take(offset)) { _api.request(*requestId).cancel(); - sendNext(); + _owner->enqueue(this); } else { _requested.remove(offset); } @@ -110,17 +112,21 @@ void LoaderMtproto::changeRequestedAmount(int index, int amount) { _amountByDcIndex[index] += amount; } -void LoaderMtproto::sendNext() { - if (_requests.size() >= kMaxConcurrentRequests) { - return; - } +MTP::DcId LoaderMtproto::dcId() const { + return _dcId; +} + +bool LoaderMtproto::readyToRequest() const { + return !_requested.empty(); +} + +void LoaderMtproto::loadPart(int dcIndex) { const auto offset = _requested.take().value_or(-1); if (offset < 0) { return; } - const auto index = _owner->chooseDcIndexForRequest(_dcId); - changeRequestedAmount(index, kPartSize); + changeRequestedAmount(dcIndex, kPartSize); const auto usedFileReference = _location.fileReference(); const auto id = _api.request(MTPupload_GetFile( @@ -129,23 +135,21 @@ void LoaderMtproto::sendNext() { MTP_int(offset), MTP_int(kPartSize) )).done([=](const MTPupload_File &result) { - changeRequestedAmount(index, -kPartSize); + changeRequestedAmount(dcIndex, -kPartSize); requestDone(offset, result); }).fail([=](const RPCError &error) { - changeRequestedAmount(index, -kPartSize); + changeRequestedAmount(dcIndex, -kPartSize); requestFailed(offset, error, usedFileReference); }).toDC( - MTP::downloadDcId(_dcId, index) + MTP::downloadDcId(_dcId, dcIndex) ).send(); _requests.emplace(offset, id); - - sendNext(); } void LoaderMtproto::requestDone(int offset, const MTPupload_File &result) { result.match([&](const MTPDupload_file &data) { _requests.erase(offset); - sendNext(); + _owner->enqueue(this); _parts.fire({ offset, data.vbytes().v }); }, [&](const MTPDupload_fileCdnRedirect &data) { changeCdnParams( @@ -189,7 +193,7 @@ void LoaderMtproto::requestFailed( return; } else { _requested.add(offset); - sendNext(); + _owner->enqueue(this); } }; _owner->api().refreshFileReference( diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h index a7f16f4f8f..a9a2294034 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h @@ -10,15 +10,15 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "media/streaming/media_streaming_loader.h" #include "mtproto/sender.h" #include "data/data_file_origin.h" - -namespace Storage { -class DownloadManager; -} // namespace Storage +#include "storage/file_download.h" namespace Media { namespace Streaming { -class LoaderMtproto : public Loader, public base::has_weak_ptr { +class LoaderMtproto + : public Loader + , public base::has_weak_ptr + , public Storage::Downloader { public: LoaderMtproto( not_null<Storage::DownloadManager*> owner, @@ -44,7 +44,9 @@ public: void clearAttachedDownloader() override; private: - void sendNext(); + MTP::DcId dcId() const override; + bool readyToRequest() const override; + void loadPart(int dcIndex) override; void requestDone(int offset, const MTPupload_File &result); void requestFailed( diff --git a/Telegram/SourceFiles/storage/file_download.cpp b/Telegram/SourceFiles/storage/file_download.cpp index 0be8c9cfd1..ec759ef4d4 100644 --- a/Telegram/SourceFiles/storage/file_download.cpp +++ b/Telegram/SourceFiles/storage/file_download.cpp @@ -45,6 +45,7 @@ constexpr auto kPartSize = 128 * 1024; constexpr auto kStartSessionsCount = 1; constexpr auto kMaxSessionsCount = 8; +constexpr auto kResetDownloadPrioritiesTimeout = crl::time(200); } // namespace @@ -78,6 +79,10 @@ void DownloadManager::Queue::resetGeneration() { std::swap(_loaders, _previousGeneration); } +bool DownloadManager::Queue::empty() const { + return _loaders.empty() && _previousGeneration.empty(); +} + Downloader *DownloadManager::Queue::nextLoader() const { auto &&all = ranges::view::concat(_loaders, _previousGeneration); const auto i = ranges::find(all, true, &FileLoader::readyToRequest); @@ -86,6 +91,7 @@ Downloader *DownloadManager::Queue::nextLoader() const { DownloadManager::DownloadManager(not_null<ApiWrap*> api) : _api(api) +, _resetGenerationTimer([=] { resetGeneration(); }) , _killDownloadSessionsTimer([=] { killDownloadSessions(); }) { } @@ -96,11 +102,8 @@ DownloadManager::~DownloadManager() { void DownloadManager::enqueue(not_null<Downloader*> loader) { const auto dcId = loader->dcId(); (dcId ? _mtprotoLoaders[dcId] : _webLoaders).enqueue(loader); - if (!_resettingGeneration) { - _resettingGeneration = true; - crl::on_main(this, [=] { - resetGeneration(); - }); + if (!_resetGenerationTimer.isActive()) { + _resetGenerationTimer.callOnce(kResetDownloadPrioritiesTimeout); } checkSendNext(); } @@ -112,7 +115,7 @@ void DownloadManager::remove(not_null<Downloader*> loader) { } void DownloadManager::resetGeneration() { - _resettingGeneration = false; + _resetGenerationTimer.cancel(); for (auto &[dcId, queue] : _mtprotoLoaders) { queue.resetGeneration(); } @@ -121,6 +124,9 @@ void DownloadManager::resetGeneration() { void DownloadManager::checkSendNext() { for (auto &[dcId, queue] : _mtprotoLoaders) { + if (queue.empty()) { + continue; + } const auto bestIndex = [&] { const auto i = _requestedBytesAmount.find(dcId); if (i == end(_requestedBytesAmount)) { diff --git a/Telegram/SourceFiles/storage/file_download.h b/Telegram/SourceFiles/storage/file_download.h index 73e7e77637..520501de96 100644 --- a/Telegram/SourceFiles/storage/file_download.h +++ b/Telegram/SourceFiles/storage/file_download.h @@ -71,6 +71,7 @@ private: void enqueue(not_null<Downloader*> loader); void remove(not_null<Downloader*> loader); void resetGeneration(); + [[nodiscard]] bool empty() const; [[nodiscard]] Downloader *nextLoader() const; private: @@ -92,13 +93,13 @@ private: base::Observable<void> _taskFinishedObservable; base::flat_map<MTP::DcId, std::vector<int>> _requestedBytesAmount; + base::Timer _resetGenerationTimer; base::flat_map<MTP::DcId, crl::time> _killDownloadSessionTimes; base::Timer _killDownloadSessionsTimer; base::flat_map<MTP::DcId, Queue> _mtprotoLoaders; Queue _webLoaders; - bool _resettingGeneration = false; }; diff --git a/Telegram/SourceFiles/storage/streamed_file_downloader.cpp b/Telegram/SourceFiles/storage/streamed_file_downloader.cpp index aaab67bfae..33b15bb54e 100644 --- a/Telegram/SourceFiles/storage/streamed_file_downloader.cpp +++ b/Telegram/SourceFiles/storage/streamed_file_downloader.cpp @@ -16,6 +16,7 @@ namespace { using namespace Media::Streaming; constexpr auto kPartSize = Loader::kPartSize; +constexpr auto kRequestPartsCount = 8; } // namespace @@ -60,6 +61,8 @@ StreamedFileDownloader::StreamedFileDownloader( savePart(std::move(part)); } }, _lifetime); + + requestParts(); } StreamedFileDownloader::~StreamedFileDownloader() { @@ -78,6 +81,31 @@ void StreamedFileDownloader::stop() { cancelRequests(); } +void StreamedFileDownloader::requestParts() { + while (!_finished + && _nextPartIndex < _partsCount + && _partsRequested < kRequestPartsCount) { + requestPart(); + } +} + +void StreamedFileDownloader::requestPart() { + Expects(!_finished); + + const auto index = std::find( + begin(_partIsSaved) + _nextPartIndex, + end(_partIsSaved), + false + ) - begin(_partIsSaved); + if (index == _partsCount) { + _nextPartIndex = _partsCount; + return; + } + _nextPartIndex = index + 1; + _reader->loadForDownloader(this, index * kPartSize); + ++_partsRequested; +} + QByteArray StreamedFileDownloader::readLoadedPart(int offset) { Expects(offset >= 0 && offset < _size); Expects(!(offset % kPartSize)); @@ -104,30 +132,11 @@ void StreamedFileDownloader::cancelRequests() { } bool StreamedFileDownloader::readyToRequest() const { - if (_finished || _nextPartIndex >= _partsCount) { - return false; - } - _nextPartIndex = std::find( - begin(_partIsSaved) + _nextPartIndex, - end(_partIsSaved), - false - ) - begin(_partIsSaved); - return (_nextPartIndex < _partsCount); + return false; } void StreamedFileDownloader::loadPart(int dcIndex) { - const auto index = std::find( - begin(_partIsSaved) + _nextPartIndex, - end(_partIsSaved), - false - ) - begin(_partIsSaved); - if (index == _partsCount) { - _nextPartIndex = _partsCount; - return; - } - _nextPartIndex = index + 1; - _reader->loadForDownloader(this, index * kPartSize); - ++_partsRequested; + Unexpected("StreamedFileDownloader can't load parts."); } void StreamedFileDownloader::savePart(const LoadedPart &part) { @@ -159,6 +168,7 @@ void StreamedFileDownloader::savePart(const LoadedPart &part) { } } _reader->doneForDownloader(offset); + requestParts(); notifyAboutProgress(); } diff --git a/Telegram/SourceFiles/storage/streamed_file_downloader.h b/Telegram/SourceFiles/storage/streamed_file_downloader.h index 803bc3cde1..90aea8c099 100644 --- a/Telegram/SourceFiles/storage/streamed_file_downloader.h +++ b/Telegram/SourceFiles/storage/streamed_file_downloader.h @@ -51,6 +51,8 @@ private: void cancelRequests() override; bool readyToRequest() const override; void loadPart(int dcIndex) override; + void requestParts(); + void requestPart(); void savePart(const Media::Streaming::LoadedPart &part);