Allow variable count of download sessions.

This commit is contained in:
John Preston 2019-12-04 09:51:21 +03:00
parent cb2c6e4b89
commit 3ae2986c25
11 changed files with 32 additions and 25 deletions

View File

@ -40,7 +40,7 @@ public:
[[nodiscard]] virtual rpl::producer<LoadedPart> parts() const = 0;
virtual void attachDownloader(
Storage::StreamedFileDownloader *downloader) = 0;
not_null<Storage::StreamedFileDownloader*> downloader) = 0;
virtual void clearAttachedDownloader() = 0;
virtual ~Loader() = default;

View File

@ -79,7 +79,7 @@ rpl::producer<LoadedPart> LoaderLocal::parts() const {
}
void LoaderLocal::attachDownloader(
Storage::StreamedFileDownloader *downloader) {
not_null<Storage::StreamedFileDownloader*> downloader) {
Unexpected("Downloader attached to a local streaming loader.");
}

View File

@ -33,7 +33,7 @@ public:
[[nodiscard]] rpl::producer<LoadedPart> parts() const override;
void attachDownloader(
Storage::StreamedFileDownloader *downloader) override;
not_null<Storage::StreamedFileDownloader*> downloader) override;
void clearAttachedDownloader() override;
private:

View File

@ -91,7 +91,7 @@ void LoaderMtproto::cancelForOffset(int offset) {
}
void LoaderMtproto::attachDownloader(
Storage::StreamedFileDownloader *downloader) {
not_null<Storage::StreamedFileDownloader*> downloader) {
_downloader = downloader;
}

View File

@ -40,7 +40,7 @@ public:
[[nodiscard]] rpl::producer<LoadedPart> parts() const override;
void attachDownloader(
Storage::StreamedFileDownloader *downloader) override;
not_null<Storage::StreamedFileDownloader*> downloader) override;
void clearAttachedDownloader() override;
private:

View File

