
281 lines
7.6 KiB
Raw Normal View History

This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
#pragma once
#include "media/streaming/media_streaming_loader.h"
#include "base/bytes.h"
#include "base/weak_ptr.h"
#include "base/thread_safe_wrap.h"
namespace Storage {
class StreamedFileDownloader;
} // namespace Storage
namespace Storage {
namespace Cache {
struct Key;
2019-06-26 10:01:04 +00:00
class Database;
} // namespace Cache
} // namespace Storage
namespace Media {
namespace Streaming {
class Loader;
struct LoadedPart;
2019-03-05 13:56:27 +00:00
enum class Error;
class Reader final : public base::has_weak_ptr {
enum class FillState : uchar {
// Main thread.
explicit Reader(
std::unique_ptr<Loader> loader,
Storage::Cache::Database *cache = nullptr);
void setLoaderPriority(int priority);
// Any thread.
2022-05-10 14:22:28 +00:00
[[nodiscard]] int64 size() const;
[[nodiscard]] bool isRemoteLoader() const;
// Single thread.
[[nodiscard]] FillState fill(
2022-05-10 14:22:28 +00:00
int64 offset,
bytes::span buffer,
not_null<crl::semaphore*> notify);
2019-04-12 07:34:39 +00:00
[[nodiscard]] std::optional<Error> streamingError() const;
void headerDone();
2019-05-31 11:45:35 +00:00
[[nodiscard]] int headerSize() const;
[[nodiscard]] bool fullInCache() const;
// Thread safe.
void startSleep(not_null<crl::semaphore*> wake);
void wakeFromSleep();
void stopSleep();
void stopStreamingAsync();
void tryRemoveLoaderAsync();
// Main thread.
void startStreaming();
void stopStreaming(bool stillActive = false);
[[nodiscard]] rpl::producer<LoadedPart> partsForDownloader() const;
void loadForDownloader(
not_null<Storage::StreamedFileDownloader*> downloader,
2022-05-10 14:22:28 +00:00
int64 offset);
void doneForDownloader(int64 offset);
void cancelForDownloader(
not_null<Storage::StreamedFileDownloader*> downloader);
2019-03-05 13:56:27 +00:00
static constexpr auto kLoadFromRemoteMax = 8;
struct CacheHelper;
2022-05-10 14:22:28 +00:00
// FileSize: Right now any file size fits 32 bit.
using PartsMap = base::flat_map<uint32, QByteArray>;
2019-04-11 11:56:26 +00:00
template <int Size>
class StackIntVector {
2022-05-10 14:22:28 +00:00
bool add(uint32 value);
auto values() const;
2022-05-10 14:22:28 +00:00
std::array<uint32, Size> _storage = { uint32(-1) };
struct SerializedSlice {
int number = -1;
QByteArray data;
struct FillResult {
static constexpr auto kReadFromCacheMax = 2;
StackIntVector<kReadFromCacheMax> sliceNumbersFromCache;
StackIntVector<kLoadFromRemoteMax> offsetsFromLoader;
SerializedSlice toCache;
FillState state = FillState::WaitingRemote;
struct Slice {
enum class Flag : uchar {
2019-03-10 08:01:12 +00:00
LoadingFromCache = 0x01,
LoadedFromCache = 0x02,
ChangedSinceCache = 0x04,
FullInCache = 0x08,
friend constexpr inline bool is_flag_type(Flag) { return true; }
using Flags = base::flags<Flag>;
struct PrepareFillResult {
StackIntVector<kLoadFromRemoteMax> offsetsFromLoader;
2019-04-11 11:56:26 +00:00
PartsMap::const_iterator start;
PartsMap::const_iterator finish;
bool ready = true;
2019-04-11 11:56:26 +00:00
void processCacheData(PartsMap &&data);
2022-05-10 14:22:28 +00:00
void addPart(uint32 offset, QByteArray bytes);
PrepareFillResult prepareFill(uint32 from, uint32 till);
// Get up to kLoadFromRemoteMax not loaded parts in from-till range.
StackIntVector<kLoadFromRemoteMax> offsetsFromLoader(
2022-05-10 14:22:28 +00:00
uint32 from,
uint32 till) const;
2019-04-11 11:56:26 +00:00
PartsMap parts;
Flags flags;
class Slices {
2022-05-10 14:22:28 +00:00
Slices(uint32 size, bool useCache);
void headerDone(bool fromCache);
2019-05-31 11:45:35 +00:00
[[nodiscard]] int headerSize() const;
[[nodiscard]] bool fullInCache() const;
2019-03-06 14:05:11 +00:00
[[nodiscard]] bool headerWontBeFilled() const;
[[nodiscard]] bool headerModeUnknown() const;
[[nodiscard]] bool isFullInHeader() const;
[[nodiscard]] bool isGoodHeader() const;
[[nodiscard]] bool waitingForHeaderCache() const;
[[nodiscard]] int requestSliceSizesCount() const;
2019-04-11 11:56:26 +00:00
void processCacheResult(int sliceNumber, PartsMap &&result);
void processCachedSizes(const std::vector<int> &sizes);
2022-05-10 14:22:28 +00:00
void processPart(uint32 offset, QByteArray &&bytes);
2022-05-10 14:22:28 +00:00
[[nodiscard]] FillResult fill(uint32 offset, bytes::span buffer);
2019-03-06 14:05:11 +00:00
[[nodiscard]] SerializedSlice unloadToCache();
2022-05-10 14:22:28 +00:00
[[nodiscard]] QByteArray partForDownloader(uint32 offset) const;
[[nodiscard]] bool readCacheForDownloaderRequired(uint32 offset);
enum class HeaderMode {
void applyHeaderCacheData();
[[nodiscard]] int maxSliceSize(int sliceNumber) const;
[[nodiscard]] SerializedSlice serializeAndUnloadSlice(
int sliceNumber);
[[nodiscard]] SerializedSlice serializeAndUnloadUnused();
[[nodiscard]] QByteArray serializeComplexSlice(
const Slice &slice) const;
[[nodiscard]] QByteArray serializeAndUnloadFirstSliceNoHeader();
2019-02-26 13:57:47 +00:00
void markSliceUsed(int sliceIndex);
[[nodiscard]] bool computeIsGoodHeader() const;
[[nodiscard]] FillResult fillFromHeader(
2022-05-10 14:22:28 +00:00
uint32 offset,
bytes::span buffer);
void unloadSlice(Slice &slice) const;
void checkSliceFullLoaded(int sliceNumber);
[[nodiscard]] bool checkFullInCache() const;
std::vector<Slice> _data;
Slice _header;
2019-02-26 13:57:47 +00:00
std::deque<int> _usedSlices;
2022-05-10 14:22:28 +00:00
uint32 _size = 0;
HeaderMode _headerMode = HeaderMode::Unknown;
bool _fullInCache = false;
// 0 is for headerData, slice index = sliceNumber - 1.
2019-04-12 07:34:39 +00:00
// returns false if asked for a known-empty downloader slice cache.
void readFromCache(int sliceNumber);
[[nodiscard]] bool readFromCacheForDownloader(int sliceNumber);
bool processCacheResults();
void putToCache(SerializedSlice &&data);
2022-05-10 14:22:28 +00:00
void cancelLoadInRange(uint32 from, uint32 till);
void loadAtOffset(uint32 offset);
void checkLoadWillBeFirst(uint32 offset);
bool processLoadedParts();
bool checkForSomethingMoreReceived();
2022-05-10 14:22:28 +00:00
FillState fillFromSlices(uint32 offset, bytes::span buffer);
void finalizeCache();
void processDownloaderRequests();
2019-04-11 11:56:26 +00:00
void checkCacheResultsForDownloader();
2022-05-10 14:22:28 +00:00
void pruneDownloaderCache(uint32 minimalOffset);
void pruneDoneDownloaderRequests();
void sendDownloaderRequests();
2022-05-10 14:22:28 +00:00
[[nodiscard]] bool downloaderWaitForCachedSlice(uint32 offset);
void enqueueDownloaderOffsets();
void checkForDownloaderChange(int checkItemsCount);
void checkForDownloaderReadyOffsets();
void refreshLoaderPriority();
static std::shared_ptr<CacheHelper> InitCacheHelper(
Storage::Cache::Key baseKey);
const std::unique_ptr<Loader> _loader;
Storage::Cache::Database * const _cache = nullptr;
// shared_ptr is used to be able to have weak_ptr.
const std::shared_ptr<CacheHelper> _cacheHelper;
base::thread_safe_queue<LoadedPart, std::vector> _loadedParts;
std::atomic<crl::semaphore*> _waiting = nullptr;
std::atomic<crl::semaphore*> _sleeping = nullptr;
std::atomic<bool> _stopStreamingAsync = false;
PriorityQueue _loadingOffsets;
Slices _slices;
2019-04-12 07:34:39 +00:00
// Even if streaming had failed, the Reader can work for the downloader.
std::optional<Error> _streamingError;
// In case streaming is active both main and streaming threads have work.
// In case only downloader is active, all work is done on main thread.
// Main thread.
Storage::StreamedFileDownloader *_attachedDownloader = nullptr;
rpl::event_stream<LoadedPart> _partsForDownloader;
int _realPriority = 1;
bool _streamingActive = false;
// Streaming thread.
2022-05-10 14:22:28 +00:00
std::deque<uint32> _offsetsForDownloader;
base::flat_set<uint32> _downloaderOffsetsRequested;
base::flat_map<uint32, std::optional<PartsMap>> _downloaderReadCache;
// Communication from main thread to streaming thread.
// Streaming thread to main thread communicates using crl::on_main.
2022-05-10 14:22:28 +00:00
base::thread_safe_queue<uint32> _downloaderOffsetRequests;
base::thread_safe_queue<uint32> _downloaderOffsetAcks;
rpl::lifetime _lifetime;
} // namespace Streaming
} // namespace Media