diff --git a/Telegram/SourceFiles/mtproto/facade.h b/Telegram/SourceFiles/mtproto/facade.h index 6f31960da5..39e2f732c7 100644 --- a/Telegram/SourceFiles/mtproto/facade.h +++ b/Telegram/SourceFiles/mtproto/facade.h @@ -40,8 +40,6 @@ constexpr ShiftedDcId groupCallStreamDcId(DcId dcId) { return ShiftDcId(dcId, kGroupCallStreamDcShift); } -constexpr auto kUploadSessionsCount = 2; - namespace details { constexpr ShiftedDcId downloadDcId(DcId dcId, int index) { @@ -92,7 +90,6 @@ inline DcId getTemporaryIdFromRealDcId(ShiftedDcId shiftedDcId) { namespace details { constexpr ShiftedDcId uploadDcId(DcId dcId, int index) { - static_assert(kUploadSessionsCount < kMaxMediaDcCount, "Too large MTPUploadSessionsCount!"); return ShiftDcId(dcId, kBaseUploadDcShift + index); }; @@ -101,14 +98,14 @@ constexpr ShiftedDcId uploadDcId(DcId dcId, int index) { // send(req, callbacks, MTP::uploadDcId(index)) - for upload shifted dc id // uploading always to the main dc so BareDcId(result) == 0 inline ShiftedDcId uploadDcId(int index) { - Expects(index >= 0 && index < kUploadSessionsCount); + Expects(index >= 0 && index < kMaxMediaDcCount); return details::uploadDcId(0, index); }; constexpr bool isUploadDcId(ShiftedDcId shiftedDcId) { return (shiftedDcId >= details::uploadDcId(0, 0)) - && (shiftedDcId < details::uploadDcId(0, kUploadSessionsCount - 1) + kDcShift); + && (shiftedDcId < details::uploadDcId(0, kMaxMediaDcCount - 1) + kDcShift); } inline ShiftedDcId destroyKeyNextDcId(ShiftedDcId shiftedDcId) { diff --git a/Telegram/SourceFiles/mtproto/session_private.cpp b/Telegram/SourceFiles/mtproto/session_private.cpp index 729a95ed60..05e968b7f0 100644 --- a/Telegram/SourceFiles/mtproto/session_private.cpp +++ b/Telegram/SourceFiles/mtproto/session_private.cpp @@ -1108,9 +1108,6 @@ void SessionPrivate::onSentSome(uint64 size) { DEBUG_LOG(("Checking connect for request with size %1 bytes, delay will be %2").arg(size).arg(remain)); } } - if (isUploadDcId(_shiftedDcId)) { - remain *= kUploadSessionsCount; - } _waitForReceivedTimer.callOnce(remain); } if (!_firstSentAt) { diff --git a/Telegram/SourceFiles/storage/file_upload.cpp b/Telegram/SourceFiles/storage/file_upload.cpp index b19af82df0..b718ed95ba 100644 --- a/Telegram/SourceFiles/storage/file_upload.cpp +++ b/Telegram/SourceFiles/storage/file_upload.cpp @@ -27,7 +27,7 @@ namespace Storage { namespace { // max 512kb uploaded at the same time in each session -constexpr auto kMaxUploadFileParallelSize = MTP::kUploadSessionsCount * 512 * 1024; +constexpr auto kMaxUploadPerSession = 512 * 1024; constexpr auto kDocumentMaxPartsCountDefault = 4000; @@ -47,7 +47,7 @@ constexpr auto kDocumentUploadPartSize3 = 256 * 1024; constexpr auto kDocumentUploadPartSize4 = 512 * 1024; // One part each half second, if not uploaded faster. -constexpr auto kUploadRequestInterval = crl::time(500); +constexpr auto kUploadRequestInterval = crl::time(200); // How much time without upload causes additional session kill. constexpr auto kKillSessionTimeout = 15 * crl::time(1000); @@ -58,35 +58,52 @@ constexpr auto kKillSessionTimeout = 15 * crl::time(1000); } // namespace -struct Uploader::File { - explicit File(const std::shared_ptr &file); +struct Uploader::Entry { + Entry(FullMsgId itemId, const std::shared_ptr &file); void setDocSize(int64 size); - bool setPartSize(uint32 partSize); + bool setPartSize(int partSize); + // const, but non-const for the move-assignment in the + FullMsgId itemId; std::shared_ptr file; - const std::vector &parts; - const uint64 partsOfId = 0; - int partsSent = 0; + not_null*> parts; + uint64 partsOfId = 0; - mutable int64 fileSentSize = 0; + int partsSent = 0; + int partsWaiting = 0; + int64 partsSentSize = 0; HashMd5 md5Hash; std::unique_ptr docFile; int64 docSize = 0; - int64 docPartSize = 0; - int docSentParts = 0; + int64 docSentSize = 0; + int docPartSize = 0; + int docPartsSent = 0; int docPartsCount = 0; + int docPartsWaiting = 0; }; -Uploader::File::File(const std::shared_ptr &file) -: file(file) +struct Uploader::Request { + FullMsgId itemId; + crl::time sent = 0; + QByteArray bytes; + int dcIndex = 0; + bool docPart = false; + bool nonPremiumDelayed = false; +}; + +Uploader::Entry::Entry( + FullMsgId itemId, + const std::shared_ptr &file) +: itemId(itemId) +, file(file) , parts((file->type == SendMediaType::Photo || file->type == SendMediaType::Secure) - ? file->fileparts - : file->thumbparts) + ? &file->fileparts + : &file->thumbparts) , partsOfId((file->type == SendMediaType::Photo || file->type == SendMediaType::Secure) ? file->id @@ -95,12 +112,10 @@ Uploader::File::File(const std::shared_ptr &file) || file->type == SendMediaType::ThemeFile || file->type == SendMediaType::Audio) { setDocSize(file->filesize); - } else { - docSize = docPartSize = docPartsCount = 0; } } -void Uploader::File::setDocSize(int64 size) { +void Uploader::Entry::setDocSize(int64 size) { docSize = size; constexpr auto limit0 = 1024 * 1024; constexpr auto limit1 = 32 * limit0; @@ -115,16 +130,15 @@ void Uploader::File::setDocSize(int64 size) { } } -bool Uploader::File::setPartSize(uint32 partSize) { +bool Uploader::Entry::setPartSize(int partSize) { docPartSize = partSize; - docPartsCount = (docSize / docPartSize) - + ((docSize % docPartSize) ? 1 : 0); + docPartsCount = (docSize + docPartSize - 1) / docPartSize; return (docPartsCount <= kDocumentMaxPartsCountDefault); } Uploader::Uploader(not_null api) : _api(api) -, _nextTimer([=] { sendNext(); }) +, _nextTimer([=] { maybeSendNext(); }) , _stopSessionsTimer([=] { stopSessions(); }) { const auto session = &_api->session(); photoReady( @@ -181,22 +195,21 @@ Uploader::Uploader(not_null api) _api->instance().nonPremiumDelayedRequests( ) | rpl::start_with_next([=](mtpRequestId id) { - if (_dcIndices.contains(id)) { - _nonPremiumDelayed.emplace(id); + const auto i = _requests.find(id); + if (i != end(_requests)) { + i->second.nonPremiumDelayed = true; } }, _lifetime); } -void Uploader::processPhotoProgress(const FullMsgId &newId) { - const auto session = &_api->session(); - if (const auto item = session->data().message(newId)) { +void Uploader::processPhotoProgress(FullMsgId itemId) { + if (const auto item = session().data().message(itemId)) { sendProgressUpdate(item, Api::SendProgressType::UploadPhoto); } } -void Uploader::processDocumentProgress(const FullMsgId &newId) { - const auto session = &_api->session(); - if (const auto item = session->data().message(newId)) { +void Uploader::processDocumentProgress(FullMsgId itemId) { + if (const auto item = session().data().message(itemId)) { const auto media = item->media(); const auto document = media ? media->document() : nullptr; const auto sendAction = (document && document->isVoiceMessage()) @@ -210,16 +223,14 @@ void Uploader::processDocumentProgress(const FullMsgId &newId) { } } -void Uploader::processPhotoFailed(const FullMsgId &newId) { - const auto session = &_api->session(); - if (const auto item = session->data().message(newId)) { +void Uploader::processPhotoFailed(FullMsgId itemId) { + if (const auto item = session().data().message(itemId)) { sendProgressUpdate(item, Api::SendProgressType::UploadPhoto, -1); } } -void Uploader::processDocumentFailed(const FullMsgId &newId) { - const auto session = &_api->session(); - if (const auto item = session->data().message(newId)) { +void Uploader::processDocumentFailed(FullMsgId itemId) { + if (const auto item = session().data().message(itemId)) { const auto media = item->media(); const auto document = media ? media->document() : nullptr; const auto sendAction = (document && document->isVoiceMessage()) @@ -254,8 +265,12 @@ Main::Session &Uploader::session() const { return _api->session(); } +FullMsgId Uploader::currentUploadId() const { + return _queue.empty() ? FullMsgId() : _queue.front().itemId; +} + void Uploader::upload( - const FullMsgId &msgId, + FullMsgId itemId, const std::shared_ptr &file) { if (file->type == SendMediaType::Photo) { const auto photo = session().data().processPhoto( @@ -301,211 +316,147 @@ void Uploader::upload( document->checkWallPaperProperties(); } } - queue.emplace(msgId, File(file)); - sendNext(); + _queue.push_back({ itemId, file }); + maybeSendNext(); } -void Uploader::currentFailed() { - auto j = queue.find(_uploadingId); - if (j != queue.end()) { - const auto [msgId, file] = std::move(*j); - queue.erase(j); - notifyFailed(msgId, file); +void Uploader::failed(FullMsgId itemId) { + const auto i = ranges::find(_queue, itemId, &Entry::itemId); + if (i != end(_queue)) { + const auto entry = std::move(*i); + _queue.erase(i); + notifyFailed(entry); } - - cancelRequests(); - _dcIndices.clear(); - _uploadingId = FullMsgId(); - _sentTotal = 0; - for (int i = 0; i < MTP::kUploadSessionsCount; ++i) { - _sentPerDc[i] = 0; - } - - sendNext(); + cancelRequests(itemId); + maybeFinishFront(); + crl::on_main(this, [=] { + maybeSendNext(); + }); } -void Uploader::notifyFailed(FullMsgId id, const File &file) { - const auto type = file.file->type; +void Uploader::notifyFailed(const Entry &entry) { + const auto type = entry.file->type; if (type == SendMediaType::Photo) { - _photoFailed.fire_copy(id); + _photoFailed.fire_copy(entry.itemId); } else if (type == SendMediaType::File || type == SendMediaType::ThemeFile || type == SendMediaType::Audio) { - const auto document = session().data().document(file.file->id); + const auto document = session().data().document(entry.file->id); if (document->uploading()) { document->status = FileUploadFailed; } - _documentFailed.fire_copy(id); + _documentFailed.fire_copy(entry.itemId); } else if (type == SendMediaType::Secure) { - _secureFailed.fire_copy(id); + _secureFailed.fire_copy(entry.itemId); } else { - Unexpected("Type in Uploader::currentFailed."); + Unexpected("Type in Uploader::failed."); } } void Uploader::stopSessions() { - for (int i = 0; i < MTP::kUploadSessionsCount; ++i) { - _api->instance().stopSession(MTP::uploadDcId(i)); + if (ranges::any_of(_sentPerDcIndex, rpl::mappers::_1 != 0)) { + _stopSessionsTimer.callOnce(kKillSessionTimeout); + } else { + for (auto i = 0; i != int(_sentPerDcIndex.size()); ++i) { + _api->instance().stopSession(MTP::uploadDcId(i)); + } + _sentPerDcIndex.clear(); } } -void Uploader::sendNext() { - if (_sentTotal >= kMaxUploadFileParallelSize || _pausedId.msg) { - return; +QByteArray Uploader::readDocPart(not_null entry) { + const auto checked = [&](QByteArray result) { + if ((entry->file->type == SendMediaType::File + || entry->file->type == SendMediaType::ThemeFile + || entry->file->type == SendMediaType::Audio) + && entry->docSize <= kUseBigFilesFrom) { + entry->md5Hash.feed(result.data(), result.size()); + } + if (result.isEmpty() + || (result.size() > entry->docPartSize) + || ((result.size() < entry->docPartSize + && entry->docPartsSent + 1 != entry->docPartsCount))) { + return QByteArray(); + } + return result; + }; + auto &content = entry->file->content; + if (!content.isEmpty()) { + const auto offset = entry->docPartsSent * entry->docPartSize; + return checked(content.mid(offset, entry->docPartSize)); + } else if (!entry->docFile) { + const auto filepath = entry->file->filepath; + entry->docFile = std::make_unique(filepath); + if (!entry->docFile->open(QIODevice::ReadOnly)) { + return QByteArray(); + } } + return checked(entry->docFile->read(entry->docPartSize)); +} +void Uploader::maybeSendNext() { const auto stopping = _stopSessionsTimer.isActive(); - if (queue.empty()) { + if (_queue.empty()) { if (!stopping) { _stopSessionsTimer.callOnce(kKillSessionTimeout); } + _pausedId = FullMsgId(); + return; + } else if (_pausedId) { return; } if (stopping) { _stopSessionsTimer.cancel(); } - auto i = _uploadingId.msg ? queue.find(_uploadingId) : queue.begin(); - if (!_uploadingId.msg) { - _uploadingId = i->first; - } else if (i == queue.end()) { - i = queue.begin(); - _uploadingId = i->first; - } - auto &uploadingData = i->second; auto todc = 0; - for (auto dc = 1; dc != MTP::kUploadSessionsCount; ++dc) { - if (_sentPerDc[dc] < _sentPerDc[todc]) { - todc = dc; + if (_sentPerDcIndex.size() < 2) { + todc = int(_sentPerDcIndex.size()); + _sentPerDcIndex.resize(todc + 1); + } else { + const auto min = ranges::min_element(_sentPerDcIndex); + todc = int(min - begin(_sentPerDcIndex)); + } + const auto alreadySent = _sentPerDcIndex[todc]; + + const auto entry = [&]() -> Entry* { + for (auto i = begin(_queue); i != end(_queue); ++i) { + if (i->partsSent < i->parts->size() + || i->docPartsSent < i->docPartsCount) { + return &*i; + } } + return nullptr; + }(); + if (!entry) { + return; } - if (uploadingData.partsSent >= uploadingData.parts.size()) { - if (uploadingData.docSentParts >= uploadingData.docPartsCount) { - if (_sentSizes.empty()) { - const auto options = uploadingData.file - ? uploadingData.file->to.options - : Api::SendOptions(); - const auto edit = uploadingData.file && - uploadingData.file->to.replaceMediaOf; - const auto attachedStickers = uploadingData.file - ? uploadingData.file->attachedStickers - : std::vector(); - if (uploadingData.file->type == SendMediaType::Photo) { - auto photoFilename = uploadingData.file->filename; - if (!photoFilename.endsWith(u".jpg"_q, Qt::CaseInsensitive)) { - // Server has some extensions checking for inputMediaUploadedPhoto, - // so force the extension to be .jpg anyway. It doesn't matter, - // because the filename from inputFile is not used anywhere. - photoFilename += u".jpg"_q; - } - const auto md5 = uploadingData.file->filemd5; - const auto file = MTP_inputFile( - MTP_long(uploadingData.file->id), - MTP_int(uploadingData.parts.size()), - MTP_string(photoFilename), - MTP_bytes(md5)); - _photoReady.fire({ - .fullId = _uploadingId, - .info = { - .file = file, - .attachedStickers = attachedStickers, - }, - .options = options, - .edit = edit, - }); - } else if (uploadingData.file->type == SendMediaType::File - || uploadingData.file->type == SendMediaType::ThemeFile - || uploadingData.file->type == SendMediaType::Audio) { - QByteArray docMd5(32, Qt::Uninitialized); - hashMd5Hex(uploadingData.md5Hash.result(), docMd5.data()); - - const auto file = (uploadingData.docSize > kUseBigFilesFrom) - ? MTP_inputFileBig( - MTP_long(uploadingData.file->id), - MTP_int(uploadingData.docPartsCount), - MTP_string(uploadingData.file->filename)) - : MTP_inputFile( - MTP_long(uploadingData.file->id), - MTP_int(uploadingData.docPartsCount), - MTP_string(uploadingData.file->filename), - MTP_bytes(docMd5)); - const auto thumb = [&]() -> std::optional { - if (uploadingData.parts.empty()) { - return std::nullopt; - } - const auto thumbFilename = uploadingData.file->thumbname; - const auto thumbMd5 = uploadingData.file->thumbmd5; - return MTP_inputFile( - MTP_long(uploadingData.file->thumbId), - MTP_int(uploadingData.parts.size()), - MTP_string(thumbFilename), - MTP_bytes(thumbMd5)); - }(); - _documentReady.fire({ - .fullId = _uploadingId, - .info = { - .file = file, - .thumb = thumb, - .attachedStickers = attachedStickers, - }, - .options = options, - .edit = edit, - }); - } else if (uploadingData.file->type == SendMediaType::Secure) { - _secureReady.fire({ - _uploadingId, - uploadingData.file->id, - int(uploadingData.parts.size()), - }); - } - queue.erase(_uploadingId); - _uploadingId = FullMsgId(); - sendNext(); - } + const auto itemId = entry->itemId; + if (entry->partsSent >= entry->parts->size()) { + const auto willProbablyBeSent = entry->docPartSize; + if (alreadySent + willProbablyBeSent >= kMaxUploadPerSession) { return; } - auto &content = uploadingData.file->content; - QByteArray toSend; - if (content.isEmpty()) { - if (!uploadingData.docFile) { - const auto filepath = uploadingData.file->filepath; - uploadingData.docFile = std::make_unique(filepath); - if (!uploadingData.docFile->open(QIODevice::ReadOnly)) { - currentFailed(); - return; - } - } - toSend = uploadingData.docFile->read(uploadingData.docPartSize); - if (uploadingData.docSize <= kUseBigFilesFrom) { - uploadingData.md5Hash.feed(toSend.constData(), toSend.size()); - } - } else { - const auto offset = uploadingData.docSentParts - * uploadingData.docPartSize; - toSend = content.mid(offset, uploadingData.docPartSize); - if ((uploadingData.file->type == SendMediaType::File - || uploadingData.file->type == SendMediaType::ThemeFile - || uploadingData.file->type == SendMediaType::Audio) - && uploadingData.docSentParts <= kUseBigFilesFrom) { - uploadingData.md5Hash.feed(toSend.constData(), toSend.size()); - } - } - if ((toSend.size() > uploadingData.docPartSize) - || ((toSend.size() < uploadingData.docPartSize - && uploadingData.docSentParts + 1 != uploadingData.docPartsCount))) { - currentFailed(); + Assert(entry->docPartsSent < entry->docPartsCount); + + const auto partBytes = readDocPart(entry); + if (partBytes.isEmpty()) { + failed(itemId); return; } + const auto index = entry->docPartsSent++; + ++entry->docPartsWaiting; + mtpRequestId requestId; - if (uploadingData.docSize > kUseBigFilesFrom) { + if (entry->docSize > kUseBigFilesFrom) { requestId = _api->request(MTPupload_SaveBigFilePart( - MTP_long(uploadingData.file->id), - MTP_int(uploadingData.docSentParts), - MTP_int(uploadingData.docPartsCount), - MTP_bytes(toSend) + MTP_long(entry->file->id), + MTP_int(index), + MTP_int(entry->docPartsCount), + MTP_bytes(partBytes) )).done([=](const MTPBool &result, mtpRequestId requestId) { partLoaded(result, requestId); }).fail([=](const MTP::Error &error, mtpRequestId requestId) { @@ -513,29 +464,34 @@ void Uploader::sendNext() { }).toDC(MTP::uploadDcId(todc)).send(); } else { requestId = _api->request(MTPupload_SaveFilePart( - MTP_long(uploadingData.file->id), - MTP_int(uploadingData.docSentParts), - MTP_bytes(toSend) + MTP_long(entry->file->id), + MTP_int(index), + MTP_bytes(partBytes) )).done([=](const MTPBool &result, mtpRequestId requestId) { partLoaded(result, requestId); }).fail([=](const MTP::Error &error, mtpRequestId requestId) { partFailed(error, requestId); }).toDC(MTP::uploadDcId(todc)).send(); } - _sentSizes.emplace(requestId, uploadingData.docPartSize); - _docSentRequests.emplace(requestId); - _dcIndices.emplace(requestId, todc); - _sentTotal += uploadingData.docPartSize; - _sentPerDc[todc] += uploadingData.docPartSize; - - uploadingData.docSentParts++; + _requests.emplace(requestId, Request{ + .itemId = itemId, + .sent = crl::now(), + .bytes = partBytes, + .dcIndex = todc, + .docPart = true, + }); + _sentPerDcIndex[todc] += int(partBytes.size()); } else { - const auto index = uploadingData.partsSent++; - const auto partBytes = uploadingData.parts[index]; - const auto partSize = int(partBytes.size()); + const auto willBeSent = entry->parts->at(entry->partsSent).size(); + if (alreadySent + willBeSent >= kMaxUploadPerSession) { + return; + } + ++entry->partsWaiting; + const auto index = entry->partsSent++; + const auto partBytes = entry->parts->at(index); const auto requestId = _api->request(MTPupload_SaveFilePart( - MTP_long(uploadingData.partsOfId), + MTP_long(entry->partsOfId), MTP_int(index), MTP_bytes(partBytes) )).done([=](const MTPBool &result, mtpRequestId requestId) { @@ -543,139 +499,232 @@ void Uploader::sendNext() { }).fail([=](const MTP::Error &error, mtpRequestId requestId) { partFailed(error, requestId); }).toDC(MTP::uploadDcId(todc)).send(); - _sentSizes.emplace(requestId, partSize); - _dcIndices.emplace(requestId, todc); - _sentTotal += partSize; - _sentPerDc[todc] += partSize; + + _requests.emplace(requestId, Request{ + .itemId = itemId, + .sent = crl::now(), + .bytes = partBytes, + .dcIndex = todc, + }); + _sentPerDcIndex[todc] += int(partBytes.size()); } _nextTimer.callOnce(kUploadRequestInterval); } -void Uploader::cancel(const FullMsgId &msgId) { - if (_uploadingId == msgId) { - currentFailed(); - } else { - queue.erase(msgId); - } +void Uploader::cancel(FullMsgId itemId) { + failed(itemId); } void Uploader::cancelAll() { - const auto single = queue.empty() ? _uploadingId : queue.begin()->first; - if (!single) { - return; - } - _pausedId = single; - if (_uploadingId) { - currentFailed(); - } - while (!queue.empty()) { - const auto [msgId, file] = std::move(*queue.begin()); - queue.erase(queue.begin()); - notifyFailed(msgId, file); + while (!_queue.empty()) { + failed(_queue.front().itemId); } clear(); unpause(); } -void Uploader::pause(const FullMsgId &msgId) { - _pausedId = msgId; +void Uploader::pause(FullMsgId itemId) { + _pausedId = itemId; } void Uploader::unpause() { _pausedId = FullMsgId(); - sendNext(); + maybeSendNext(); } -void Uploader::confirm(const FullMsgId &msgId) { -} - -void Uploader::cancelRequests() { - _docSentRequests.clear(); - for (const auto &requestData : _sentSizes) { - _api->request(requestData.first).cancel(); +void Uploader::cancelRequests(FullMsgId itemId) { + for (auto i = begin(_requests); i != end(_requests);) { + if (i->second.itemId == itemId) { + const auto bytes = int(i->second.bytes.size()); + _sentPerDcIndex[i->second.dcIndex] -= bytes; + _api->request(i->first).cancel(); + i = _requests.erase(i); + } else { + ++i; + } } - _sentSizes.clear(); +} + +void Uploader::cancelAllRequests() { + for (const auto &[requestId, request] : base::take(_requests)) { + _api->request(requestId).cancel(); + } + ranges::fill(_sentPerDcIndex, 0); } void Uploader::clear() { - queue.clear(); - cancelRequests(); - _dcIndices.clear(); - _sentTotal = 0; - for (int i = 0; i < MTP::kUploadSessionsCount; ++i) { - _api->instance().stopSession(MTP::uploadDcId(i)); - _sentPerDc[i] = 0; - } + _queue.clear(); + cancelAllRequests(); + stopSessions(); _stopSessionsTimer.cancel(); } -void Uploader::partLoaded(const MTPBool &result, mtpRequestId requestId) { - _docSentRequests.remove(requestId); - auto i = _sentSizes.find(requestId); - const auto wasNonPremiumDelayed = _nonPremiumDelayed.remove(requestId); - if (i != _sentSizes.cend()) { - if (mtpIsFalse(result)) { // failed to upload current file - currentFailed(); - return; - } else { - auto dcIt = _dcIndices.find(requestId); - if (dcIt == _dcIndices.cend()) { // must not happen - currentFailed(); - return; - } - auto dc = dcIt->second; - _dcIndices.erase(dcIt); +Uploader::Request Uploader::finishRequest(mtpRequestId requestId) { + const auto taken = _requests.take(requestId); + Assert(taken.has_value()); - int64 sentPartSize = i->second; - auto k = queue.find(_uploadingId); - Assert(k != queue.cend()); - auto &[fullId, file] = *k; - _sentSizes.erase(i); - _sentTotal -= sentPartSize; - _sentPerDc[dc] -= sentPartSize; - if (file.file->type == SendMediaType::Photo) { - file.fileSentSize += sentPartSize; - const auto photo = session().data().photo(file.file->id); - if (photo->uploading() && file.file) { - photo->uploadingData->size = file.file->partssize; - photo->uploadingData->offset = file.fileSentSize; - } - _photoProgress.fire_copy(fullId); - } else if (file.file->type == SendMediaType::File - || file.file->type == SendMediaType::ThemeFile - || file.file->type == SendMediaType::Audio) { - const auto document = session().data().document(file.file->id); - if (document->uploading()) { - const auto doneParts = file.docSentParts - - int(_docSentRequests.size()); - document->uploadingData->offset = std::min( - document->uploadingData->size, - doneParts * file.docPartSize); - } - _documentProgress.fire_copy(fullId); - } else if (file.file->type == SendMediaType::Secure) { - file.fileSentSize += sentPartSize; - _secureProgress.fire_copy({ - fullId, - file.fileSentSize, - file.file->partssize }); - } - if (wasNonPremiumDelayed) { - _nonPremiumDelays.fire_copy(fullId); - } - } + _sentPerDcIndex[taken->dcIndex] -= int(taken->bytes.size()); + return *taken; +} + +void Uploader::partLoaded(const MTPBool &result, mtpRequestId requestId) { + const auto request = finishRequest(requestId); + + const auto bytes = int(request.bytes.size()); + const auto itemId = request.itemId; + + if (mtpIsFalse(result)) { // failed to upload current file + failed(itemId); + return; } - sendNext(); + const auto i = ranges::find(_queue, itemId, &Entry::itemId); + Assert(i != end(_queue)); + auto &entry = *i; + + if (request.docPart) { + --entry.docPartsWaiting; + entry.docSentSize += bytes; + } else { + --entry.partsWaiting; + entry.partsSentSize += bytes; + } + + if (entry.file->type == SendMediaType::Photo) { + const auto photo = session().data().photo(entry.file->id); + if (photo->uploading()) { + photo->uploadingData->size = entry.file->partssize; + photo->uploadingData->offset = entry.partsSentSize; + } + _photoProgress.fire_copy(itemId); + } else if (entry.file->type == SendMediaType::File + || entry.file->type == SendMediaType::ThemeFile + || entry.file->type == SendMediaType::Audio) { + const auto document = session().data().document(entry.file->id); + if (document->uploading()) { + document->uploadingData->offset = std::min( + document->uploadingData->size, + entry.docSentSize); + } + _documentProgress.fire_copy(itemId); + } else if (entry.file->type == SendMediaType::Secure) { + _secureProgress.fire_copy({ + .fullId = itemId, + .offset = entry.partsSentSize, + .size = entry.file->partssize, + }); + } + if (request.nonPremiumDelayed) { + _nonPremiumDelays.fire_copy(itemId); + } + + if (!_queue.empty() && itemId == _queue.front().itemId) { + maybeFinishFront(); + } + maybeSendNext(); +} + +void Uploader::maybeFinishFront() { + while (!_queue.empty()) { + const auto &entry = _queue.front(); + if (entry.partsSent >= entry.parts->size() + && entry.docPartsSent >= entry.docPartsCount + && !entry.partsWaiting + && !entry.docPartsWaiting) { + finishFront(); + } else { + break; + } + } +} + +void Uploader::finishFront() { + Expects(!_queue.empty()); + + auto entry = std::move(_queue.front()); + _queue.erase(_queue.begin()); + + const auto options = entry.file + ? entry.file->to.options + : Api::SendOptions(); + const auto edit = entry.file && + entry.file->to.replaceMediaOf; + const auto attachedStickers = entry.file + ? entry.file->attachedStickers + : std::vector(); + if (entry.file->type == SendMediaType::Photo) { + auto photoFilename = entry.file->filename; + if (!photoFilename.endsWith(u".jpg"_q, Qt::CaseInsensitive)) { + // Server has some extensions checking for inputMediaUploadedPhoto, + // so force the extension to be .jpg anyway. It doesn't matter, + // because the filename from inputFile is not used anywhere. + photoFilename += u".jpg"_q; + } + const auto md5 = entry.file->filemd5; + const auto file = MTP_inputFile( + MTP_long(entry.file->id), + MTP_int(entry.parts->size()), + MTP_string(photoFilename), + MTP_bytes(md5)); + _photoReady.fire({ + .fullId = entry.itemId, + .info = { + .file = file, + .attachedStickers = attachedStickers, + }, + .options = options, + .edit = edit, + }); + } else if (entry.file->type == SendMediaType::File + || entry.file->type == SendMediaType::ThemeFile + || entry.file->type == SendMediaType::Audio) { + QByteArray docMd5(32, Qt::Uninitialized); + hashMd5Hex(entry.md5Hash.result(), docMd5.data()); + + const auto file = (entry.docSize > kUseBigFilesFrom) + ? MTP_inputFileBig( + MTP_long(entry.file->id), + MTP_int(entry.docPartsCount), + MTP_string(entry.file->filename)) + : MTP_inputFile( + MTP_long(entry.file->id), + MTP_int(entry.docPartsCount), + MTP_string(entry.file->filename), + MTP_bytes(docMd5)); + const auto thumb = [&]() -> std::optional { + if (entry.parts->empty()) { + return std::nullopt; + } + const auto thumbFilename = entry.file->thumbname; + const auto thumbMd5 = entry.file->thumbmd5; + return MTP_inputFile( + MTP_long(entry.file->thumbId), + MTP_int(entry.parts->size()), + MTP_string(thumbFilename), + MTP_bytes(thumbMd5)); + }(); + _documentReady.fire({ + .fullId = entry.itemId, + .info = { + .file = file, + .thumb = thumb, + .attachedStickers = attachedStickers, + }, + .options = options, + .edit = edit, + }); + } else if (entry.file->type == SendMediaType::Secure) { + _secureReady.fire({ + entry.itemId, + entry.file->id, + int(entry.parts->size()), + }); + } } void Uploader::partFailed(const MTP::Error &error, mtpRequestId requestId) { - // failed to upload current file - _nonPremiumDelayed.remove(requestId); - if (_sentSizes.find(requestId) != _sentSizes.cend()) { - currentFailed(); - } - sendNext(); + const auto request = finishRequest(requestId); + failed(request.itemId); } } // namespace Storage diff --git a/Telegram/SourceFiles/storage/file_upload.h b/Telegram/SourceFiles/storage/file_upload.h index a733775ad1..f4fced3402 100644 --- a/Telegram/SourceFiles/storage/file_upload.h +++ b/Telegram/SourceFiles/storage/file_upload.h @@ -9,6 +9,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "api/api_common.h" #include "base/timer.h" +#include "base/weak_ptr.h" #include "mtproto/facade.h" class ApiWrap; @@ -46,53 +47,48 @@ struct UploadSecureDone { int partsCount = 0; }; -class Uploader final : public QObject { +class Uploader final : public base::has_weak_ptr { public: explicit Uploader(not_null api); ~Uploader(); [[nodiscard]] Main::Session &session() const; - - [[nodiscard]] FullMsgId currentUploadId() const { - return _uploadingId; - } + [[nodiscard]] FullMsgId currentUploadId() const; void upload( - const FullMsgId &msgId, + FullMsgId itemId, const std::shared_ptr &file); - void cancel(const FullMsgId &msgId); - void pause(const FullMsgId &msgId); - void confirm(const FullMsgId &msgId); - + void pause(FullMsgId itemId); + void cancel(FullMsgId itemId); void cancelAll(); - void clear(); - rpl::producer photoReady() const { + [[nodiscard]] rpl::producer photoReady() const { return _photoReady.events(); } - rpl::producer documentReady() const { + [[nodiscard]] rpl::producer documentReady() const { return _documentReady.events(); } - rpl::producer secureReady() const { + [[nodiscard]] rpl::producer secureReady() const { return _secureReady.events(); } - rpl::producer photoProgress() const { + [[nodiscard]] rpl::producer photoProgress() const { return _photoProgress.events(); } - rpl::producer documentProgress() const { + [[nodiscard]] rpl::producer documentProgress() const { return _documentProgress.events(); } - rpl::producer secureProgress() const { + [[nodiscard]] auto secureProgress() const + -> rpl::producer { return _secureProgress.events(); } - rpl::producer photoFailed() const { + [[nodiscard]] rpl::producer photoFailed() const { return _photoFailed.events(); } - rpl::producer documentFailed() const { + [[nodiscard]] rpl::producer documentFailed() const { return _documentFailed.events(); } - rpl::producer secureFailed() const { + [[nodiscard]] rpl::producer secureFailed() const { return _secureFailed.events(); } @@ -101,23 +97,31 @@ public: } void unpause(); - void sendNext(); void stopSessions(); private: - struct File; + struct Entry; + struct Request; + + [[nodiscard]] QByteArray readDocPart(not_null entry); + void maybeSendNext(); + void maybeFinishFront(); + void finishFront(); void partLoaded(const MTPBool &result, mtpRequestId requestId); void partFailed(const MTP::Error &error, mtpRequestId requestId); + Request finishRequest(mtpRequestId requestId); - void processPhotoProgress(const FullMsgId &msgId); - void processPhotoFailed(const FullMsgId &msgId); - void processDocumentProgress(const FullMsgId &msgId); - void processDocumentFailed(const FullMsgId &msgId); + void processPhotoProgress(FullMsgId itemId); + void processPhotoFailed(FullMsgId itemId); + void processDocumentProgress(FullMsgId itemId); + void processDocumentFailed(FullMsgId itemId); - void notifyFailed(FullMsgId id, const File &file); - void currentFailed(); - void cancelRequests(); + void notifyFailed(const Entry &entry); + void failed(FullMsgId itemId); + void cancelRequests(FullMsgId itemId); + void cancelAllRequests(); + void clear(); void sendProgressUpdate( not_null item, @@ -125,16 +129,13 @@ private: int progress = 0); const not_null _api; - base::flat_map _sentSizes; - base::flat_set _docSentRequests; - base::flat_map _dcIndices; - base::flat_set _nonPremiumDelayed; - uint32 _sentTotal = 0; // FileSize: Right now any file size fits 32 bit. - uint32 _sentPerDc[MTP::kUploadSessionsCount] = { 0 }; - FullMsgId _uploadingId; + std::vector _queue; + + base::flat_map _requests; + std::vector _sentPerDcIndex; + FullMsgId _pausedId; - std::map queue; base::Timer _nextTimer, _stopSessionsTimer; rpl::event_stream _photoReady;