@ -906,7 +906,7 @@ rpl::producer<LoadedPart> Reader::partsForDownloader() const {
}
void Reader::loadForDownloader(
Storage::StreamedFileDownloader *downloader,
not_null<Storage::StreamedFileDownloader*> downloader,
int offset) {
if (_attachedDownloader != downloader) {
if (_attachedDownloader) {
@ -931,7 +931,7 @@ void Reader::doneForDownloader(int offset) {
}
void Reader::cancelForDownloader(
Storage::StreamedFileDownloader *downloader) {
not_null<Storage::StreamedFileDownloader*> downloader) {
if (_attachedDownloader == downloader) {
_downloaderOffsetRequests.take();
_attachedDownloader = nullptr;

View File

@ -61,10 +61,11 @@ public:
void stopStreaming(bool stillActive = false);
[[nodiscard]] rpl::producer<LoadedPart> partsForDownloader() const;
void loadForDownloader(
Storage::StreamedFileDownloader *downloader,
not_null<Storage::StreamedFileDownloader*> downloader,
int offset);
void doneForDownloader(int offset);
void cancelForDownloader(Storage::StreamedFileDownloader *downloader);
void cancelForDownloader(
not_null<Storage::StreamedFileDownloader*> downloader);
~Reader();

View File

@ -34,13 +34,13 @@ constexpr ShiftedDcId updaterDcId(DcId dcId) {
return ShiftDcId(dcId, kUpdaterDcShift);
}
constexpr auto kDownloadSessionsCount = 2;
constexpr auto kUploadSessionsCount = 2;
namespace details {
constexpr ShiftedDcId downloadDcId(DcId dcId, int index) {
static_assert(kDownloadSessionsCount < kMaxMediaDcCount, "Too large MTPDownloadSessionsCount!");
Expects(index < kMaxMediaDcCount);
return ShiftDcId(dcId, kBaseDownloadDcShift + index);
};
@ -48,13 +48,12 @@ constexpr ShiftedDcId downloadDcId(DcId dcId, int index) {
// send(req, callbacks, MTP::downloadDcId(dc, index)) - for download shifted dc id
inline ShiftedDcId downloadDcId(DcId dcId, int index) {
Expects(index >= 0 && index < kDownloadSessionsCount);
return details::downloadDcId(dcId, index);
}
inline constexpr bool isDownloadDcId(ShiftedDcId shiftedDcId) {
return (shiftedDcId >= details::downloadDcId(0, 0))
&& (shiftedDcId < details::downloadDcId(0, kDownloadSessionsCount - 1) + kDcShift);
&& (shiftedDcId < details::downloadDcId(0, kMaxMediaDcCount - 1) + kDcShift);
}
inline bool isCdnDc(MTPDdcOption::Flags flags) {

View File

@ -1033,8 +1033,6 @@ void SessionPrivate::onSentSome(uint64 size) {
}
if (isUploadDcId(_shiftedDcId)) {
remain *= kUploadSessionsCount;
} else if (isDownloadDcId(_shiftedDcId)) {
remain *= kDownloadSessionsCount;
}
_waitForReceivedTimer.callOnce(remain);
}

View File

@ -43,6 +43,9 @@ constexpr auto kMaxWebFileQueries = 8;
// fixed part size download for hash checking.
constexpr auto kPartSize = 128 * 1024;
constexpr auto kStartSessionsCount = 1;
constexpr auto kMaxSessionsCount = 8;
} // namespace
void DownloadManager::Queue::enqueue(not_null<Downloader*> loader) {
@ -121,6 +124,7 @@ void DownloadManager::checkSendNext() {
const auto bestIndex = [&] {
const auto i = _requestedBytesAmount.find(dcId);
if (i == end(_requestedBytesAmount)) {
_requestedBytesAmount[dcId].resize(kStartSessionsCount);
return 0;
}
const auto j = ranges::min_element(i->second);
@ -136,6 +140,9 @@ void DownloadManager::checkSendNext() {
loader->loadPart(bestIndex);
}
}
if (_requestedBytesAmount[0].empty()) {
_requestedBytesAmount[0] = std::vector<int>(1, 0);
}
if (_requestedBytesAmount[0][0] < kMaxWebFileQueries) {
if (const auto loader = _webLoaders.nextLoader()) {
loader->loadPart(0);
@ -147,13 +154,14 @@ void DownloadManager::requestedAmountIncrement(
MTP::DcId dcId,
int index,
int amount) {
Expects(index >= 0 && index < MTP::kDownloadSessionsCount);
using namespace rpl::mappers;
auto it = _requestedBytesAmount.find(dcId);
if (it == _requestedBytesAmount.end()) {
it = _requestedBytesAmount.emplace(dcId, RequestedInDc { { 0 } }).first;
it = _requestedBytesAmount.emplace(
dcId,
std::vector<int>(dcId ? kStartSessionsCount : 1, 0)
).first;
}
it->second[index] += amount;
if (!dcId) {
@ -163,6 +171,7 @@ void DownloadManager::requestedAmountIncrement(
killDownloadSessionsStop(dcId);
} else if (ranges::find_if(it->second, _1 > 0) == end(it->second)) {
killDownloadSessionsStart(dcId);
checkSendNext();
}
}
@ -197,8 +206,11 @@ void DownloadManager::killDownloadSessions() {
auto left = kKillSessionTimeout;
for (auto i = _killDownloadSessionTimes.begin(); i != _killDownloadSessionTimes.end(); ) {
if (i->second <= now) {
for (int j = 0; j < MTP::kDownloadSessionsCount; ++j) {
MTP::stopSession(MTP::downloadDcId(i->first, j));
const auto j = _requestedBytesAmount.find(i->first);
if (j != end(_requestedBytesAmount)) {
for (auto index = 0; index != int(j->second.size()); ++index) {
MTP::stopSession(MTP::downloadDcId(i->first, index));
}
}
i = _killDownloadSessionTimes.erase(i);
} else {
@ -458,8 +470,6 @@ void FileLoader::cancel(bool fail) {
}
_data = QByteArray();
const auto downloader = _downloader;
const auto sessionGuard = &session();
const auto weak = QPointer<FileLoader>(this);
if (fail) {
emit failed(this, started);

View File

@ -41,7 +41,7 @@ public:
[[nodiscard]] virtual MTP::DcId dcId() const = 0;
[[nodiscard]] virtual bool readyToRequest() const = 0;
[[nodiscard]] virtual void loadPart(int dcIndex) = 0;
virtual void loadPart(int dcIndex) = 0;
};
@ -91,8 +91,7 @@ private:
base::Observable<void> _taskFinishedObservable;
using RequestedInDc = std::array<int64, MTP::kDownloadSessionsCount>;
base::flat_map<MTP::DcId, RequestedInDc> _requestedBytesAmount;
base::flat_map<MTP::DcId, std::vector<int>> _requestedBytesAmount;
base::flat_map<MTP::DcId, crl::time> _killDownloadSessionTimes;
base::Timer _killDownloadSessionsTimer;