diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp index 9d4e14a2ab..a2e1736db9 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp @@ -24,17 +24,139 @@ constexpr auto kMaxOnlyInHeader = 80 * kPartSize; constexpr auto kPartsOutsideFirstSliceGood = 8; constexpr auto kSlicesInMemory = 2; -// 1 MB of header parts can be outside the first slice for us to still -// put the whole first slice of the file in the header cache entry. -//constexpr auto kMaxOutsideHeaderPartsForOptimizedMode = 8; - // 1 MB of parts are requested from cloud ahead of reading demand. constexpr auto kPreloadPartsAhead = 8; +using PartsMap = base::flat_map; + +struct ParsedCacheEntry { + PartsMap parts; + std::optional included; +}; + bool IsContiguousSerialization(int serializedSize, int maxSliceSize) { return !(serializedSize % kPartSize) || (serializedSize == maxSliceSize); } +bool IsFullInHeader(int size) { + return (size <= kMaxOnlyInHeader); +} + +bool ComputeIsGoodHeader(int size, const PartsMap &header) { + if (IsFullInHeader(size)) { + return false; + } + const auto outsideFirstSliceIt = ranges::lower_bound( + header, + kInSlice, + ranges::less(), + &PartsMap::value_type::first); + const auto outsideFirstSlice = end(header) - outsideFirstSliceIt; + return (outsideFirstSlice <= kPartsOutsideFirstSliceGood); +} + +int SlicesCount(int size) { + return (size + kInSlice - 1) / kInSlice; +} + +int MaxSliceSize(int sliceNumber, int size) { + return !sliceNumber + ? size + : (sliceNumber == SlicesCount(size)) + ? (size - (sliceNumber - 1) * kInSlice) + : kInSlice; +} + +bytes::const_span ParseComplexCachedMap( + PartsMap &result, + bytes::const_span data, + int maxSize) { + const auto takeInt = [&]() -> std::optional { + if (data.size() < sizeof(int32)) { + return std::nullopt; + } + const auto bytes = data.data(); + const auto result = *reinterpret_cast(bytes); + data = data.subspan(sizeof(int32)); + return result; + }; + const auto takeBytes = [&](int count) { + if (count <= 0 || data.size() < count) { + return bytes::const_span(); + } + const auto result = data.subspan(0, count); + data = data.subspan(count); + return result; + }; + const auto maybeCount = takeInt(); + if (!maybeCount) { + return {}; + } + const auto count = *maybeCount; + if (count < 0) { + return {}; + } else if (!count) { + return data; + } + for (auto i = 0; i != count; ++i) { + const auto offset = takeInt().value_or(0); + const auto size = takeInt().value_or(0); + const auto bytes = takeBytes(size); + if (offset < 0 + || offset >= maxSize + || size <= 0 + || size > maxSize + || offset + size > maxSize + || bytes.size() != size) { + return {}; + } + result.try_emplace( + offset, + reinterpret_cast(bytes.data()), + bytes.size()); + } + return data; +} + +bytes::const_span ParseCachedMap( + PartsMap &result, + bytes::const_span data, + int maxSize) { + const auto size = int(data.size()); + if (IsContiguousSerialization(size, maxSize)) { + if (size > maxSize) { + return {}; + } + for (auto offset = 0; offset < size; offset += kPartSize) { + const auto part = data.subspan( + offset, + std::min(kPartSize, size - offset)); + result.try_emplace( + offset, + reinterpret_cast(part.data()), + part.size()); + } + return {}; + } + return ParseComplexCachedMap(result, data, maxSize); +} + +ParsedCacheEntry ParseCacheEntry( + bytes::const_span data, + int sliceNumber, + int size) { + auto result = ParsedCacheEntry(); + const auto remaining = ParseCachedMap( + result.parts, + data, + MaxSliceSize(sliceNumber, size)); + if (!sliceNumber && ComputeIsGoodHeader(size, result.parts)) { + result.included = PartsMap(); + ParseCachedMap(*result.included, remaining, MaxSliceSize(1, size)); + } + return result; +} + template // Range::value_type is Pair int FindNotLoadedStart(Range &&parts, int offset) { auto result = offset; @@ -101,7 +223,7 @@ struct Reader::CacheHelper { const Storage::Cache::Key baseKey; QMutex mutex; - base::flat_map results; + base::flat_map results; std::atomic waiting = nullptr; }; @@ -113,9 +235,7 @@ Storage::Cache::Key Reader::CacheHelper::key(int sliceNumber) const { return Storage::Cache::Key{ baseKey.high, baseKey.low + sliceNumber }; } -bytes::const_span Reader::Slice::processCacheData( - bytes::const_span data, - int maxSize) { +void Reader::Slice::processCacheData(PartsMap &&data) { Expects((flags & Flag::LoadingFromCache) != 0); Expects(!(flags & Flag::LoadedFromCache)); @@ -123,74 +243,13 @@ bytes::const_span Reader::Slice::processCacheData( flags |= Flag::LoadedFromCache; flags &= ~Flag::LoadingFromCache; }); - - const auto size = int(data.size()); - if (IsContiguousSerialization(size, maxSize)) { - if (size > maxSize) { - return {}; + if (parts.empty()) { + parts = std::move(data); + } else { + for (auto &[offset, bytes] : data) { + parts.emplace(offset, std::move(bytes)); } - for (auto offset = 0; offset < size; offset += kPartSize) { - const auto part = data.subspan( - offset, - std::min(kPartSize, size - offset)); - parts.try_emplace( - offset, - reinterpret_cast(part.data()), - part.size()); - } - return {}; } - return processComplexCacheData(bytes::make_span(data), maxSize); -} - -bytes::const_span Reader::Slice::processComplexCacheData( - bytes::const_span data, - int maxSize) { - const auto takeInt = [&]() -> std::optional { - if (data.size() < sizeof(int32)) { - return std::nullopt; - } - const auto bytes = data.data(); - const auto result = *reinterpret_cast(bytes); - data = data.subspan(sizeof(int32)); - return result; - }; - const auto takeBytes = [&](int count) { - if (count <= 0 || data.size() < count) { - return bytes::const_span(); - } - const auto result = data.subspan(0, count); - data = data.subspan(count); - return result; - }; - const auto maybeCount = takeInt(); - if (!maybeCount) { - return {}; - } - const auto count = *maybeCount; - if (count < 0) { - return {}; - } else if (!count) { - return data; - } - for (auto i = 0; i != count; ++i) { - const auto offset = takeInt().value_or(0); - const auto size = takeInt().value_or(0); - const auto bytes = takeBytes(size); - if (offset < 0 - || offset >= maxSize - || size <= 0 - || size > maxSize - || offset + size > maxSize - || bytes.size() != size) { - return {}; - } - parts.try_emplace( - offset, - reinterpret_cast(bytes.data()), - bytes.size()); - } - return data; } void Reader::Slice::addPart(int offset, QByteArray bytes) { @@ -215,7 +274,7 @@ auto Reader::Slice::prepareFill(int from, int till) -> PrepareFillResult { parts, from, ranges::less(), - &base::flat_map::value_type::first); + &PartsMap::value_type::first); if (after == begin(parts)) { result.offsetsFromLoader = offsetsFromLoader( fromOffset, @@ -229,7 +288,7 @@ auto Reader::Slice::prepareFill(int from, int till) -> PrepareFillResult { end(parts), till, ranges::less(), - &base::flat_map::value_type::first); + &PartsMap::value_type::first); const auto haveTill = FindNotLoadedStart( ranges::make_iterator_range(start, finish), fromOffset); @@ -256,7 +315,7 @@ auto Reader::Slice::offsetsFromLoader(int from, int till) const parts, from, ranges::less(), - &base::flat_map::value_type::first); + &PartsMap::value_type::first); auto check = (after == begin(parts)) ? after : (after - 1); const auto end = parts.end(); for (auto offset = from; offset != till; offset += kPartSize) { @@ -282,7 +341,7 @@ Reader::Slices::Slices(int size, bool useCache) _headerMode = HeaderMode::NoCache; } if (!isFullInHeader()) { - _data.resize((_size + kInSlice - 1) / kInSlice); + _data.resize(SlicesCount(_size)); } } @@ -291,7 +350,7 @@ bool Reader::Slices::headerModeUnknown() const { } bool Reader::Slices::isFullInHeader() const { - return (_size <= kMaxOnlyInHeader); + return IsFullInHeader(_size); } bool Reader::Slices::isGoodHeader() const { @@ -299,16 +358,7 @@ bool Reader::Slices::isGoodHeader() const { } bool Reader::Slices::computeIsGoodHeader() const { - if (isFullInHeader()) { - return false; - } - const auto outsideFirstSliceIt = ranges::lower_bound( - _header.parts, - kInSlice, - ranges::less(), - &base::flat_map::value_type::first); - const auto outsideFirstSlice = end(_header.parts) - outsideFirstSliceIt; - return (outsideFirstSlice <= kPartsOutsideFirstSliceGood); + return ComputeIsGoodHeader(_size, _header.parts); } void Reader::Slices::headerDone(bool fromCache) { @@ -364,9 +414,7 @@ void Reader::Slices::applyHeaderCacheData() { } } -void Reader::Slices::processCacheResult( - int sliceNumber, - bytes::const_span result) { +void Reader::Slices::processCacheResult(int sliceNumber, PartsMap &&result) { Expects(sliceNumber >= 0 && sliceNumber <= _data.size()); auto &slice = (sliceNumber ? _data[sliceNumber - 1] : _header); @@ -384,16 +432,13 @@ void Reader::Slices::processCacheResult( // We could've already unloaded this slice using LRU _usedSlices. return; } - const auto remaining = slice.processCacheData( - result, - maxSliceSize(sliceNumber)); + slice.processCacheData(std::move(result)); if (!sliceNumber) { applyHeaderCacheData(); if (isGoodHeader()) { // When we first read header we don't request the first slice. // But we get it, so let's apply it anyway. _data[0].flags |= Slice::Flag::LoadingFromCache; - processCacheResult(1, remaining); } } } @@ -542,11 +587,7 @@ void Reader::Slices::markSliceUsed(int sliceIndex) { } int Reader::Slices::maxSliceSize(int sliceNumber) const { - return !sliceNumber - ? _size - : (sliceNumber == _data.size()) - ? (_size - (sliceNumber - 1) * kInSlice) - : kInSlice; + return MaxSliceSize(sliceNumber, _size); } Reader::SerializedSlice Reader::Slices::serializeAndUnloadUnused() { @@ -779,6 +820,10 @@ void Reader::processDownloaderRequests() { } +void Reader::checkCacheResultsForDownloader() { + +} + bool Reader::isRemoteLoader() const { return _loader->baseCacheKey().has_value(); } @@ -799,17 +844,32 @@ void Reader::readFromCache(int sliceNumber) { if (sliceNumber == 1 && _slices.isGoodHeader()) { return readFromCache(0); } + const auto size = _loader->size(); const auto key = _cacheHelper->key(sliceNumber); - const auto weak = std::weak_ptr(_cacheHelper); + const auto cache = std::weak_ptr(_cacheHelper); + const auto weak = base::make_weak(this); _owner->cacheBigFile().get(key, [=](QByteArray &&result) { - if (const auto strong = weak.lock()) { - QMutexLocker lock(&strong->mutex); - strong->results.emplace(sliceNumber, std::move(result)); - if (const auto waiting = strong->waiting.load()) { - strong->waiting.store(nullptr, std::memory_order_release); - waiting->release(); + crl::async([=, result = std::move(result)]{ + auto entry = ParseCacheEntry( + bytes::make_span(result), + sliceNumber, + size); + if (const auto strong = cache.lock()) { + QMutexLocker lock(&strong->mutex); + strong->results.emplace(sliceNumber, std::move(entry.parts)); + if (!sliceNumber && entry.included) { + strong->results.emplace(1, std::move(*entry.included)); + } + if (const auto waiting = strong->waiting.load()) { + strong->waiting.store(nullptr, std::memory_order_release); + waiting->release(); + } else { + crl::on_main(weak, [=] { + checkCacheResultsForDownloader(); + }); + } } - } + }); }); } @@ -935,11 +995,17 @@ bool Reader::processCacheResults() { } QMutexLocker lock(&_cacheHelper->mutex); - const auto loaded = base::take(_cacheHelper->results); + auto loaded = base::take(_cacheHelper->results); lock.unlock(); - for (const auto &[sliceNumber, result] : loaded) { - _slices.processCacheResult(sliceNumber, bytes::make_span(result)); + for (auto &[sliceNumber, result] : loaded) { + _slices.processCacheResult(sliceNumber, std::move(result)); + } + if (!loaded.empty() + && (loaded.front().first == 0) + && _slices.isGoodHeader()) { + Assert(loaded.size() > 1); + Assert((loaded.begin() + 1)->first == 1); } if (_downloaderAttached.load(std::memory_order_acquire)) { for (const auto &[sliceNumber, result] : loaded) { diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h index c452f29cc6..23233a62bc 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h @@ -65,6 +65,8 @@ private: struct CacheHelper; + using PartsMap = base::flat_map; + template class StackIntVector { public: @@ -100,17 +102,12 @@ private: struct PrepareFillResult { StackIntVector offsetsFromLoader; - base::flat_map::const_iterator start; - base::flat_map::const_iterator finish; + PartsMap::const_iterator start; + PartsMap::const_iterator finish; bool ready = true; }; - bytes::const_span processCacheData( - bytes::const_span data, - int maxSize); - bytes::const_span processComplexCacheData( - bytes::const_span data, - int maxSize); + void processCacheData(PartsMap &&data); void addPart(int offset, QByteArray bytes); PrepareFillResult prepareFill(int from, int till); @@ -119,7 +116,7 @@ private: int from, int till) const; - base::flat_map parts; + PartsMap parts; Flags flags; }; @@ -134,7 +131,7 @@ private: [[nodiscard]] bool isFullInHeader() const; [[nodiscard]] bool isGoodHeader() const; - void processCacheResult(int sliceNumber, bytes::const_span result); + void processCacheResult(int sliceNumber, PartsMap &&result); void processPart(int offset, QByteArray &&bytes); [[nodiscard]] FillResult fill(int offset, bytes::span buffer); @@ -191,6 +188,7 @@ private: void finalizeCache(); void processDownloaderRequests(); + void checkCacheResultsForDownloader(); static std::shared_ptr InitCacheHelper( std::optional baseKey);