diff --git a/Telegram/SourceFiles/mtproto/connection.cpp b/Telegram/SourceFiles/mtproto/connection.cpp index 05cc6c457d..1e5f17c6aa 100644 --- a/Telegram/SourceFiles/mtproto/connection.cpp +++ b/Telegram/SourceFiles/mtproto/connection.cpp @@ -218,42 +218,52 @@ int16 Connection::getProtocolDcId() const { void Connection::checkSentRequests() { // Remove very old (10 minutes) containers and resend requests. auto removingIds = std::vector(); + auto restarting = false; auto requesting = false; { QReadLocker locker(_sessionData->haveSentMutex()); auto &haveSent = _sessionData->haveSentMap(); const auto haveSentCount = haveSent.size(); - auto now = crl::now(); + const auto now = crl::now(); + const auto checkAfter = kCheckSentRequestTimeout; for (const auto &[msgId, request] : haveSent) { if (request.isStateRequest()) { continue; } else if (request.isSentContainer()) { - if (base::unixtime::now() - > int32(msgId >> 32) + kContainerLives) { + if (now > request->lastSentTime + kContainerLives) { removingIds.push_back(msgId); + DEBUG_LOG(("MTP Info: Removing old container %1, " + "sent: %2, now: %3, current unixtime: %4" + ).arg(msgId + ).arg(request->lastSentTime + ).arg(now + ).arg(base::unixtime::now())); } - } else if (request->lastSentTime + kCheckSentRequestTimeout - < now) { + } else if (request->lastSentTime + checkAfter < now) { // Need to check state. request->lastSentTime = now; - if (_stateRequestData.emplace(msgId).second) { + if (_bindMsgId) { + restarting = true; + } else if (_stateRequestData.emplace(msgId).second) { requesting = true; } } } } - if (requesting) { - _sessionData->queueSendAnything(kSendStateRequestWaiting); - } if (!removingIds.empty()) { QWriteLocker locker(_sessionData->haveSentMutex()); auto &haveSent = _sessionData->haveSentMap(); for (const auto msgId : removingIds) { - if (const auto removed = haveSent.take(msgId)) { - Assert(!(*removed)->requestId); - } + haveSent.remove(msgId); } } + if (restarting) { + DEBUG_LOG(("MTP Info: " + "Request state while key is not bound, restarting.")); + restart(); + } else if (requesting) { + _sessionData->queueSendAnything(kSendStateRequestWaiting); + } } void Connection::destroyAllConnections() { @@ -479,7 +489,12 @@ mtpMsgId Connection::placeToContainer( } void Connection::tryToSend() { - if (!_connection || !_keyId) { + DEBUG_LOG(("MTP Info: tryToSend for dc %1.").arg(_shiftedDcId)); + if (!_connection) { + DEBUG_LOG(("MTP Info: not yet connected in dc %1.").arg(_shiftedDcId)); + return; + } else if (!_keyId) { + DEBUG_LOG(("MTP Info: not yet with auth key in dc %1.").arg(_shiftedDcId)); return; } @@ -1051,6 +1066,7 @@ void Connection::sendPingByTimer() { } void Connection::sendPingForce() { + DEBUG_LOG(("MTP Info: send ping force for dcWithShift %1.").arg(_shiftedDcId)); if (!_pingId) { _pingSendAt = 0; DEBUG_LOG(("Will send ping!")); @@ -1496,14 +1512,20 @@ Connection::HandleResult Connection::handleOneReceived( const auto requestId = wasSent(resendId); if (requestId) { LOG(("Message Error: " - "bad message notification received, " - "msgId %1, error_code %2, fatal: clearing callbacks" + "fatal bad message notification received, " + "msgId %1, error_code %2, requestId: %3" ).arg(badMsgId ).arg(errorCode - )); - _instance->clearCallbacksDelayed({ 1, RPCCallbackClear( - requestId, - -errorCode) }); + ).arg(requestId)); + auto response = mtpBuffer(); + MTPRpcError(MTP_rpc_error( + MTP_int(500), + MTP_string("PROTOCOL_ERROR") + )).write(response); + + // Save rpc_error for processing in the main thread. + QWriteLocker locker(_sessionData->haveReceivedMutex()); + _sessionData->haveReceivedResponses().emplace(requestId, response); } else { DEBUG_LOG(("Message Error: " "such message was not sent recently %1").arg(badMsgId)); @@ -2006,18 +2028,10 @@ void Connection::requestsAcked(const QVector &ids, bool byResponse) { DEBUG_LOG(("Message Info: removing some old acked sent msgIds %1").arg(ackedCount - kIdsBufferSize)); clearedBecauseTooOld.reserve(ackedCount - kIdsBufferSize); while (ackedCount-- > kIdsBufferSize) { - auto i = _ackedIds.begin(); - clearedBecauseTooOld.push_back(RPCCallbackClear( - i->second, - RPCError::TimeoutError)); - _ackedIds.erase(i); + _ackedIds.erase(_ackedIds.begin()); } } - if (!clearedBecauseTooOld.empty()) { - _instance->clearCallbacksDelayed(std::move(clearedBecauseTooOld)); - } - if (toAckMore.size()) { requestsAcked(toAckMore); } @@ -2260,7 +2274,7 @@ void Connection::applyAuthKey(AuthKeyPtr &&encryptionKey) { return; } setCurrentKeyId(0); - DEBUG_LOG(("MTP Error: auth_key id for dc %1 changed, restarting..." + DEBUG_LOG(("MTP Info: auth_key id for dc %1 changed, restarting..." ).arg(_shiftedDcId)); if (_connection) { restart(); diff --git a/Telegram/SourceFiles/mtproto/mtp_instance.cpp b/Telegram/SourceFiles/mtproto/mtp_instance.cpp index a24caabee4..2af0788f07 100644 --- a/Telegram/SourceFiles/mtproto/mtp_instance.cpp +++ b/Telegram/SourceFiles/mtproto/mtp_instance.cpp @@ -122,7 +122,6 @@ public: const SerializedRequest &request, RPCResponseHandler &&callbacks); SerializedRequest getRequest(mtpRequestId requestId); - void clearCallbacksDelayed(std::vector &&ids); void execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end); bool hasCallbacks(mtpRequestId requestId); void globalCallback(const mtpPrime *from, const mtpPrime *end); @@ -189,12 +188,6 @@ private: std::optional changeRequestByDc( mtpRequestId requestId, DcId newdc); - // RPCError::NoError means do not toggle onError callback. - void clearCallbacks( - mtpRequestId requestId, - int32 errorCode = RPCError::NoError); - void clearCallbacks(const std::vector &ids); - void checkDelayedRequests(); const not_null _instance; @@ -562,7 +555,9 @@ void Instance::Private::cancel(mtpRequestId requestId) { const auto session = getSession(qAbs(*shiftedDcId)); session->cancel(requestId, msgId); } - clearCallbacks(requestId); + + QMutexLocker locker(&_parserMapLock); + _parserMap.erase(requestId); } // result < 0 means waiting for such count of ms. @@ -993,76 +988,6 @@ SerializedRequest Instance::Private::getRequest(mtpRequestId requestId) { } -void Instance::Private::clearCallbacks(mtpRequestId requestId, int32 errorCode) { - RPCResponseHandler h; - bool found = false; - { - QMutexLocker locker(&_parserMapLock); - auto it = _parserMap.find(requestId); - if (it != _parserMap.end()) { - h = it->second; - found = true; - - _parserMap.erase(it); - } - } - if (errorCode && found) { - LOG(("API Error: callbacks cleared without handling! " - "Request: %1, error code: %2" - ).arg(requestId - ).arg(errorCode)); - rpcErrorOccured( - requestId, - h, - RPCError::Local( - "CLEAR_CALLBACK", - QString("did not handle request %1, error code %2" - ).arg(requestId - ).arg(errorCode))); - } -} - -void Instance::Private::clearCallbacksDelayed( - std::vector &&ids) { - if (ids.empty()) { - return; - } - - if (Logs::DebugEnabled()) { - auto idsString = QStringList(); - idsString.reserve(ids.size()); - for (auto &value : ids) { - idsString.push_back(QString::number(value.requestId)); - } - DEBUG_LOG(("RPC Info: clear callbacks delayed, msgIds: %1" - ).arg(idsString.join(", "))); - } - - InvokeQueued(_instance, [=, list = std::move(ids)] { - clearCallbacks(list); - }); -} - -void Instance::Private::clearCallbacks( - const std::vector &ids) { - Expects(!ids.empty()); - - for (const auto &clearRequest : ids) { - if (Logs::DebugEnabled()) { - QMutexLocker locker(&_parserMapLock); - const auto hasParsers = (_parserMap.find(clearRequest.requestId) - != _parserMap.end()); - DEBUG_LOG(("RPC Info: " - "clearing delayed callback %1, error code %2, parsers: %3" - ).arg(clearRequest.requestId - ).arg(clearRequest.errorCode - ).arg(Logs::b(hasParsers))); - } - clearCallbacks(clearRequest.requestId, clearRequest.errorCode); - unregisterRequest(clearRequest.requestId); - } -} - void Instance::Private::execCallback( mtpRequestId requestId, const mtpPrime *from, @@ -1859,10 +1784,6 @@ void Instance::onSessionReset(ShiftedDcId shiftedDcId) { _private->onSessionReset(shiftedDcId); } -void Instance::clearCallbacksDelayed(std::vector &&ids) { - _private->clearCallbacksDelayed(std::move(ids)); -} - void Instance::execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end) { _private->execCallback(requestId, from, end); } diff --git a/Telegram/SourceFiles/mtproto/mtp_instance.h b/Telegram/SourceFiles/mtproto/mtp_instance.h index 2eb80fb8dd..7f6d7f463a 100644 --- a/Telegram/SourceFiles/mtproto/mtp_instance.h +++ b/Telegram/SourceFiles/mtproto/mtp_instance.h @@ -100,9 +100,6 @@ public: void onStateChange(ShiftedDcId shiftedDcId, int32 state); void onSessionReset(ShiftedDcId shiftedDcId); - // Thread-safe. - void clearCallbacksDelayed(std::vector &&ids); - void execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end); bool hasCallbacks(mtpRequestId requestId); void globalCallback(const mtpPrime *from, const mtpPrime *end); diff --git a/Telegram/SourceFiles/mtproto/session.cpp b/Telegram/SourceFiles/mtproto/session.cpp index 00294cd3db..b81859c274 100644 --- a/Telegram/SourceFiles/mtproto/session.cpp +++ b/Telegram/SourceFiles/mtproto/session.cpp @@ -175,10 +175,12 @@ void Session::watchDcKeyChanges() { ) | rpl::filter([=](DcId dcId) { return (dcId == _shiftedDcId) || (dcId == BareDcId(_shiftedDcId)); }) | rpl::start_with_next([=] { - DEBUG_LOG(("AuthKey Info: Session::authKeyCreatedForDC slot, " - "emitting authKeyChanged(), dcWithShift %1").arg(_shiftedDcId)); + DEBUG_LOG(("AuthKey Info: dcTemporaryKeyChanged in Session %1" + ).arg(_shiftedDcId)); if (const auto connection = _connection) { InvokeQueued(connection, [=] { + DEBUG_LOG(("AuthKey Info: calling Connection::updateAuthKey in Session %1" + ).arg(_shiftedDcId)); connection->updateAuthKey(); }); }