Fix resending requests after new session.

This commit is contained in:
John Preston 2019-11-19 19:14:50 +03:00
parent e7e1c9aa5a
commit e6d3b2b098
5 changed files with 276 additions and 345 deletions

View File

@ -291,11 +291,15 @@ QString ConnectionPrivate::transport() const {
bool ConnectionPrivate::setState(int32 state, int32 ifState) {
if (ifState != Connection::UpdateAlways) {
QReadLocker lock(&stateConnMutex);
if (_state != ifState) return false;
if (_state != ifState) {
return false;
}
}
QWriteLocker lock(&stateConnMutex);
if (_state == state) return false;
if (_state == state) {
return false;
}
_state = state;
if (state < 0) {
_retryTimeout = -state;
@ -308,118 +312,14 @@ bool ConnectionPrivate::setState(int32 state, int32 ifState) {
return true;
}
void ConnectionPrivate::resetSession() { // recreate all msg_id and msg_seqno
void ConnectionPrivate::resetSession() {
MTP_LOG(_shiftedDcId, ("Resetting session!"));
_needSessionReset = false;
MTP_LOG(_shiftedDcId, ("Resetting session!"));
QWriteLocker locker1(_sessionData->haveSentMutex());
QWriteLocker locker2(_sessionData->toResendMutex());
QWriteLocker locker3(_sessionData->toSendMutex());
QWriteLocker locker4(_sessionData->wereAckedMutex());
auto &haveSent = _sessionData->haveSentMap();
auto &toResend = _sessionData->toResendMap();
auto &toSend = _sessionData->toSendMap();
auto &wereAcked = _sessionData->wereAckedMap();
auto newId = base::unixtime::mtproto_msg_id();
auto setSeqNumbers = RequestMap();
auto replaces = QMap<mtpMsgId, mtpMsgId>();
for (auto i = haveSent.cbegin(), e = haveSent.cend(); i != e; ++i) {
if (!i.value().isSentContainer()) {
if (!*(mtpMsgId*)(i.value()->constData() + 4)) continue;
mtpMsgId id = i.key();
if (id > newId) {
while (toResend.constFind(newId) != toResend.cend()
|| wereAcked.constFind(newId) != wereAcked.cend()
|| haveSent.constFind(newId) != haveSent.cend()) {
newId = base::unixtime::mtproto_msg_id();
}
MTP_LOG(_shiftedDcId, ("Replacing msgId %1 to %2!"
).arg(id
).arg(newId));
replaces.insert(id, newId);
id = newId;
*(mtpMsgId*)(i.value()->data() + 4) = id;
}
setSeqNumbers.insert(id, i.value());
}
}
// Collect all non-container requests.
for (auto i = toResend.cbegin(), e = toResend.cend(); i != e; ++i) {
const auto j = toSend.constFind(i.value());
if (j == toSend.cend()) continue;
if (!j.value().isSentContainer()) {
if (!*(mtpMsgId*)(j.value()->constData() + 4)) continue;
mtpMsgId id = i.key();
if (id > newId) {
while (toResend.constFind(newId) != toResend.cend()
|| wereAcked.constFind(newId) != wereAcked.cend()
|| haveSent.constFind(newId) != haveSent.cend()) {
newId = base::unixtime::mtproto_msg_id();
}
MTP_LOG(_shiftedDcId, ("Replacing msgId %1 to %2!"
).arg(id
).arg(newId));
replaces.insert(id, newId);
id = newId;
*(mtpMsgId*)(j.value()->data() + 4) = id;
}
setSeqNumbers.insert(id, j.value());
}
}
const auto sessionId = rand_value<uint64>();
DEBUG_LOG(("MTP Info: creating new session after bad_msg_notification, setting random server_session %1").arg(sessionId));
_sessionData->setSessionId(sessionId);
for (auto i = setSeqNumbers.cbegin(), e = setSeqNumbers.cend(); i != e; ++i) { // generate new seq_numbers
bool wasNeedAck = (*(i.value()->data() + 6) & 1);
*(i.value()->data() + 6) = _sessionData->nextRequestSeqNumber(wasNeedAck);
}
if (!replaces.isEmpty()) {
for (auto i = replaces.cbegin(), e = replaces.cend(); i != e; ++i) { // replace msgIds keys in all data structs
const auto j = haveSent.find(i.key());
if (j != haveSent.cend()) {
const auto req = j.value();
haveSent.erase(j);
haveSent.insert(i.value(), req);
}
const auto k = toResend.find(i.key());
if (k != toResend.cend()) {
const auto req = k.value();
toResend.erase(k);
toResend.insert(i.value(), req);
}
const auto l = wereAcked.find(i.key());
if (l != wereAcked.cend()) {
DEBUG_LOG(("MTP Info: Replaced %1 with %2 in wereAcked."
).arg(i.key()
).arg(i.value()));
const auto req = l.value();
wereAcked.erase(l);
wereAcked.insert(i.value(), req);
}
}
for (auto i = haveSent.cbegin(), e = haveSent.cend(); i != e; ++i) { // replace msgIds in saved containers
if (i.value().isSentContainer()) {
mtpMsgId *ids = (mtpMsgId*)(i.value()->data() + 8);
for (uint32 j = 0, l = (i.value()->size() - 8) >> 1; j < l; ++j) {
const auto k = replaces.constFind(ids[j]);
if (k != replaces.cend()) {
ids[j] = k.value();
}
}
}
}
}
DEBUG_LOG(("MTP Info: creating new session in resetSession."));
_sessionData->changeSessionId();
// #TODO move to sessionData, clear on changeSessionIdLocked.
_ackRequestData.clear();
_resendRequestData.clear();
{
@ -432,93 +332,99 @@ void ConnectionPrivate::resetSession() { // recreate all msg_id and msg_seqno
mtpMsgId ConnectionPrivate::prepareToSend(
SecureRequest &request,
mtpMsgId currentLastId) {
if (request->size() < 9) {
return 0;
}
mtpMsgId currentLastId,
bool forceNewMsgId) {
Expects(request->size() > 8);
if (const auto msgId = request.getMsgId()) {
// resending this request
QWriteLocker locker(_sessionData->toResendMutex());
QWriteLocker lock(_sessionData->toResendMutex());
auto &toResend = _sessionData->toResendMap();
const auto i = toResend.find(msgId);
if (i != toResend.cend()) {
toResend.erase(i);
}
return msgId;
lock.unlock();
return (forceNewMsgId || msgId > currentLastId)
? replaceMsgId(request, currentLastId)
: msgId;
}
request.setMsgId(currentLastId);
request.setSeqNo(_sessionData->nextRequestSeqNumber(request.needAck()));
if (request->requestId) {
MTP_LOG(_shiftedDcId, ("[r%1] msg_id 0 -> %2").arg(request->requestId).arg(currentLastId));
}
return currentLastId;
}
mtpMsgId ConnectionPrivate::replaceMsgId(SecureRequest &request, mtpMsgId newId) {
if (request->size() < 9) return 0;
Expects(request->size() > 8);
const auto oldMsgId = request.getMsgId();
if (oldMsgId != newId) {
if (oldMsgId) {
QWriteLocker locker(_sessionData->toResendMutex());
// haveSentMutex() and wereAckedMutex() were locked in tryToSend()
auto &toResend = _sessionData->toResendMap();
auto &wereAcked = _sessionData->wereAckedMap();
auto &haveSent = _sessionData->haveSentMap();
while (true) {
if (toResend.constFind(newId) == toResend.cend() && wereAcked.constFind(newId) == wereAcked.cend() && haveSent.constFind(newId) == haveSent.cend()) {
break;
}
const auto m = base::unixtime::mtproto_msg_id();
if (m <= newId) break; // wtf
newId = m;
}
const auto i = toResend.find(oldMsgId);
if (i != toResend.cend()) {
const auto req = i.value();
toResend.erase(i);
toResend.insert(newId, req);
}
const auto j = wereAcked.find(oldMsgId);
if (j != wereAcked.cend()) {
const auto req = j.value();
wereAcked.erase(j);
wereAcked.insert(newId, req);
}
const auto k = haveSent.find(oldMsgId);
if (k != haveSent.cend()) {
const auto req = k.value();
haveSent.erase(k);
haveSent.insert(newId, req);
}
for (auto l = haveSent.begin(); l != haveSent.cend(); ++l) {
const auto req = l.value();
if (req.isSentContainer()) {
const auto ids = (mtpMsgId *)(req->data() + 8);
for (uint32 i = 0, l = (req->size() - 8) >> 1; i < l; ++i) {
if (ids[i] == oldMsgId) {
ids[i] = newId;
}
}
}
}
} else {
request.setSeqNo(_sessionData->nextRequestSeqNumber(request.needAck()));
}
request.setMsgId(newId);
if (oldMsgId == newId) {
return newId;
}
QWriteLocker locker(_sessionData->toResendMutex());
// haveSentMutex() and wereAckedMutex() were locked in tryToSend()
auto &toResend = _sessionData->toResendMap();
auto &wereAcked = _sessionData->wereAckedMap();
auto &haveSent = _sessionData->haveSentMap();
while (toResend.constFind(newId) != toResend.cend()
|| wereAcked.constFind(newId) != wereAcked.cend()
|| haveSent.constFind(newId) != haveSent.cend()) {
newId = base::unixtime::mtproto_msg_id();
}
MTP_LOG(_shiftedDcId, ("[r%1] msg_id %2 -> %3").arg(request->requestId).arg(oldMsgId).arg(newId));
const auto i = toResend.find(oldMsgId);
if (i != toResend.cend()) {
const auto req = i.value();
toResend.erase(i);
toResend.insert(newId, req);
}
const auto j = wereAcked.find(oldMsgId);
if (j != wereAcked.cend()) {
const auto req = j.value();
wereAcked.erase(j);
wereAcked.insert(newId, req);
}
const auto k = haveSent.find(oldMsgId);
if (k != haveSent.cend()) {
const auto req = k.value();
haveSent.erase(k);
haveSent.insert(newId, req);
}
for (auto l = haveSent.begin(); l != haveSent.cend(); ++l) {
const auto req = l.value();
if (req.isSentContainer()) {
const auto ids = (mtpMsgId *)(req->data() + 8);
for (uint32 i = 0, l = (req->size() - 8) >> 1; i < l; ++i) {
if (ids[i] == oldMsgId) {
ids[i] = newId;
}
}
}
}
request.setMsgId(newId);
request.setSeqNo(_sessionData->nextRequestSeqNumber(request.needAck()));
return newId;
}
mtpMsgId ConnectionPrivate::placeToContainer(SecureRequest &toSendRequest, mtpMsgId &bigMsgId, mtpMsgId *&haveSentArr, SecureRequest &req) {
auto msgId = prepareToSend(req, bigMsgId);
if (msgId > bigMsgId) {
msgId = replaceMsgId(req, bigMsgId);
}
mtpMsgId ConnectionPrivate::placeToContainer(
SecureRequest &toSendRequest,
mtpMsgId &bigMsgId,
bool forceNewMsgId,
mtpMsgId *&haveSentArr,
SecureRequest &req) {
const auto msgId = prepareToSend(req, bigMsgId, forceNewMsgId);
if (msgId >= bigMsgId) {
bigMsgId = base::unixtime::mtproto_msg_id();
}
@ -551,7 +457,12 @@ void ConnectionPrivate::tryToSend() {
&& _pingSendAt <= crl::now()) {
_pingIdToSend = openssl::RandomValue<mtpPingId>();
}
const auto forceNewMsgId = sendAll
&& _sessionData->markSessionAsStarted();
if (forceNewMsgId) {
int a = 0;
}
auto pingRequest = SecureRequest();
auto ackRequest = SecureRequest();
auto resendRequest = SecureRequest();
@ -744,7 +655,8 @@ void ConnectionPrivate::tryToSend() {
const auto msgId = prepareToSend(
toSendRequest,
base::unixtime::mtproto_msg_id());
base::unixtime::mtproto_msg_id(),
forceNewMsgId);
if (pingRequest) {
_pingMsgId = msgId;
needAnyResponse = true;
@ -839,17 +751,22 @@ void ConnectionPrivate::tryToSend() {
auto haveSentArr = (mtpMsgId*)(haveSentIdsWrap->data() + 8);
if (pingRequest) {
_pingMsgId = placeToContainer(toSendRequest, bigMsgId, haveSentArr, pingRequest);
_pingMsgId = placeToContainer(
toSendRequest,
bigMsgId,
forceNewMsgId,
haveSentArr,
pingRequest);
needAnyResponse = true;
} else if (resendRequest || stateRequest || bindDcKeyRequest) {
needAnyResponse = true;
}
for (auto i = toSend.begin(), e = toSend.end(); i != e; ++i) {
auto &req = i.value();
auto msgId = prepareToSend(req, bigMsgId);
if (msgId > bigMsgId) {
msgId = replaceMsgId(req, bigMsgId);
}
const auto msgId = prepareToSend(
req,
bigMsgId,
forceNewMsgId);
if (msgId >= bigMsgId) {
bigMsgId = base::unixtime::mtproto_msg_id();
}
@ -889,21 +806,24 @@ void ConnectionPrivate::tryToSend() {
}
}
if (stateRequest) {
mtpMsgId msgId = placeToContainer(toSendRequest, bigMsgId, haveSentArr, stateRequest);
mtpMsgId msgId = placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, stateRequest);
stateRequest->msDate = 0; // 0 for state request, do not request state of it
Assert(!haveSent.contains(msgId));
haveSent.insert(msgId, stateRequest);
}
if (resendRequest) placeToContainer(toSendRequest, bigMsgId, haveSentArr, resendRequest);
if (ackRequest) placeToContainer(toSendRequest, bigMsgId, haveSentArr, ackRequest);
if (httpWaitRequest) placeToContainer(toSendRequest, bigMsgId, haveSentArr, httpWaitRequest);
if (bindDcKeyRequest) placeToContainer(toSendRequest, bigMsgId, haveSentArr, bindDcKeyRequest);
if (resendRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, resendRequest);
if (ackRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, ackRequest);
if (httpWaitRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, httpWaitRequest);
if (bindDcKeyRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, bindDcKeyRequest);
mtpMsgId contMsgId = prepareToSend(toSendRequest, bigMsgId);
*(mtpMsgId*)(haveSentIdsWrap->data() + 4) = contMsgId;
const auto containerMsgId = prepareToSend(
toSendRequest,
bigMsgId,
forceNewMsgId);
*(mtpMsgId*)(haveSentIdsWrap->data() + 4) = containerMsgId;
(*haveSentIdsWrap)[6] = 0; // for container, msDate = 0, seqNo = 0
Assert(!haveSent.contains(contMsgId));
haveSent.insert(contMsgId, haveSentIdsWrap);
Assert(!haveSent.contains(containerMsgId));
haveSent.insert(containerMsgId, haveSentIdsWrap);
toSend.clear();
}
}
@ -1050,7 +970,6 @@ void ConnectionPrivate::restart() {
if (_needSessionReset) {
resetSession();
}
_restarted = true;
if (_retryTimer.isActive()) {
return;
}
@ -1144,7 +1063,6 @@ void ConnectionPrivate::waitReceivedFailed() {
_waitForReceived *= 2;
}
doDisconnect();
_restarted = true;
if (_retryTimer.isActive()) {
return;
}
@ -1164,7 +1082,6 @@ void ConnectionPrivate::waitConnectedFailed() {
}
connectingTimedOut();
_restarted = true;
DEBUG_LOG(("MTP Info: immediate restart!"));
InvokeQueued(this, [=] { connectToServer(); });
@ -1183,9 +1100,7 @@ void ConnectionPrivate::connectingTimedOut() {
void ConnectionPrivate::doDisconnect() {
destroyAllConnections();
setState(DisconnectedState);
_restarted = false;
}
void ConnectionPrivate::finishAndDestroy() {
@ -1212,6 +1127,8 @@ void ConnectionPrivate::requestCDNConfig() {
}
void ConnectionPrivate::handleReceived() {
Expects(_temporaryKey != nullptr);
onReceivedSome();
while (!_connection->received().empty()) {
@ -1345,9 +1262,8 @@ void ConnectionPrivate::handleReceived() {
DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2, updating...").arg(serverSalt).arg(mySalt));
_sessionData->setSalt(serverSalt);
if (setState(ConnectedState, ConnectingState) && _restarted) {
_sessionData->queueResendAll();
_restarted = false;
if (setState(ConnectedState, ConnectingState)) {
_sessionData->resendAll();
}
} else {
DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2").arg(serverSalt).arg(mySalt));
@ -1393,9 +1309,10 @@ void ConnectionPrivate::handleReceived() {
}
if (res != HandleResult::Success && res != HandleResult::Ignored) {
_needSessionReset = (res == HandleResult::ResetSession);
if (res == HandleResult::DestroyTemporaryKey) {
destroyTemporaryKey();
} else if (res == HandleResult::ResetSession) {
_needSessionReset = true;
}
return restart();
}
@ -1633,11 +1550,8 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
_sessionData->setSalt(serverSalt);
base::unixtime::update(serverTime);
if (setState(ConnectedState, ConnectingState)) { // maybe only connected
if (_restarted) {
_sessionData->queueResendAll();
_restarted = false;
}
if (setState(ConnectedState, ConnectingState)) {
_sessionData->resendAll();
}
badTime = false;
@ -1863,7 +1777,6 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
return HandleResult::Ignored;
}
}
requestsAcked(ids, true);
if (typeId == mtpc_gzip_packed) {
DEBUG_LOG(("RPC Info: gzip container"));
@ -1880,12 +1793,13 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
if (DcKeyBinder::IsDestroyedTemporaryKeyError(response)) {
return HandleResult::DestroyTemporaryKey;
}
} else {
// An error could be some RPC_CALL_FAIL or other error inside
// the initConnection, so we're not sure yet that it was inited.
// Wait till a good response is received.
} else {
_sessionData->notifyConnectionInited(*_connectionOptions);
}
requestsAcked(ids, true);
if (_keyBinder) {
const auto result = _keyBinder->handleResponse(
@ -1945,11 +1859,16 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
const auto &haveSent = _sessionData->haveSentMap();
toResend.reserve(haveSent.size());
for (auto i = haveSent.cbegin(), e = haveSent.cend(); i != e; ++i) {
if (i.key() >= firstMsgId) break;
if (i.value()->requestId) toResend.push_back(i.key());
if (i.key() >= firstMsgId) {
break;
} else if (i.value()->requestId) {
toResend.push_back(i.key());
}
}
}
resendMany(toResend, 10, true);
for (const auto msgId : toResend) {
_sessionData->resend(msgId, 10, true);
}
mtpBuffer update(from - start);
if (from > start) memcpy(update.data(), start, (from - start) * sizeof(mtpPrime));
@ -2237,26 +2156,18 @@ void ConnectionPrivate::resend(
mtpMsgId msgId,
crl::time msCanWait,
bool forceContainer) {
if (msgId == _pingMsgId) {
return;
if (msgId != _pingMsgId) {
_sessionData->resend(msgId, msCanWait, forceContainer);
}
_sessionData->queueResend(msgId, msCanWait, forceContainer);
}
void ConnectionPrivate::resendMany(
QVector<mtpMsgId> msgIds,
crl::time msCanWait,
bool forceContainer) {
for (int32 i = 0, l = msgIds.size(); i < l; ++i) {
if (msgIds.at(i) == _pingMsgId) {
msgIds.remove(i);
--l;
}
for (const auto msgId : msgIds) {
resend(msgId, msCanWait, forceContainer);
}
_sessionData->queueResendMany(
std::move(msgIds),
msCanWait,
forceContainer);
}
void ConnectionPrivate::onConnected(
@ -2340,10 +2251,13 @@ void ConnectionPrivate::removeTestConnection(
}
void ConnectionPrivate::checkAuthKey() {
if (!_keyId) {
updateAuthKey();
} else {
Expects(_keyCreator == nullptr);
Expects(_keyBinder == nullptr || _keyId != 0);
if (_keyId) {
authKeyChecked();
} else {
applyAuthKey(_sessionData->getTemporaryKey());
}
}
@ -2359,23 +2273,40 @@ void ConnectionPrivate::updateAuthKey() {
void ConnectionPrivate::applyAuthKey(AuthKeyPtr &&temporaryKey) {
_temporaryKey = std::move(temporaryKey);
const auto newKeyId = _temporaryKey ? _temporaryKey->keyId() : 0;
if (newKeyId) {
if (_keyId) {
if (_keyId == newKeyId) {
return;
}
_sessionData->setCurrentKeyId(newKeyId);
_keyId = 0;
if (_sessionData->setCurrentKeyId(_keyId)) {
_ackRequestData.clear(); // #TODO move to sessionData.
_resendRequestData.clear();
{
QWriteLocker locker5(_sessionData->stateRequestMutex());
_sessionData->stateRequestMap().clear();
}
}
DEBUG_LOG(("MTP Error: auth_key id for dc %1 changed, restarting..."
).arg(_shiftedDcId));
if (_connection) {
restart();
}
return;
}
_keyId = newKeyId;
if (!_connection) {
return;
}
if (const auto already = _connection->sentEncryptedWithKeyId()) {
Assert(already != newKeyId);
DEBUG_LOG(("MTP Error: auth_key id for dc %1 changed").arg(_shiftedDcId));
restart();
return;
if (newKeyId && _sessionData->setCurrentKeyId(newKeyId)) {
_ackRequestData.clear(); // #TODO move to sessionData.
_resendRequestData.clear();
{
QWriteLocker locker5(_sessionData->stateRequestMutex());
_sessionData->stateRequestMap().clear();
}
}
_keyId = newKeyId;
Assert(!_connection->sentEncryptedWithKeyId());
DEBUG_LOG(("AuthKey Info: Connection update key from Session, dc %1 result: %2").arg(_shiftedDcId).arg(Logs::mb(&_keyId, sizeof(_keyId)).str()));
if (_keyId) {
return authKeyChecked();
@ -2470,12 +2401,8 @@ void ConnectionPrivate::authKeyChecked() {
handleReceived();
});
if (_sessionData->getSalt()) {
setState(ConnectedState);
if (_restarted) {
_sessionData->queueResendAll();
_restarted = false;
}
if (_sessionData->getSalt() && setState(ConnectedState)) {
_sessionData->resendAll();
} // else receive salt in bad_server_salt first, then try to send all the requests
_pingIdToSend = rand_value<uint64>(); // get server_salt
@ -2525,7 +2452,6 @@ void ConnectionPrivate::destroyTemporaryKey() {
if (_temporaryKey) {
_sessionData->destroyTemporaryKey(_temporaryKey->keyId());
}
_needSessionReset = true;
applyAuthKey(nullptr);
}

View File

@ -139,9 +139,13 @@ private:
mtpMsgId placeToContainer(
SecureRequest &toSendRequest,
mtpMsgId &bigMsgId,
bool forceNewMsgId,
mtpMsgId *&haveSentArr,
SecureRequest &req);
mtpMsgId prepareToSend(SecureRequest &request, mtpMsgId currentLastId);
mtpMsgId prepareToSend(
SecureRequest &request,
mtpMsgId currentLastId,
bool forceNewMsgId);
mtpMsgId replaceMsgId(SecureRequest &request, mtpMsgId newId);
bool sendSecureRequest(SecureRequest &&request, bool needAnyResponse);
@ -221,7 +225,6 @@ private:
mtpMsgId _pingMsgId = 0;
base::Timer _pingSender;
bool _restarted = false;
bool _finished = false;
AuthKeyPtr _temporaryKey;

View File

@ -1156,7 +1156,9 @@ void Instance::Private::onSessionReset(ShiftedDcId dcWithShift) {
bool Instance::Private::rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err) { // return true if need to clean request data
if (isDefaultHandledError(err)) {
if (onFail && (*onFail)(requestId, err)) return true;
if (onFail && (*onFail)(requestId, err)) {
return true;
}
}
if (onErrorDefault(requestId, err)) {

View File

@ -26,10 +26,6 @@ constexpr auto kCheckResendTimeout = crl::time(10000);
// when resending request or checking its state.
constexpr auto kCheckResendWaiting = crl::time(1000);
// How much ints should message contain for us not to resend,
// but instead to check its state.
constexpr auto kResendThreshold = 1;
// Container lives 10 minutes in haveSent map.
constexpr auto kContainerLives = 600;
@ -76,15 +72,51 @@ void SessionData::withSession(Callback &&callback) {
}
}
void SessionData::setCurrentKeyId(uint64 keyId) {
bool SessionData::setCurrentKeyId(uint64 keyId) {
QWriteLocker locker(&_lock);
if (_keyId == keyId) {
return;
return false;
}
_keyId = keyId;
_sessionId = openssl::RandomValue<uint64>();
DEBUG_LOG(("MTP Info: auth key set in SessionData, id %1").arg(keyId));
changeSessionIdLocked();
return true;
}
void SessionData::changeSessionId() {
QWriteLocker locker(&_lock);
changeSessionIdLocked();
}
void SessionData::changeSessionIdLocked() {
auto sessionId = _sessionId;
do {
sessionId = openssl::RandomValue<uint64>();
} while (_sessionId == sessionId);
DEBUG_LOG(("MTP Info: setting server_session: %1").arg(sessionId));
_sessionId = sessionId;
_messagesSent = 0;
DEBUG_LOG(("MTP Info: new auth key set in SessionData, id %1, setting random server_session %2").arg(_keyId).arg(_sessionId));
_sessionMarkedAsStarted = false;
}
uint32 SessionData::nextRequestSeqNumber(bool needAck) {
QWriteLocker locker(&_lock);
auto result = _messagesSent;
_messagesSent += (needAck ? 1 : 0);
return result * 2 + (needAck ? 1 : 0);
}
bool SessionData::markSessionAsStarted() {
QWriteLocker locker(&_lock);
if (_sessionMarkedAsStarted) {
return false;
}
_sessionMarkedAsStarted = true;
return true;
}
void SessionData::setKeyForCheck(const AuthKeyPtr &key) {
@ -166,12 +198,6 @@ void SessionData::queueConnectionStateChange(int newState) {
});
}
void SessionData::queueResendAll() {
withSession([](not_null<Session*> session) {
session->resendAll();
});
}
void SessionData::queueResetDone() {
withSession([](not_null<Session*> session) {
session->resetDone();
@ -190,24 +216,21 @@ void SessionData::queueSendMsgsStateInfo(quint64 msgId, QByteArray data) {
});
}
void SessionData::queueResend(
void SessionData::resend(
mtpMsgId msgId,
crl::time msCanWait,
bool forceContainer) {
withSession([=](not_null<Session*> session) {
session->resend(msgId, msCanWait, forceContainer);
});
QMutexLocker lock(&_ownerMutex);
if (_owner) {
_owner->resend(msgId, msCanWait, forceContainer);
}
}
void SessionData::queueResendMany(
QVector<mtpMsgId> msgIds,
crl::time msCanWait,
bool forceContainer) {
withSession([=](not_null<Session*> session) {
for (const auto msgId : msgIds) {
session->resend(msgId, msCanWait, forceContainer);
}
});
void SessionData::resendAll() {
QMutexLocker lock(&_ownerMutex);
if (_owner) {
_owner->resendAll();
}
}
bool SessionData::connectionInited() const {
@ -437,7 +460,6 @@ bool Session::sharedDc() const {
}
void Session::checkRequestsByTimer() {
QVector<mtpMsgId> resendingIds;
QVector<mtpMsgId> removingIds; // remove very old (10 minutes) containers and resend requests
QVector<mtpMsgId> stateRequestIds;
@ -450,14 +472,9 @@ void Session::checkRequestsByTimer() {
auto &req = i.value();
if (req->msDate > 0) {
if (req->msDate + kCheckResendTimeout < ms) { // need to resend or check state
if (req.messageSize() < kResendThreshold) { // resend
resendingIds.reserve(haveSentCount);
resendingIds.push_back(i.key());
} else {
req->msDate = ms;
stateRequestIds.reserve(haveSentCount);
stateRequestIds.push_back(i.key());
}
req->msDate = ms;
stateRequestIds.reserve(haveSentCount);
stateRequestIds.push_back(i.key());
}
} else if (base::unixtime::now()
> int32(i.key() >> 32) + kContainerLives) {
@ -477,12 +494,6 @@ void Session::checkRequestsByTimer() {
}
sendAnything(kCheckResendWaiting);
}
if (!resendingIds.isEmpty()) {
for (uint32 i = 0, l = resendingIds.size(); i < l; ++i) {
DEBUG_LOG(("MTP Info: resending request %1").arg(resendingIds[i]));
resend(resendingIds[i], kCheckResendWaiting);
}
}
if (!removingIds.isEmpty()) {
auto clearCallbacks = std::vector<RPCCallbackClear>();
{
@ -586,40 +597,35 @@ QString Session::transport() const {
return _connection ? _connection->transport() : QString();
}
mtpRequestId Session::resend(
void Session::resend(
mtpMsgId msgId,
crl::time msCanWait,
bool forceContainer) {
SecureRequest request;
{
QWriteLocker locker(_data->haveSentMutex());
auto &haveSent = _data->haveSentMap();
auto lock = QWriteLocker(_data->haveSentMutex());
auto &haveSent = _data->haveSentMap();
auto i = haveSent.find(msgId);
if (i == haveSent.end()) {
return 0;
}
request = i.value();
haveSent.erase(i);
auto i = haveSent.find(msgId);
if (i == haveSent.end()) {
return;
}
if (request.isSentContainer()) { // for container just resend all messages we can
auto request = i.value();
haveSent.erase(i);
lock.unlock();
// For container just resend all messages we can.
if (request.isSentContainer()) {
DEBUG_LOG(("Message Info: resending container from haveSent, msgId %1").arg(msgId));
const mtpMsgId *ids = (const mtpMsgId *)(request->constData() + 8);
for (uint32 i = 0, l = (request->size() - 8) >> 1; i < l; ++i) {
resend(ids[i], 10, true);
}
return 0xFFFFFFFF;
} else if (!request.isStateRequest()) {
request->msDate = forceContainer ? 0 : crl::now();
sendPrepared(request, msCanWait, false);
{
QWriteLocker locker(_data->toResendMutex());
_data->toResendMap().insert(msgId, request->requestId);
}
return request->requestId;
} else {
return 0;
sendPrepared(request, msCanWait, false);
}
}
@ -636,8 +642,11 @@ void Session::resendAll() {
}
}
for (uint32 i = 0, l = toResend.size(); i < l; ++i) {
resend(toResend[i], 10, true);
resend(toResend[i], -1, true);
}
InvokeQueued(this, [=] {
sendAnything();
});
}
void Session::sendPrepared(
@ -657,8 +666,11 @@ void Session::sendPrepared(
}
DEBUG_LOG(("MTP Info: added, requestId %1").arg(request->requestId));
sendAnything(msCanWait);
if (msCanWait >= 0) {
InvokeQueued(this, [=] {
sendAnything(msCanWait);
});
}
}
bool Session::acquireKeyCreation() {

View File

@ -140,16 +140,8 @@ public:
SessionData(not_null<Session*> creator) : _owner(creator) {
}
void setCurrentKeyId(uint64 keyId);
void setSessionId(uint64 sessionId) {
DEBUG_LOG(("MTP Info: setting server_session: %1").arg(sessionId));
QWriteLocker locker(&_lock);
if (_sessionId != sessionId) {
_sessionId = sessionId;
_messagesSent = 0;
}
}
bool setCurrentKeyId(uint64 keyId);
void changeSessionId();
[[nodiscard]] uint64 getSessionId() const {
QReadLocker locker(&_lock);
return _sessionId;
@ -254,12 +246,8 @@ public:
return _owner;
}
uint32 nextRequestSeqNumber(bool needAck = true) {
QWriteLocker locker(&_lock);
auto result = _messagesSent;
_messagesSent += (needAck ? 1 : 0);
return result * 2 + (needAck ? 1 : 0);
}
[[nodiscard]] bool markSessionAsStarted();
[[nodiscard]] uint32 nextRequestSeqNumber(bool needAck);
void clearForNewKey(not_null<Instance*> instance);
@ -267,18 +255,9 @@ public:
void queueTryToReceive();
void queueNeedToResumeAndSend();
void queueConnectionStateChange(int newState);
void queueResendAll();
void queueResetDone();
void queueSendAnything(crl::time msCanWait = 0);
void queueSendMsgsStateInfo(quint64 msgId, QByteArray data);
void queueResend(
mtpMsgId msgId,
crl::time msCanWait,
bool forceContainer);
void queueResendMany(
QVector<mtpMsgId> msgIds,
crl::time msCanWait,
bool forceContainer);
[[nodiscard]] bool connectionInited() const;
[[nodiscard]] AuthKeyPtr getPersistentKey() const;
@ -289,10 +268,17 @@ public:
const AuthKeyPtr &persistentKey);
void releaseKeyCreationOnFail();
void destroyTemporaryKey(uint64 keyId);
void resend(
mtpMsgId msgId,
crl::time msCanWait,
bool forceContainer);
void resendAll();
void detach();
private:
void changeSessionIdLocked();
template <typename Callback>
void withSession(Callback &&callback);
@ -300,6 +286,7 @@ private:
uint64 _sessionId = 0;
uint64 _salt = 0;
uint32 _messagesSent = 0;
bool _sessionMarkedAsStarted = false;
Session *_owner = nullptr;
mutable QMutex _ownerMutex;
@ -355,6 +342,18 @@ public:
[[nodiscard]] AuthKeyPtr getPersistentKey() const;
[[nodiscard]] AuthKeyPtr getTemporaryKey() const;
[[nodiscard]] bool connectionInited() const;
void resend(
mtpMsgId msgId,
crl::time msCanWait = 0,
bool forceContainer = false);
void resendAll();
// Thread-safe.
// Nulls msgId and seqNo in request, if newRequest = true.
void sendPrepared(
const SecureRequest &request,
crl::time msCanWait = 0,
bool newRequest = true);
// Connection thread.
[[nodiscard]] bool acquireKeyCreation();
@ -374,23 +373,12 @@ public:
void sendDcKeyCheck(const AuthKeyPtr &key);
// Nulls msgId and seqNo in request, if newRequest = true.
void sendPrepared(
const SecureRequest &request,
crl::time msCanWait = 0,
bool newRequest = true);
void tryToReceive();
void needToResumeAndSend();
void connectionStateChange(int newState);
void resendAll(); // After connection restart.
void resetDone();
void sendAnything(crl::time msCanWait = 0);
void sendMsgsStateInfo(quint64 msgId, QByteArray data);
mtpRequestId resend(
mtpMsgId msgId,
crl::time msCanWait = 0,
bool forceContainer = false);
signals:
void authKeyChanged();