From efaa3ba4537bbaa7021741e85fa618f368f424cf Mon Sep 17 00:00:00 2001 From: John Preston Date: Mon, 2 Dec 2019 13:32:09 +0300 Subject: [PATCH] Handle sent containers separately. --- Telegram/SourceFiles/mtproto/connection.cpp | 290 +++++++++--------- Telegram/SourceFiles/mtproto/connection.h | 9 +- .../details/mtproto_serialized_request.cpp | 6 - .../details/mtproto_serialized_request.h | 3 - 4 files changed, 154 insertions(+), 154 deletions(-) diff --git a/Telegram/SourceFiles/mtproto/connection.cpp b/Telegram/SourceFiles/mtproto/connection.cpp index 1e5f17c6aa..7a800c5245 100644 --- a/Telegram/SourceFiles/mtproto/connection.cpp +++ b/Telegram/SourceFiles/mtproto/connection.cpp @@ -56,8 +56,7 @@ constexpr auto kCheckSentRequestTimeout = 10 * crl::time(1000); // when resending request or checking its state. constexpr auto kSendStateRequestWaiting = crl::time(1000); -// Container lives 10 minutes in haveSent map. -constexpr auto kContainerLives = TimeId(600); +constexpr auto kSentContainerLives = 600 * crl::time(1000); using namespace details; @@ -216,8 +215,8 @@ int16 Connection::getProtocolDcId() const { } void Connection::checkSentRequests() { - // Remove very old (10 minutes) containers and resend requests. - auto removingIds = std::vector(); + clearOldContainers(); + auto restarting = false; auto requesting = false; { @@ -229,16 +228,6 @@ void Connection::checkSentRequests() { for (const auto &[msgId, request] : haveSent) { if (request.isStateRequest()) { continue; - } else if (request.isSentContainer()) { - if (now > request->lastSentTime + kContainerLives) { - removingIds.push_back(msgId); - DEBUG_LOG(("MTP Info: Removing old container %1, " - "sent: %2, now: %3, current unixtime: %4" - ).arg(msgId - ).arg(request->lastSentTime - ).arg(now - ).arg(base::unixtime::now())); - } } else if (request->lastSentTime + checkAfter < now) { // Need to check state. request->lastSentTime = now; @@ -250,13 +239,6 @@ void Connection::checkSentRequests() { } } } - if (!removingIds.empty()) { - QWriteLocker locker(_sessionData->haveSentMutex()); - auto &haveSent = _sessionData->haveSentMap(); - for (const auto msgId : removingIds) { - haveSent.remove(msgId); - } - } if (restarting) { DEBUG_LOG(("MTP Info: " "Request state while key is not bound, restarting.")); @@ -266,6 +248,23 @@ void Connection::checkSentRequests() { } } +void Connection::clearOldContainers() { + const auto now = crl::now(); + for (auto i = _sentContainers.begin(); i != _sentContainers.end();) { + if (now > i->second.sent + kSentContainerLives) { + DEBUG_LOG(("MTP Info: Removing old container %1, " + "sent: %2, now: %3, current unixtime: %4" + ).arg(i->first + ).arg(i->second.sent + ).arg(now + ).arg(base::unixtime::now())); + i = _sentContainers.erase(i); + } else { + ++i; + } + } +} + void Connection::destroyAllConnections() { clearUnboundKeyCreator(); _waitForBetterTimer.cancel(); @@ -452,34 +451,29 @@ mtpMsgId Connection::replaceMsgId(SerializedRequest &request, mtpMsgId newId) { haveSent.erase(k); haveSent.emplace(newId, request); } - - for (const auto &[requestId, sent] : haveSent) { - if (sent.isSentContainer()) { - const auto ids = (mtpMsgId *)(sent->data() + 8); - for (uint32 i = 0, l = (sent->size() - 8) >> 1; i < l; ++i) { - if (ids[i] == oldMsgId) { - ids[i] = newId; - } + for (auto &[msgId, container] : _sentContainers) { + for (auto &innerMsgId : container.messages) { + if (innerMsgId == oldMsgId) { + innerMsgId = newId; } } } - request.setMsgId(newId); request.setSeqNo(nextRequestSeqNumber(request.needAck())); return newId; } mtpMsgId Connection::placeToContainer( + SentContainer &sentIdsWrap, SerializedRequest &toSendRequest, mtpMsgId &bigMsgId, bool forceNewMsgId, - mtpMsgId *&haveSentArr, SerializedRequest &req) { const auto msgId = prepareToSend(req, bigMsgId, forceNewMsgId); if (msgId >= bigMsgId) { bigMsgId = base::unixtime::mtproto_msg_id(); } - *(haveSentArr++) = msgId; + sentIdsWrap.messages.push_back(msgId); uint32 from = toSendRequest->size(), len = req.messageSize(); toSendRequest->resize(from + len); @@ -732,7 +726,7 @@ void Connection::tryToSend() { } } else { // send in container bool willNeedInit = false; - uint32 containerSize = 1 + 1, idsWrapSize = (toSendCount << 1); // cons + vector size, idsWrapSize - size of "request-like" wrap for msgId vector + uint32 containerSize = 1 + 1; // cons + vector size if (pingRequest) containerSize += pingRequest.messageSize(); if (ackRequest) containerSize += ackRequest.messageSize(); if (resendRequest) containerSize += resendRequest.messageSize(); @@ -767,27 +761,26 @@ void Connection::tryToSend() { QWriteLocker locker2(_sessionData->haveSentMutex()); auto &haveSent = _sessionData->haveSentMap(); - // prepare "request-like" wrap for msgId vector - auto haveSentIdsWrap = SerializedRequest::Prepare(idsWrapSize); - haveSentIdsWrap->isContainerIdsWrap = true; - haveSentIdsWrap->resize(haveSentIdsWrap->size() + idsWrapSize); - auto haveSentArr = (mtpMsgId*)(haveSentIdsWrap->data() + 8); + // prepare sent container + auto sentIdsWrap = SentContainer(); + sentIdsWrap.sent = crl::now(); + sentIdsWrap.messages.reserve(toSendCount); if (bindDcKeyRequest) { _bindMsgId = placeToContainer( + sentIdsWrap, toSendRequest, bigMsgId, false, - haveSentArr, bindDcKeyRequest); needAnyResponse = true; } if (pingRequest) { _pingMsgId = placeToContainer( + sentIdsWrap, toSendRequest, bigMsgId, forceNewMsgId, - haveSentArr, pingRequest); needAnyResponse = true; } @@ -799,10 +792,10 @@ void Connection::tryToSend() { request, bigMsgId, forceNewMsgId); + sentIdsWrap.messages.push_back(msgId); if (msgId >= bigMsgId) { bigMsgId = base::unixtime::mtproto_msg_id(); } - *(haveSentArr++) = msgId; bool added = false; if (request->requestId) { if (request.needAck()) { @@ -838,22 +831,46 @@ void Connection::tryToSend() { } } if (stateRequest) { - const auto msgId = placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, stateRequest); + const auto msgId = placeToContainer( + sentIdsWrap, + toSendRequest, + bigMsgId, + forceNewMsgId, + stateRequest); Assert(!haveSent.contains(msgId)); haveSent.emplace(msgId, stateRequest); } - if (resendRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, resendRequest); - if (ackRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, ackRequest); - if (httpWaitRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, httpWaitRequest); + if (resendRequest) { + placeToContainer( + sentIdsWrap, + toSendRequest, + bigMsgId, + forceNewMsgId, + resendRequest); + } + if (ackRequest) { + placeToContainer( + sentIdsWrap, + toSendRequest, + bigMsgId, + forceNewMsgId, + ackRequest); + } + if (httpWaitRequest) { + placeToContainer( + sentIdsWrap, + toSendRequest, + bigMsgId, + forceNewMsgId, + httpWaitRequest); + } + toSend.clear(); const auto containerMsgId = prepareToSend( toSendRequest, bigMsgId, forceNewMsgId); - *(mtpMsgId*)(haveSentIdsWrap->data() + 4) = containerMsgId; - Assert(!haveSent.contains(containerMsgId)); - haveSent.emplace(containerMsgId, haveSentIdsWrap); - toSend.clear(); + _sentContainers.emplace(containerMsgId, std::move(sentIdsWrap)); } } sendSecureRequest(std::move(toSendRequest), needAnyResponse); @@ -1452,29 +1469,15 @@ Connection::HandleResult Connection::handleOneReceived( || (errorCode == 64); // bad container if (errorCode == 64) { // bad container! if (Logs::DebugEnabled()) { - SerializedRequest request; - { - QWriteLocker locker(_sessionData->haveSentMutex()); - auto &haveSent = _sessionData->haveSentMap(); - - const auto i = haveSent.find(resendId); - if (i == haveSent.end()) { - LOG(("Message Error: Container not found!")); - } else { - request = i->second; - } - } - if (request) { - if (request.isSentContainer()) { - QStringList lst; - const auto ids = (const mtpMsgId*)(request->constData() + 8); - for (uint32 i = 0, l = (request->size() - 8) >> 1; i < l; ++i) { - lst.push_back(QString::number(ids[i])); - } - LOG(("Message Info: bad container received! messages: %1").arg(lst.join(','))); - } else { - LOG(("Message Error: bad container received, but request is not a container!")); + const auto i = _sentContainers.find(resendId); + if (i == _sentContainers.end()) { + LOG(("Message Error: Container not found!")); + } else { + auto idsList = QStringList(); + for (const auto innerMsgId : i->second.messages) { + idsList.push_back(QString::number(innerMsgId)); } + LOG(("Message Info: bad container received! messages: %1").arg(idsList.join(','))); } } } @@ -1613,7 +1616,7 @@ Connection::HandleResult Connection::handleOneReceived( } else { MTPMsgResendReq request; if (!request.read(rFrom, rEnd)) { - LOG(("Message Error: could not parse sent msgs_state_req")); + LOG(("Message Error: could not parse sent msgs_resend_req")); return HandleResult::ParseError; } handleMsgsStates(request.c_msg_resend_req().vmsg_ids().v, states, toAck); @@ -1962,64 +1965,58 @@ void Connection::requestsAcked(const QVector &ids, bool byResponse) { QWriteLocker locker2(_sessionData->haveSentMutex()); auto &haveSent = _sessionData->haveSentMap(); - for (uint32 i = 0; i < idsCount; ++i) { - mtpMsgId msgId = ids[i].v; - const auto req = haveSent.find(msgId); - if (req != haveSent.end()) { - if (req->second.isSentContainer()) { - DEBUG_LOG(("Message Info: container ack received, msgId %1").arg(ids[i].v)); - uint32 inContCount = (req->second->size() - 8) / 2; - const mtpMsgId *inContId = (const mtpMsgId *)(req->second->constData() + 8); - toAckMore.reserve(toAckMore.size() + inContCount); - for (uint32 j = 0; j < inContCount; ++j) { - toAckMore.push_back(MTP_long(*(inContId++))); - } - haveSent.erase(req); - } else { - const auto requestId = req->second->requestId; - bool moveToAcked = byResponse; - if (!moveToAcked) { // ignore ACK, if we need a response (if we have a handler) - moveToAcked = !_instance->hasCallbacks(requestId); - } - if (moveToAcked) { - _ackedIds.emplace(msgId, requestId); - haveSent.erase(req); - } else { - DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(requestId)); - } - } - } else { - DEBUG_LOG(("Message Info: msgId %1 was not found in recent sent, while acking requests, searching in resend...").arg(msgId)); - const auto reqIt = _resendingIds.find(msgId); - if (reqIt != _resendingIds.end()) { - const auto reqId = reqIt->second; - bool moveToAcked = byResponse; - if (!moveToAcked) { // ignore ACK, if we need a response (if we have a handler) - moveToAcked = !_instance->hasCallbacks(reqId); - } - if (moveToAcked) { - QWriteLocker locker4(_sessionData->toSendMutex()); - auto &toSend = _sessionData->toSendMap(); - const auto req = toSend.find(reqId); - if (req != toSend.cend()) { - _ackedIds.emplace(msgId, req->second->requestId); - if (req->second->requestId != reqId) { - DEBUG_LOG(("Message Error: for msgId %1 found resent request, requestId %2, contains requestId %3").arg(msgId).arg(reqId).arg(req->second->requestId)); - } else { - DEBUG_LOG(("Message Info: acked msgId %1 that was prepared to resend, requestId %2").arg(msgId).arg(reqId)); - } - toSend.erase(req); - } else { - DEBUG_LOG(("Message Info: msgId %1 was found in recent resent, requestId %2 was not found in prepared to send").arg(msgId)); - } - _resendingIds.erase(reqIt); - } else { - DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(reqId)); - } - } else { - DEBUG_LOG(("Message Info: msgId %1 was not found in recent resent either").arg(msgId)); + for (const auto &wrappedMsgId : ids) { + const auto msgId = wrappedMsgId.v; + if (const auto i = _sentContainers.find(msgId); i != end(_sentContainers)) { + DEBUG_LOG(("Message Info: container ack received, msgId %1").arg(msgId)); + const auto &list = i->second.messages; + toAckMore.reserve(toAckMore.size() + list.size()); + for (const auto msgId : list) { + toAckMore.push_back(MTP_long(msgId)); } + _sentContainers.erase(i); + continue; } + if (const auto i = haveSent.find(msgId); i != end(haveSent)) { + const auto requestId = i->second->requestId; + + if (!byResponse && _instance->hasCallbacks(requestId)) { + DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(requestId)); + continue; + } + haveSent.erase(i); + + _ackedIds.emplace(msgId, requestId); + continue; + } + DEBUG_LOG(("Message Info: msgId %1 was not found in recent sent, while acking requests, searching in resend...").arg(msgId)); + if (const auto i = _resendingIds.find(msgId); i != end(_resendingIds)) { + const auto requestId = i->second; + + if (!byResponse && _instance->hasCallbacks(requestId)) { + DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(requestId)); + continue; + } + _resendingIds.erase(i); + + QWriteLocker locker4(_sessionData->toSendMutex()); + auto &toSend = _sessionData->toSendMap(); + const auto j = toSend.find(requestId); + if (j == end(toSend)) { + DEBUG_LOG(("Message Info: msgId %1 was found in recent resent, requestId %2 was not found in prepared to send").arg(msgId).arg(requestId)); + continue; + } + if (j->second->requestId != requestId) { + DEBUG_LOG(("Message Error: for msgId %1 found resent request, requestId %2, contains requestId %3").arg(msgId).arg(requestId).arg(j->second->requestId)); + } else { + DEBUG_LOG(("Message Info: acked msgId %1 that was prepared to resend, requestId %2").arg(msgId).arg(requestId)); + } + toSend.erase(j); + + _ackedIds.emplace(msgId, j->second->requestId); + continue; + } + DEBUG_LOG(("Message Info: msgId %1 was not found in recent resent either").arg(msgId)); } } @@ -2100,6 +2097,16 @@ void Connection::resend( } }); + if (const auto i = _sentContainers.find(msgId); i != end(_sentContainers)) { + DEBUG_LOG(("Message Info: resending container, msgId %1").arg(msgId)); + const auto ids = std::move(i->second.messages); + _sentContainers.erase(i); + + for (const auto innerMsgId : ids) { + resend(innerMsgId, -1, true); + } + return; + } auto lock = QWriteLocker(_sessionData->haveSentMutex()); auto &haveSent = _sessionData->haveSentMap(); auto i = haveSent.find(msgId); @@ -2110,14 +2117,7 @@ void Connection::resend( haveSent.erase(i); lock.unlock(); - // For container just resend all messages we can. - if (request.isSentContainer()) { - DEBUG_LOG(("Message Info: resending container from haveSent, msgId %1").arg(msgId)); - const mtpMsgId *ids = (const mtpMsgId *)(request->constData() + 8); - for (uint32 i = 0, l = (request->size() - 8) >> 1; i < l; ++i) { - resend(ids[i], -1, true); - } - } else if (!request.isStateRequest()) { + if (!request.isStateRequest()) { request->lastSentTime = crl::now(); request->forceSendInContainer = forceContainer; _resendingIds.emplace(msgId, request->requestId); @@ -2135,7 +2135,7 @@ void Connection::resendAll() { const auto &haveSent = _sessionData->haveSentMap(); toResend.reserve(haveSent.size()); for (const auto &[msgId, request] : haveSent) { - if (!request.isSentContainer() && !request.isStateRequest()) { + if (!request.isStateRequest()) { toResend.push_back(msgId); } } @@ -2549,6 +2549,16 @@ mtpRequestId Connection::wasSent(mtpMsgId msgId) const { if (msgId == _pingMsgId || msgId == _bindMsgId) { return mtpRequestId(0xFFFFFFFF); } + if (const auto i = _resendingIds.find(msgId); i != end(_resendingIds)) { + return i->second; + } + if (const auto i = _ackedIds.find(msgId); i != end(_ackedIds)) { + return i->second; + } + if (const auto i = _sentContainers.find(msgId); i != end(_sentContainers)) { + return mtpRequestId(0xFFFFFFFF); + } + { QReadLocker locker(_sessionData->haveSentMutex()); const auto &haveSent = _sessionData->haveSentMap(); @@ -2559,12 +2569,6 @@ mtpRequestId Connection::wasSent(mtpMsgId msgId) const { : mtpRequestId(0xFFFFFFFF); } } - if (const auto i = _resendingIds.find(msgId); i != end(_resendingIds)) { - return i->second; - } - if (const auto i = _ackedIds.find(msgId); i != end(_ackedIds)) { - return i->second; - } return 0; } diff --git a/Telegram/SourceFiles/mtproto/connection.h b/Telegram/SourceFiles/mtproto/connection.h index bd2384b529..036a90cfd4 100644 --- a/Telegram/SourceFiles/mtproto/connection.h +++ b/Telegram/SourceFiles/mtproto/connection.h @@ -61,7 +61,10 @@ private: ConnectionPointer data; int priority = 0; }; - + struct SentContainer { + crl::time sent = 0; + std::vector messages; + }; enum class HandleResult { Success, Ignored, @@ -100,12 +103,13 @@ private: [[nodiscard]] int16 getProtocolDcId() const; void checkSentRequests(); + void clearOldContainers(); mtpMsgId placeToContainer( + SentContainer &sentIdsWrap, details::SerializedRequest &toSendRequest, mtpMsgId &bigMsgId, bool forceNewMsgId, - mtpMsgId *&haveSentArr, details::SerializedRequest &req); mtpMsgId prepareToSend( details::SerializedRequest &request, @@ -216,6 +220,7 @@ private: details::ReceivedIdsManager _receivedMessageIds; base::flat_map _resendingIds; base::flat_map _ackedIds; + base::flat_map _sentContainers; std::unique_ptr _keyCreator; mtpMsgId _bindMsgId = 0; diff --git a/Telegram/SourceFiles/mtproto/details/mtproto_serialized_request.cpp b/Telegram/SourceFiles/mtproto/details/mtproto_serialized_request.cpp index 032957d402..437b79c9ec 100644 --- a/Telegram/SourceFiles/mtproto/details/mtproto_serialized_request.cpp +++ b/Telegram/SourceFiles/mtproto/details/mtproto_serialized_request.cpp @@ -124,12 +124,6 @@ uint32 SerializedRequest::messageSize() const { return kMessageIdInts + kSeqNoInts + kMessageLengthInts + ints; } -bool SerializedRequest::isSentContainer() const { - Expects(_data != nullptr); - - return _data->isContainerIdsWrap; -} - bool SerializedRequest::isStateRequest() const { Expects(_data != nullptr); Expects(_data->size() > kMessageBodyPosition); diff --git a/Telegram/SourceFiles/mtproto/details/mtproto_serialized_request.h b/Telegram/SourceFiles/mtproto/details/mtproto_serialized_request.h index 08e13380e1..5da84f4b30 100644 --- a/Telegram/SourceFiles/mtproto/details/mtproto_serialized_request.h +++ b/Telegram/SourceFiles/mtproto/details/mtproto_serialized_request.h @@ -67,8 +67,6 @@ public: void addPadding(bool extended, bool old); [[nodiscard]] uint32 messageSize() const; - // "request-like" wrap for msgIds vector - [[nodiscard]] bool isSentContainer() const; [[nodiscard]] bool isStateRequest() const; [[nodiscard]] bool needAck() const; @@ -94,7 +92,6 @@ public: mtpRequestId requestId = 0; bool needsLayer = false; bool forceSendInContainer = false; - bool isContainerIdsWrap = false; };