Revert "Use plain vector for prepared upload parts."

This reverts commit aaaeea2979.
This commit is contained in:
John Preston 2024-04-04 18:36:00 +04:00
parent ede6e18bc9
commit f6f8eefaa0
4 changed files with 96 additions and 88 deletions

View File

@ -65,10 +65,7 @@ struct Uploader::File {
bool setPartSize(uint32 partSize); bool setPartSize(uint32 partSize);
std::shared_ptr<FilePrepareResult> file; std::shared_ptr<FilePrepareResult> file;
const std::vector<QByteArray> &parts; int32 partsCount = 0;
const uint64 partsOfId = 0;
int partsSent = 0;
mutable int64 fileSentSize = 0; mutable int64 fileSentSize = 0;
HashMd5 md5Hash; HashMd5 md5Hash;
@ -82,15 +79,11 @@ struct Uploader::File {
}; };
Uploader::File::File(const std::shared_ptr<FilePrepareResult> &file) Uploader::File::File(const std::shared_ptr<FilePrepareResult> &file)
: file(file) : file(file) {
, parts((file->type == SendMediaType::Photo partsCount = (file->type == SendMediaType::Photo
|| file->type == SendMediaType::Secure) || file->type == SendMediaType::Secure)
? file->fileparts ? file->fileparts.size()
: file->thumbparts) : file->thumbparts.size();
, partsOfId((file->type == SendMediaType::Photo
|| file->type == SendMediaType::Secure)
? file->id
: file->thumbId) {
if (file->type == SendMediaType::File if (file->type == SendMediaType::File
|| file->type == SendMediaType::ThemeFile || file->type == SendMediaType::ThemeFile
|| file->type == SendMediaType::Audio) { || file->type == SendMediaType::Audio) {
@ -181,7 +174,7 @@ Uploader::Uploader(not_null<ApiWrap*> api)
_api->instance().nonPremiumDelayedRequests( _api->instance().nonPremiumDelayedRequests(
) | rpl::start_with_next([=](mtpRequestId id) { ) | rpl::start_with_next([=](mtpRequestId id) {
if (_dcIndices.contains(id)) { if (dcMap.contains(id)) {
_nonPremiumDelayed.emplace(id); _nonPremiumDelayed.emplace(id);
} }
}, _lifetime); }, _lifetime);
@ -306,7 +299,7 @@ void Uploader::upload(
} }
void Uploader::currentFailed() { void Uploader::currentFailed() {
auto j = queue.find(_uploadingId); auto j = queue.find(uploadingId);
if (j != queue.end()) { if (j != queue.end()) {
const auto [msgId, file] = std::move(*j); const auto [msgId, file] = std::move(*j);
queue.erase(j); queue.erase(j);
@ -314,11 +307,11 @@ void Uploader::currentFailed() {
} }
cancelRequests(); cancelRequests();
_dcIndices.clear(); dcMap.clear();
_uploadingId = FullMsgId(); uploadingId = FullMsgId();
_sentTotal = 0; sentSize = 0;
for (int i = 0; i < MTP::kUploadSessionsCount; ++i) { for (int i = 0; i < MTP::kUploadSessionsCount; ++i) {
_sentPerDc[i] = 0; sentSizes[i] = 0;
} }
sendNext(); sendNext();
@ -350,7 +343,7 @@ void Uploader::stopSessions() {
} }
void Uploader::sendNext() { void Uploader::sendNext() {
if (_sentTotal >= kMaxUploadFileParallelSize || _pausedId.msg) { if (sentSize >= kMaxUploadFileParallelSize || _pausedId.msg) {
return; return;
} }
@ -365,25 +358,33 @@ void Uploader::sendNext() {
if (stopping) { if (stopping) {
_stopSessionsTimer.cancel(); _stopSessionsTimer.cancel();
} }
auto i = _uploadingId.msg ? queue.find(_uploadingId) : queue.begin(); auto i = uploadingId.msg ? queue.find(uploadingId) : queue.begin();
if (!_uploadingId.msg) { if (!uploadingId.msg) {
_uploadingId = i->first; uploadingId = i->first;
} else if (i == queue.end()) { } else if (i == queue.end()) {
i = queue.begin(); i = queue.begin();
_uploadingId = i->first; uploadingId = i->first;
} }
auto &uploadingData = i->second; auto &uploadingData = i->second;
auto todc = 0; auto todc = 0;
for (auto dc = 1; dc != MTP::kUploadSessionsCount; ++dc) { for (auto dc = 1; dc != MTP::kUploadSessionsCount; ++dc) {
if (_sentPerDc[dc] < _sentPerDc[todc]) { if (sentSizes[dc] < sentSizes[todc]) {
todc = dc; todc = dc;
} }
} }
if (uploadingData.partsSent >= uploadingData.parts.size()) { auto &parts = (uploadingData.file->type == SendMediaType::Photo
|| uploadingData.file->type == SendMediaType::Secure)
? uploadingData.file->fileparts
: uploadingData.file->thumbparts;
const auto partsOfId = (uploadingData.file->type == SendMediaType::Photo
|| uploadingData.file->type == SendMediaType::Secure)
? uploadingData.file->id
: uploadingData.file->thumbId;
if (parts.isEmpty()) {
if (uploadingData.docSentParts >= uploadingData.docPartsCount) { if (uploadingData.docSentParts >= uploadingData.docPartsCount) {
if (_sentSizes.empty()) { if (requestsSent.empty() && docRequestsSent.empty()) {
const auto options = uploadingData.file const auto options = uploadingData.file
? uploadingData.file->to.options ? uploadingData.file->to.options
: Api::SendOptions(); : Api::SendOptions();
@ -403,11 +404,11 @@ void Uploader::sendNext() {
const auto md5 = uploadingData.file->filemd5; const auto md5 = uploadingData.file->filemd5;
const auto file = MTP_inputFile( const auto file = MTP_inputFile(
MTP_long(uploadingData.file->id), MTP_long(uploadingData.file->id),
MTP_int(uploadingData.parts.size()), MTP_int(uploadingData.partsCount),
MTP_string(photoFilename), MTP_string(photoFilename),
MTP_bytes(md5)); MTP_bytes(md5));
_photoReady.fire({ _photoReady.fire({
.fullId = _uploadingId, .fullId = uploadingId,
.info = { .info = {
.file = file, .file = file,
.attachedStickers = attachedStickers, .attachedStickers = attachedStickers,
@ -432,19 +433,19 @@ void Uploader::sendNext() {
MTP_string(uploadingData.file->filename), MTP_string(uploadingData.file->filename),
MTP_bytes(docMd5)); MTP_bytes(docMd5));
const auto thumb = [&]() -> std::optional<MTPInputFile> { const auto thumb = [&]() -> std::optional<MTPInputFile> {
if (uploadingData.parts.empty()) { if (!uploadingData.partsCount) {
return std::nullopt; return std::nullopt;
} }
const auto thumbFilename = uploadingData.file->thumbname; const auto thumbFilename = uploadingData.file->thumbname;
const auto thumbMd5 = uploadingData.file->thumbmd5; const auto thumbMd5 = uploadingData.file->thumbmd5;
return MTP_inputFile( return MTP_inputFile(
MTP_long(uploadingData.file->thumbId), MTP_long(uploadingData.file->thumbId),
MTP_int(uploadingData.parts.size()), MTP_int(uploadingData.partsCount),
MTP_string(thumbFilename), MTP_string(thumbFilename),
MTP_bytes(thumbMd5)); MTP_bytes(thumbMd5));
}(); }();
_documentReady.fire({ _documentReady.fire({
.fullId = _uploadingId, .fullId = uploadingId,
.info = { .info = {
.file = file, .file = file,
.thumb = thumb, .thumb = thumb,
@ -455,13 +456,12 @@ void Uploader::sendNext() {
}); });
} else if (uploadingData.file->type == SendMediaType::Secure) { } else if (uploadingData.file->type == SendMediaType::Secure) {
_secureReady.fire({ _secureReady.fire({
_uploadingId, uploadingId,
uploadingData.file->id, uploadingData.file->id,
int(uploadingData.parts.size()), uploadingData.partsCount });
});
} }
queue.erase(_uploadingId); queue.erase(uploadingId);
_uploadingId = FullMsgId(); uploadingId = FullMsgId();
sendNext(); sendNext();
} }
return; return;
@ -522,37 +522,36 @@ void Uploader::sendNext() {
partFailed(error, requestId); partFailed(error, requestId);
}).toDC(MTP::uploadDcId(todc)).send(); }).toDC(MTP::uploadDcId(todc)).send();
} }
_sentSizes.emplace(requestId, uploadingData.docPartSize); docRequestsSent.emplace(requestId, uploadingData.docSentParts);
_docSentRequests.emplace(requestId); dcMap.emplace(requestId, todc);
_dcIndices.emplace(requestId, todc); sentSize += uploadingData.docPartSize;
_sentTotal += uploadingData.docPartSize; sentSizes[todc] += uploadingData.docPartSize;
_sentPerDc[todc] += uploadingData.docPartSize;
uploadingData.docSentParts++; uploadingData.docSentParts++;
} else { } else {
const auto index = uploadingData.partsSent++; auto part = parts.begin();
const auto partBytes = uploadingData.parts[index];
const auto partSize = int(partBytes.size());
const auto requestId = _api->request(MTPupload_SaveFilePart( const auto requestId = _api->request(MTPupload_SaveFilePart(
MTP_long(uploadingData.partsOfId), MTP_long(partsOfId),
MTP_int(index), MTP_int(part.key()),
MTP_bytes(partBytes) MTP_bytes(part.value())
)).done([=](const MTPBool &result, mtpRequestId requestId) { )).done([=](const MTPBool &result, mtpRequestId requestId) {
partLoaded(result, requestId); partLoaded(result, requestId);
}).fail([=](const MTP::Error &error, mtpRequestId requestId) { }).fail([=](const MTP::Error &error, mtpRequestId requestId) {
partFailed(error, requestId); partFailed(error, requestId);
}).toDC(MTP::uploadDcId(todc)).send(); }).toDC(MTP::uploadDcId(todc)).send();
_sentSizes.emplace(requestId, partSize); requestsSent.emplace(requestId, part.value());
_dcIndices.emplace(requestId, todc); dcMap.emplace(requestId, todc);
_sentTotal += partSize; sentSize += part.value().size();
_sentPerDc[todc] += partSize; sentSizes[todc] += part.value().size();
parts.erase(part);
} }
_nextTimer.callOnce(kUploadRequestInterval); _nextTimer.callOnce(kUploadRequestInterval);
} }
void Uploader::cancel(const FullMsgId &msgId) { void Uploader::cancel(const FullMsgId &msgId) {
if (_uploadingId == msgId) { if (uploadingId == msgId) {
currentFailed(); currentFailed();
} else { } else {
queue.erase(msgId); queue.erase(msgId);
@ -560,12 +559,12 @@ void Uploader::cancel(const FullMsgId &msgId) {
} }
void Uploader::cancelAll() { void Uploader::cancelAll() {
const auto single = queue.empty() ? _uploadingId : queue.begin()->first; const auto single = queue.empty() ? uploadingId : queue.begin()->first;
if (!single) { if (!single) {
return; return;
} }
_pausedId = single; _pausedId = single;
if (_uploadingId) { if (uploadingId) {
currentFailed(); currentFailed();
} }
while (!queue.empty()) { while (!queue.empty()) {
@ -590,49 +589,61 @@ void Uploader::confirm(const FullMsgId &msgId) {
} }
void Uploader::cancelRequests() { void Uploader::cancelRequests() {
_docSentRequests.clear(); for (const auto &requestData : requestsSent) {
for (const auto &requestData : _sentSizes) {
_api->request(requestData.first).cancel(); _api->request(requestData.first).cancel();
} }
_sentSizes.clear(); requestsSent.clear();
for (const auto &requestData : docRequestsSent) {
_api->request(requestData.first).cancel();
}
docRequestsSent.clear();
} }
void Uploader::clear() { void Uploader::clear() {
queue.clear(); queue.clear();
cancelRequests(); cancelRequests();
_dcIndices.clear(); dcMap.clear();
_sentTotal = 0; sentSize = 0;
for (int i = 0; i < MTP::kUploadSessionsCount; ++i) { for (int i = 0; i < MTP::kUploadSessionsCount; ++i) {
_api->instance().stopSession(MTP::uploadDcId(i)); _api->instance().stopSession(MTP::uploadDcId(i));
_sentPerDc[i] = 0; sentSizes[i] = 0;
} }
_stopSessionsTimer.cancel(); _stopSessionsTimer.cancel();
} }
void Uploader::partLoaded(const MTPBool &result, mtpRequestId requestId) { void Uploader::partLoaded(const MTPBool &result, mtpRequestId requestId) {
_docSentRequests.remove(requestId); auto j = docRequestsSent.end();
auto i = _sentSizes.find(requestId); auto i = requestsSent.find(requestId);
if (i == requestsSent.cend()) {
j = docRequestsSent.find(requestId);
}
const auto wasNonPremiumDelayed = _nonPremiumDelayed.remove(requestId); const auto wasNonPremiumDelayed = _nonPremiumDelayed.remove(requestId);
if (i != _sentSizes.cend()) { if (i != requestsSent.cend() || j != docRequestsSent.cend()) {
if (mtpIsFalse(result)) { // failed to upload current file if (mtpIsFalse(result)) { // failed to upload current file
currentFailed(); currentFailed();
return; return;
} else { } else {
auto dcIt = _dcIndices.find(requestId); auto dcIt = dcMap.find(requestId);
if (dcIt == _dcIndices.cend()) { // must not happen if (dcIt == dcMap.cend()) { // must not happen
currentFailed(); currentFailed();
return; return;
} }
auto dc = dcIt->second; auto dc = dcIt->second;
_dcIndices.erase(dcIt); dcMap.erase(dcIt);
int64 sentPartSize = i->second; int64 sentPartSize = 0;
auto k = queue.find(_uploadingId); auto k = queue.find(uploadingId);
Assert(k != queue.cend()); Assert(k != queue.cend());
auto &[fullId, file] = *k; auto &[fullId, file] = *k;
_sentSizes.erase(i); if (i != requestsSent.cend()) {
_sentTotal -= sentPartSize; sentPartSize = i->second.size();
_sentPerDc[dc] -= sentPartSize; requestsSent.erase(i);
} else {
sentPartSize = file.docPartSize;
docRequestsSent.erase(j);
}
sentSize -= sentPartSize;
sentSizes[dc] -= sentPartSize;
if (file.file->type == SendMediaType::Photo) { if (file.file->type == SendMediaType::Photo) {
file.fileSentSize += sentPartSize; file.fileSentSize += sentPartSize;
const auto photo = session().data().photo(file.file->id); const auto photo = session().data().photo(file.file->id);
@ -647,7 +658,7 @@ void Uploader::partLoaded(const MTPBool &result, mtpRequestId requestId) {
const auto document = session().data().document(file.file->id); const auto document = session().data().document(file.file->id);
if (document->uploading()) { if (document->uploading()) {
const auto doneParts = file.docSentParts const auto doneParts = file.docSentParts
- int(_docSentRequests.size()); - int(docRequestsSent.size());
document->uploadingData->offset = std::min( document->uploadingData->offset = std::min(
document->uploadingData->size, document->uploadingData->size,
doneParts * file.docPartSize); doneParts * file.docPartSize);
@ -672,7 +683,8 @@ void Uploader::partLoaded(const MTPBool &result, mtpRequestId requestId) {
void Uploader::partFailed(const MTP::Error &error, mtpRequestId requestId) { void Uploader::partFailed(const MTP::Error &error, mtpRequestId requestId) {
// failed to upload current file // failed to upload current file
_nonPremiumDelayed.remove(requestId); _nonPremiumDelayed.remove(requestId);
if (_sentSizes.find(requestId) != _sentSizes.cend()) { if ((requestsSent.find(requestId) != requestsSent.cend())
|| (docRequestsSent.find(requestId) != docRequestsSent.cend())) {
currentFailed(); currentFailed();
} }
sendNext(); sendNext();

View File

@ -54,7 +54,7 @@ public:
[[nodiscard]] Main::Session &session() const; [[nodiscard]] Main::Session &session() const;
[[nodiscard]] FullMsgId currentUploadId() const { [[nodiscard]] FullMsgId currentUploadId() const {
return _uploadingId; return uploadingId;
} }
void upload( void upload(
@ -125,14 +125,14 @@ private:
int progress = 0); int progress = 0);
const not_null<ApiWrap*> _api; const not_null<ApiWrap*> _api;
base::flat_map<mtpRequestId, int> _sentSizes; base::flat_map<mtpRequestId, QByteArray> requestsSent;
base::flat_set<mtpRequestId> _docSentRequests; base::flat_map<mtpRequestId, int32> docRequestsSent;
base::flat_map<mtpRequestId, int> _dcIndices; base::flat_map<mtpRequestId, int32> dcMap;
base::flat_set<mtpRequestId> _nonPremiumDelayed; base::flat_set<mtpRequestId> _nonPremiumDelayed;
uint32 _sentTotal = 0; // FileSize: Right now any file size fits 32 bit. uint32 sentSize = 0; // FileSize: Right now any file size fits 32 bit.
uint32 _sentPerDc[MTP::kUploadSessionsCount] = { 0 }; uint32 sentSizes[MTP::kUploadSessionsCount] = { 0 };
FullMsgId _uploadingId; FullMsgId uploadingId;
FullMsgId _pausedId; FullMsgId _pausedId;
std::map<FullMsgId, File> queue; std::map<FullMsgId, File> queue;
base::Timer _nextTimer, _stopSessionsTimer; base::Timer _nextTimer, _stopSessionsTimer;

View File

@ -434,10 +434,8 @@ void FilePrepareResult::setFileData(const QByteArray &filedata) {
partssize = 0; partssize = 0;
} else { } else {
partssize = filedata.size(); partssize = filedata.size();
fileparts.reserve(
(partssize + kPhotoUploadPartSize - 1) / kPhotoUploadPartSize);
for (int32 i = 0, part = 0; i < partssize; i += kPhotoUploadPartSize, ++part) { for (int32 i = 0, part = 0; i < partssize; i += kPhotoUploadPartSize, ++part) {
fileparts.push_back(filedata.mid(i, kPhotoUploadPartSize)); fileparts.insert(part, filedata.mid(i, kPhotoUploadPartSize));
} }
filemd5.resize(32); filemd5.resize(32);
hashMd5Hex(filedata.constData(), filedata.size(), filemd5.data()); hashMd5Hex(filedata.constData(), filedata.size(), filemd5.data());
@ -448,10 +446,8 @@ void FilePrepareResult::setThumbData(const QByteArray &thumbdata) {
if (!thumbdata.isEmpty()) { if (!thumbdata.isEmpty()) {
thumbbytes = thumbdata; thumbbytes = thumbdata;
int32 size = thumbdata.size(); int32 size = thumbdata.size();
thumbparts.reserve(
(size + kPhotoUploadPartSize - 1) / kPhotoUploadPartSize);
for (int32 i = 0, part = 0; i < size; i += kPhotoUploadPartSize, ++part) { for (int32 i = 0, part = 0; i < size; i += kPhotoUploadPartSize, ++part) {
thumbparts.push_back(thumbdata.mid(i, kPhotoUploadPartSize)); thumbparts.insert(part, thumbdata.mid(i, kPhotoUploadPartSize));
} }
thumbmd5.resize(32); thumbmd5.resize(32);
hashMd5Hex(thumbdata.constData(), thumbdata.size(), thumbmd5.data()); hashMd5Hex(thumbdata.constData(), thumbdata.size(), thumbmd5.data());

View File

@ -147,7 +147,7 @@ struct FileLoadTo {
MsgId replaceMediaOf; MsgId replaceMediaOf;
}; };
using UploadFileParts = std::vector<QByteArray>; using UploadFileParts = QMap<int, QByteArray>;
struct FilePrepareDescriptor { struct FilePrepareDescriptor {
TaskId taskId = kEmptyTaskId; TaskId taskId = kEmptyTaskId;
base::required<uint64> id; base::required<uint64> id;