From d9fc3619c2a5dd3d680e695239fc4d801a0ad742 Mon Sep 17 00:00:00 2001 From: John Preston Date: Mon, 18 Nov 2019 15:53:37 +0300 Subject: [PATCH] Remove all signals from ConnectionPrivate. --- Telegram/SourceFiles/mtproto/connection.cpp | 240 ++++++----------- Telegram/SourceFiles/mtproto/connection.h | 65 ++--- Telegram/SourceFiles/mtproto/dcenter.cpp | 19 +- Telegram/SourceFiles/mtproto/dcenter.h | 9 +- Telegram/SourceFiles/mtproto/mtp_instance.cpp | 24 +- Telegram/SourceFiles/mtproto/mtp_instance.h | 5 +- Telegram/SourceFiles/mtproto/session.cpp | 255 ++++++++++++++---- Telegram/SourceFiles/mtproto/session.h | 79 ++++-- 8 files changed, 399 insertions(+), 297 deletions(-) diff --git a/Telegram/SourceFiles/mtproto/connection.cpp b/Telegram/SourceFiles/mtproto/connection.cpp index 8544699609..1ab49e7959 100644 --- a/Telegram/SourceFiles/mtproto/connection.cpp +++ b/Telegram/SourceFiles/mtproto/connection.cpp @@ -23,15 +23,6 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "base/qthelp_url.h" #include "base/unixtime.h" -extern "C" { -#include -#include -#include -#include -#include -#include -} // extern "C" - #ifdef small #undef small #endif // small @@ -107,7 +98,7 @@ Connection::~Connection() { } } -void Connection::start(SessionData *sessionData, ShiftedDcId shiftedDcId) { +void Connection::start(std::shared_ptr sessionData, ShiftedDcId shiftedDcId) { Expects(_thread == nullptr && _private == nullptr); _thread = std::make_unique(); @@ -115,7 +106,7 @@ void Connection::start(SessionData *sessionData, ShiftedDcId shiftedDcId) { _instance, _thread.get(), this, - sessionData, + std::move(sessionData), shiftedDcId); // will be deleted in the thread::finished signal @@ -214,10 +205,7 @@ int16 ConnectionPrivate::getProtocolDcId() const { } void ConnectionPrivate::destroyAllConnections() { - { - QReadLocker lockFinished(&_sessionDataMutex); - clearKeyCreatorOnFail(); - } + clearKeyCreatorOnFail(); _waitForBetterTimer.cancel(); _waitForReceivedTimer.cancel(); _waitForConnectedTimer.cancel(); @@ -229,7 +217,7 @@ ConnectionPrivate::ConnectionPrivate( not_null instance, not_null thread, not_null owner, - not_null data, + std::shared_ptr data, ShiftedDcId shiftedDcId) : QObject(nullptr) , _instance(instance) @@ -244,7 +232,7 @@ ConnectionPrivate::ConnectionPrivate( , _waitForReceived(kMinReceiveTimeout) , _waitForConnected(kMinConnectedTimeout) , _pingSender(thread, [=] { sendPingByTimer(); }) -, _sessionData(data) { +, _sessionData(std::move(data)) { Expects(_shiftedDcId != 0); moveToThread(thread); @@ -254,33 +242,16 @@ ConnectionPrivate::ConnectionPrivate( connect(_sessionData->owner(), SIGNAL(authKeyChanged()), this, SLOT(updateAuthKey()), Qt::QueuedConnection); connect(_sessionData->owner(), SIGNAL(needToRestart()), this, SLOT(restartNow()), Qt::QueuedConnection); - connect(this, SIGNAL(needToReceive()), _sessionData->owner(), SLOT(tryToReceive()), Qt::QueuedConnection); - connect(this, SIGNAL(stateChanged(qint32)), _sessionData->owner(), SLOT(onConnectionStateChange(qint32)), Qt::QueuedConnection); connect(_sessionData->owner(), SIGNAL(needToSend()), this, SLOT(tryToSend()), Qt::QueuedConnection); connect(_sessionData->owner(), SIGNAL(needToPing()), this, SLOT(onPingSendForce()), Qt::QueuedConnection); - connect(this, SIGNAL(sessionResetDone()), _sessionData->owner(), SLOT(onResetDone()), Qt::QueuedConnection); - - static bool _registered = false; - if (!_registered) { - _registered = true; - qRegisterMetaType >("QVector"); - } - - connect(this, SIGNAL(needToSendAsync()), _sessionData->owner(), SLOT(needToResumeAndSend()), Qt::QueuedConnection); - connect(this, SIGNAL(sendAnythingAsync(qint64)), _sessionData->owner(), SLOT(sendAnything(qint64)), Qt::QueuedConnection); - connect(this, SIGNAL(sendHttpWaitAsync()), _sessionData->owner(), SLOT(sendAnything()), Qt::QueuedConnection); - connect(this, SIGNAL(sendPongAsync(quint64,quint64)), _sessionData->owner(), SLOT(sendPong(quint64,quint64)), Qt::QueuedConnection); - connect(this, SIGNAL(sendMsgsStateInfoAsync(quint64, QByteArray)), _sessionData->owner(), SLOT(sendMsgsStateInfo(quint64,QByteArray)), Qt::QueuedConnection); - connect(this, SIGNAL(resendAsync(quint64,qint64,bool,bool)), _sessionData->owner(), SLOT(resend(quint64,qint64,bool,bool)), Qt::QueuedConnection); - connect(this, SIGNAL(resendManyAsync(QVector,qint64,bool,bool)), _sessionData->owner(), SLOT(resendMany(QVector,qint64,bool,bool)), Qt::QueuedConnection); - connect(this, SIGNAL(resendAllAsync()), _sessionData->owner(), SLOT(resendAll()), Qt::QueuedConnection); } ConnectionPrivate::~ConnectionPrivate() { + clearKeyCreatorOnFail(); + Expects(_finished); Expects(!_connection); Expects(_testConnections.empty()); - Expects(!_keyCreator); } void ConnectionPrivate::onConfigLoaded() { @@ -324,6 +295,7 @@ bool ConnectionPrivate::setState(int32 state, int32 ifState) { QReadLocker lock(&stateConnMutex); if (_state != ifState) return false; } + QWriteLocker lock(&stateConnMutex); if (_state == state) return false; _state = state; @@ -332,7 +304,9 @@ bool ConnectionPrivate::setState(int32 state, int32 ifState) { _retryTimer.callOnce(_retryTimeout); _retryWillFinish = crl::now() + _retryTimeout; } - emit stateChanged(state); + lock.unlock(); + + _sessionData->queueConnectionStateChange(state); return true; } @@ -465,7 +439,7 @@ void ConnectionPrivate::resetSession() { // recreate all msg_id and msg_seqno _sessionData->stateRequestMap().clear(); } - emit sessionResetDone(); + _sessionData->queueResetDone(); } mtpMsgId ConnectionPrivate::prepareToSend( @@ -570,12 +544,11 @@ mtpMsgId ConnectionPrivate::placeToContainer(SecureRequest &toSendRequest, mtpMs } void ConnectionPrivate::tryToSend() { - QReadLocker lockFinished(&_sessionDataMutex); - if (!_sessionData || !_connection || !_keyId) { + if (!_connection || !_keyId) { return; } - auto needsLayer = !_sessionData->owner()->connectionInited(); + auto needsLayer = !_sessionData->connectionInited(); auto state = getState(); auto sendOnlyFirstPing = (state != ConnectedState); if (sendOnlyFirstPing && !_pingIdToSend) { @@ -926,16 +899,10 @@ void ConnectionPrivate::tryToSend() { toSend.clear(); } } - sendSecureRequest( - std::move(toSendRequest), - needAnyResponse, - lockFinished); + sendSecureRequest(std::move(toSendRequest), needAnyResponse); } void ConnectionPrivate::retryByTimer() { - QReadLocker lockFinished(&_sessionDataMutex); - if (!_sessionData) return; - if (_retryTimeout < 3) { ++_retryTimeout; } else if (_retryTimeout == 3) { @@ -959,17 +926,11 @@ void ConnectionPrivate::connectToServer(bool afterConfig) { return; } - QReadLocker lockFinished(&_sessionDataMutex); - if (!_sessionData) { - DEBUG_LOG(("MTP Error: " - "connectToServer() called for stopped connection!")); - return; - } _connectionOptions = std::make_unique( _sessionData->connectionOptions()); + // #TODO race. - const auto hasKey = (_sessionData->owner()->getKey() != nullptr); - lockFinished.unlock(); + const auto hasKey = (_sessionData->getKey() != nullptr); const auto bareDc = BareDcId(_shiftedDcId); _dcType = _instance->dcOptions()->dcType(_shiftedDcId); @@ -1063,6 +1024,7 @@ void ConnectionPrivate::connectToServer(bool afterConfig) { _waitForConnectedTimer.cancel(); setState(ConnectingState); + _pingId = _pingMsgId = _pingIdToSend = _pingSendAt = 0; _pingSender.cancel(); @@ -1070,25 +1032,23 @@ void ConnectionPrivate::connectToServer(bool afterConfig) { } void ConnectionPrivate::restart() { - QReadLocker lockFinished(&_sessionDataMutex); - if (!_sessionData) return; - DEBUG_LOG(("MTP Info: restarting Connection")); _waitForReceivedTimer.cancel(); _waitForConnectedTimer.cancel(); - lockFinished.unlock(); doDisconnect(); - lockFinished.relock(); - if (_sessionData && _needSessionReset) { + if (_needSessionReset) { resetSession(); } _restarted = true; - if (_retryTimer.isActive()) return; + if (_retryTimer.isActive()) { + return; + } DEBUG_LOG(("MTP Info: restart timeout: %1ms").arg(_retryTimeout)); + setState(-_retryTimeout); } @@ -1152,7 +1112,7 @@ void ConnectionPrivate::sendPingByTimer() { _pingSender.callOnce(mustSendTill - now); } } else { - emit needToSendAsync(); + _sessionData->queueNeedToResumeAndSend(); } } @@ -1244,16 +1204,8 @@ void ConnectionPrivate::requestCDNConfig() { } void ConnectionPrivate::handleReceived() { - QReadLocker lockFinished(&_sessionDataMutex); - if (!_sessionData) return; - onReceivedSome(); - const auto restartOnError = [&] { - lockFinished.unlock(); - restart(); - }; - while (!_connection->received().empty()) { auto intsBuffer = std::move(_connection->received().front()); _connection->received().pop_front(); @@ -1268,13 +1220,13 @@ void ConnectionPrivate::handleReceived() { LOG(("TCP Error: bad message received, len %1").arg(intsCount * kIntSize)); TCP_LOG(("TCP Error: bad message %1").arg(Logs::mb(ints, intsCount * kIntSize).str())); - return restartOnError(); + return restart(); } if (_keyId != *(uint64*)ints) { LOG(("TCP Error: bad auth_key_id %1 instead of %2 received").arg(_keyId).arg(*(uint64*)ints)); TCP_LOG(("TCP Error: bad message %1").arg(Logs::mb(ints, intsCount * kIntSize).str())); - return restartOnError(); + return restart(); } auto encryptedInts = ints + kExternalHeaderIntsCount; @@ -1301,7 +1253,7 @@ void ConnectionPrivate::handleReceived() { LOG(("TCP Error: bad messageLength %1").arg(messageLength)); TCP_LOG(("TCP Error: bad message %1").arg(Logs::mb(ints, intsCount * kIntSize).str())); - return restartOnError(); + return restart(); } auto fullDataLength = kEncryptedHeaderIntsCount * kIntSize + messageLength; // Without padding. @@ -1322,7 +1274,7 @@ void ConnectionPrivate::handleReceived() { LOG(("TCP Error: bad SHA1 hash after aesDecrypt in message.")); TCP_LOG(("TCP Error: bad message %1").arg(Logs::mb(encryptedInts, encryptedBytesCount).str())); - return restartOnError(); + return restart(); } #else // TDESKTOP_MTPROTO_OLD constexpr auto kMinPaddingSize = 12U; @@ -1342,7 +1294,7 @@ void ConnectionPrivate::handleReceived() { LOG(("TCP Error: bad SHA256 hash after aesDecrypt in message")); TCP_LOG(("TCP Error: bad message %1").arg(Logs::mb(encryptedInts, encryptedBytesCount).str())); - return restartOnError(); + return restart(); } #endif // TDESKTOP_MTPROTO_OLD @@ -1350,7 +1302,7 @@ void ConnectionPrivate::handleReceived() { LOG(("TCP Error: bad msg_len received %1, data size: %2").arg(messageLength).arg(encryptedBytesCount)); TCP_LOG(("TCP Error: bad message %1").arg(Logs::mb(encryptedInts, encryptedBytesCount).str())); - return restartOnError(); + return restart(); } TCP_LOG(("TCP Info: decrypted message %1,%2,%3 is %4 len").arg(msgId).arg(seqNo).arg(Logs::b(needAck)).arg(fullDataLength)); @@ -1360,7 +1312,7 @@ void ConnectionPrivate::handleReceived() { LOG(("MTP Error: bad server session received")); TCP_LOG(("MTP Error: bad server session %1 instead of %2 in message received").arg(session).arg(serverSession)); - return restartOnError(); + return restart(); } const auto serverTime = int32(msgId >> 32); @@ -1369,7 +1321,7 @@ void ConnectionPrivate::handleReceived() { if (!isReply && ((msgId & 0x03) != 3)) { LOG(("MTP Error: bad msg_id %1 in message received").arg(msgId)); - return restartOnError(); + return restart(); } bool badTime = false; @@ -1384,11 +1336,10 @@ void ConnectionPrivate::handleReceived() { if (!badTime) { DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2, updating...").arg(serverSalt).arg(mySalt)); _sessionData->setSalt(serverSalt); - if (setState(ConnectedState, ConnectingState)) { // only connected - if (_restarted) { - emit resendAllAsync(); - _restarted = false; - } + + if (setState(ConnectedState, ConnectingState) && _restarted) { + _sessionData->queueResendAll(); + _restarted = false; } } else { DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2").arg(serverSalt).arg(mySalt)); @@ -1419,29 +1370,24 @@ void ConnectionPrivate::handleReceived() { } // send acks - uint32 toAckSize = _ackRequestData.size(); - if (toAckSize) { + if (const auto toAckSize = _ackRequestData.size()) { DEBUG_LOG(("MTP Info: will send %1 acks, ids: %2").arg(toAckSize).arg(LogIdsVector(_ackRequestData))); - emit sendAnythingAsync(kAckSendWaiting); + _sessionData->queueSendAnything(kAckSendWaiting); } - bool emitSignal = false; - { - QReadLocker locker(_sessionData->haveReceivedMutex()); - emitSignal = !_sessionData->haveReceivedResponses().isEmpty() || !_sessionData->haveReceivedUpdates().isEmpty(); - if (emitSignal) { - DEBUG_LOG(("MTP Info: emitting needToReceive() - need to parse in another thread, %1 responses, %2 updates.").arg(_sessionData->haveReceivedResponses().size()).arg(_sessionData->haveReceivedUpdates().size())); - } - } + auto lock = QReadLocker(_sessionData->haveReceivedMutex()); + const auto tryToReceive = !_sessionData->haveReceivedResponses().isEmpty() || !_sessionData->haveReceivedUpdates().isEmpty(); + lock.unlock(); - if (emitSignal) { - emit needToReceive(); + if (tryToReceive) { + DEBUG_LOG(("MTP Info: queueTryToReceive() - need to parse in another thread, %1 responses, %2 updates.").arg(_sessionData->haveReceivedResponses().size()).arg(_sessionData->haveReceivedUpdates().size())); + _sessionData->queueTryToReceive(); } if (res != HandleResult::Success && res != HandleResult::Ignored) { _needSessionReset = (res == HandleResult::ResetSession); - return restartOnError(); + return restart(); } _retryTimeout = 1; // reset restart() timer @@ -1449,12 +1395,12 @@ void ConnectionPrivate::handleReceived() { if (!wasConnected) { if (getState() == ConnectedState) { - emit needToSendAsync(); + _sessionData->queueNeedToResumeAndSend(); } } } if (_connection->needHttpWait()) { - emit sendHttpWaitAsync(); + _sessionData->queueSendAnything(); } } @@ -1679,7 +1625,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr if (setState(ConnectedState, ConnectingState)) { // maybe only connected if (_restarted) { - emit resendAllAsync(); + _sessionData->queueResendAll(); _restarted = false; } } @@ -1741,7 +1687,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr info[i] = state; } } - emit sendMsgsStateInfoAsync(msgId, info); + _sessionData->queueSendMsgsStateInfo(msgId, info); } return HandleResult::Success; case mtpc_msgs_state_info: { @@ -2273,32 +2219,45 @@ void ConnectionPrivate::handleMsgsStates(const QVector &ids, const QByt } } -void ConnectionPrivate::resend(quint64 msgId, qint64 msCanWait, bool forceContainer, bool sendMsgStateInfo) { - if (msgId == _pingMsgId) return; - emit resendAsync(msgId, msCanWait, forceContainer, sendMsgStateInfo); +void ConnectionPrivate::resend( + mtpMsgId msgId, + crl::time msCanWait, + bool forceContainer, + bool sendMsgStateInfo) { + if (msgId == _pingMsgId) { + return; + } + _sessionData->queueResend( + msgId, + msCanWait, + forceContainer, + sendMsgStateInfo); } -void ConnectionPrivate::resendMany(QVector msgIds, qint64 msCanWait, bool forceContainer, bool sendMsgStateInfo) { +void ConnectionPrivate::resendMany( + QVector msgIds, + crl::time msCanWait, + bool forceContainer, + bool sendMsgStateInfo) { for (int32 i = 0, l = msgIds.size(); i < l; ++i) { if (msgIds.at(i) == _pingMsgId) { msgIds.remove(i); --l; } } - emit resendManyAsync(msgIds, msCanWait, forceContainer, sendMsgStateInfo); + _sessionData->queueResendMany( + std::move(msgIds), + msCanWait, + forceContainer, + sendMsgStateInfo); } void ConnectionPrivate::onConnected( not_null connection) { - QReadLocker lockFinished(&_sessionDataMutex); - if (!_sessionData) return; - disconnect(connection, &AbstractConnection::connected, nullptr, nullptr); if (!connection->isConnected()) { LOG(("Connection Error: not connected in onConnected(), " "state: %1").arg(connection->debugState())); - - lockFinished.unlock(); return restart(); } @@ -2323,8 +2282,6 @@ void ConnectionPrivate::onConnected( _waitForBetterTimer.cancel(); _connection = std::move(i->data); _testConnections.clear(); - - lockFinished.unlock(); checkAuthKey(); } } @@ -2384,13 +2341,16 @@ void ConnectionPrivate::checkAuthKey() { } void ConnectionPrivate::updateAuthKey() { - QReadLocker lockFinished(&_sessionDataMutex); - if (!_sessionData || _keyCreator) { + if (_keyCreator) { return; } DEBUG_LOG(("AuthKey Info: Connection updating key from Session, dc %1").arg(_shiftedDcId)); - _key = _sessionData->owner()->getKey(); + applyAuthKey(_sessionData->getKey()); +} + +void ConnectionPrivate::applyAuthKey(AuthKeyPtr &&key) { + _key = std::move(key); const auto newKeyId = _key ? _key->keyId() : 0; if (newKeyId) { if (_keyId == newKeyId) { @@ -2406,7 +2366,6 @@ void ConnectionPrivate::updateAuthKey() { Assert(already != newKeyId); DEBUG_LOG(("MTP Error: auth_key id for dc %1 changed").arg(_shiftedDcId)); - lockFinished.unlock(); restart(); return; } @@ -2420,11 +2379,10 @@ void ConnectionPrivate::updateAuthKey() { LOG(("MTP Error: No key %1 in updateAuthKey() for destroying.").arg(_shiftedDcId)); _instance->checkIfKeyWasDestroyed(_shiftedDcId); return; - } else if (!_sessionData->owner()->acquireKeyCreation()) { + } else if (!_sessionData->acquireKeyCreation()) { DEBUG_LOG(("AuthKey Info: No key in updateAuthKey(), but someone is creating already.")); return; } - lockFinished.unlock(); DEBUG_LOG(("AuthKey Info: No key in updateAuthKey(), creating.")); createDcKey(); @@ -2435,9 +2393,6 @@ void ConnectionPrivate::createDcKey() { using Error = DcKeyCreator::Error; auto delegate = DcKeyCreator::Delegate(); delegate.done = [=](base::expected result) { - QReadLocker lockFinished(&_sessionDataMutex); - if (!_sessionData) return; - if (result) { DEBUG_LOG(("AuthKey Info: auth key gen succeed, id: %1, server salt: %2").arg(result->key->keyId()).arg(result->serverSalt)); @@ -2445,10 +2400,8 @@ void ConnectionPrivate::createDcKey() { _sessionData->clearForNewKey(_instance); _keyCreator = nullptr; - _sessionData->owner()->releaseKeyCreationOnDone( - std::move(result->key)); - - updateAuthKey(); + _sessionData->releaseKeyCreationOnDone(result->key); + applyAuthKey(std::move(result->key)); return; } clearKeyCreatorOnFail(); @@ -2484,14 +2437,13 @@ void ConnectionPrivate::authKeyChecked() { if (_sessionData->getSalt()) { setState(ConnectedState); if (_restarted) { - emit resendAllAsync(); + _sessionData->queueResendAll(); _restarted = false; } } // else receive salt in bad_server_salt first, then try to send all the requests _pingIdToSend = rand_value(); // get server_salt - - emit needToSendAsync(); + _sessionData->queueNeedToResumeAndSend(); } void ConnectionPrivate::onError( @@ -2537,20 +2489,15 @@ void ConnectionPrivate::handleError(int errorCode) { void ConnectionPrivate::destroyCdnKey() { if (_key) { - QReadLocker lockFinished(&_sessionDataMutex); - if (_sessionData) { - _sessionData->owner()->destroyCdnKey(_keyId); - } + _sessionData->destroyCdnKey(_keyId); } _key = nullptr; _keyId = 0; - } bool ConnectionPrivate::sendSecureRequest( SecureRequest &&request, - bool needAnyResponse, - QReadLocker &lockFinished) { + bool needAnyResponse) { #ifdef TDESKTOP_MTPROTO_OLD const auto oldPadding = true; #else // TDESKTOP_MTPROTO_OLD @@ -2658,24 +2605,15 @@ mtpRequestId ConnectionPrivate::wasSent(mtpMsgId msgId) const { return 0; } -// _sessionDataMutex must be locked for read. void ConnectionPrivate::clearKeyCreatorOnFail() { - if (_keyCreator) { - _keyCreator = nullptr; - - Assert(_sessionData != nullptr); - _sessionData->owner()->releaseKeyCreationOnFail(); + if (!_keyCreator) { + return; } + _keyCreator = nullptr; + _sessionData->releaseKeyCreationOnFail(); } void ConnectionPrivate::stop() { - QWriteLocker lockFinished(&_sessionDataMutex); - if (!_sessionData) { - Assert(_keyCreator == nullptr); - return; - } - clearKeyCreatorOnFail(); - _sessionData = nullptr; } } // namespace internal diff --git a/Telegram/SourceFiles/mtproto/connection.h b/Telegram/SourceFiles/mtproto/connection.h index 5e151b29a6..78c2217a83 100644 --- a/Telegram/SourceFiles/mtproto/connection.h +++ b/Telegram/SourceFiles/mtproto/connection.h @@ -21,7 +21,7 @@ class DcKeyChecker; } // namespace details // How much time to wait for some more requests, when sending msg acks. -constexpr auto kAckSendWaiting = crl::time(10000); +constexpr auto kAckSendWaiting = 10 * crl::time(1000); class Instance; @@ -43,7 +43,7 @@ public: Connection(not_null instance); ~Connection(); - void start(SessionData *data, ShiftedDcId shiftedDcId); + void start(std::shared_ptr data, ShiftedDcId shiftedDcId); void kill(); void waitTillFinish(); @@ -68,7 +68,7 @@ public: not_null instance, not_null thread, not_null owner, - not_null data, + std::shared_ptr data, ShiftedDcId shiftedDcId); ~ConnectionPrivate(); @@ -79,21 +79,6 @@ public: int32 getState() const; QString transport() const; -signals: - void needToReceive(); - void needToRestart(); - void stateChanged(qint32 newState); - void sessionResetDone(); - - void needToSendAsync(); - void sendAnythingAsync(qint64 msWait); - void sendHttpWaitAsync(); - void sendPongAsync(quint64 msgId, quint64 pingId); - void sendMsgsStateInfoAsync(quint64 msgId, QByteArray data); - void resendAsync(quint64 msgId, qint64 msCanWait, bool forceContainer, bool sendMsgStateInfo); - void resendManyAsync(QVector msgIds, qint64 msCanWait, bool forceContainer, bool sendMsgStateInfo); - void resendAllAsync(); - public slots: void restartNow(); @@ -112,6 +97,15 @@ private: ConnectionPointer data; int priority = 0; }; + + enum class HandleResult { + Success, + Ignored, + RestartConnection, + ResetSession, + ParseError, + }; + void connectToServer(bool afterConfig = false); void connectingTimedOut(); void doDisconnect(); @@ -135,8 +129,6 @@ private: void waitBetterFailed(); void markConnectionOld(); void sendPingByTimer(); - - // Locks _sessionDataMutex. void destroyAllConnections(); void confirmBestConnection(); @@ -151,23 +143,14 @@ private: mtpMsgId prepareToSend(SecureRequest &request, mtpMsgId currentLastId); mtpMsgId replaceMsgId(SecureRequest &request, mtpMsgId newId); - bool sendSecureRequest( - SecureRequest &&request, - bool needAnyResponse, - QReadLocker &lockFinished); + bool sendSecureRequest(SecureRequest &&request, bool needAnyResponse); mtpRequestId wasSent(mtpMsgId msgId) const; - enum class HandleResult { - Success, - Ignored, - RestartConnection, - ResetSession, - ParseError, - }; [[nodiscard]] HandleResult handleOneReceived(const mtpPrime *from, const mtpPrime *end, uint64 msgId, int32 serverTime, uint64 serverSalt, bool badTime); mtpBuffer ungzip(const mtpPrime *from, const mtpPrime *end) const; void handleMsgsStates(const QVector &ids, const QByteArray &states, QVector &acked); + // _sessionDataMutex must be locked for read. bool setState(int32 state, int32 ifState = Connection::UpdateAlways); void appendTestConnection( @@ -182,19 +165,26 @@ private: // remove msgs with such ids from sessionData->haveSent, add to sessionData->wereAcked void requestsAcked(const QVector &ids, bool byResponse = false); - void resend(quint64 msgId, qint64 msCanWait = 0, bool forceContainer = false, bool sendMsgStateInfo = false); - void resendMany(QVector msgIds, qint64 msCanWait = 0, bool forceContainer = false, bool sendMsgStateInfo = false); + void resend( + mtpMsgId msgId, + crl::time msCanWait = 0, + bool forceContainer = false, + bool sendMsgStateInfo = false); + void resendMany( + QVector msgIds, + crl::time msCanWait = 0, + bool forceContainer = false, + bool sendMsgStateInfo = false); void createDcKey(); void resetSession(); void checkAuthKey(); void authKeyChecked(); void destroyCdnKey(); - - // _sessionDataMutex must be locked for read. void clearKeyCreatorOnFail(); + void applyAuthKey(AuthKeyPtr &&key); - not_null _instance; + const not_null _instance; DcType _dcType = DcType::Regular; mutable QReadWriteLock stateConnMutex; @@ -236,8 +226,7 @@ private: AuthKeyPtr _key; uint64 _keyId = 0; - QReadWriteLock _sessionDataMutex; - SessionData *_sessionData = nullptr; + std::shared_ptr _sessionData; std::unique_ptr _connectionOptions; std::unique_ptr _keyCreator; diff --git a/Telegram/SourceFiles/mtproto/dcenter.cpp b/Telegram/SourceFiles/mtproto/dcenter.cpp index 65bd84fb99..fa2f10d8a4 100644 --- a/Telegram/SourceFiles/mtproto/dcenter.cpp +++ b/Telegram/SourceFiles/mtproto/dcenter.cpp @@ -37,8 +37,8 @@ AuthKeyPtr Dcenter::getKey() const { return _key; } -void Dcenter::destroyCdnKey(uint64 keyId) { - destroyKey(keyId); +bool Dcenter::destroyCdnKey(uint64 keyId) { + return destroyKey(keyId); } bool Dcenter::destroyConfirmedForgottenKey(uint64 keyId) { @@ -54,9 +54,6 @@ bool Dcenter::destroyKey(uint64 keyId) { } _key = nullptr; _connectionInited = false; - lock.unlock(); - - emit authKeyChanged(); return true; } @@ -86,18 +83,18 @@ void Dcenter::releaseKeyCreationOnFail() { _creatingKey = false; } -void Dcenter::releaseKeyCreationOnDone(AuthKeyPtr &&key) { +void Dcenter::releaseKeyCreationOnDone(const AuthKeyPtr &key) { Expects(_creatingKey); Expects(_key == nullptr); QWriteLocker lock(&_mutex); - DEBUG_LOG(("AuthKey Info: Dcenter::releaseKeyCreationOnDone(%1), emitting authKeyChanged, dc %2").arg(key ? key->keyId() : 0).arg(_id)); - _key = std::move(key); + DEBUG_LOG(("AuthKey Info: Dcenter::releaseKeyCreationOnDone(%1), " + "emitting authKeyChanged, dc %2" + ).arg(key ? key->keyId() : 0 + ).arg(_id)); + _key = key; _connectionInited = false; _creatingKey = false; - lock.unlock(); - - emit authKeyChanged(); } } // namespace internal diff --git a/Telegram/SourceFiles/mtproto/dcenter.h b/Telegram/SourceFiles/mtproto/dcenter.h index 002762b1cb..f935c86191 100644 --- a/Telegram/SourceFiles/mtproto/dcenter.h +++ b/Telegram/SourceFiles/mtproto/dcenter.h @@ -16,8 +16,6 @@ using AuthKeyPtr = std::shared_ptr; namespace internal { class Dcenter : public QObject { - Q_OBJECT - public: // Main thread. Dcenter(DcId dcId, AuthKeyPtr &&key); @@ -26,9 +24,9 @@ public: [[nodiscard]] DcId id() const; [[nodiscard]] AuthKeyPtr getKey() const; - void destroyCdnKey(uint64 keyId); + bool destroyCdnKey(uint64 keyId); bool destroyConfirmedForgottenKey(uint64 keyId); - void releaseKeyCreationOnDone(AuthKeyPtr &&key); + void releaseKeyCreationOnDone(const AuthKeyPtr &key); [[nodiscard]] bool connectionInited() const; void setConnectionInited(bool connectionInited = true); @@ -36,9 +34,6 @@ public: [[nodiscard]] bool acquireKeyCreation(); void releaseKeyCreationOnFail(); -signals: - void authKeyChanged(); - private: bool destroyKey(uint64 keyId); diff --git a/Telegram/SourceFiles/mtproto/mtp_instance.cpp b/Telegram/SourceFiles/mtproto/mtp_instance.cpp index c82305c6dd..320f1b77c2 100644 --- a/Telegram/SourceFiles/mtproto/mtp_instance.cpp +++ b/Telegram/SourceFiles/mtproto/mtp_instance.cpp @@ -65,7 +65,8 @@ public: void setMainDcId(DcId mainDcId); [[nodiscard]] DcId mainDcId() const; - void setKeyForWrite(DcId dcId, const AuthKeyPtr &key); + void dcKeyChanged(DcId dcId, const AuthKeyPtr &key); + [[nodiscard]] rpl::producer dcKeyChanged() const; [[nodiscard]] AuthKeysList getKeysForWrite() const; void addKeysForDestroy(AuthKeysList &&keys); @@ -208,6 +209,7 @@ private: bool _mainDcIdForced = false; base::flat_map> _dcenters; std::vector> _dcentersToDestroy; + rpl::event_stream _dcKeyChanged; Session *_mainSession = nullptr; base::flat_map> _sessions; @@ -693,7 +695,9 @@ not_null Instance::Private::getDcById( return addDc(dcId); } -void Instance::Private::setKeyForWrite(DcId dcId, const AuthKeyPtr &key) { +void Instance::Private::dcKeyChanged(DcId dcId, const AuthKeyPtr &key) { + _dcKeyChanged.fire_copy(dcId); + if (isTemporaryDcId(dcId)) { return; } @@ -710,6 +714,10 @@ void Instance::Private::setKeyForWrite(DcId dcId, const AuthKeyPtr &key) { }); } +rpl::producer Instance::Private::dcKeyChanged() const { + return _dcKeyChanged.events(); +} + AuthKeysList Instance::Private::getKeysForWrite() const { auto result = AuthKeysList(); @@ -1600,7 +1608,7 @@ void Instance::Private::keyDestroyedOnServer(DcId dcId, uint64 keyId) { if (const auto dc = findDc(dcId)) { if (dc->destroyConfirmedForgottenKey(keyId)) { LOG(("Key destroyed!")); - setKeyForWrite(dcId, nullptr); + dcKeyChanged(dcId, nullptr); } else { LOG(("Key already is different.")); } @@ -1753,10 +1761,12 @@ void Instance::logout(RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail) { _private->logout(onDone, onFail); } -void Instance::setKeyForWrite(DcId dcId, const AuthKeyPtr &key) { - InvokeQueued(this, [=] { - _private->setKeyForWrite(dcId, key); - }); +void Instance::dcKeyChanged(DcId dcId, const AuthKeyPtr &key) { + _private->dcKeyChanged(dcId, key); +} + +rpl::producer Instance::dcKeyChanged() const { + return _private->dcKeyChanged(); } AuthKeysList Instance::getKeysForWrite() const { diff --git a/Telegram/SourceFiles/mtproto/mtp_instance.h b/Telegram/SourceFiles/mtproto/mtp_instance.h index 5443745c46..bf4ddc117d 100644 --- a/Telegram/SourceFiles/mtproto/mtp_instance.h +++ b/Telegram/SourceFiles/mtproto/mtp_instance.h @@ -63,9 +63,10 @@ public: // Thread-safe. [[nodiscard]] QString deviceModel() const; [[nodiscard]] QString systemVersion() const; - void setKeyForWrite(DcId dcId, const AuthKeyPtr &key); // Main thread. + void dcKeyChanged(DcId dcId, const AuthKeyPtr &key); + [[nodiscard]] rpl::producer dcKeyChanged() const; [[nodiscard]] AuthKeysList getKeysForWrite() const; void addKeysForDestroy(AuthKeysList &&keys); @@ -110,6 +111,8 @@ public: bool isKeysDestroyer() const; void scheduleKeyDestroy(ShiftedDcId shiftedDcId); void checkIfKeyWasDestroyed(ShiftedDcId shiftedDcId); + + // Main thread. void keyDestroyedOnServer(DcId dcId, uint64 keyId); void requestConfig(); diff --git a/Telegram/SourceFiles/mtproto/session.cpp b/Telegram/SourceFiles/mtproto/session.cpp index 4ef0ecdd40..99feebb62d 100644 --- a/Telegram/SourceFiles/mtproto/session.cpp +++ b/Telegram/SourceFiles/mtproto/session.cpp @@ -64,6 +64,19 @@ ConnectionOptions::ConnectionOptions( , useTcp(useTcp) { } +template +void SessionData::withSession(Callback &&callback) { + QMutexLocker lock(&_ownerMutex); + if (const auto session = _owner) { + InvokeQueued(session, [ + session, + callback = std::forward(callback) + ] { + callback(session); + }); + } +} + void SessionData::setCurrentKeyId(uint64 keyId) { QWriteLocker locker(&_lock); if (_keyId == keyId) { @@ -136,6 +149,115 @@ void SessionData::clearForNewKey(not_null instance) { instance->clearCallbacksDelayed(std::move(clearCallbacks)); } +void SessionData::queueTryToReceive() { + withSession([](not_null session) { + session->tryToReceive(); + }); +} + +void SessionData::queueNeedToResumeAndSend() { + withSession([](not_null session) { + session->needToResumeAndSend(); + }); +} + +void SessionData::queueConnectionStateChange(int newState) { + withSession([=](not_null session) { + session->connectionStateChange(newState); + }); +} + +void SessionData::queueResendAll() { + withSession([](not_null session) { + session->resendAll(); + }); +} + +void SessionData::queueResetDone() { + withSession([](not_null session) { + session->resetDone(); + }); +} + +void SessionData::queueSendAnything(crl::time msCanWait) { + withSession([=](not_null session) { + session->sendAnything(msCanWait); + }); +} + +void SessionData::queueSendMsgsStateInfo(quint64 msgId, QByteArray data) { + withSession([=](not_null session) { + session->sendMsgsStateInfo(msgId, data); + }); +} + +void SessionData::queueResend( + mtpMsgId msgId, + crl::time msCanWait, + bool forceContainer, + bool sendMsgStateInfo) { + withSession([=](not_null session) { + session->resend(msgId, msCanWait, forceContainer, sendMsgStateInfo); + }); +} + +void SessionData::queueResendMany( + QVector msgIds, + crl::time msCanWait, + bool forceContainer, + bool sendMsgStateInfo) { + withSession([=](not_null session) { + for (const auto msgId : msgIds) { + session->resend( + msgId, + msCanWait, + forceContainer, + sendMsgStateInfo); + } + }); +} + +bool SessionData::connectionInited() const { + QMutexLocker lock(&_ownerMutex); + return _owner ? _owner->connectionInited() : false; +} + +AuthKeyPtr SessionData::getKey() const { + QMutexLocker lock(&_ownerMutex); + return _owner ? _owner->getKey() : nullptr; +} + +bool SessionData::acquireKeyCreation() { + QMutexLocker lock(&_ownerMutex); + return _owner ? _owner->acquireKeyCreation() : false; +} + +void SessionData::releaseKeyCreationOnDone(const AuthKeyPtr &key) { + QMutexLocker lock(&_ownerMutex); + if (_owner) { + _owner->releaseKeyCreationOnDone(key); + } +} + +void SessionData::releaseKeyCreationOnFail() { + QMutexLocker lock(&_ownerMutex); + if (_owner) { + _owner->releaseKeyCreationOnFail(); + } +} + +void SessionData::destroyCdnKey(uint64 keyId) { + QMutexLocker lock(&_ownerMutex); + if (_owner) { + _owner->destroyCdnKey(keyId); + } +} + +void SessionData::detach() { + QMutexLocker lock(&_ownerMutex); + _owner = nullptr; +} + Session::Session( not_null instance, ShiftedDcId shiftedDcId, @@ -145,19 +267,30 @@ Session::Session( , _shiftedDcId(shiftedDcId) , _ownedDc(dc ? nullptr : std::make_unique(shiftedDcId, nullptr)) , _dc(dc ? dc : _ownedDc.get()) -, _data(this) +, _data(std::make_shared(this)) , _timeouter([=] { checkRequestsByTimer(); }) , _sender([=] { needToResumeAndSend(); }) { _timeouter.callEach(1000); refreshOptions(); if (sharedDc()) { - connect(_dc, SIGNAL(authKeyChanged()), this, SLOT(authKeyChangedForDC()), Qt::QueuedConnection); + watchDcKeyChanges(); } } +void Session::watchDcKeyChanges() { + _instance->dcKeyChanged( + ) | rpl::filter([=](DcId dcId) { + return (dcId == _shiftedDcId) || (dcId == BareDcId(_shiftedDcId)); + }) | rpl::start_with_next([=] { + DEBUG_LOG(("AuthKey Info: Session::authKeyCreatedForDC slot, " + "emitting authKeyChanged(), dcWithShift %1").arg(_shiftedDcId)); + emit authKeyChanged(); + }, _lifetime); +} + void Session::start() { _connection = std::make_unique(_instance); - _connection->start(&_data, _shiftedDcId); + _connection->start(_data, _shiftedDcId); if (_instance->isKeysDestroyer()) { _instance->scheduleKeyDestroy(_shiftedDcId); } @@ -189,7 +322,7 @@ void Session::refreshOptions() { const auto useHttp = (proxyType != ProxyData::Type::Mtproto); const auto useIPv4 = true; const auto useIPv6 = Global::TryIPv6(); - _data.setConnectionOptions(ConnectionOptions( + _data->setConnectionOptions(ConnectionOptions( _instance->systemLangCode(), _instance->cloudLangCode(), _instance->langPackName(), @@ -222,22 +355,25 @@ void Session::stop() { void Session::kill() { stop(); _killed = true; + _data->detach(); DEBUG_LOG(("Session Info: marked session dcWithShift %1 as killed").arg(_shiftedDcId)); } void Session::unpaused() { if (_needToReceive) { _needToReceive = false; - QTimer::singleShot(0, this, SLOT(tryToReceive())); + InvokeQueued(this, [=] { + tryToReceive(); + }); } } void Session::sendDcKeyCheck(const AuthKeyPtr &key) { - _data.setKeyForCheck(key); + _data->setKeyForCheck(key); needToResumeAndSend(); } -void Session::sendAnything(qint64 msCanWait) { +void Session::sendAnything(crl::time msCanWait) { if (_killed) { DEBUG_LOG(("Session Error: can't send anything in a killed session")); return; @@ -284,12 +420,6 @@ void Session::needToResumeAndSend() { } } -void Session::sendPong(quint64 msgId, quint64 pingId) { - _instance->sendProtocolMessage( - _shiftedDcId, - MTPPong(MTP_pong(MTP_long(msgId), MTP_long(pingId)))); -} - void Session::sendMsgsStateInfo(quint64 msgId, QByteArray data) { auto info = bytes::vector(); if (!data.isEmpty()) { @@ -312,8 +442,8 @@ void Session::checkRequestsByTimer() { QVector stateRequestIds; { - QReadLocker locker(_data.haveSentMutex()); - auto &haveSent = _data.haveSentMap(); + QReadLocker locker(_data->haveSentMutex()); + auto &haveSent = _data->haveSentMap(); const auto haveSentCount = haveSent.size(); auto ms = crl::now(); for (auto i = haveSent.begin(), e = haveSent.end(); i != e; ++i) { @@ -340,9 +470,9 @@ void Session::checkRequestsByTimer() { if (stateRequestIds.size()) { DEBUG_LOG(("MTP Info: requesting state of msgs: %1").arg(LogIds(stateRequestIds))); { - QWriteLocker locker(_data.stateRequestMutex()); + QWriteLocker locker(_data->stateRequestMutex()); for (uint32 i = 0, l = stateRequestIds.size(); i < l; ++i) { - _data.stateRequestMap().insert(stateRequestIds[i], true); + _data->stateRequestMap().insert(stateRequestIds[i], true); } } sendAnything(kCheckResendWaiting); @@ -356,8 +486,8 @@ void Session::checkRequestsByTimer() { if (!removingIds.isEmpty()) { auto clearCallbacks = std::vector(); { - QWriteLocker locker(_data.haveSentMutex()); - auto &haveSent = _data.haveSentMap(); + QWriteLocker locker(_data->haveSentMutex()); + auto &haveSent = _data->haveSentMap(); for (uint32 i = 0, l = removingIds.size(); i < l; ++i) { auto j = haveSent.find(removingIds[i]); if (j != haveSent.cend()) { @@ -372,28 +502,28 @@ void Session::checkRequestsByTimer() { } } -void Session::onConnectionStateChange(qint32 newState) { +void Session::connectionStateChange(int newState) { _instance->onStateChange(_shiftedDcId, newState); } -void Session::onResetDone() { +void Session::resetDone() { _instance->onSessionReset(_shiftedDcId); } void Session::cancel(mtpRequestId requestId, mtpMsgId msgId) { if (requestId) { - QWriteLocker locker(_data.toSendMutex()); - _data.toSendMap().remove(requestId); + QWriteLocker locker(_data->toSendMutex()); + _data->toSendMap().remove(requestId); } if (msgId) { - QWriteLocker locker(_data.haveSentMutex()); - _data.haveSentMap().remove(msgId); + QWriteLocker locker(_data->haveSentMutex()); + _data->haveSentMap().remove(msgId); } } void Session::ping() { _ping = true; - sendAnything(0); + sendAnything(); } int32 Session::requestState(mtpRequestId requestId) const { @@ -419,8 +549,8 @@ int32 Session::requestState(mtpRequestId requestId) const { } if (!requestId) return MTP::RequestSent; - QWriteLocker locker(_data.toSendMutex()); - const auto &toSend = _data.toSendMap(); + QWriteLocker locker(_data->toSendMutex()); + const auto &toSend = _data->toSendMap(); const auto i = toSend.constFind(requestId); if (i != toSend.cend()) { return MTP::RequestSending; @@ -456,11 +586,15 @@ QString Session::transport() const { return _connection ? _connection->transport() : QString(); } -mtpRequestId Session::resend(quint64 msgId, qint64 msCanWait, bool forceContainer, bool sendMsgStateInfo) { +mtpRequestId Session::resend( + mtpMsgId msgId, + crl::time msCanWait, + bool forceContainer, + bool sendMsgStateInfo) { SecureRequest request; { - QWriteLocker locker(_data.haveSentMutex()); - auto &haveSent = _data.haveSentMap(); + QWriteLocker locker(_data->haveSentMutex()); + auto &haveSent = _data->haveSentMap(); auto i = haveSent.find(msgId); if (i == haveSent.end()) { @@ -493,8 +627,8 @@ mtpRequestId Session::resend(quint64 msgId, qint64 msCanWait, bool forceContaine request->msDate = forceContainer ? 0 : crl::now(); sendPrepared(request, msCanWait, false); { - QWriteLocker locker(_data.toResendMutex()); - _data.toResendMap().insert(msgId, request->requestId); + QWriteLocker locker(_data->toResendMutex()); + _data->toResendMap().insert(msgId, request->requestId); } return request->requestId; } else { @@ -502,17 +636,11 @@ mtpRequestId Session::resend(quint64 msgId, qint64 msCanWait, bool forceContaine } } -void Session::resendMany(QVector msgIds, qint64 msCanWait, bool forceContainer, bool sendMsgStateInfo) { - for (int32 i = 0, l = msgIds.size(); i < l; ++i) { - resend(msgIds.at(i), msCanWait, forceContainer, sendMsgStateInfo); - } -} - void Session::resendAll() { QVector toResend; { - QReadLocker locker(_data.haveSentMutex()); - const auto &haveSent = _data.haveSentMap(); + QReadLocker locker(_data->haveSentMutex()); + const auto &haveSent = _data->haveSentMap(); toResend.reserve(haveSent.size()); for (auto i = haveSent.cbegin(), e = haveSent.cend(); i != e; ++i) { if (i.value()->requestId) { @@ -532,8 +660,8 @@ void Session::sendPrepared( DEBUG_LOG(("MTP Info: adding request to toSendMap, msCanWait %1" ).arg(msCanWait)); { - QWriteLocker locker(_data.toSendMutex()); - _data.toSendMap().insert(request->requestId, request); + QWriteLocker locker(_data->toSendMutex()); + _data->toSendMap().insert(request->requestId, request); if (newRequest) { *(mtpMsgId*)(request->data() + 4) = 0; @@ -546,29 +674,37 @@ void Session::sendPrepared( sendAnything(msCanWait); } -void Session::authKeyChangedForDC() { - DEBUG_LOG(("AuthKey Info: Session::authKeyCreatedForDC slot, emitting authKeyChanged(), dcWithShift %1").arg(_shiftedDcId)); - emit authKeyChanged(); -} - bool Session::acquireKeyCreation() { - return _dc->acquireKeyCreation(); + Expects(!_myKeyCreation); + + if (!_dc->acquireKeyCreation()) { + return false; + } + _myKeyCreation = true; + return true; } void Session::releaseKeyCreationOnFail() { + Expects(_myKeyCreation); + _dc->releaseKeyCreationOnFail(); + _myKeyCreation = false; } -void Session::releaseKeyCreationOnDone(AuthKeyPtr &&key) { +void Session::releaseKeyCreationOnDone(const AuthKeyPtr &key) { + Expects(_myKeyCreation); + DEBUG_LOG(("AuthKey Info: Session key created, setting, dcWithShift %1").arg(_shiftedDcId)); + _dc->releaseKeyCreationOnDone(key); + _myKeyCreation = false; + if (sharedDc()) { const auto dcId = _dc->id(); const auto instance = _instance; InvokeQueued(instance, [=] { - instance->setKeyForWrite(dcId, key); + instance->dcKeyChanged(dcId, key); }); } - _dc->releaseKeyCreationOnDone(std::move(key)); } void Session::notifyDcConnectionInited() { @@ -577,12 +713,14 @@ void Session::notifyDcConnectionInited() { } void Session::destroyCdnKey(uint64 keyId) { - _dc->destroyCdnKey(keyId); + if (!_dc->destroyCdnKey(keyId)) { + return; + } if (sharedDc()) { const auto dcId = _dc->id(); const auto instance = _instance; InvokeQueued(instance, [=] { - instance->setKeyForWrite(dcId, nullptr); + instance->dcKeyChanged(dcId, nullptr); }); } } @@ -613,11 +751,11 @@ void Session::tryToReceive() { auto isUpdate = false; auto message = SerializedMessage(); { - QWriteLocker locker(_data.haveReceivedMutex()); - auto &responses = _data.haveReceivedResponses(); + QWriteLocker locker(_data->haveReceivedMutex()); + auto &responses = _data->haveReceivedResponses(); auto response = responses.begin(); if (response == responses.cend()) { - auto &updates = _data.haveReceivedUpdates(); + auto &updates = _data->haveReceivedUpdates(); auto update = updates.begin(); if (update == updates.cend()) { return; @@ -643,6 +781,9 @@ void Session::tryToReceive() { } Session::~Session() { + if (_myKeyCreation) { + releaseKeyCreationOnFail(); + } Assert(_connection == nullptr); } diff --git a/Telegram/SourceFiles/mtproto/session.h b/Telegram/SourceFiles/mtproto/session.h index cc5a03d6d4..a0075a2145 100644 --- a/Telegram/SourceFiles/mtproto/session.h +++ b/Telegram/SourceFiles/mtproto/session.h @@ -249,10 +249,8 @@ public: return _stateRequest; } - not_null owner() { - return _owner; - } - not_null owner() const { + // Warning! Valid only in constructor, _owner is guaranteed != null. + [[nodiscard]] not_null owner() { return _owner; } @@ -265,13 +263,45 @@ public: void clearForNewKey(not_null instance); + // Connection -> Session interface. + void queueTryToReceive(); + void queueNeedToResumeAndSend(); + void queueConnectionStateChange(int newState); + void queueResendAll(); + void queueResetDone(); + void queueSendAnything(crl::time msCanWait = 0); + void queueSendMsgsStateInfo(quint64 msgId, QByteArray data); + void queueResend( + mtpMsgId msgId, + crl::time msCanWait, + bool forceContainer, + bool sendMsgStateInfo); + void queueResendMany( + QVector msgIds, + crl::time msCanWait, + bool forceContainer, + bool sendMsgStateInfo); + + [[nodiscard]] bool connectionInited() const; + [[nodiscard]] AuthKeyPtr getKey() const; + [[nodiscard]] bool acquireKeyCreation(); + void releaseKeyCreationOnDone(const AuthKeyPtr &key); + void releaseKeyCreationOnFail(); + void destroyCdnKey(uint64 keyId); + + void detach(); + private: + template + void withSession(Callback &&callback); + uint64 _keyId = 0; uint64 _sessionId = 0; uint64 _salt = 0; uint32 _messagesSent = 0; - not_null _owner; + Session *_owner = nullptr; + mutable QMutex _ownerMutex; AuthKeyPtr _dcKeyForCheck; ConnectionOptions _options; @@ -327,7 +357,7 @@ public: // Connection thread. [[nodiscard]] bool acquireKeyCreation(); void releaseKeyCreationOnFail(); - void releaseKeyCreationOnDone(AuthKeyPtr &&key); + void releaseKeyCreationOnDone(const AuthKeyPtr &key); void destroyCdnKey(uint64 keyId); void notifyDcConnectionInited(); @@ -346,32 +376,29 @@ public: crl::time msCanWait = 0, bool newRequest = true); + void tryToReceive(); + void needToResumeAndSend(); + void connectionStateChange(int newState); + void resendAll(); // After connection restart. + void resetDone(); + void sendAnything(crl::time msCanWait = 0); + void sendMsgsStateInfo(quint64 msgId, QByteArray data); + mtpRequestId resend( + mtpMsgId msgId, + crl::time msCanWait = 0, + bool forceContainer = false, + bool sendMsgStateInfo = false); + signals: void authKeyChanged(); void needToSend(); void needToPing(); void needToRestart(); -public slots: - void needToResumeAndSend(); - - mtpRequestId resend(quint64 msgId, qint64 msCanWait = 0, bool forceContainer = false, bool sendMsgStateInfo = false); - void resendMany(QVector msgIds, qint64 msCanWait, bool forceContainer, bool sendMsgStateInfo); - void resendAll(); // after connection restart - - void authKeyChangedForDC(); - - void tryToReceive(); - void onConnectionStateChange(qint32 newState); - void onResetDone(); - - void sendAnything(qint64 msCanWait = 0); - void sendPong(quint64 msgId, quint64 pingId); - void sendMsgsStateInfo(quint64 msgId, QByteArray data); - private: [[nodiscard]] bool sharedDc() const; void checkRequestsByTimer(); + void watchDcKeyChanges(); bool rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err); @@ -379,15 +406,15 @@ private: const ShiftedDcId _shiftedDcId = 0; const std::unique_ptr _ownedDc; const not_null _dc; + const std::shared_ptr _data; std::unique_ptr _connection; bool _killed = false; bool _needToReceive = false; - SessionData _data; - AuthKeyPtr _dcKeyForCheck; + bool _myKeyCreation = false; crl::time _msSendCall = 0; crl::time _msWait = 0; @@ -397,6 +424,8 @@ private: base::Timer _timeouter; base::Timer _sender; + rpl::lifetime _lifetime; + }; } // namespace internal