Fix lost bind key messages.

This commit is contained in:
John Preston 2019-11-22 12:06:48 +03:00
parent 2597bc9f4e
commit 46a42e02bc
8 changed files with 255 additions and 431 deletions

View File

@ -443,13 +443,10 @@ mtpMsgId ConnectionPrivate::prepareToSend(
if (const auto msgId = request.getMsgId()) {
// resending this request
QWriteLocker lock(_sessionData->toResendMutex());
auto &toResend = _sessionData->toResendMap();
const auto i = toResend.find(msgId);
if (i != toResend.cend()) {
toResend.erase(i);
const auto i = _resendingIds.find(msgId);
if (i != _resendingIds.cend()) {
_resendingIds.erase(i);
}
lock.unlock();
return (forceNewMsgId || msgId > currentLastId)
? replaceMsgId(request, currentLastId)
@ -470,33 +467,29 @@ mtpMsgId ConnectionPrivate::replaceMsgId(SecureRequest &request, mtpMsgId newId)
if (oldMsgId == newId) {
return newId;
}
QWriteLocker locker(_sessionData->toResendMutex());
// haveSentMutex() and wereAckedMutex() were locked in tryToSend()
auto &toResend = _sessionData->toResendMap();
auto &wereAcked = _sessionData->wereAckedMap();
// haveSentMutex() was locked in tryToSend()
auto &haveSent = _sessionData->haveSentMap();
while (toResend.constFind(newId) != toResend.cend()
|| wereAcked.constFind(newId) != wereAcked.cend()
while (_resendingIds.contains(newId)
|| _ackedIds.contains(newId)
|| haveSent.constFind(newId) != haveSent.cend()) {
newId = base::unixtime::mtproto_msg_id();
}
MTP_LOG(_shiftedDcId, ("[r%1] msg_id %2 -> %3").arg(request->requestId).arg(oldMsgId).arg(newId));
const auto i = toResend.find(oldMsgId);
if (i != toResend.cend()) {
const auto req = i.value();
toResend.erase(i);
toResend.insert(newId, req);
const auto i = _resendingIds.find(oldMsgId);
if (i != _resendingIds.end()) {
const auto requestId = i->second;
_resendingIds.erase(i);
_resendingIds.emplace(newId, requestId);
}
const auto j = wereAcked.find(oldMsgId);
if (j != wereAcked.cend()) {
const auto req = j.value();
wereAcked.erase(j);
wereAcked.insert(newId, req);
const auto j = _ackedIds.find(oldMsgId);
if (j != _ackedIds.cend()) {
const auto requestId = j->second;
_ackedIds.erase(j);
_ackedIds.emplace(newId, requestId);
}
const auto k = haveSent.find(oldMsgId);
@ -618,14 +611,14 @@ void ConnectionPrivate::tryToSend() {
}
stateRequest = SecureRequest::Serialize(MTPMsgsStateReq(
MTP_msgs_state_req(MTP_vector<MTPlong>(ids))));
// Add to haveSent / wereAcked maps, but don't add to requestMap.
// Add to haveSent / _ackedIds, but don't add to requestMap.
stateRequest->requestId = GetNextRequestId();
}
if (_connection->usingHttpWait()) {
httpWaitRequest = SecureRequest::Serialize(MTPHttpWait(
MTP_http_wait(MTP_int(100), MTP_int(30), MTP_int(25000))));
}
if (_keyCreator && _keyCreator->bindReadyToRequest()) {
if (!_bindMsgId && _keyCreator && _keyCreator->readyToBind()) {
bindDcKeyRequest = _keyCreator->prepareBindRequest(
_encryptionKey,
_sessionId);
@ -635,22 +628,6 @@ void ConnectionPrivate::tryToSend() {
// seqNo for it manually here.
bindDcKeyRequest.setSeqNo(
nextRequestSeqNumber(bindDcKeyRequest.needAck()));
//} else if (!_keyChecker) {
// if (const auto &keyForCheck = _sessionData->getKeyForCheck()) {
// _keyChecker = std::make_unique<details::DcKeyChecker>(
// _instance,
// _shiftedDcId,
// keyForCheck);
// bindDcKeyRequest = _keyChecker->prepareRequest(
// _encryptionKey,
// _sessionId);
// // This is a special request with msgId used inside the message
// // body, so it is prepared already with a msgId and we place
// // seqNo for it manually here.
// bindDcKeyRequest.setSeqNo(
// nextRequestSeqNumber(bindDcKeyRequest.needAck()));
// }
}
}
@ -750,8 +727,11 @@ void ConnectionPrivate::tryToSend() {
const auto msgId = prepareToSend(
toSendRequest,
base::unixtime::mtproto_msg_id(),
forceNewMsgId);
if (pingRequest) {
forceNewMsgId && !bindDcKeyRequest);
if (bindDcKeyRequest) {
_bindMsgId = msgId;
needAnyResponse = true;
} else if (pingRequest) {
_pingMsgId = msgId;
needAnyResponse = true;
} else if (resendRequest || stateRequest) {
@ -792,8 +772,7 @@ void ConnectionPrivate::tryToSend() {
needAnyResponse = true;
} else {
QWriteLocker locker3(_sessionData->wereAckedMutex());
_sessionData->wereAckedMap().insert(msgId, toSendRequest->requestId);
_ackedIds.emplace(msgId, toSendRequest->requestId);
}
}
} else { // send in container
@ -833,10 +812,6 @@ void ConnectionPrivate::tryToSend() {
QWriteLocker locker2(_sessionData->haveSentMutex());
auto &haveSent = _sessionData->haveSentMap();
// the fact of this lock is used in replaceMsgId()
QWriteLocker locker3(_sessionData->wereAckedMutex());
auto &wereAcked = _sessionData->wereAckedMap();
// prepare "request-like" wrap for msgId vector
auto haveSentIdsWrap = SecureRequest::Prepare(idsWrapSize);
haveSentIdsWrap->msDate = 0; // Container: msDate = 0, seqNo = 0.
@ -844,6 +819,15 @@ void ConnectionPrivate::tryToSend() {
haveSentIdsWrap->resize(haveSentIdsWrap->size() + idsWrapSize);
auto haveSentArr = (mtpMsgId*)(haveSentIdsWrap->data() + 8);
if (bindDcKeyRequest) {
_bindMsgId = placeToContainer(
toSendRequest,
bigMsgId,
false,
haveSentArr,
bindDcKeyRequest);
needAnyResponse = true;
}
if (pingRequest) {
_pingMsgId = placeToContainer(
toSendRequest,
@ -852,7 +836,8 @@ void ConnectionPrivate::tryToSend() {
haveSentArr,
pingRequest);
needAnyResponse = true;
} else if (resendRequest || stateRequest || bindDcKeyRequest) {
}
if (resendRequest || stateRequest) {
needAnyResponse = true;
}
for (auto i = toSend.begin(), e = toSend.end(); i != e; ++i) {
@ -890,7 +875,7 @@ void ConnectionPrivate::tryToSend() {
needAnyResponse = true;
} else {
wereAcked.insert(msgId, req->requestId);
_ackedIds.emplace(msgId, req->requestId);
}
}
if (!added) {
@ -908,7 +893,6 @@ void ConnectionPrivate::tryToSend() {
if (resendRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, resendRequest);
if (ackRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, ackRequest);
if (httpWaitRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, httpWaitRequest);
if (bindDcKeyRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, bindDcKeyRequest);
const auto containerMsgId = prepareToSend(
toSendRequest,
@ -1044,6 +1028,7 @@ void ConnectionPrivate::connectToServer(bool afterConfig) {
setState(ConnectingState);
_bindMsgId = 0;
_pingId = _pingMsgId = _pingIdToSend = _pingSendAt = 0;
_pingSender.cancel();
@ -1352,7 +1337,7 @@ void ConnectionPrivate::handleReceived() {
_sessionSalt = serverSalt;
if (setState(ConnectedState, ConnectingState)) {
_sessionData->resendAll();
resendAll();
}
} else {
DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2").arg(serverSalt).arg(_sessionSalt));
@ -1416,10 +1401,16 @@ void ConnectionPrivate::handleReceived() {
}
}
ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPrime *from, const mtpPrime *end, uint64 msgId, int32 serverTime, uint64 serverSalt, bool badTime) {
const auto cons = mtpTypeId(*from);
ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(
const mtpPrime *from,
const mtpPrime *end,
uint64 msgId,
int32 serverTime,
uint64 serverSalt,
bool badTime) {
Expects(from < end);
switch (cons) {
switch (mtpTypeId(*from)) {
case mtpc_gzip_packed: {
DEBUG_LOG(("Message Info: gzip container"));
@ -1495,18 +1486,15 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
if (!msg.read(from, end)) {
return HandleResult::ParseError;
}
auto &ids = msg.c_msgs_ack().vmsg_ids().v;
uint32 idsCount = ids.size();
const auto &ids = msg.c_msgs_ack().vmsg_ids().v;
DEBUG_LOG(("Message Info: acks received, ids: %1"
).arg(LogIdsVector(ids)));
if (ids.isEmpty()) {
return badTime ? HandleResult::Ignored : HandleResult::Success;
}
DEBUG_LOG(("Message Info: acks received, ids: %1").arg(LogIdsVector(ids)));
if (!idsCount) return (badTime ? HandleResult::Ignored : HandleResult::Success);
if (badTime) {
if (requestsFixTimeSalt(ids, serverTime, serverSalt)) {
badTime = false;
} else {
return HandleResult::Ignored;
}
if (badTime && !requestsFixTimeSalt(ids, serverTime, serverSalt)) {
return HandleResult::Ignored;
}
requestsAcked(ids);
} return HandleResult::Success;
@ -1519,11 +1507,8 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
const auto &data(msg.c_bad_msg_notification());
LOG(("Message Info: bad message notification received (error_code %3) for msg_id = %1, seq_no = %2").arg(data.vbad_msg_id().v).arg(data.vbad_msg_seqno().v).arg(data.verror_code().v));
mtpMsgId resendId = data.vbad_msg_id().v;
if (resendId == _pingMsgId) {
_pingId = 0;
}
int32 errorCode = data.verror_code().v;
const auto resendId = data.vbad_msg_id().v;
const auto errorCode = data.verror_code().v;
if (false
|| errorCode == 16
|| errorCode == 17
@ -1619,13 +1604,11 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
if (!msg.read(from, end)) {
return HandleResult::ParseError;
}
const auto &data(msg.c_bad_server_salt());
const auto &data = msg.c_bad_server_salt();
DEBUG_LOG(("Message Info: bad server salt received (error_code %4) for msg_id = %1, seq_no = %2, new salt: %3").arg(data.vbad_msg_id().v).arg(data.vbad_msg_seqno().v).arg(data.vnew_server_salt().v).arg(data.verror_code().v));
mtpMsgId resendId = data.vbad_msg_id().v;
if (resendId == _pingMsgId) {
_pingId = 0;
} else if (!wasSent(resendId)) {
const auto resendId = data.vbad_msg_id().v;
if (!wasSent(resendId)) {
DEBUG_LOG(("Message Error: such message was not sent recently %1").arg(resendId));
return (badTime ? HandleResult::Ignored : HandleResult::Success);
}
@ -1634,7 +1617,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
base::unixtime::update(serverTime);
if (setState(ConnectedState, ConnectingState)) {
_sessionData->resendAll();
resendAll();
}
badTime = false;
@ -1661,11 +1644,6 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
{
const auto minRecv = _receivedMessageIds.min();
const auto maxRecv = _receivedMessageIds.max();
QReadLocker locker(_sessionData->wereAckedMutex());
const auto &wereAcked = _sessionData->wereAckedMap();
const auto wereAckedEnd = wereAcked.cend();
for (uint32 i = 0, l = idsCount; i < l; ++i) {
char state = 0;
uint64 reqMsgId = ids[i].v;
@ -1679,7 +1657,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
state |= 0x02;
} else {
state |= 0x04;
if (wereAcked.constFind(reqMsgId) != wereAckedEnd) {
if (_ackedIds.contains(reqMsgId)) {
state |= 0x80; // we know, that server knows, that we received request
}
if (msgIdState == ReceivedIdsManager::State::NeedsAck) { // need ack, so we sent ack
@ -1837,20 +1815,21 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
if (!reqMsgId.read(++from, end)) {
return HandleResult::ParseError;
}
mtpTypeId typeId = from[0];
const auto requestMsgId = reqMsgId.v;
DEBUG_LOG(("RPC Info: response received for %1, queueing...").arg(reqMsgId.v));
DEBUG_LOG(("RPC Info: response received for %1, queueing...").arg(requestMsgId));
QVector<MTPlong> ids(1, reqMsgId);
if (badTime) {
if (requestsFixTimeSalt(ids, serverTime, serverSalt)) {
badTime = false;
} else {
DEBUG_LOG(("Message Info: error, such message was not sent recently %1").arg(reqMsgId.v));
DEBUG_LOG(("Message Info: error, such message was not sent recently %1").arg(requestMsgId));
return HandleResult::Ignored;
}
}
mtpTypeId typeId = from[0];
if (typeId == mtpc_gzip_packed) {
DEBUG_LOG(("RPC Info: gzip container"));
response = ungzip(++from, end);
@ -1874,36 +1853,17 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
}
requestsAcked(ids, true);
if (_keyCreator) {
const auto result = _keyCreator->handleBindResponse(
reqMsgId,
response);
switch (result) {
case DcKeyBindState::Success:
if (!_sessionData->releaseKeyCreationOnDone(
_encryptionKey,
base::take(_keyCreator)->bindPersistentKey())) {
return HandleResult::DestroyTemporaryKey;
}
_sessionData->queueNeedToResumeAndSend();
return HandleResult::Success;
case DcKeyBindState::DefinitelyDestroyed:
if (destroyOldEnoughPersistentKey()) {
return HandleResult::DestroyTemporaryKey;
}
[[fallthrough]];
case DcKeyBindState::Failed:
_sessionData->queueNeedToResumeAndSend();
return HandleResult::Success;
}
const auto bindResult = handleBindResponse(requestMsgId, response);
if (bindResult != HandleResult::Ignored) {
return bindResult;
}
auto requestId = wasSent(reqMsgId.v);
const auto requestId = wasSent(requestMsgId);
if (requestId && requestId != mtpRequestId(0xFFFFFFFF)) {
// Save rpc_result for processing in the main thread.
QWriteLocker locker(_sessionData->haveReceivedMutex());
_sessionData->haveReceivedResponses().insert(requestId, response);
} else {
DEBUG_LOG(("RPC Info: requestId not found for msgId %1").arg(reqMsgId.v));
DEBUG_LOG(("RPC Info: requestId not found for msgId %1").arg(requestMsgId));
}
} return HandleResult::Success;
@ -1942,7 +1902,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
}
}
for (const auto msgId : toResend) {
_sessionData->resend(msgId, 10, true);
resend(msgId, 10, true);
}
mtpBuffer update(from - start);
@ -1991,22 +1951,13 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
if (_currentDcType == DcType::Regular) {
mtpBuffer update(end - from);
if (end > from) memcpy(update.data(), from, (end - from) * sizeof(mtpPrime));
if (end > from) {
memcpy(update.data(), from, (end - from) * sizeof(mtpPrime));
}
// Notify main process about the new updates.
QWriteLocker locker(_sessionData->haveReceivedMutex());
_sessionData->haveReceivedUpdates().push_back(SerializedMessage(update));
if (cons != mtpc_updatesTooLong
&& cons != mtpc_updateShortMessage
&& cons != mtpc_updateShortChatMessage
&& cons != mtpc_updateShortSentMessage
&& cons != mtpc_updateShort
&& cons != mtpc_updatesCombined
&& cons != mtpc_updates) {
// Maybe some new unknown update?
LOG(("Message Error: unknown constructor 0x%1").arg(cons, 0, 16));
}
} else {
LOG(("Message Error: unexpected updates in dcType: %1"
).arg(static_cast<int>(_currentDcType)));
@ -2015,6 +1966,34 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
return HandleResult::Success;
}
ConnectionPrivate::HandleResult ConnectionPrivate::handleBindResponse(
mtpMsgId requestMsgId,
const mtpBuffer &response) {
if (!_keyCreator || !_bindMsgId || _bindMsgId != requestMsgId) {
return HandleResult::Ignored;
}
const auto result = _keyCreator->handleBindResponse(response);
switch (result) {
case DcKeyBindState::Success:
if (!_sessionData->releaseKeyCreationOnDone(
_encryptionKey,
base::take(_keyCreator)->bindPersistentKey())) {
return HandleResult::DestroyTemporaryKey;
}
_sessionData->queueNeedToResumeAndSend();
return HandleResult::Success;
case DcKeyBindState::DefinitelyDestroyed:
if (destroyOldEnoughPersistentKey()) {
return HandleResult::DestroyTemporaryKey;
}
[[fallthrough]];
case DcKeyBindState::Failed:
_sessionData->queueNeedToResumeAndSend();
return HandleResult::Success;
}
Unexpected("Result of BoundKeyCreator::handleBindResponse.");
}
mtpBuffer ConnectionPrivate::ungzip(const mtpPrime *from, const mtpPrime *end) const {
mtpBuffer result; // * 4 because of mtpPrime type
result.resize(0);
@ -2090,87 +2069,80 @@ void ConnectionPrivate::requestsAcked(const QVector<MTPlong> &ids, bool byRespon
auto clearedBecauseTooOld = std::vector<RPCCallbackClear>();
QVector<MTPlong> toAckMore;
{
QWriteLocker locker1(_sessionData->wereAckedMutex());
auto &wereAcked = _sessionData->wereAckedMap();
QWriteLocker locker2(_sessionData->haveSentMutex());
auto &haveSent = _sessionData->haveSentMap();
{
QWriteLocker locker2(_sessionData->haveSentMutex());
auto &haveSent = _sessionData->haveSentMap();
for (uint32 i = 0; i < idsCount; ++i) {
mtpMsgId msgId = ids[i].v;
const auto req = haveSent.find(msgId);
if (req != haveSent.cend()) {
if (!req.value()->msDate) {
DEBUG_LOG(("Message Info: container ack received, msgId %1").arg(ids[i].v));
uint32 inContCount = ((*req)->size() - 8) / 2;
const mtpMsgId *inContId = (const mtpMsgId *)(req.value()->constData() + 8);
toAckMore.reserve(toAckMore.size() + inContCount);
for (uint32 j = 0; j < inContCount; ++j) {
toAckMore.push_back(MTP_long(*(inContId++)));
}
for (uint32 i = 0; i < idsCount; ++i) {
mtpMsgId msgId = ids[i].v;
const auto req = haveSent.find(msgId);
if (req != haveSent.cend()) {
if (!req.value()->msDate) {
DEBUG_LOG(("Message Info: container ack received, msgId %1").arg(ids[i].v));
uint32 inContCount = ((*req)->size() - 8) / 2;
const mtpMsgId *inContId = (const mtpMsgId *)(req.value()->constData() + 8);
toAckMore.reserve(toAckMore.size() + inContCount);
for (uint32 j = 0; j < inContCount; ++j) {
toAckMore.push_back(MTP_long(*(inContId++)));
}
haveSent.erase(req);
} else {
mtpRequestId reqId = req.value()->requestId;
bool moveToAcked = byResponse;
if (!moveToAcked) { // ignore ACK, if we need a response (if we have a handler)
moveToAcked = !_instance->hasCallbacks(reqId);
}
if (moveToAcked) {
_ackedIds.emplace(msgId, reqId);
haveSent.erase(req);
} else {
mtpRequestId reqId = req.value()->requestId;
bool moveToAcked = byResponse;
if (!moveToAcked) { // ignore ACK, if we need a response (if we have a handler)
moveToAcked = !_instance->hasCallbacks(reqId);
}
if (moveToAcked) {
wereAcked.insert(msgId, reqId);
haveSent.erase(req);
DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(reqId));
}
}
} else {
DEBUG_LOG(("Message Info: msgId %1 was not found in recent sent, while acking requests, searching in resend...").arg(msgId));
const auto reqIt = _resendingIds.find(msgId);
if (reqIt != _resendingIds.end()) {
const auto reqId = reqIt->second;
bool moveToAcked = byResponse;
if (!moveToAcked) { // ignore ACK, if we need a response (if we have a handler)
moveToAcked = !_instance->hasCallbacks(reqId);
}
if (moveToAcked) {
QWriteLocker locker4(_sessionData->toSendMutex());
auto &toSend = _sessionData->toSendMap();
const auto req = toSend.find(reqId);
if (req != toSend.cend()) {
_ackedIds.emplace(msgId, req.value()->requestId);
if (req.value()->requestId != reqId) {
DEBUG_LOG(("Message Error: for msgId %1 found resent request, requestId %2, contains requestId %3").arg(msgId).arg(reqId).arg(req.value()->requestId));
} else {
DEBUG_LOG(("Message Info: acked msgId %1 that was prepared to resend, requestId %2").arg(msgId).arg(reqId));
}
toSend.erase(req);
} else {
DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(reqId));
DEBUG_LOG(("Message Info: msgId %1 was found in recent resent, requestId %2 was not found in prepared to send").arg(msgId));
}
_resendingIds.erase(reqIt);
} else {
DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(reqId));
}
} else {
DEBUG_LOG(("Message Info: msgId %1 was not found in recent sent, while acking requests, searching in resend...").arg(msgId));
QWriteLocker locker3(_sessionData->toResendMutex());
auto &toResend = _sessionData->toResendMap();
const auto reqIt = toResend.find(msgId);
if (reqIt != toResend.cend()) {
const auto reqId = reqIt.value();
bool moveToAcked = byResponse;
if (!moveToAcked) { // ignore ACK, if we need a response (if we have a handler)
moveToAcked = !_instance->hasCallbacks(reqId);
}
if (moveToAcked) {
QWriteLocker locker4(_sessionData->toSendMutex());
auto &toSend = _sessionData->toSendMap();
const auto req = toSend.find(reqId);
if (req != toSend.cend()) {
wereAcked.insert(msgId, req.value()->requestId);
if (req.value()->requestId != reqId) {
DEBUG_LOG(("Message Error: for msgId %1 found resent request, requestId %2, contains requestId %3").arg(msgId).arg(reqId).arg(req.value()->requestId));
} else {
DEBUG_LOG(("Message Info: acked msgId %1 that was prepared to resend, requestId %2").arg(msgId).arg(reqId));
}
toSend.erase(req);
} else {
DEBUG_LOG(("Message Info: msgId %1 was found in recent resent, requestId %2 was not found in prepared to send").arg(msgId));
}
toResend.erase(reqIt);
} else {
DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(reqId));
}
} else {
DEBUG_LOG(("Message Info: msgId %1 was not found in recent resent either").arg(msgId));
}
DEBUG_LOG(("Message Info: msgId %1 was not found in recent resent either").arg(msgId));
}
}
}
}
uint32 ackedCount = wereAcked.size();
if (ackedCount > kIdsBufferSize) {
DEBUG_LOG(("Message Info: removing some old acked sent msgIds %1").arg(ackedCount - kIdsBufferSize));
clearedBecauseTooOld.reserve(ackedCount - kIdsBufferSize);
while (ackedCount-- > kIdsBufferSize) {
auto i = wereAcked.begin();
clearedBecauseTooOld.push_back(RPCCallbackClear(
i.value(),
RPCError::TimeoutError));
wereAcked.erase(i);
}
auto ackedCount = _ackedIds.size();
if (ackedCount > kIdsBufferSize) {
DEBUG_LOG(("Message Info: removing some old acked sent msgIds %1").arg(ackedCount - kIdsBufferSize));
clearedBecauseTooOld.reserve(ackedCount - kIdsBufferSize);
while (ackedCount-- > kIdsBufferSize) {
auto i = _ackedIds.begin();
clearedBecauseTooOld.push_back(RPCCallbackClear(
i->second,
RPCError::TimeoutError));
_ackedIds.erase(i);
}
}
@ -2204,10 +2176,8 @@ void ConnectionPrivate::handleMsgsStates(const QVector<MTPlong> &ids, const QByt
const auto haveSentEnd = haveSent.cend();
if (haveSent.find(requestMsgId) == haveSentEnd) {
DEBUG_LOG(("Message Info: state was received for msgId %1, but request is not found, looking in resent requests...").arg(requestMsgId));
QWriteLocker locker2(_sessionData->toResendMutex());
auto &toResend = _sessionData->toResendMap();
const auto reqIt = toResend.find(requestMsgId);
if (reqIt != toResend.cend()) {
const auto reqIt = _resendingIds.find(requestMsgId);
if (reqIt != _resendingIds.cend()) {
if ((state & 0x07) != 0x04) { // was received
DEBUG_LOG(("Message Info: state was received for msgId %1, state %2, already resending in container").arg(requestMsgId).arg((int32)state));
} else {
@ -2230,22 +2200,70 @@ void ConnectionPrivate::handleMsgsStates(const QVector<MTPlong> &ids, const QByt
}
}
void ConnectionPrivate::clearSpecialMsgId(mtpMsgId msgId) {
if (msgId == _pingMsgId) {
_pingMsgId = 0;
_pingId = 0;
} else if (msgId == _bindMsgId) {
_bindMsgId = 0;
}
}
void ConnectionPrivate::resend(
mtpMsgId msgId,
crl::time msCanWait,
bool forceContainer) {
if (msgId != _pingMsgId) {
_sessionData->resend(msgId, msCanWait, forceContainer);
const auto guard = gsl::finally([&] {
clearSpecialMsgId(msgId);
if (msCanWait >= 0) {
_sessionData->queueSendAnything(msCanWait);
}
});
auto lock = QWriteLocker(_sessionData->haveSentMutex());
auto &haveSent = _sessionData->haveSentMap();
auto i = haveSent.find(msgId);
if (i == haveSent.end()) {
return;
}
auto request = i.value();
haveSent.erase(i);
lock.unlock();
// For container just resend all messages we can.
if (request.isSentContainer()) {
DEBUG_LOG(("Message Info: resending container from haveSent, msgId %1").arg(msgId));
const mtpMsgId *ids = (const mtpMsgId *)(request->constData() + 8);
for (uint32 i = 0, l = (request->size() - 8) >> 1; i < l; ++i) {
resend(ids[i], -1, true);
}
} else if (!request.isStateRequest()) {
request->msDate = forceContainer ? 0 : crl::now();
_resendingIds.emplace(msgId, request->requestId);
{
QWriteLocker locker(_sessionData->toSendMutex());
_sessionData->toSendMap().insert(request->requestId, request);
}
}
}
void ConnectionPrivate::resendMany(
QVector<mtpMsgId> msgIds,
crl::time msCanWait,
bool forceContainer) {
for (const auto msgId : msgIds) {
resend(msgId, msCanWait, forceContainer);
void ConnectionPrivate::resendAll() {
auto toResend = std::vector<mtpMsgId>();
auto lock = QReadLocker(_sessionData->haveSentMutex());
const auto &haveSent = _sessionData->haveSentMap();
toResend.reserve(haveSent.size());
for (auto i = haveSent.cbegin(), e = haveSent.cend(); i != e; ++i) {
if (!i.value().isSentContainer()) {
toResend.push_back(i.key());
}
}
lock.unlock();
for (const auto msgId : toResend) {
resend(msgId, -1, true);
}
_sessionData->queueSendAnything();
}
void ConnectionPrivate::onConnected(
@ -2470,10 +2488,6 @@ DcType ConnectionPrivate::tryAcquireKeyCreation() {
).arg(result->persistentServerSalt));
_sessionSalt = result->temporaryServerSalt;
if (result->persistentKey) {
_sessionData->clearForNewKey(_instance);
}
auto key = result->persistentKey
? std::move(result->persistentKey)
: _sessionData->getPersistentKey();
@ -2512,7 +2526,7 @@ void ConnectionPrivate::authKeyChecked() {
});
if (_sessionSalt && setState(ConnectedState)) {
_sessionData->resendAll();
resendAll();
} // else receive salt in bad_server_salt first, then try to send all the requests
_pingIdToSend = rand_value<uint64>(); // get server_salt
@ -2651,7 +2665,9 @@ bool ConnectionPrivate::sendSecureRequest(
}
mtpRequestId ConnectionPrivate::wasSent(mtpMsgId msgId) const {
if (msgId == _pingMsgId) return mtpRequestId(0xFFFFFFFF);
if (msgId == _pingMsgId || msgId == _bindMsgId) {
return mtpRequestId(0xFFFFFFFF);
}
{
QReadLocker locker(_sessionData->haveSentMutex());
const auto &haveSent = _sessionData->haveSentMap();
@ -2662,17 +2678,11 @@ mtpRequestId ConnectionPrivate::wasSent(mtpMsgId msgId) const {
: mtpRequestId(0xFFFFFFFF);
}
}
{
QReadLocker locker(_sessionData->toResendMutex());
const auto &toResend = _sessionData->toResendMap();
const auto i = toResend.constFind(msgId);
if (i != toResend.cend()) return i.value();
if (const auto i = _resendingIds.find(msgId); i != end(_resendingIds)) {
return i->second;
}
{
QReadLocker locker(_sessionData->wereAckedMutex());
const auto &wereAcked = _sessionData->wereAckedMap();
const auto i = wereAcked.constFind(msgId);
if (i != wereAcked.cend()) return i.value();
if (const auto i = _ackedIds.find(msgId); i != end(_ackedIds)) {
return i->second;
}
return 0;
}

View File

@ -154,6 +154,9 @@ private:
mtpRequestId wasSent(mtpMsgId msgId) const;
[[nodiscard]] HandleResult handleOneReceived(const mtpPrime *from, const mtpPrime *end, uint64 msgId, int32 serverTime, uint64 serverSalt, bool badTime);
[[nodiscard]] HandleResult handleBindResponse(
mtpMsgId requestMsgId,
const mtpBuffer &response);
mtpBuffer ungzip(const mtpPrime *from, const mtpPrime *end) const;
void handleMsgsStates(const QVector<MTPlong> &ids, const QByteArray &states, QVector<MTPlong> &acked);
@ -176,10 +179,8 @@ private:
mtpMsgId msgId,
crl::time msCanWait = 0,
bool forceContainer = false);
void resendMany(
QVector<mtpMsgId> msgIds,
crl::time msCanWait = 0,
bool forceContainer = false);
void resendAll();
void clearSpecialMsgId(mtpMsgId msgId);
[[nodiscard]] DcType tryAcquireKeyCreation();
void resetSession();
@ -249,8 +250,11 @@ private:
QVector<MTPlong> _resendRequestData;
base::flat_set<mtpMsgId> _stateRequestData;
details::ReceivedIdsManager _receivedMessageIds;
base::flat_map<mtpMsgId, mtpRequestId> _resendingIds;
base::flat_map<mtpMsgId, mtpRequestId> _ackedIds;
std::unique_ptr<details::BoundKeyCreator> _keyCreator;
mtpMsgId _bindMsgId = 0;
};

View File

@ -50,8 +50,8 @@ void BoundKeyCreator::restartBinder() {
}
}
bool BoundKeyCreator::bindReadyToRequest() const {
return _binder ? !_binder->requested() : false;
bool BoundKeyCreator::readyToBind() const {
return _binder.has_value();
}
SecureRequest BoundKeyCreator::prepareBindRequest(
@ -63,11 +63,10 @@ SecureRequest BoundKeyCreator::prepareBindRequest(
}
DcKeyBindState BoundKeyCreator::handleBindResponse(
MTPlong requestMsgId,
const mtpBuffer &response) {
return _binder
? _binder->handleResponse(requestMsgId, response)
: DcKeyBindState::Unknown;
Expects(_binder.has_value());
return _binder->handleResponse(response);
}
AuthKeyPtr BoundKeyCreator::bindPersistentKey() const {

View File

@ -31,12 +31,11 @@ public:
void bind(AuthKeyPtr &&persistentKey);
void restartBinder();
[[nodiscard]] bool bindReadyToRequest() const;
[[nodiscard]] bool readyToBind() const;
[[nodiscard]] SecureRequest prepareBindRequest(
const AuthKeyPtr &temporaryKey,
uint64 sessionId);
[[nodiscard]] DcKeyBindState handleBindResponse(
MTPlong requestMsgId,
const mtpBuffer &response);
[[nodiscard]] AuthKeyPtr bindPersistentKey() const;

View File

@ -77,46 +77,34 @@ DcKeyBinder::DcKeyBinder(AuthKeyPtr &&persistentKey)
Expects(_persistentKey != nullptr);
}
bool DcKeyBinder::requested() const {
return _requestMsgId != 0;
}
SecureRequest DcKeyBinder::prepareRequest(
const AuthKeyPtr &temporaryKey,
uint64 sessionId) {
Expects(_requestMsgId == 0);
Expects(temporaryKey != nullptr);
Expects(temporaryKey->expiresAt() != 0);
const auto nonce = openssl::RandomValue<uint64>();
_requestMsgId = base::unixtime::mtproto_msg_id();
const auto msgId = base::unixtime::mtproto_msg_id();
auto result = SecureRequest::Serialize(MTPauth_BindTempAuthKey(
MTP_long(_persistentKey->keyId()),
MTP_long(nonce),
MTP_int(temporaryKey->expiresAt()),
MTP_bytes(EncryptBindAuthKeyInner(
_persistentKey,
_requestMsgId,
msgId,
MTP_bind_auth_key_inner(
MTP_long(nonce),
MTP_long(temporaryKey->keyId()),
MTP_long(_persistentKey->keyId()),
MTP_long(sessionId),
MTP_int(temporaryKey->expiresAt()))))));
result.setMsgId(_requestMsgId);
result.setMsgId(msgId);
return result;
}
DcKeyBindState DcKeyBinder::handleResponse(
MTPlong requestMsgId,
const mtpBuffer &response) {
DcKeyBindState DcKeyBinder::handleResponse(const mtpBuffer &response) {
Expects(!response.isEmpty());
if (!_requestMsgId || requestMsgId.v != _requestMsgId) {
return DcKeyBindState::Unknown;
}
_requestMsgId = 0;
auto from = response.begin();
const auto end = from + response.size();
auto error = MTPRpcError();

View File

@ -17,7 +17,6 @@ class Instance;
namespace MTP::details {
enum class DcKeyBindState {
Unknown,
Success,
Failed,
DefinitelyDestroyed,
@ -27,18 +26,14 @@ class DcKeyBinder final {
public:
explicit DcKeyBinder(AuthKeyPtr &&persistentKey);
[[nodiscard]] bool requested() const;
[[nodiscard]] SecureRequest prepareRequest(
const AuthKeyPtr &temporaryKey,
uint64 sessionId);
[[nodiscard]] DcKeyBindState handleResponse(
MTPlong requestMsgId,
const mtpBuffer &response);
[[nodiscard]] DcKeyBindState handleResponse(const mtpBuffer &response);
[[nodiscard]] AuthKeyPtr persistentKey() const;
private:
AuthKeyPtr _persistentKey;
mtpMsgId _requestMsgId = 0;
};

View File

@ -64,48 +64,6 @@ void SessionData::notifyConnectionInited(const ConnectionOptions &options) {
}
}
void SessionData::clearForNewKey(not_null<Instance*> instance) {
auto clearCallbacks = std::vector<RPCCallbackClear>();
{
QReadLocker locker1(haveSentMutex());
QReadLocker locker2(toResendMutex());
QReadLocker locker3(haveReceivedMutex());
QReadLocker locker4(wereAckedMutex());
clearCallbacks.reserve(_haveSent.size() + _toResend.size() + _wereAcked.size());
for (auto i = _haveSent.cbegin(), e = _haveSent.cend(); i != e; ++i) {
auto requestId = i.value()->requestId;
if (!_receivedResponses.contains(requestId)) {
clearCallbacks.push_back(requestId);
}
}
for (auto i = _toResend.cbegin(), e = _toResend.cend(); i != e; ++i) {
auto requestId = i.value();
if (!_receivedResponses.contains(requestId)) {
clearCallbacks.push_back(requestId);
}
}
for (auto i = _wereAcked.cbegin(), e = _wereAcked.cend(); i != e; ++i) {
auto requestId = i.value();
if (!_receivedResponses.contains(requestId)) {
clearCallbacks.push_back(requestId);
}
}
}
{
QWriteLocker locker(haveSentMutex());
_haveSent.clear();
}
{
QWriteLocker locker(toResendMutex());
_toResend.clear();
}
{
QWriteLocker locker(wereAckedMutex());
_wereAcked.clear();
}
instance->clearCallbacksDelayed(std::move(clearCallbacks));
}
void SessionData::queueTryToReceive() {
withSession([](not_null<Session*> session) {
session->tryToReceive();
@ -142,23 +100,6 @@ void SessionData::queueSendMsgsStateInfo(quint64 msgId, QByteArray data) {
});
}
void SessionData::resend(
mtpMsgId msgId,
crl::time msCanWait,
bool forceContainer) {
QMutexLocker lock(&_ownerMutex);
if (_owner) {
_owner->resend(msgId, msCanWait, forceContainer);
}
}
void SessionData::resendAll() {
QMutexLocker lock(&_ownerMutex);
if (_owner) {
_owner->resendAll();
}
}
bool SessionData::connectionInited() const {
QMutexLocker lock(&_ownerMutex);
return _owner ? _owner->connectionInited() : false;
@ -455,72 +396,16 @@ QString Session::transport() const {
return _connection ? _connection->transport() : QString();
}
void Session::resend(
mtpMsgId msgId,
crl::time msCanWait,
bool forceContainer) {
auto lock = QWriteLocker(_data->haveSentMutex());
auto &haveSent = _data->haveSentMap();
auto i = haveSent.find(msgId);
if (i == haveSent.end()) {
return;
}
auto request = i.value();
haveSent.erase(i);
lock.unlock();
// For container just resend all messages we can.
if (request.isSentContainer()) {
DEBUG_LOG(("Message Info: resending container from haveSent, msgId %1").arg(msgId));
const mtpMsgId *ids = (const mtpMsgId *)(request->constData() + 8);
for (uint32 i = 0, l = (request->size() - 8) >> 1; i < l; ++i) {
resend(ids[i], 10, true);
}
} else if (!request.isStateRequest()) {
request->msDate = forceContainer ? 0 : crl::now();
{
QWriteLocker locker(_data->toResendMutex());
_data->toResendMap().insert(msgId, request->requestId);
}
sendPrepared(request, msCanWait, false);
}
}
void Session::resendAll() {
QVector<mtpMsgId> toResend;
{
QReadLocker locker(_data->haveSentMutex());
const auto &haveSent = _data->haveSentMap();
toResend.reserve(haveSent.size());
for (auto i = haveSent.cbegin(), e = haveSent.cend(); i != e; ++i) {
if (i.value()->requestId) {
toResend.push_back(i.key());
}
}
}
for (uint32 i = 0, l = toResend.size(); i < l; ++i) {
resend(toResend[i], -1, true);
}
InvokeQueued(this, [=] {
sendAnything();
});
}
void Session::sendPrepared(
const SecureRequest &request,
crl::time msCanWait,
bool newRequest) {
crl::time msCanWait) {
DEBUG_LOG(("MTP Info: adding request to toSendMap, msCanWait %1"
).arg(msCanWait));
{
QWriteLocker locker(_data->toSendMutex());
_data->toSendMap().insert(request->requestId, request);
if (newRequest) {
*(mtpMsgId*)(request->data() + 4) = 0;
*(request->data() + 6) = 0;
}
*(mtpMsgId*)(request->data() + 4) = 0;
*(request->data() + 6) = 0;
}
DEBUG_LOG(("MTP Info: added, requestId %1").arg(request->requestId));

View File

@ -29,22 +29,6 @@ enum class CreatingKeyType;
using PreRequestMap = QMap<mtpRequestId, SecureRequest>;
using RequestMap = QMap<mtpMsgId, SecureRequest>;
class RequestIdsMap : public QMap<mtpMsgId, mtpRequestId> {
public:
using ParentType = QMap<mtpMsgId, mtpRequestId>;
mtpMsgId min() const {
return size() ? cbegin().key() : 0;
}
mtpMsgId max() const {
ParentType::const_iterator e(cend());
return size() ? (--e).key() : 0;
}
};
using SerializedMessage = mtpBuffer;
inline bool ResponseNeedsAck(const SerializedMessage &response) {
@ -102,12 +86,6 @@ public:
not_null<QReadWriteLock*> haveSentMutex() const {
return &_haveSentLock;
}
not_null<QReadWriteLock*> toResendMutex() const {
return &_toResendLock;
}
not_null<QReadWriteLock*> wereAckedMutex() const {
return &_wereAckedLock;
}
not_null<QReadWriteLock*> haveReceivedMutex() const {
return &_haveReceivedLock;
}
@ -124,18 +102,6 @@ public:
const RequestMap &haveSentMap() const {
return _haveSent;
}
RequestIdsMap &toResendMap() { // msgId -> requestId, on which toSend: requestId -> request for resended requests
return _toResend;
}
const RequestIdsMap &toResendMap() const {
return _toResend;
}
RequestIdsMap &wereAckedMap() {
return _wereAcked;
}
const RequestIdsMap &wereAckedMap() const {
return _wereAcked;
}
QMap<mtpRequestId, SerializedMessage> &haveReceivedResponses() {
return _receivedResponses;
}
@ -154,8 +120,6 @@ public:
return _owner;
}
void clearForNewKey(not_null<Instance*> instance);
// Connection -> Session interface.
void queueTryToReceive();
void queueNeedToResumeAndSend();
@ -173,11 +137,6 @@ public:
const AuthKeyPtr &persistentKeyUsedForBind);
void releaseKeyCreationOnFail();
void destroyTemporaryKey(uint64 keyId);
void resend(
mtpMsgId msgId,
crl::time msCanWait,
bool forceContainer);
void resendAll();
void detach();
@ -192,8 +151,6 @@ private:
PreRequestMap _toSend; // map of request_id -> request, that is waiting to be sent
RequestMap _haveSent; // map of msg_id -> request, that was sent, msDate = 0 for msgs_state_req (no resend / state req), msDate = 0, seqNo = 0 for containers
RequestIdsMap _toResend; // map of msg_id -> request_id, that request_id -> request lies in toSend and is waiting to be resent
RequestIdsMap _wereAcked; // map of msg_id -> request_id, this msg_ids already were acked or do not need ack
QMap<mtpRequestId, SerializedMessage> _receivedResponses; // map of request_id -> response that should be processed in the main thread
QList<SerializedMessage> _receivedUpdates; // list of updates that should be processed in the main thread
@ -202,8 +159,6 @@ private:
mutable QReadWriteLock _optionsLock;
mutable QReadWriteLock _toSendLock;
mutable QReadWriteLock _haveSentLock;
mutable QReadWriteLock _toResendLock;
mutable QReadWriteLock _wereAckedLock;
mutable QReadWriteLock _haveReceivedLock;
};
@ -234,18 +189,7 @@ public:
[[nodiscard]] AuthKeyPtr getPersistentKey() const;
[[nodiscard]] AuthKeyPtr getTemporaryKey(TemporaryKeyType type) const;
[[nodiscard]] bool connectionInited() const;
void resend(
mtpMsgId msgId,
crl::time msCanWait = 0,
bool forceContainer = false);
void resendAll();
// Thread-safe.
// Nulls msgId and seqNo in request, if newRequest = true.
void sendPrepared(
const SecureRequest &request,
crl::time msCanWait = 0,
bool newRequest = true);
void sendPrepared(const SecureRequest &request, crl::time msCanWait = 0);
// Connection thread.
[[nodiscard]] CreatingKeyType acquireKeyCreation(TemporaryKeyType type);