Send up to 1MB of parts to a single session.

This commit is contained in:
John Preston 2024-04-04 13:50:28 +04:00
parent c3f0d2ef31
commit a9c1970f41
2 changed files with 65 additions and 26 deletions

View File

@ -60,6 +60,8 @@ constexpr auto kSendStateRequestWaiting = crl::time(1000);
// How much time to wait for some more requests, when sending msg acks.
constexpr auto kAckSendWaiting = 10 * crl::time(1000);
constexpr auto kCutContainerOnSize = 16 * 1024;
auto SyncTimeRequestDuration = kFastRequestDuration;
using namespace details;
@ -696,7 +698,8 @@ void SessionPrivate::tryToSend() {
initSize = initSizeInInts * sizeof(mtpPrime);
}
bool needAnyResponse = false;
auto needAnyResponse = false;
auto someSkipped = false;
SerializedRequest toSendRequest;
{
QWriteLocker locker1(_sessionData->toSendMutex());
@ -711,15 +714,33 @@ void SessionPrivate::tryToSend() {
locker1.unlock();
}
uint32 toSendCount = toSend.size();
if (pingRequest) ++toSendCount;
if (ackRequest) ++toSendCount;
if (resendRequest) ++toSendCount;
if (stateRequest) ++toSendCount;
if (httpWaitRequest) ++toSendCount;
if (bindDcKeyRequest) ++toSendCount;
auto totalSending = int(toSend.size());
auto sendingFrom = begin(toSend);
auto sendingTill = end(toSend);
auto combinedLength = 0;
for (auto i = sendingFrom; i != sendingTill; ++i) {
combinedLength += i->second->size();
if (combinedLength >= kCutContainerOnSize) {
++i;
if (const auto skipping = int(sendingTill - i)) {
sendingTill = i;
totalSending -= skipping;
Assert(totalSending > 0);
someSkipped = true;
}
break;
}
}
auto sendingRange = ranges::make_subrange(sendingFrom, sendingTill);
const auto sendingCount = totalSending;
if (pingRequest) ++totalSending;
if (ackRequest) ++totalSending;
if (resendRequest) ++totalSending;
if (stateRequest) ++totalSending;
if (httpWaitRequest) ++totalSending;
if (bindDcKeyRequest) ++totalSending;
if (!toSendCount) {
if (!totalSending) {
return; // nothing to send
}
@ -735,11 +756,11 @@ void SessionPrivate::tryToSend() {
? httpWaitRequest
: bindDcKeyRequest
? bindDcKeyRequest
: toSend.begin()->second;
if (toSendCount == 1 && !first->forceSendInContainer) {
: sendingRange.begin()->second;
if (totalSending == 1 && !first->forceSendInContainer) {
toSendRequest = first;
if (sendAll) {
toSend.clear();
toSend.erase(sendingFrom, sendingTill);
locker1.unlock();
}
@ -808,7 +829,7 @@ void SessionPrivate::tryToSend() {
if (stateRequest) containerSize += stateRequest.messageSize();
if (httpWaitRequest) containerSize += httpWaitRequest.messageSize();
if (bindDcKeyRequest) containerSize += bindDcKeyRequest.messageSize();
for (const auto &[requestId, request] : toSend) {
for (const auto &[requestId, request] : sendingRange) {
containerSize += request.messageSize();
if (needsLayer && request->needsLayer) {
containerSize += initSizeInInts;
@ -825,9 +846,9 @@ void SessionPrivate::tryToSend() {
// prepare container + each in invoke after
toSendRequest = SerializedRequest::Prepare(
containerSize,
containerSize + 3 * toSend.size());
containerSize + 3 * sendingCount);
toSendRequest->push_back(mtpc_msg_container);
toSendRequest->push_back(toSendCount);
toSendRequest->push_back(totalSending);
// check for a valid container
auto bigMsgId = base::unixtime::mtproto_msg_id();
@ -839,7 +860,7 @@ void SessionPrivate::tryToSend() {
// prepare sent container
auto sentIdsWrap = SentContainer();
sentIdsWrap.sent = crl::now();
sentIdsWrap.messages.reserve(toSendCount);
sentIdsWrap.messages.reserve(totalSending);
if (bindDcKeyRequest) {
_bindMsgId = placeToContainer(
@ -859,7 +880,7 @@ void SessionPrivate::tryToSend() {
needAnyResponse = true;
}
for (auto &[requestId, request] : toSend) {
for (auto &[requestId, request] : sendingRange) {
const auto msgId = prepareToSend(
request,
bigMsgId,
@ -904,7 +925,7 @@ void SessionPrivate::tryToSend() {
memcpy(toSendRequest->data() + from, request->constData() + 4, len * sizeof(mtpPrime));
}
}
toSend.clear();
toSend.erase(sendingFrom, sendingTill);
if (stateRequest) {
const auto msgId = placeToContainer(
@ -951,6 +972,11 @@ void SessionPrivate::tryToSend() {
}
}
sendSecureRequest(std::move(toSendRequest), needAnyResponse);
if (someSkipped) {
InvokeQueued(this, [=] {
tryToSend();
});
}
}
void SessionPrivate::retryByTimer() {

View File

@ -26,8 +26,8 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
namespace Storage {
namespace {
// max 512kb uploaded at the same time in each session
constexpr auto kMaxUploadPerSession = 512 * 1024;
// max 1mb uploaded at the same time in each session
constexpr auto kMaxUploadPerSession = 1024 * 1024;
constexpr auto kDocumentMaxPartsCountDefault = 4000;
@ -47,7 +47,7 @@ constexpr auto kDocumentUploadPartSize3 = 256 * 1024;
constexpr auto kDocumentUploadPartSize4 = 512 * 1024;
// One part each half second, if not uploaded faster.
constexpr auto kUploadRequestInterval = crl::time(200);
constexpr auto kUploadRequestInterval = crl::time(250);
// How much time without upload causes additional session kill.
constexpr auto kKillSessionTimeout = 15 * crl::time(1000);
@ -59,6 +59,10 @@ constexpr auto kMaxSessionsCount = 8;
constexpr auto kFastRequestThreshold = 1 * crl::time(1000);
constexpr auto kSlowRequestThreshold = 8 * crl::time(1000);
// Request is 'fast' if it was done in less than 1s and
// (it-s size + queued before size) >= 512kb.
constexpr auto kAcceptAsFastIfTotalAtLeast = 512 * 1024;
[[nodiscard]] const char *ThumbnailFormat(const QString &mime) {
return Core::IsMimeSticker(mime) ? "WEBP" : "JPG";
}
@ -97,6 +101,7 @@ struct Uploader::Request {
FullMsgId itemId;
crl::time sent = 0;
QByteArray bytes;
int queued = 0;
ushort part = 0;
uchar dcIndex = 0;
bool docPart = false;
@ -467,7 +472,9 @@ auto Uploader::sendPart(not_null<Entry*> entry, uchar dcIndex)
template <typename Prepared>
void Uploader::sendPreparedRequest(Prepared &&prepared, Request &&request) {
_sentPerDcIndex[request.dcIndex] += int(request.bytes.size());
auto &sentInSession = _sentPerDcIndex[request.dcIndex];
const auto queued = sentInSession;
sentInSession += int(request.bytes.size());
const auto requestId = _api->request(
std::move(prepared)
@ -478,6 +485,7 @@ void Uploader::sendPreparedRequest(Prepared &&prepared, Request &&request) {
}).toDC(MTP::uploadDcId(request.dcIndex)).send();
request.sent = crl::now();
request.queued = queued;
_requests.emplace(requestId, std::move(request));
}
@ -515,7 +523,7 @@ auto Uploader::sendDocPart(not_null<Entry*> entry, uchar dcIndex)
const auto itemId = entry->itemId;
const auto alreadySent = _sentPerDcIndex[dcIndex];
const auto willProbablyBeSent = entry->docPartSize;
if (alreadySent + willProbablyBeSent >= kMaxUploadPerSession) {
if (alreadySent + willProbablyBeSent > kMaxUploadPerSession) {
return SendResult::DcIndexFull;
}
@ -614,9 +622,13 @@ void Uploader::maybeSend() {
}
// If this entry failed, we try the next one.
}
usedDcIndices.emplace(dcIndex);
if (_sentPerDcIndex[dcIndex] >= kAcceptAsFastIfTotalAtLeast) {
usedDcIndices.emplace(dcIndex);
}
}
if (!usedDcIndices.empty()) {
if (usedDcIndices.empty()) {
_nextTimer.cancel();
} else {
_nextTimer.callOnce(kUploadRequestInterval);
}
}
@ -718,7 +730,8 @@ void Uploader::partLoaded(const MTPBool &result, mtpRequestId requestId) {
} else {
DEBUG_LOG(("Uploader: Slow-ish request, clear fast records."));
}
} else if (request.sent > _latestDcIndexAdded) {
} else if (request.sent > _latestDcIndexAdded
&& (request.queued + bytes >= kAcceptAsFastIfTotalAtLeast)) {
if (_dcIndicesWithFastRequests.emplace(request.dcIndex).second) {
DEBUG_LOG(("Uploader: Mark %1 of %2 as fast."
).arg(request.dcIndex