diff --git a/Telegram/SourceFiles/mtproto/connection.cpp b/Telegram/SourceFiles/mtproto/connection.cpp index ff312d44db..9c206ff270 100644 --- a/Telegram/SourceFiles/mtproto/connection.cpp +++ b/Telegram/SourceFiles/mtproto/connection.cpp @@ -39,6 +39,7 @@ constexpr auto kPingSendAfterForce = 45 * crl::time(1000); constexpr auto kTemporaryExpiresIn = TimeId(10); constexpr auto kBindKeyAdditionalExpiresTimeout = TimeId(30); constexpr auto kTestModeDcIdShift = 10000; +constexpr auto kCheckSentRequestsEach = 1 * crl::time(1000); // If we can't connect for this time we will ask _instance to update config. constexpr auto kRequestConfigTimeout = 8 * crl::time(1000); @@ -46,9 +47,19 @@ constexpr auto kRequestConfigTimeout = 8 * crl::time(1000); // Don't try to handle messages larger than this size. constexpr auto kMaxMessageLength = 16 * 1024 * 1024; +// How much time passed from send till we resend request or check its state. +constexpr auto kCheckSentRequestTimeout = 10 * crl::time(1000); + +// How much time to wait for some more requests, +// when resending request or checking its state. +constexpr auto kSendStateRequestWaiting = crl::time(1000); + +// Container lives 10 minutes in haveSent map. +constexpr auto kContainerLives = TimeId(600); + using namespace details; -QString LogIdsVector(const QVector &ids) { +[[nodiscard]] QString LogIdsVector(const QVector &ids) { if (!ids.size()) return "[]"; auto idsStr = QString("[%1").arg(ids.cbegin()->v); for (const auto &id : ids) { @@ -57,6 +68,15 @@ QString LogIdsVector(const QVector &ids) { return idsStr + "]"; } +[[nodiscard]] QString LogIds(const QVector &ids) { + if (!ids.size()) return "[]"; + auto idsStr = QString("[%1").arg(*ids.cbegin()); + for (const auto id : ids) { + idsStr += QString(", %2").arg(id); + } + return idsStr + "]"; +} + void wrapInvokeAfter(SecureRequest &to, const SecureRequest &from, const RequestMap &haveSent, int32 skipBeforeRequest = 0) { const auto afterId = *(mtpMsgId*)(from->after->data() + 4); const auto i = afterId ? haveSent.constFind(afterId) : haveSent.cend(); @@ -200,6 +220,47 @@ int16 ConnectionPrivate::getProtocolDcId() const { : testedDcId; } +void ConnectionPrivate::checkSentRequests() { + QVector removingIds; // remove very old (10 minutes) containers and resend requests + auto requesting = false; + { + QReadLocker locker(_sessionData->haveSentMutex()); + auto &haveSent = _sessionData->haveSentMap(); + const auto haveSentCount = haveSent.size(); + auto ms = crl::now(); + for (auto i = haveSent.begin(), e = haveSent.end(); i != e; ++i) { + auto &req = i.value(); + if (req->msDate > 0) { + if (req->msDate + kCheckSentRequestTimeout < ms) { + // Need to check state. + req->msDate = ms; + if (_stateRequestData.emplace(i.key()).second) { + requesting = true; + } + } + } else if (base::unixtime::now() + > int32(i.key() >> 32) + kContainerLives) { + removingIds.reserve(haveSentCount); + removingIds.push_back(i.key()); + } + } + } + if (requesting) { + _sessionData->queueSendAnything(kSendStateRequestWaiting); + } + if (!removingIds.isEmpty()) { + QWriteLocker locker(_sessionData->haveSentMutex()); + auto &haveSent = _sessionData->haveSentMap(); + for (uint32 i = 0, l = removingIds.size(); i < l; ++i) { + auto j = haveSent.find(removingIds[i]); + if (j != haveSent.cend()) { + Assert(!j.value()->requestId); + haveSent.erase(j); + } + } + } +} + void ConnectionPrivate::destroyAllConnections() { clearUnboundKeyCreator(); _waitForBetterTimer.cancel(); @@ -228,12 +289,16 @@ ConnectionPrivate::ConnectionPrivate( , _waitForReceived(kMinReceiveTimeout) , _waitForConnected(kMinConnectedTimeout) , _pingSender(thread, [=] { sendPingByTimer(); }) +, _checkSentRequestsTimer(thread, [=] { checkSentRequests(); }) , _sessionData(std::move(data)) { Expects(_shiftedDcId != 0); moveToThread(thread); - connect(thread, &QThread::started, this, [=] { connectToServer(); }); + connect(thread, &QThread::started, this, [=] { + _checkSentRequestsTimer.callEach(kCheckSentRequestsEach); + connectToServer(); + }); connect(thread, &QThread::finished, this, [=] { finishAndDestroy(); }); connect(_sessionData->owner(), SIGNAL(authKeyChanged()), this, SLOT(updateAuthKey()), Qt::QueuedConnection); @@ -315,19 +380,42 @@ void ConnectionPrivate::resetSession() { _needSessionReset = false; DEBUG_LOG(("MTP Info: creating new session in resetSession.")); - _sessionData->changeSessionId(); - - // #TODO move to sessionData, clear on changeSessionIdLocked. - _ackRequestData.clear(); - _resendRequestData.clear(); - { - QWriteLocker locker5(_sessionData->stateRequestMutex()); - _sessionData->stateRequestMap().clear(); - } + changeSessionId(); _sessionData->queueResetDone(); } +void ConnectionPrivate::changeSessionId() { + auto sessionId = _sessionId; + do { + sessionId = openssl::RandomValue(); + } while (_sessionId == sessionId); + + DEBUG_LOG(("MTP Info: setting server_session: %1").arg(sessionId)); + + _sessionId = sessionId; + _messagesCounter = 0; + _sessionMarkedAsStarted = false; + _ackRequestData.clear(); + _resendRequestData.clear(); + _stateRequestData.clear(); + _receivedMessageIds.clear(); +} + +uint32 ConnectionPrivate::nextRequestSeqNumber(bool needAck) { + const auto result = _messagesCounter; + _messagesCounter += (needAck ? 1 : 0); + return result * 2 + (needAck ? 1 : 0); +} + +bool ConnectionPrivate::markSessionAsStarted() { + if (_sessionMarkedAsStarted) { + return false; + } + _sessionMarkedAsStarted = true; + return true; +} + mtpMsgId ConnectionPrivate::prepareToSend( SecureRequest &request, mtpMsgId currentLastId, @@ -349,7 +437,7 @@ mtpMsgId ConnectionPrivate::prepareToSend( : msgId; } request.setMsgId(currentLastId); - request.setSeqNo(_sessionData->nextRequestSeqNumber(request.needAck())); + request.setSeqNo(nextRequestSeqNumber(request.needAck())); if (request->requestId) { MTP_LOG(_shiftedDcId, ("[r%1] msg_id 0 -> %2").arg(request->requestId).arg(currentLastId)); } @@ -412,7 +500,7 @@ mtpMsgId ConnectionPrivate::replaceMsgId(SecureRequest &request, mtpMsgId newId) } request.setMsgId(newId); - request.setSeqNo(_sessionData->nextRequestSeqNumber(request.needAck())); + request.setSeqNo(nextRequestSeqNumber(request.needAck())); return newId; } @@ -455,8 +543,7 @@ void ConnectionPrivate::tryToSend() { && _pingSendAt <= crl::now()) { _pingIdToSend = openssl::RandomValue(); } - const auto forceNewMsgId = sendAll - && _sessionData->markSessionAsStarted(); + const auto forceNewMsgId = sendAll && markSessionAsStarted(); if (forceNewMsgId && _keyCreator) { _keyCreator->restartBinder(); } @@ -504,22 +591,14 @@ void ConnectionPrivate::tryToSend() { MTP_msg_resend_req(MTP_vector( base::take(_resendRequestData))))); } - - auto stateReq = QVector(); - { - QWriteLocker locker(_sessionData->stateRequestMutex()); - auto &ids = _sessionData->stateRequestMap(); - if (!ids.isEmpty()) { - stateReq.reserve(ids.size()); - for (auto i = ids.cbegin(), e = ids.cend(); i != e; ++i) { - stateReq.push_back(MTP_long(i.key())); - } + if (!_stateRequestData.empty()) { + auto ids = QVector(); + ids.reserve(_stateRequestData.size()); + for (const auto id : base::take(_stateRequestData)) { + ids.push_back(MTP_long(id)); } - ids.clear(); - } - if (!stateReq.isEmpty()) { stateRequest = SecureRequest::Serialize(MTPMsgsStateReq( - MTP_msgs_state_req(MTP_vector(stateReq)))); + MTP_msgs_state_req(MTP_vector(ids)))); // Add to haveSent / wereAcked maps, but don't add to requestMap. stateRequest->requestId = GetNextRequestId(); } @@ -530,14 +609,13 @@ void ConnectionPrivate::tryToSend() { if (_keyCreator && _keyCreator->bindReadyToRequest()) { bindDcKeyRequest = _keyCreator->prepareBindRequest( _temporaryKey, - _sessionData->getSessionId()); + _sessionId); // This is a special request with msgId used inside the message // body, so it is prepared already with a msgId and we place // seqNo for it manually here. bindDcKeyRequest.setSeqNo( - _sessionData->nextRequestSeqNumber( - bindDcKeyRequest.needAck())); + nextRequestSeqNumber(bindDcKeyRequest.needAck())); //} else if (!_keyChecker) { // if (const auto &keyForCheck = _sessionData->getKeyForCheck()) { // _keyChecker = std::make_unique( @@ -546,14 +624,13 @@ void ConnectionPrivate::tryToSend() { // keyForCheck); // bindDcKeyRequest = _keyChecker->prepareRequest( // _temporaryKey, - // _sessionData->getSessionId()); + // _sessionId); // // This is a special request with msgId used inside the message // // body, so it is prepared already with a msgId and we place // // seqNo for it manually here. // bindDcKeyRequest.setSeqNo( - // _sessionData->nextRequestSeqNumber( - // bindDcKeyRequest.needAck())); + // nextRequestSeqNumber(bindDcKeyRequest.needAck())); // } } } @@ -1229,10 +1306,9 @@ void ConnectionPrivate::handleReceived() { TCP_LOG(("TCP Info: decrypted message %1,%2,%3 is %4 len").arg(msgId).arg(seqNo).arg(Logs::b(needAck)).arg(fullDataLength)); - uint64 serverSession = _sessionData->getSessionId(); - if (session != serverSession) { + if (session != _sessionId) { LOG(("MTP Error: bad server session received")); - TCP_LOG(("MTP Error: bad server session %1 instead of %2 in message received").arg(session).arg(serverSession)); + TCP_LOG(("MTP Error: bad server session %1 instead of %2 in message received").arg(session).arg(_sessionId)); return restart(); } @@ -1247,23 +1323,22 @@ void ConnectionPrivate::handleReceived() { } bool badTime = false; - uint64 mySalt = _sessionData->getSalt(); if (serverTime > clientTime + 60 || serverTime + 300 < clientTime) { DEBUG_LOG(("MTP Info: bad server time from msg_id: %1, my time: %2").arg(serverTime).arg(clientTime)); badTime = true; } bool wasConnected = (getState() == ConnectedState); - if (serverSalt != mySalt) { + if (serverSalt != _sessionSalt) { if (!badTime) { - DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2, updating...").arg(serverSalt).arg(mySalt)); - _sessionData->setSalt(serverSalt); + DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2, updating...").arg(serverSalt).arg(_sessionSalt)); + _sessionSalt = serverSalt; if (setState(ConnectedState, ConnectingState)) { _sessionData->resendAll(); } } else { - DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2").arg(serverSalt).arg(mySalt)); + DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2").arg(serverSalt).arg(_sessionSalt)); } } else { serverSalt = 0; // dont pass to handle method, so not to lock in setSalt() @@ -1277,18 +1352,10 @@ void ConnectionPrivate::handleReceived() { auto sfrom = decryptedInts + 4U; // msg_id + seq_no + length + message MTP_LOG(_shiftedDcId, ("Recv: ") + details::DumpToText(sfrom, end) + QString(" (keyId:%1)").arg(_temporaryKey->keyId())); - bool needToHandle = false; - { - QWriteLocker lock(_sessionData->receivedIdsMutex()); - needToHandle = _sessionData->receivedIdsSet().registerMsgId(msgId, needAck); - } - if (needToHandle) { + if (_receivedMessageIds.registerMsgId(msgId, needAck)) { res = handleOneReceived(from, end, msgId, serverTime, serverSalt, badTime); } - { - QWriteLocker lock(_sessionData->receivedIdsMutex()); - _sessionData->receivedIdsSet().shrink(); - } + _receivedMessageIds.shrink(); // send acks if (const auto toAckSize = _ackRequestData.size()) { @@ -1389,13 +1456,8 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr return HandleResult::ParseError; } - bool needToHandle = false; - { - QWriteLocker lock(_sessionData->receivedIdsMutex()); - needToHandle = _sessionData->receivedIdsSet().registerMsgId(inMsgId.v, needAck); - } auto res = HandleResult::Success; // if no need to handle, then succeed - if (needToHandle) { + if (_receivedMessageIds.registerMsgId(inMsgId.v, needAck)) { res = handleOneReceived(from, otherEnd, inMsgId.v, serverTime, serverSalt, badTime); badTime = false; } @@ -1489,7 +1551,9 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr } if (needResend) { // bad msg_id or bad container - if (serverSalt) _sessionData->setSalt(serverSalt); + if (serverSalt) { + _sessionSalt = serverSalt; + } base::unixtime::update(serverTime, true); DEBUG_LOG(("Message Info: unixtime updated, now %1, resending in container...").arg(serverTime)); @@ -1497,7 +1561,9 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr resend(resendId, 0, true); } else { // must create new session, because msg_id and msg_seqno are inconsistent if (badTime) { - if (serverSalt) _sessionData->setSalt(serverSalt); + if (serverSalt) { + _sessionSalt = serverSalt; + } base::unixtime::update(serverTime, true); badTime = false; } @@ -1543,8 +1609,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr return (badTime ? HandleResult::Ignored : HandleResult::Success); } - uint64 serverSalt = data.vnew_server_salt().v; - _sessionData->setSalt(serverSalt); + _sessionSalt = data.vnew_server_salt().v; base::unixtime::update(serverTime); if (setState(ConnectedState, ConnectingState)) { @@ -1573,10 +1638,8 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr QByteArray info(idsCount, Qt::Uninitialized); { - QReadLocker lock(_sessionData->receivedIdsMutex()); - auto &receivedIds = _sessionData->receivedIdsSet(); - auto minRecv = receivedIds.min(); - auto maxRecv = receivedIds.max(); + const auto minRecv = _receivedMessageIds.min(); + const auto maxRecv = _receivedMessageIds.max(); QReadLocker locker(_sessionData->wereAckedMutex()); const auto &wereAcked = _sessionData->wereAckedMap(); @@ -1590,15 +1653,15 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr } else if (reqMsgId > maxRecv) { state |= 0x03; } else { - auto msgIdState = receivedIds.lookup(reqMsgId); - if (msgIdState == ReceivedMsgIds::State::NotFound) { + auto msgIdState = _receivedMessageIds.lookup(reqMsgId); + if (msgIdState == ReceivedIdsManager::State::NotFound) { state |= 0x02; } else { state |= 0x04; if (wereAcked.constFind(reqMsgId) != wereAckedEnd) { state |= 0x80; // we know, that server knows, that we received request } - if (msgIdState == ReceivedMsgIds::State::NeedsAck) { // need ack, so we sent ack + if (msgIdState == ReceivedIdsManager::State::NeedsAck) { // need ack, so we sent ack state |= 0x08; } else { state |= 0x10; @@ -1632,7 +1695,9 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr return (badTime ? HandleResult::Ignored : HandleResult::Success); } if (badTime) { - if (serverSalt) _sessionData->setSalt(serverSalt); // requestsFixTimeSalt with no lookup + if (serverSalt) { + _sessionSalt = serverSalt; // requestsFixTimeSalt with no lookup + } base::unixtime::update(serverTime, true); DEBUG_LOG(("Message Info: unixtime updated from mtpc_msgs_state_info, now %1").arg(serverTime)); @@ -1710,13 +1775,8 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr } requestsAcked(ids); - bool received = false; - MTPlong resMsgId = data.vanswer_msg_id(); - { - QReadLocker lock(_sessionData->receivedIdsMutex()); - received = (_sessionData->receivedIdsSet().lookup(resMsgId.v) != ReceivedMsgIds::State::NotFound); - } - if (received) { + const auto resMsgId = data.vanswer_msg_id(); + if (_receivedMessageIds.lookup(resMsgId.v) != ReceivedIdsManager::State::NotFound) { _ackRequestData.push_back(resMsgId); } else { DEBUG_LOG(("Message Info: answer message %1 was not received, requesting...").arg(resMsgId.v)); @@ -1737,13 +1797,8 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr DEBUG_LOG(("Message Info: msg new detailed info, answerId %2, status %3, bytes %4").arg(data.vanswer_msg_id().v).arg(data.vstatus().v).arg(data.vbytes().v)); - bool received = false; - MTPlong resMsgId = data.vanswer_msg_id(); - { - QReadLocker lock(_sessionData->receivedIdsMutex()); - received = (_sessionData->receivedIdsSet().lookup(resMsgId.v) != ReceivedMsgIds::State::NotFound); - } - if (received) { + const auto resMsgId = data.vanswer_msg_id(); + if (_receivedMessageIds.lookup(resMsgId.v) != ReceivedIdsManager::State::NotFound) { _ackRequestData.push_back(resMsgId); } else { DEBUG_LOG(("Message Info: answer message %1 was not received, requesting...").arg(resMsgId.v)); @@ -1847,7 +1902,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr } DEBUG_LOG(("Message Info: new server session created, unique_id %1, first_msg_id %2, server_salt %3").arg(data.vunique_id().v).arg(data.vfirst_msg_id().v).arg(data.vserver_salt().v)); - _sessionData->setSalt(data.vserver_salt().v); + _sessionSalt = data.vserver_salt().v; mtpMsgId firstMsgId = data.vfirst_msg_id().v; QVector toResend; @@ -1993,7 +2048,9 @@ bool ConnectionPrivate::requestsFixTimeSalt(const QVector &ids, int32 s for (uint32 i = 0; i < idsCount; ++i) { if (wasSent(ids[i].v)) {// found such msg_id in recent acked requests or in recent sent requests - if (serverSalt) _sessionData->setSalt(serverSalt); + if (serverSalt) { + _sessionSalt = serverSalt; + } base::unixtime::update(serverTime, true); return true; } @@ -2264,6 +2321,16 @@ void ConnectionPrivate::updateAuthKey() { applyAuthKey(_sessionData->getTemporaryKey()); } +void ConnectionPrivate::setCurrentKeyId(uint64 newKeyId) { + if (_keyId == newKeyId) { + return; + } + _keyId = newKeyId; + + DEBUG_LOG(("MTP Info: auth key id set to id %1").arg(newKeyId)); + changeSessionId(); +} + void ConnectionPrivate::applyAuthKey(AuthKeyPtr &&temporaryKey) { _temporaryKey = std::move(temporaryKey); const auto newKeyId = _temporaryKey ? _temporaryKey->keyId() : 0; @@ -2271,15 +2338,7 @@ void ConnectionPrivate::applyAuthKey(AuthKeyPtr &&temporaryKey) { if (_keyId == newKeyId) { return; } - _keyId = 0; - if (_sessionData->setCurrentKeyId(_keyId)) { - _ackRequestData.clear(); // #TODO move to sessionData. - _resendRequestData.clear(); - { - QWriteLocker locker5(_sessionData->stateRequestMutex()); - _sessionData->stateRequestMap().clear(); - } - } + setCurrentKeyId(0); DEBUG_LOG(("MTP Error: auth_key id for dc %1 changed, restarting..." ).arg(_shiftedDcId)); if (_connection) { @@ -2290,15 +2349,7 @@ void ConnectionPrivate::applyAuthKey(AuthKeyPtr &&temporaryKey) { if (!_connection) { return; } - if (newKeyId && _sessionData->setCurrentKeyId(newKeyId)) { - _ackRequestData.clear(); // #TODO move to sessionData. - _resendRequestData.clear(); - { - QWriteLocker locker5(_sessionData->stateRequestMutex()); - _sessionData->stateRequestMap().clear(); - } - } - _keyId = newKeyId; + setCurrentKeyId(newKeyId); Assert(!_connection->sentEncryptedWithKeyId()); DEBUG_LOG(("AuthKey Info: Connection update key from Session, dc %1 result: %2").arg(_shiftedDcId).arg(Logs::mb(&_keyId, sizeof(_keyId)).str())); @@ -2355,7 +2406,7 @@ void ConnectionPrivate::tryAcquireKeyCreation() { ).arg(result->temporaryServerSalt ).arg(result->persistentServerSalt)); - _sessionData->setSalt(result->temporaryServerSalt); + _sessionSalt = result->temporaryServerSalt; if (result->persistentKey) { _sessionData->clearForNewKey(_instance); } @@ -2395,7 +2446,7 @@ void ConnectionPrivate::authKeyChecked() { handleReceived(); }); - if (_sessionData->getSalt() && setState(ConnectedState)) { + if (_sessionSalt && setState(ConnectedState)) { _sessionData->resendAll(); } // else receive salt in bad_server_salt first, then try to send all the requests @@ -2469,11 +2520,8 @@ bool ConnectionPrivate::sendSecureRequest( return false; } - auto session = _sessionData->getSessionId(); - auto salt = _sessionData->getSalt(); - - memcpy(request->data() + 0, &salt, 2 * sizeof(mtpPrime)); - memcpy(request->data() + 2, &session, 2 * sizeof(mtpPrime)); + memcpy(request->data() + 0, &_sessionSalt, 2 * sizeof(mtpPrime)); + memcpy(request->data() + 2, &_sessionId, 2 * sizeof(mtpPrime)); auto from = request->constData() + 4; MTP_LOG(_shiftedDcId, ("Send: ") + details::DumpToText(from, from + messageSize) + QString(" (keyId:%1)").arg(_temporaryKey->keyId())); diff --git a/Telegram/SourceFiles/mtproto/connection.h b/Telegram/SourceFiles/mtproto/connection.h index b8c0d536db..77e7debc08 100644 --- a/Telegram/SourceFiles/mtproto/connection.h +++ b/Telegram/SourceFiles/mtproto/connection.h @@ -7,6 +7,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL */ #pragma once +#include "mtproto/details/mtproto_received_ids_manager.h" #include "mtproto/mtproto_auth_key.h" #include "mtproto/dc_options.h" #include "mtproto/connection_abstract.h" @@ -133,7 +134,9 @@ private: void confirmBestConnection(); void removeTestConnection(not_null connection); - int16 getProtocolDcId() const; + [[nodiscard]] int16 getProtocolDcId() const; + + void checkSentRequests(); mtpMsgId placeToContainer( SecureRequest &toSendRequest, @@ -187,6 +190,12 @@ private: void releaseKeyCreationOnFail(); void applyAuthKey(AuthKeyPtr &&temporaryKey); + void setCurrentKeyId(uint64 newKeyId); + void changeSessionId(); + void setSessionSalt(uint64 salt); + [[nodiscard]] bool markSessionAsStarted(); + [[nodiscard]] uint32 nextRequestSeqNumber(bool needAck); + const not_null _instance; DcType _dcType = DcType::Regular; @@ -215,21 +224,28 @@ private: crl::time _waitForConnected = 0; crl::time _firstSentAt = -1; - QVector _ackRequestData; - QVector _resendRequestData; - mtpPingId _pingId = 0; mtpPingId _pingIdToSend = 0; crl::time _pingSendAt = 0; mtpMsgId _pingMsgId = 0; base::Timer _pingSender; + base::Timer _checkSentRequestsTimer; bool _finished = false; - AuthKeyPtr _temporaryKey; - uint64 _keyId = 0; std::shared_ptr _sessionData; std::unique_ptr _connectionOptions; + AuthKeyPtr _temporaryKey; + uint64 _keyId = 0; + uint64 _sessionId = 0; + uint64 _sessionSalt = 0; + uint32 _messagesCounter = 0; + bool _sessionMarkedAsStarted = false; + + QVector _ackRequestData; + QVector _resendRequestData; + base::flat_set _stateRequestData; + details::ReceivedIdsManager _receivedMessageIds; std::unique_ptr _keyCreator; diff --git a/Telegram/SourceFiles/mtproto/details/mtproto_received_ids_manager.cpp b/Telegram/SourceFiles/mtproto/details/mtproto_received_ids_manager.cpp new file mode 100644 index 0000000000..28e6ad5185 --- /dev/null +++ b/Telegram/SourceFiles/mtproto/details/mtproto_received_ids_manager.cpp @@ -0,0 +1,54 @@ +/* +This file is part of Telegram Desktop, +the official desktop application for the Telegram messaging service. + +For license and copyright information please follow this link: +https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL +*/ +#include "mtproto/details/mtproto_received_ids_manager.h" + +namespace MTP::details { + +bool ReceivedIdsManager::registerMsgId(mtpMsgId msgId, bool needAck) { + const auto i = _idsNeedAck.find(msgId); + if (i == _idsNeedAck.end()) { + if (_idsNeedAck.size() < kIdsBufferSize || msgId > min()) { + _idsNeedAck.emplace(msgId, needAck); + return true; + } + MTP_LOG(-1, ("No need to handle - %1 < min = %2").arg(msgId).arg(min())); + } else { + MTP_LOG(-1, ("No need to handle - %1 already is in map").arg(msgId)); + } + return false; +} + +mtpMsgId ReceivedIdsManager::min() const { + return _idsNeedAck.empty() ? 0 : _idsNeedAck.begin()->first; +} + +mtpMsgId ReceivedIdsManager::max() const { + auto end = _idsNeedAck.end(); + return _idsNeedAck.empty() ? 0 : (--end)->first; +} + +ReceivedIdsManager::State ReceivedIdsManager::lookup(mtpMsgId msgId) const { + auto i = _idsNeedAck.find(msgId); + if (i == _idsNeedAck.end()) { + return State::NotFound; + } + return i->second ? State::NeedsAck : State::NoAckNeeded; +} + +void ReceivedIdsManager::shrink() { + auto size = _idsNeedAck.size(); + while (size-- > kIdsBufferSize) { + _idsNeedAck.erase(_idsNeedAck.begin()); + } +} + +void ReceivedIdsManager::clear() { + _idsNeedAck.clear(); +} + +} // namespace MTP::details diff --git a/Telegram/SourceFiles/mtproto/details/mtproto_received_ids_manager.h b/Telegram/SourceFiles/mtproto/details/mtproto_received_ids_manager.h new file mode 100644 index 0000000000..2a8d6a57a1 --- /dev/null +++ b/Telegram/SourceFiles/mtproto/details/mtproto_received_ids_manager.h @@ -0,0 +1,38 @@ +/* +This file is part of Telegram Desktop, +the official desktop application for the Telegram messaging service. + +For license and copyright information please follow this link: +https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL +*/ +#pragma once + +#include "base/flat_map.h" + +namespace MTP::details { + +// Received msgIds and wereAcked msgIds count stored. +inline constexpr auto kIdsBufferSize = 400; + +class ReceivedIdsManager final { +public: + enum class State { + NotFound, + NeedsAck, + NoAckNeeded, + }; + + bool registerMsgId(mtpMsgId msgId, bool needAck); + [[nodiscard]] mtpMsgId min() const; + [[nodiscard]] mtpMsgId max() const; + [[nodiscard]] State lookup(mtpMsgId msgId) const; + + void shrink(); + void clear(); + +private: + base::flat_map _idsNeedAck; + +}; + +} // namespace MTP::details diff --git a/Telegram/SourceFiles/mtproto/mtp_instance.h b/Telegram/SourceFiles/mtproto/mtp_instance.h index dc400e8b6c..3dacb252a0 100644 --- a/Telegram/SourceFiles/mtproto/mtp_instance.h +++ b/Telegram/SourceFiles/mtproto/mtp_instance.h @@ -100,6 +100,7 @@ public: void onStateChange(ShiftedDcId shiftedDcId, int32 state); void onSessionReset(ShiftedDcId shiftedDcId); + // Thread-safe. void clearCallbacksDelayed(std::vector &&ids); void execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end); diff --git a/Telegram/SourceFiles/mtproto/session.cpp b/Telegram/SourceFiles/mtproto/session.cpp index 0504cb0459..64677c4abb 100644 --- a/Telegram/SourceFiles/mtproto/session.cpp +++ b/Telegram/SourceFiles/mtproto/session.cpp @@ -17,28 +17,6 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL namespace MTP { namespace internal { -namespace { - -// How much time passed from send till we resend request or check its state. -constexpr auto kCheckResendTimeout = crl::time(10000); - -// How much time to wait for some more requests, -// when resending request or checking its state. -constexpr auto kCheckResendWaiting = crl::time(1000); - -// Container lives 10 minutes in haveSent map. -constexpr auto kContainerLives = 600; - -QString LogIds(const QVector &ids) { - if (!ids.size()) return "[]"; - auto idsStr = QString("[%1").arg(*ids.cbegin()); - for (const auto id : ids) { - idsStr += QString(", %2").arg(id); - } - return idsStr + "]"; -} - -} // namespace ConnectionOptions::ConnectionOptions( const QString &systemLangCode, @@ -72,53 +50,6 @@ void SessionData::withSession(Callback &&callback) { } } -bool SessionData::setCurrentKeyId(uint64 keyId) { - QWriteLocker locker(&_lock); - if (_keyId == keyId) { - return false; - } - _keyId = keyId; - - DEBUG_LOG(("MTP Info: auth key set in SessionData, id %1").arg(keyId)); - - changeSessionIdLocked(); - return true; -} - -void SessionData::changeSessionId() { - QWriteLocker locker(&_lock); - changeSessionIdLocked(); -} - -void SessionData::changeSessionIdLocked() { - auto sessionId = _sessionId; - do { - sessionId = openssl::RandomValue(); - } while (_sessionId == sessionId); - - DEBUG_LOG(("MTP Info: setting server_session: %1").arg(sessionId)); - - _sessionId = sessionId; - _messagesSent = 0; - _sessionMarkedAsStarted = false; -} - -uint32 SessionData::nextRequestSeqNumber(bool needAck) { - QWriteLocker locker(&_lock); - auto result = _messagesSent; - _messagesSent += (needAck ? 1 : 0); - return result * 2 + (needAck ? 1 : 0); -} - -bool SessionData::markSessionAsStarted() { - QWriteLocker locker(&_lock); - if (_sessionMarkedAsStarted) { - return false; - } - _sessionMarkedAsStarted = true; - return true; -} - void SessionData::setKeyForCheck(const AuthKeyPtr &key) { _dcKeyForCheck = key; } @@ -173,10 +104,6 @@ void SessionData::clearForNewKey(not_null instance) { QWriteLocker locker(wereAckedMutex()); _wereAcked.clear(); } - { - QWriteLocker locker(receivedIdsMutex()); - _receivedIds.clear(); - } instance->clearCallbacksDelayed(std::move(clearCallbacks)); } @@ -291,7 +218,6 @@ Session::Session( , _ownedDc(dc ? nullptr : std::make_unique(shiftedDcId, nullptr)) , _dc(dc ? dc : _ownedDc.get()) , _data(std::make_shared(this)) -, _timeouter([=] { checkRequestsByTimer(); }) , _sender([=] { needToResumeAndSend(); }) { _timeouter.callEach(1000); refreshOptions(); @@ -459,60 +385,6 @@ bool Session::sharedDc() const { return (_ownedDc == nullptr); } -void Session::checkRequestsByTimer() { - QVector removingIds; // remove very old (10 minutes) containers and resend requests - QVector stateRequestIds; - - { - QReadLocker locker(_data->haveSentMutex()); - auto &haveSent = _data->haveSentMap(); - const auto haveSentCount = haveSent.size(); - auto ms = crl::now(); - for (auto i = haveSent.begin(), e = haveSent.end(); i != e; ++i) { - auto &req = i.value(); - if (req->msDate > 0) { - if (req->msDate + kCheckResendTimeout < ms) { // need to resend or check state - req->msDate = ms; - stateRequestIds.reserve(haveSentCount); - stateRequestIds.push_back(i.key()); - } - } else if (base::unixtime::now() - > int32(i.key() >> 32) + kContainerLives) { - removingIds.reserve(haveSentCount); - removingIds.push_back(i.key()); - } - } - } - - if (stateRequestIds.size()) { - DEBUG_LOG(("MTP Info: requesting state of msgs: %1").arg(LogIds(stateRequestIds))); - { - QWriteLocker locker(_data->stateRequestMutex()); - for (uint32 i = 0, l = stateRequestIds.size(); i < l; ++i) { - _data->stateRequestMap().insert(stateRequestIds[i], true); - } - } - sendAnything(kCheckResendWaiting); - } - if (!removingIds.isEmpty()) { - auto clearCallbacks = std::vector(); - { - QWriteLocker locker(_data->haveSentMutex()); - auto &haveSent = _data->haveSentMap(); - for (uint32 i = 0, l = removingIds.size(); i < l; ++i) { - auto j = haveSent.find(removingIds[i]); - if (j != haveSent.cend()) { - if (j.value()->requestId) { - clearCallbacks.push_back(j.value()->requestId); - } - haveSent.erase(j); - } - } - } - _instance->clearCallbacksDelayed(std::move(clearCallbacks)); - } -} - void Session::connectionStateChange(int newState) { _instance->onStateChange(_shiftedDcId, newState); } diff --git a/Telegram/SourceFiles/mtproto/session.h b/Telegram/SourceFiles/mtproto/session.h index b3034dcc83..2508e1cc89 100644 --- a/Telegram/SourceFiles/mtproto/session.h +++ b/Telegram/SourceFiles/mtproto/session.h @@ -21,9 +21,6 @@ using AuthKeyPtr = std::shared_ptr; namespace internal { -// Received msgIds and wereAcked msgIds count stored. -constexpr auto kIdsBufferSize = 400; - class Dcenter; class Connection; @@ -45,60 +42,6 @@ public: }; -class ReceivedMsgIds { -public: - bool registerMsgId(mtpMsgId msgId, bool needAck) { - auto i = _idsNeedAck.constFind(msgId); - if (i == _idsNeedAck.cend()) { - if (_idsNeedAck.size() < kIdsBufferSize || msgId > min()) { - _idsNeedAck.insert(msgId, needAck); - return true; - } - MTP_LOG(-1, ("No need to handle - %1 < min = %2").arg(msgId).arg(min())); - } else { - MTP_LOG(-1, ("No need to handle - %1 already is in map").arg(msgId)); - } - return false; - } - - mtpMsgId min() const { - return _idsNeedAck.isEmpty() ? 0 : _idsNeedAck.cbegin().key(); - } - - mtpMsgId max() const { - auto end = _idsNeedAck.cend(); - return _idsNeedAck.isEmpty() ? 0 : (--end).key(); - } - - void shrink() { - auto size = _idsNeedAck.size(); - while (size-- > kIdsBufferSize) { - _idsNeedAck.erase(_idsNeedAck.begin()); - } - } - - enum class State { - NotFound, - NeedsAck, - NoAckNeeded, - }; - State lookup(mtpMsgId msgId) const { - auto i = _idsNeedAck.constFind(msgId); - if (i == _idsNeedAck.cend()) { - return State::NotFound; - } - return i.value() ? State::NeedsAck : State::NoAckNeeded; - } - - void clear() { - _idsNeedAck.clear(); - } - -private: - QMap _idsNeedAck; - -}; - using SerializedMessage = mtpBuffer; inline bool ResponseNeedsAck(const SerializedMessage &response) { @@ -140,31 +83,16 @@ public: SessionData(not_null creator) : _owner(creator) { } - bool setCurrentKeyId(uint64 keyId); - void changeSessionId(); - [[nodiscard]] uint64 getSessionId() const { - QReadLocker locker(&_lock); - return _sessionId; - } void notifyConnectionInited(const ConnectionOptions &options); void setConnectionOptions(ConnectionOptions options) { - QWriteLocker locker(&_lock); + QWriteLocker locker(&_optionsLock); _options = options; } [[nodiscard]] ConnectionOptions connectionOptions() const { - QReadLocker locker(&_lock); + QReadLocker locker(&_optionsLock); return _options; } - void setSalt(uint64 salt) { - QWriteLocker locker(&_lock); - _salt = salt; - } - [[nodiscard]] uint64 getSalt() const { - QReadLocker locker(&_lock); - return _salt; - } - [[nodiscard]] const AuthKeyPtr &getKeyForCheck() const { return _dcKeyForCheck; } @@ -182,15 +110,9 @@ public: not_null wereAckedMutex() const { return &_wereAckedLock; } - not_null receivedIdsMutex() const { - return &_receivedIdsLock; - } not_null haveReceivedMutex() const { return &_haveReceivedLock; } - not_null stateRequestMutex() const { - return &_stateRequestLock; - } PreRequestMap &toSendMap() { return _toSend; @@ -210,12 +132,6 @@ public: const RequestIdsMap &toResendMap() const { return _toResend; } - ReceivedMsgIds &receivedIdsSet() { - return _receivedIds; - } - const ReceivedMsgIds &receivedIdsSet() const { - return _receivedIds; - } RequestIdsMap &wereAckedMap() { return _wereAcked; } @@ -234,21 +150,12 @@ public: const QList &haveReceivedUpdates() const { return _receivedUpdates; } - QMap &stateRequestMap() { - return _stateRequest; - } - const QMap &stateRequestMap() const { - return _stateRequest; - } // Warning! Valid only in constructor, _owner is guaranteed != null. [[nodiscard]] not_null owner() { return _owner; } - [[nodiscard]] bool markSessionAsStarted(); - [[nodiscard]] uint32 nextRequestSeqNumber(bool needAck); - void clearForNewKey(not_null instance); // Connection -> Session interface. @@ -277,17 +184,9 @@ public: void detach(); private: - void changeSessionIdLocked(); - template void withSession(Callback &&callback); - uint64 _keyId = 0; - uint64 _sessionId = 0; - uint64 _salt = 0; - uint32 _messagesSent = 0; - bool _sessionMarkedAsStarted = false; - Session *_owner = nullptr; mutable QMutex _ownerMutex; @@ -297,22 +196,18 @@ private: PreRequestMap _toSend; // map of request_id -> request, that is waiting to be sent RequestMap _haveSent; // map of msg_id -> request, that was sent, msDate = 0 for msgs_state_req (no resend / state req), msDate = 0, seqNo = 0 for containers RequestIdsMap _toResend; // map of msg_id -> request_id, that request_id -> request lies in toSend and is waiting to be resent - ReceivedMsgIds _receivedIds; // set of received msg_id's, for checking new msg_ids RequestIdsMap _wereAcked; // map of msg_id -> request_id, this msg_ids already were acked or do not need ack - QMap _stateRequest; // set of msg_id's, whose state should be requested QMap _receivedResponses; // map of request_id -> response that should be processed in the main thread QList _receivedUpdates; // list of updates that should be processed in the main thread // mutexes - mutable QReadWriteLock _lock; + mutable QReadWriteLock _optionsLock; mutable QReadWriteLock _toSendLock; mutable QReadWriteLock _haveSentLock; mutable QReadWriteLock _toResendLock; - mutable QReadWriteLock _receivedIdsLock; mutable QReadWriteLock _wereAckedLock; mutable QReadWriteLock _haveReceivedLock; - mutable QReadWriteLock _stateRequestLock; }; @@ -388,7 +283,6 @@ signals: private: [[nodiscard]] bool sharedDc() const; - void checkRequestsByTimer(); void watchDcKeyChanges(); bool rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err); diff --git a/Telegram/gyp/lib_mtproto.gyp b/Telegram/gyp/lib_mtproto.gyp index 93a02d331c..4b01766a2a 100644 --- a/Telegram/gyp/lib_mtproto.gyp +++ b/Telegram/gyp/lib_mtproto.gyp @@ -42,6 +42,8 @@ '<(src_loc)/mtproto/details/mtproto_dc_key_creator.h', '<(src_loc)/mtproto/details/mtproto_dump_to_text.cpp', '<(src_loc)/mtproto/details/mtproto_dump_to_text.h', + '<(src_loc)/mtproto/details/mtproto_received_ids_manager.cpp', + '<(src_loc)/mtproto/details/mtproto_received_ids_manager.h', '<(src_loc)/mtproto/mtproto_abstract_socket.cpp', '<(src_loc)/mtproto/mtproto_abstract_socket.h', '<(src_loc)/mtproto/mtproto_auth_key.cpp',