tdesktop/Telegram/SourceFiles/mtproto/session_private.cpp

2695 lines
81 KiB
C++

/*
This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#include "mtproto/session_private.h"
#include "mtproto/details/mtproto_bound_key_creator.h"
#include "mtproto/details/mtproto_dcenter.h"
#include "mtproto/details/mtproto_dump_to_text.h"
#include "mtproto/details/mtproto_rsa_public_key.h"
#include "mtproto/session.h"
#include "mtproto/mtproto_response.h"
#include "mtproto/mtproto_dc_options.h"
#include "mtproto/connection_abstract.h"
#include "base/random.h"
#include "base/qthelp_url.h"
#include "base/openssl_help.h"
#include "base/unixtime.h"
#include "base/platform/base_platform_info.h"
#include <ksandbox.h>
#include <zlib.h>
namespace MTP {
namespace details {
namespace {
constexpr auto kIntSize = static_cast<int>(sizeof(mtpPrime));
constexpr auto kWaitForBetterTimeout = crl::time(2000);
constexpr auto kMinConnectedTimeout = crl::time(1000);
constexpr auto kMaxConnectedTimeout = crl::time(8000);
constexpr auto kMinReceiveTimeout = crl::time(4000);
constexpr auto kMaxReceiveTimeout = crl::time(64000);
constexpr auto kMarkConnectionOldTimeout = crl::time(192000);
constexpr auto kPingDelayDisconnect = 60;
constexpr auto kPingSendAfter = 30 * crl::time(1000);
constexpr auto kPingSendAfterForce = 45 * crl::time(1000);
constexpr auto kTemporaryExpiresIn = TimeId(86400);
constexpr auto kBindKeyAdditionalExpiresTimeout = TimeId(30);
constexpr auto kKeyOldEnoughForDestroy = 60 * crl::time(1000);
constexpr auto kSentContainerLives = 600 * crl::time(1000);
constexpr auto kFastRequestDuration = crl::time(500);
// If we can't connect for this time we will ask _instance to update config.
constexpr auto kRequestConfigTimeout = 8 * crl::time(1000);
// Don't try to handle messages larger than this size.
constexpr auto kMaxMessageLength = 16 * 1024 * 1024;
// How much time passed from send till we resend request or check its state.
constexpr auto kCheckSentRequestTimeout = 10 * crl::time(1000);
// How much time to wait for some more requests,
// when resending request or checking its state.
constexpr auto kSendStateRequestWaiting = crl::time(1000);
// How much time to wait for some more requests, when sending msg acks.
constexpr auto kAckSendWaiting = 10 * crl::time(1000);
auto SyncTimeRequestDuration = kFastRequestDuration;
using namespace details;
[[nodiscard]] QString LogIdsVector(const QVector<MTPlong> &ids) {
if (!ids.size()) return "[]";
auto idsStr = QString("[%1").arg(ids.cbegin()->v);
for (const auto &id : ids) {
idsStr += QString(", %2").arg(id.v);
}
return idsStr + "]";
}
[[nodiscard]] QString ComputeAppVersion() {
#if defined Q_OS_WIN && defined Q_PROCESSOR_X86_64
const auto arch = u" x64"_q;
#elif (defined Q_OS_WIN && defined Q_PROCESSOR_X86_32) || defined Q_PROCESSOR_X86_64
const auto arch = QString();
#else
const auto arch = ' ' + QSysInfo::buildCpuArchitecture();
#endif
return QString::fromLatin1(AppVersionStr) + arch + ([] {
#if defined OS_MAC_STORE
return u" Mac App Store"_q;
#elif defined OS_WIN_STORE // OS_MAC_STORE
return u" Microsoft Store"_q;
#else // OS_MAC_STORE || OS_WIN_STORE
return KSandbox::isFlatpak()
? u" Flatpak"_q
: KSandbox::isSnap()
? u" Snap"_q
: QString();
#endif // OS_MAC_STORE || OS_WIN_STORE
})();
}
void WrapInvokeAfter(
SerializedRequest &to,
const SerializedRequest &from,
const base::flat_map<mtpMsgId, SerializedRequest> &haveSent,
int32 skipBeforeRequest = 0) {
const auto afterId = *(mtpMsgId*)(from->after->data() + 4);
const auto i = afterId ? haveSent.find(afterId) : haveSent.end();
int32 size = to->size(), lenInInts = (tl::count_length(from) >> 2), headlen = 4, fulllen = headlen + lenInInts;
if (i == haveSent.end()) { // no invoke after or such msg was not sent or was completed recently
to->resize(size + fulllen + skipBeforeRequest);
if (skipBeforeRequest) {
memcpy(to->data() + size, from->constData() + 4, headlen * sizeof(mtpPrime));
memcpy(to->data() + size + headlen + skipBeforeRequest, from->constData() + 4 + headlen, lenInInts * sizeof(mtpPrime));
} else {
memcpy(to->data() + size, from->constData() + 4, fulllen * sizeof(mtpPrime));
}
} else {
to->resize(size + fulllen + skipBeforeRequest + 3);
memcpy(to->data() + size, from->constData() + 4, headlen * sizeof(mtpPrime));
(*to)[size + 3] += 3 * sizeof(mtpPrime);
*((mtpTypeId*)&((*to)[size + headlen + skipBeforeRequest])) = mtpc_invokeAfterMsg;
memcpy(to->data() + size + headlen + skipBeforeRequest + 1, &afterId, 2 * sizeof(mtpPrime));
memcpy(to->data() + size + headlen + skipBeforeRequest + 3, from->constData() + 4 + headlen, lenInInts * sizeof(mtpPrime));
if (size + 3 != 7) (*to)[7] += 3 * sizeof(mtpPrime);
}
}
[[nodiscard]] bool ConstTimeIsDifferent(
const void *a,
const void *b,
size_t size) {
auto ca = reinterpret_cast<const char*>(a);
auto cb = reinterpret_cast<const char*>(b);
volatile auto different = false;
for (const auto ce = ca + size; ca != ce; ++ca, ++cb) {
different = different | (*ca != *cb);
}
return different;
}
} // namespace
SessionPrivate::SessionPrivate(
not_null<Instance*> instance,
not_null<QThread*> thread,
std::shared_ptr<SessionData> data,
ShiftedDcId shiftedDcId)
: QObject(nullptr)
, _instance(instance)
, _shiftedDcId(shiftedDcId)
, _realDcType(_instance->dcOptions().dcType(_shiftedDcId))
, _currentDcType(_realDcType)
, _state(DisconnectedState)
, _retryTimer(thread, [=] { retryByTimer(); })
, _oldConnectionTimer(thread, [=] { markConnectionOld(); })
, _waitForConnectedTimer(thread, [=] { waitConnectedFailed(); })
, _waitForReceivedTimer(thread, [=] { waitReceivedFailed(); })
, _waitForBetterTimer(thread, [=] { waitBetterFailed(); })
, _waitForReceived(kMinReceiveTimeout)
, _waitForConnected(kMinConnectedTimeout)
, _pingSender(thread, [=] { sendPingByTimer(); })
, _checkSentRequestsTimer(thread, [=] { checkSentRequests(); })
, _clearOldContainersTimer(thread, [=] { clearOldContainers(); })
, _sessionData(std::move(data)) {
Expects(_shiftedDcId != 0);
moveToThread(thread);
InvokeQueued(this, [=] {
_clearOldContainersTimer.callEach(kSentContainerLives);
connectToServer();
});
}
SessionPrivate::~SessionPrivate() {
releaseKeyCreationOnFail();
doDisconnect();
Expects(!_connection);
Expects(_testConnections.empty());
}
void SessionPrivate::appendTestConnection(
DcOptions::Variants::Protocol protocol,
const QString &ip,
int port,
const bytes::vector &protocolSecret) {
QWriteLocker lock(&_stateMutex);
const auto priority = (qthelp::is_ipv6(ip) ? 0 : 1)
+ (protocol == DcOptions::Variants::Tcp ? 1 : 0)
+ (protocolSecret.empty() ? 0 : 1);
_testConnections.push_back({
AbstractConnection::Create(
_instance,
protocol,
thread(),
protocolSecret,
_options->proxy),
priority
});
const auto weak = _testConnections.back().data.get();
connect(weak, &AbstractConnection::error, [=](int errorCode) {
onError(weak, errorCode);
});
connect(weak, &AbstractConnection::receivedSome, [=] {
onReceivedSome();
});
_firstSentAt = 0;
if (_oldConnection) {
_oldConnection = false;
DEBUG_LOG(("This connection marked as not old!"));
}
_oldConnectionTimer.callOnce(kMarkConnectionOldTimeout);
connect(weak, &AbstractConnection::connected, [=] {
onConnected(weak);
});
connect(weak, &AbstractConnection::disconnected, [=] {
onDisconnected(weak);
});
connect(weak, &AbstractConnection::syncTimeRequest, [=] {
InvokeQueued(_instance, [instance = _instance] {
instance->syncHttpUnixtime();
});
});
const auto protocolForFiles = isMediaClusterDcId(_shiftedDcId)
//|| isUploadDcId(_shiftedDcId)
|| (_realDcType == DcType::Cdn);
const auto protocolDcId = getProtocolDcId();
InvokeQueued(_testConnections.back().data, [=] {
weak->connectToServer(
ip,
port,
protocolSecret,
protocolDcId,
protocolForFiles);
});
}
int16 SessionPrivate::getProtocolDcId() const {
const auto dcId = BareDcId(_shiftedDcId);
const auto simpleDcId = isTemporaryDcId(dcId)
? getRealIdFromTemporaryDcId(dcId)
: dcId;
const auto testedDcId = _instance->isTestMode()
? (kTestModeDcIdShift + simpleDcId)
: simpleDcId;
return (_currentDcType == DcType::MediaCluster)
? -testedDcId
: testedDcId;
}
void SessionPrivate::checkSentRequests() {
const auto now = crl::now();
const auto checkTime = now - kCheckSentRequestTimeout;
if (_bindMsgId && _bindMessageSent < checkTime) {
DEBUG_LOG(("MTP Info: "
"Request state while key is not bound, restarting."));
restart();
_checkSentRequestsTimer.callOnce(kCheckSentRequestTimeout);
return;
}
auto requesting = false;
auto nextTimeout = kCheckSentRequestTimeout;
{
QReadLocker locker(_sessionData->haveSentMutex());
auto &haveSent = _sessionData->haveSentMap();
for (const auto &[msgId, request] : haveSent) {
if (request->lastSentTime <= checkTime) {
// Need to check state.
request->lastSentTime = now;
if (_stateRequestData.emplace(msgId).second) {
requesting = true;
}
} else {
nextTimeout = std::min(request->lastSentTime - checkTime, nextTimeout);
}
}
}
if (requesting) {
_sessionData->queueSendAnything(kSendStateRequestWaiting);
}
if (nextTimeout < kCheckSentRequestTimeout) {
_checkSentRequestsTimer.callOnce(nextTimeout);
}
}
void SessionPrivate::clearOldContainers() {
auto resent = false;
auto nextTimeout = kSentContainerLives;
const auto now = crl::now();
const auto checkTime = now - kSentContainerLives;
for (auto i = _sentContainers.begin(); i != _sentContainers.end();) {
if (i->second.sent <= checkTime) {
DEBUG_LOG(("MTP Info: Removing old container with resending %1, "
"sent: %2, now: %3, current unixtime: %4"
).arg(i->first
).arg(i->second.sent
).arg(now
).arg(base::unixtime::now()));
const auto ids = std::move(i->second.messages);
i = _sentContainers.erase(i);
resent = resent || !ids.empty();
for (const auto innerMsgId : ids) {
resend(innerMsgId, -1);
}
} else {
nextTimeout = std::min(i->second.sent - checkTime, nextTimeout);
++i;
}
}
if (resent) {
_sessionData->queueNeedToResumeAndSend();
}
if (nextTimeout < kSentContainerLives) {
_clearOldContainersTimer.callOnce(nextTimeout);
} else if (!_clearOldContainersTimer.isActive()) {
_clearOldContainersTimer.callEach(nextTimeout);
}
}
void SessionPrivate::destroyAllConnections() {
clearUnboundKeyCreator();
_waitForBetterTimer.cancel();
_waitForReceivedTimer.cancel();
_waitForConnectedTimer.cancel();
_testConnections.clear();
_connection = nullptr;
}
void SessionPrivate::cdnConfigChanged() {
connectToServer(true);
}
int32 SessionPrivate::getShiftedDcId() const {
return _shiftedDcId;
}
void SessionPrivate::dcOptionsChanged() {
_retryTimeout = 1;
connectToServer(true);
}
int32 SessionPrivate::getState() const {
QReadLocker lock(&_stateMutex);
int32 result = _state;
if (_state < 0) {
if (_retryTimer.isActive()) {
result = int32(crl::now() - _retryWillFinish);
if (result >= 0) {
result = -1;
}
}
}
return result;
}
QString SessionPrivate::transport() const {
QReadLocker lock(&_stateMutex);
if (!_connection || (_state < 0)) {
return QString();
}
Assert(_options != nullptr);
return _connection->transport();
}
bool SessionPrivate::setState(int state, int ifState) {
if (ifState != kUpdateStateAlways) {
QReadLocker lock(&_stateMutex);
if (_state != ifState) {
return false;
}
}
QWriteLocker lock(&_stateMutex);
if (_state == state) {
return false;
}
_state = state;
if (state < 0) {
_retryTimeout = -state;
_retryTimer.callOnce(_retryTimeout);
_retryWillFinish = crl::now() + _retryTimeout;
}
lock.unlock();
_sessionData->queueConnectionStateChange(state);
return true;
}
void SessionPrivate::resetSession() {
MTP_LOG(_shiftedDcId, ("Resetting session!"));
_needSessionReset = false;
DEBUG_LOG(("MTP Info: creating new session in resetSession."));
changeSessionId();
_sessionData->queueResetDone();
}
void SessionPrivate::changeSessionId() {
auto sessionId = _sessionId;
do {
sessionId = base::RandomValue<uint64>();
} while (_sessionId == sessionId);
DEBUG_LOG(("MTP Info: setting server_session: %1").arg(sessionId));
_sessionId = sessionId;
_messagesCounter = 0;
_sessionMarkedAsStarted = false;
_ackRequestData.clear();
_resendRequestData.clear();
_stateRequestData.clear();
_receivedMessageIds.clear();
}
uint32 SessionPrivate::nextRequestSeqNumber(bool needAck) {
const auto result = _messagesCounter;
_messagesCounter += (needAck ? 1 : 0);
return result * 2 + (needAck ? 1 : 0);
}
bool SessionPrivate::realDcTypeChanged() {
const auto now = _instance->dcOptions().dcType(_shiftedDcId);
if (_realDcType == now) {
return false;
}
_realDcType = now;
return true;
}
bool SessionPrivate::markSessionAsStarted() {
if (_sessionMarkedAsStarted) {
return false;
}
_sessionMarkedAsStarted = true;
return true;
}
mtpMsgId SessionPrivate::prepareToSend(
SerializedRequest &request,
mtpMsgId currentLastId,
bool forceNewMsgId) {
Expects(request->size() > 8);
if (const auto msgId = request.getMsgId()) {
// resending this request
const auto i = _resendingIds.find(msgId);
if (i != _resendingIds.cend()) {
_resendingIds.erase(i);
}
return (forceNewMsgId || msgId > currentLastId)
? replaceMsgId(request, currentLastId)
: msgId;
}
request.setMsgId(currentLastId);
request.setSeqNo(nextRequestSeqNumber(request.needAck()));
if (request->requestId) {
MTP_LOG(_shiftedDcId, ("[r%1] msg_id 0 -> %2").arg(request->requestId).arg(currentLastId));
}
return currentLastId;
}
mtpMsgId SessionPrivate::replaceMsgId(SerializedRequest &request, mtpMsgId newId) {
Expects(request->size() > 8);
const auto oldMsgId = request.getMsgId();
if (oldMsgId == newId) {
return newId;
}
// haveSentMutex() was locked in tryToSend()
auto &haveSent = _sessionData->haveSentMap();
while (_resendingIds.contains(newId)
|| _ackedIds.contains(newId)
|| haveSent.contains(newId)) {
newId = base::unixtime::mtproto_msg_id();
}
MTP_LOG(_shiftedDcId, ("[r%1] msg_id %2 -> %3"
).arg(request->requestId
).arg(oldMsgId
).arg(newId));
const auto i = _resendingIds.find(oldMsgId);
if (i != _resendingIds.end()) {
const auto requestId = i->second;
_resendingIds.erase(i);
_resendingIds.emplace(newId, requestId);
}
const auto j = _ackedIds.find(oldMsgId);
if (j != _ackedIds.end()) {
const auto requestId = j->second;
_ackedIds.erase(j);
_ackedIds.emplace(newId, requestId);
}
const auto k = haveSent.find(oldMsgId);
if (k != haveSent.end()) {
const auto request = k->second;
haveSent.erase(k);
haveSent.emplace(newId, request);
}
for (auto &[msgId, container] : _sentContainers) {
for (auto &innerMsgId : container.messages) {
if (innerMsgId == oldMsgId) {
innerMsgId = newId;
}
}
}
request.setMsgId(newId);
request.setSeqNo(nextRequestSeqNumber(request.needAck()));
return newId;
}
mtpMsgId SessionPrivate::placeToContainer(
SerializedRequest &toSendRequest,
mtpMsgId &bigMsgId,
bool forceNewMsgId,
SerializedRequest &req) {
const auto msgId = prepareToSend(req, bigMsgId, forceNewMsgId);
if (msgId >= bigMsgId) {
bigMsgId = base::unixtime::mtproto_msg_id();
}
uint32 from = toSendRequest->size(), len = req.messageSize();
toSendRequest->resize(from + len);
memcpy(toSendRequest->data() + from, req->constData() + 4, len * sizeof(mtpPrime));
return msgId;
}
MTPVector<MTPJSONObjectValue> SessionPrivate::prepareInitParams() {
const auto local = QDateTime::currentDateTime();
const auto utc = QDateTime(local.date(), local.time(), Qt::UTC);
const auto shift = base::unixtime::now() - (TimeId)::time(nullptr);
const auto delta = int(utc.toSecsSinceEpoch()) - int(local.toSecsSinceEpoch()) - shift;
auto sliced = delta;
while (sliced < -12 * 3600) {
sliced += 24 * 3600;
}
while (sliced > 14 * 3600) {
sliced -= 24 * 3600;
}
const auto sign = (sliced < 0) ? -1 : 1;
const auto rounded = base::SafeRound(std::abs(sliced) / 900.)
* 900
* sign;
return MTP_vector<MTPJSONObjectValue>(
1,
MTP_jsonObjectValue(
MTP_string("tz_offset"),
MTP_jsonNumber(MTP_double(rounded))));
}
void SessionPrivate::tryToSend() {
DEBUG_LOG(("MTP Info: tryToSend for dc %1.").arg(_shiftedDcId));
if (!_connection) {
DEBUG_LOG(("MTP Info: not yet connected in dc %1.").arg(_shiftedDcId));
return;
} else if (!_keyId) {
DEBUG_LOG(("MTP Info: not yet with auth key in dc %1.").arg(_shiftedDcId));
return;
}
const auto needsLayer = !_sessionData->connectionInited();
const auto state = getState();
const auto sendOnlyFirstPing = (state != ConnectedState);
const auto sendAll = !sendOnlyFirstPing && !_keyCreator;
const auto isMainSession = (GetDcIdShift(_shiftedDcId) == 0);
if (sendOnlyFirstPing && !_pingIdToSend) {
DEBUG_LOG(("MTP Info: dc %1 not sending, waiting for Connected state, state: %2").arg(_shiftedDcId).arg(state));
return; // just do nothing, if is not connected yet
} else if (isMainSession
&& !sendOnlyFirstPing
&& !_pingIdToSend
&& !_pingId
&& _pingSendAt <= crl::now()) {
_pingIdToSend = base::RandomValue<mtpPingId>();
}
const auto forceNewMsgId = sendAll && markSessionAsStarted();
if (forceNewMsgId && _keyCreator) {
_keyCreator->restartBinder();
}
auto pingRequest = SerializedRequest();
auto ackRequest = SerializedRequest();
auto resendRequest = SerializedRequest();
auto stateRequest = SerializedRequest();
auto httpWaitRequest = SerializedRequest();
auto bindDcKeyRequest = SerializedRequest();
if (_pingIdToSend) {
if (sendOnlyFirstPing || !isMainSession) {
DEBUG_LOG(("MTP Info: sending ping, ping_id: %1"
).arg(_pingIdToSend));
pingRequest = SerializedRequest::Serialize(MTPPing(
MTP_long(_pingIdToSend)
));
} else {
DEBUG_LOG(("MTP Info: sending ping_delay_disconnect, "
"ping_id: %1").arg(_pingIdToSend));
pingRequest = SerializedRequest::Serialize(MTPPing_delay_disconnect(
MTP_long(_pingIdToSend),
MTP_int(kPingDelayDisconnect)));
_pingSender.callOnce(kPingSendAfterForce);
}
_pingSendAt = pingRequest->lastSentTime + kPingSendAfter;
_pingId = base::take(_pingIdToSend);
} else if (!sendAll) {
DEBUG_LOG(("MTP Info: dc %1 sending only service or bind."
).arg(_shiftedDcId));
} else {
DEBUG_LOG(("MTP Info: dc %1 trying to send after ping, state: %2"
).arg(_shiftedDcId
).arg(state));
}
if (!sendOnlyFirstPing) {
if (!_ackRequestData.isEmpty()) {
ackRequest = SerializedRequest::Serialize(MTPMsgsAck(
MTP_msgs_ack(MTP_vector<MTPlong>(
base::take(_ackRequestData)))));
}
if (!_resendRequestData.isEmpty()) {
resendRequest = SerializedRequest::Serialize(MTPMsgResendReq(
MTP_msg_resend_req(MTP_vector<MTPlong>(
base::take(_resendRequestData)))));
}
if (!_stateRequestData.empty()) {
auto ids = QVector<MTPlong>();
ids.reserve(_stateRequestData.size());
for (const auto id : base::take(_stateRequestData)) {
ids.push_back(MTP_long(id));
}
stateRequest = SerializedRequest::Serialize(MTPMsgsStateReq(
MTP_msgs_state_req(MTP_vector<MTPlong>(ids))));
}
if (_connection->usingHttpWait()) {
httpWaitRequest = SerializedRequest::Serialize(MTPHttpWait(
MTP_http_wait(MTP_int(100), MTP_int(30), MTP_int(25000))));
}
if (!_bindMsgId && _keyCreator && _keyCreator->readyToBind()) {
bindDcKeyRequest = _keyCreator->prepareBindRequest(
_encryptionKey,
_sessionId);
// This is a special request with msgId used inside the message
// body, so it is prepared already with a msgId and we place
// seqNo for it manually here.
bindDcKeyRequest.setSeqNo(
nextRequestSeqNumber(bindDcKeyRequest.needAck()));
}
}
MTPInitConnection<SerializedRequest> initWrapper;
int32 initSize = 0, initSizeInInts = 0;
if (needsLayer) {
Assert(_options != nullptr);
const auto systemLangCode = _options->systemLangCode;
const auto cloudLangCode = _options->cloudLangCode;
const auto langPackName = _options->langPackName;
const auto deviceModel = (_currentDcType == DcType::Cdn)
? "n/a"
: _instance->deviceModel();
const auto systemVersion = (_currentDcType == DcType::Cdn)
? "n/a"
: _instance->systemVersion();
const auto appVersion = ComputeAppVersion();
const auto proxyType = _options->proxy.type;
const auto mtprotoProxy = (proxyType == ProxyData::Type::Mtproto);
const auto clientProxyFields = mtprotoProxy
? MTP_inputClientProxy(
MTP_string(_options->proxy.host),
MTP_int(_options->proxy.port))
: MTPInputClientProxy();
using Flag = MTPInitConnection<SerializedRequest>::Flag;
initWrapper = MTPInitConnection<SerializedRequest>(
MTP_flags(Flag::f_params
| (mtprotoProxy ? Flag::f_proxy : Flag(0))),
MTP_int(ApiId),
MTP_string(deviceModel),
MTP_string(systemVersion),
MTP_string(appVersion),
MTP_string(systemLangCode),
MTP_string(langPackName),
MTP_string(cloudLangCode),
clientProxyFields,
MTP_jsonObject(prepareInitParams()),
SerializedRequest());
initSizeInInts = (tl::count_length(initWrapper) >> 2) + 2;
initSize = initSizeInInts * sizeof(mtpPrime);
}
bool needAnyResponse = false;
SerializedRequest toSendRequest;
{
QWriteLocker locker1(_sessionData->toSendMutex());
auto scheduleCheckSentRequests = false;
auto toSendDummy = base::flat_map<mtpRequestId, SerializedRequest>();
auto &toSend = sendAll
? _sessionData->toSendMap()
: toSendDummy;
if (!sendAll) {
locker1.unlock();
}
uint32 toSendCount = toSend.size();
if (pingRequest) ++toSendCount;
if (ackRequest) ++toSendCount;
if (resendRequest) ++toSendCount;
if (stateRequest) ++toSendCount;
if (httpWaitRequest) ++toSendCount;
if (bindDcKeyRequest) ++toSendCount;
if (!toSendCount) {
return; // nothing to send
}
const auto first = pingRequest
? pingRequest
: ackRequest
? ackRequest
: resendRequest
? resendRequest
: stateRequest
? stateRequest
: httpWaitRequest
? httpWaitRequest
: bindDcKeyRequest
? bindDcKeyRequest
: toSend.begin()->second;
if (toSendCount == 1 && !first->forceSendInContainer) {
toSendRequest = first;
if (sendAll) {
toSend.clear();
locker1.unlock();
}
const auto msgId = prepareToSend(
toSendRequest,
base::unixtime::mtproto_msg_id(),
forceNewMsgId && !bindDcKeyRequest);
if (bindDcKeyRequest) {
_bindMsgId = msgId;
_bindMessageSent = crl::now();
needAnyResponse = true;
} else if (pingRequest) {
_pingMsgId = msgId;
needAnyResponse = true;
} else if (stateRequest || resendRequest) {
_stateAndResendRequests.emplace(
msgId,
stateRequest ? stateRequest : resendRequest);
needAnyResponse = true;
}
if (toSendRequest->requestId) {
if (toSendRequest.needAck()) {
toSendRequest->lastSentTime = crl::now();
QWriteLocker locker2(_sessionData->haveSentMutex());
auto &haveSent = _sessionData->haveSentMap();
haveSent.emplace(msgId, toSendRequest);
scheduleCheckSentRequests = true;
const auto wrapLayer = needsLayer && toSendRequest->needsLayer;
if (toSendRequest->after) {
const auto toSendSize = tl::count_length(toSendRequest) >> 2;
auto wrappedRequest = SerializedRequest::Prepare(
toSendSize,
toSendSize + 3);
wrappedRequest->resize(4);
memcpy(wrappedRequest->data(), toSendRequest->constData(), 4 * sizeof(mtpPrime));
WrapInvokeAfter(wrappedRequest, toSendRequest, haveSent);
toSendRequest = std::move(wrappedRequest);
}
if (wrapLayer) {
const auto noWrapSize = (tl::count_length(toSendRequest) >> 2);
const auto toSendSize = noWrapSize + initSizeInInts;
auto wrappedRequest = SerializedRequest::Prepare(toSendSize);
memcpy(wrappedRequest->data(), toSendRequest->constData(), 7 * sizeof(mtpPrime)); // all except length
wrappedRequest->push_back(mtpc_invokeWithLayer);
wrappedRequest->push_back(kCurrentLayer);
initWrapper.write<mtpBuffer>(*wrappedRequest);
wrappedRequest->resize(wrappedRequest->size() + noWrapSize);
memcpy(wrappedRequest->data() + wrappedRequest->size() - noWrapSize, toSendRequest->constData() + 8, noWrapSize * sizeof(mtpPrime));
toSendRequest = std::move(wrappedRequest);
}
needAnyResponse = true;
} else {
_ackedIds.emplace(msgId, toSendRequest->requestId);
}
}
} else { // send in container
bool willNeedInit = false;
uint32 containerSize = 1 + 1; // cons + vector size
if (pingRequest) containerSize += pingRequest.messageSize();
if (ackRequest) containerSize += ackRequest.messageSize();
if (resendRequest) containerSize += resendRequest.messageSize();
if (stateRequest) containerSize += stateRequest.messageSize();
if (httpWaitRequest) containerSize += httpWaitRequest.messageSize();
if (bindDcKeyRequest) containerSize += bindDcKeyRequest.messageSize();
for (const auto &[requestId, request] : toSend) {
containerSize += request.messageSize();
if (needsLayer && request->needsLayer) {
containerSize += initSizeInInts;
willNeedInit = true;
}
}
mtpBuffer initSerialized;
if (willNeedInit) {
initSerialized.reserve(initSizeInInts);
initSerialized.push_back(mtpc_invokeWithLayer);
initSerialized.push_back(kCurrentLayer);
initWrapper.write<mtpBuffer>(initSerialized);
}
// prepare container + each in invoke after
toSendRequest = SerializedRequest::Prepare(
containerSize,
containerSize + 3 * toSend.size());
toSendRequest->push_back(mtpc_msg_container);
toSendRequest->push_back(toSendCount);
// check for a valid container
auto bigMsgId = base::unixtime::mtproto_msg_id();
// the fact of this lock is used in replaceMsgId()
QWriteLocker locker2(_sessionData->haveSentMutex());
auto &haveSent = _sessionData->haveSentMap();
// prepare sent container
auto sentIdsWrap = SentContainer();
sentIdsWrap.sent = crl::now();
sentIdsWrap.messages.reserve(toSendCount);
if (bindDcKeyRequest) {
_bindMsgId = placeToContainer(
toSendRequest,
bigMsgId,
false,
bindDcKeyRequest);
_bindMessageSent = crl::now();
needAnyResponse = true;
}
if (pingRequest) {
_pingMsgId = placeToContainer(
toSendRequest,
bigMsgId,
forceNewMsgId,
pingRequest);
needAnyResponse = true;
}
for (auto &[requestId, request] : toSend) {
const auto msgId = prepareToSend(
request,
bigMsgId,
forceNewMsgId);
if (msgId >= bigMsgId) {
bigMsgId = base::unixtime::mtproto_msg_id();
}
bool added = false;
if (request->requestId) {
if (request.needAck()) {
request->lastSentTime = crl::now();
int32 reqNeedsLayer = (needsLayer && request->needsLayer) ? toSendRequest->size() : 0;
if (request->after) {
WrapInvokeAfter(toSendRequest, request, haveSent, reqNeedsLayer ? initSizeInInts : 0);
if (reqNeedsLayer) {
memcpy(toSendRequest->data() + reqNeedsLayer + 4, initSerialized.constData(), initSize);
*(toSendRequest->data() + reqNeedsLayer + 3) += initSize;
}
added = true;
} else if (reqNeedsLayer) {
toSendRequest->resize(reqNeedsLayer + initSizeInInts + request.messageSize());
memcpy(toSendRequest->data() + reqNeedsLayer, request->constData() + 4, 4 * sizeof(mtpPrime));
memcpy(toSendRequest->data() + reqNeedsLayer + 4, initSerialized.constData(), initSize);
memcpy(toSendRequest->data() + reqNeedsLayer + 4 + initSizeInInts, request->constData() + 8, tl::count_length(request));
*(toSendRequest->data() + reqNeedsLayer + 3) += initSize;
added = true;
}
// #TODO rewrite so that it will always hold.
//Assert(!haveSent.contains(msgId));
haveSent.emplace(msgId, request);
sentIdsWrap.messages.push_back(msgId);
scheduleCheckSentRequests = true;
needAnyResponse = true;
} else {
_ackedIds.emplace(msgId, request->requestId);
}
}
if (!added) {
uint32 from = toSendRequest->size(), len = request.messageSize();
toSendRequest->resize(from + len);
memcpy(toSendRequest->data() + from, request->constData() + 4, len * sizeof(mtpPrime));
}
}
toSend.clear();
if (stateRequest) {
const auto msgId = placeToContainer(
toSendRequest,
bigMsgId,
forceNewMsgId,
stateRequest);
_stateAndResendRequests.emplace(msgId, stateRequest);
needAnyResponse = true;
}
if (resendRequest) {
const auto msgId = placeToContainer(
toSendRequest,
bigMsgId,
forceNewMsgId,
resendRequest);
_stateAndResendRequests.emplace(msgId, resendRequest);
needAnyResponse = true;
}
if (ackRequest) {
placeToContainer(
toSendRequest,
bigMsgId,
forceNewMsgId,
ackRequest);
}
if (httpWaitRequest) {
placeToContainer(
toSendRequest,
bigMsgId,
forceNewMsgId,
httpWaitRequest);
}
const auto containerMsgId = prepareToSend(
toSendRequest,
bigMsgId,
forceNewMsgId);
_sentContainers.emplace(containerMsgId, std::move(sentIdsWrap));
if (scheduleCheckSentRequests && !_checkSentRequestsTimer.isActive()) {
_checkSentRequestsTimer.callOnce(kCheckSentRequestTimeout);
}
}
}
sendSecureRequest(std::move(toSendRequest), needAnyResponse);
}
void SessionPrivate::retryByTimer() {
if (_retryTimeout < 3) {
++_retryTimeout;
} else if (_retryTimeout == 3) {
_retryTimeout = 1000;
} else if (_retryTimeout < 64000) {
_retryTimeout *= 2;
}
connectToServer();
}
void SessionPrivate::restartNow() {
_retryTimeout = 1;
_retryTimer.cancel();
restart();
}
void SessionPrivate::connectToServer(bool afterConfig) {
if (afterConfig && (!_testConnections.empty() || _connection)) {
return;
}
destroyAllConnections();
if (realDcTypeChanged() && _keyCreator) {
destroyTemporaryKey();
return;
}
_options = std::make_unique<SessionOptions>(_sessionData->options());
const auto bareDc = BareDcId(_shiftedDcId);
_currentDcType = tryAcquireKeyCreation();
if (_currentDcType == DcType::Cdn && !_instance->isKeysDestroyer()) {
if (!_instance->dcOptions().hasCDNKeysForDc(bareDc)) {
requestCDNConfig();
return;
}
}
if (_options->proxy.type == ProxyData::Type::Mtproto) {
// host, port, secret for mtproto proxy are taken from proxy.
appendTestConnection(DcOptions::Variants::Tcp, {}, 0, {});
} else {
using Variants = DcOptions::Variants;
const auto special = (_currentDcType == DcType::Temporary);
const auto variants = _instance->dcOptions().lookup(
bareDc,
_currentDcType,
_options->proxy.type != ProxyData::Type::None);
const auto useIPv4 = special ? true : _options->useIPv4;
const auto useIPv6 = special ? false : _options->useIPv6;
const auto useTcp = special ? true : _options->useTcp;
const auto useHttp = special ? false : _options->useHttp;
const auto skipAddress = !useIPv4
? Variants::IPv4
: !useIPv6
? Variants::IPv6
: Variants::AddressTypeCount;
const auto skipProtocol = !useTcp
? Variants::Tcp
: !useHttp
? Variants::Http
: Variants::ProtocolCount;
for (auto address = 0; address != Variants::AddressTypeCount; ++address) {
if (address == skipAddress) {
continue;
}
for (auto protocol = 0; protocol != Variants::ProtocolCount; ++protocol) {
if (protocol == skipProtocol) {
continue;
}
for (const auto &endpoint : variants.data[address][protocol]) {
appendTestConnection(
static_cast<Variants::Protocol>(protocol),
QString::fromStdString(endpoint.ip),
endpoint.port,
endpoint.secret);
}
}
}
}
if (_testConnections.empty()) {
if (_instance->isKeysDestroyer()) {
LOG(("MTP Error: DC %1 options for not found for auth key destruction!").arg(_shiftedDcId));
_instance->keyWasPossiblyDestroyed(_shiftedDcId);
return;
} else if (afterConfig) {
LOG(("MTP Error: DC %1 options for not found right after config load!").arg(_shiftedDcId));
return restart();
}
DEBUG_LOG(("MTP Info: DC %1 options not found, waiting for config").arg(_shiftedDcId));
InvokeQueued(_instance, [instance = _instance] {
instance->requestConfig();
});
return;
}
DEBUG_LOG(("Connection Info: Connecting to %1 with %2 test connections."
).arg(_shiftedDcId
).arg(_testConnections.size()));
if (!_startedConnectingAt) {
_startedConnectingAt = crl::now();
} else if (crl::now() - _startedConnectingAt > kRequestConfigTimeout) {
InvokeQueued(_instance, [instance = _instance] {
instance->requestConfigIfOld();
});
}
_retryTimer.cancel();
_waitForConnectedTimer.cancel();
setState(ConnectingState);
_bindMsgId = 0;
_pingId = _pingMsgId = _pingIdToSend = _pingSendAt = 0;
_pingSender.cancel();
_waitForConnectedTimer.callOnce(_waitForConnected);
}
void SessionPrivate::restart() {
DEBUG_LOG(("MTP Info: restarting Connection"));
_waitForReceivedTimer.cancel();
_waitForConnectedTimer.cancel();
doDisconnect();
if (_needSessionReset) {
resetSession();
}
if (_retryTimer.isActive()) {
return;
}
DEBUG_LOG(("MTP Info: restart timeout: %1ms").arg(_retryTimeout));
setState(-_retryTimeout);
}
void SessionPrivate::onSentSome(uint64 size) {
if (!_waitForReceivedTimer.isActive()) {
auto remain = static_cast<uint64>(_waitForReceived);
if (!_oldConnection) {
// 8kb / sec, so 512 kb give 64 sec
auto remainBySize = size * _waitForReceived / 8192;
remain = std::clamp(
remainBySize,
remain,
uint64(kMaxReceiveTimeout));
if (remain != _waitForReceived) {
DEBUG_LOG(("Checking connect for request with size %1 bytes, delay will be %2").arg(size).arg(remain));
}
}
if (isUploadDcId(_shiftedDcId)) {
remain *= kUploadSessionsCount;
}
_waitForReceivedTimer.callOnce(remain);
}
if (!_firstSentAt) {
_firstSentAt = crl::now();
}
}
void SessionPrivate::onReceivedSome() {
if (_oldConnection) {
_oldConnection = false;
DEBUG_LOG(("This connection marked as not old!"));
}
_oldConnectionTimer.callOnce(kMarkConnectionOldTimeout);
_waitForReceivedTimer.cancel();
if (_firstSentAt > 0) {
const auto ms = crl::now() - _firstSentAt;
DEBUG_LOG(("MTP Info: response in %1ms, _waitForReceived: %2ms"
).arg(ms
).arg(_waitForReceived));
if (ms > 0 && ms * 2 < _waitForReceived) {
_waitForReceived = qMax(ms * 2, kMinReceiveTimeout);
}
_firstSentAt = -1;
}
}
void SessionPrivate::markConnectionOld() {
_oldConnection = true;
_waitForReceived = kMinReceiveTimeout;
DEBUG_LOG(("This connection marked as old! _waitForReceived now %1ms"
).arg(_waitForReceived));
}
void SessionPrivate::sendPingByTimer() {
if (_pingId) {
// _pingSendAt: when to send next ping (lastPingAt + kPingSendAfter)
// could be equal to zero.
const auto now = crl::now();
const auto mustSendTill = _pingSendAt
+ kPingSendAfterForce
- kPingSendAfter;
if (mustSendTill < now + 1000) {
LOG(("Could not send ping for some seconds, restarting..."));
return restart();
} else {
_pingSender.callOnce(mustSendTill - now);
}
} else {
_sessionData->queueNeedToResumeAndSend();
}
}
void SessionPrivate::sendPingForce() {
DEBUG_LOG(("MTP Info: send ping force for dcWithShift %1.").arg(_shiftedDcId));
if (!_pingId) {
_pingSendAt = 0;
DEBUG_LOG(("Will send ping!"));
tryToSend();
}
}
void SessionPrivate::waitReceivedFailed() {
Expects(_options != nullptr);
DEBUG_LOG(("MTP Info: bad connection, _waitForReceived: %1ms").arg(_waitForReceived));
if (_waitForReceived < kMaxReceiveTimeout) {
_waitForReceived *= 2;
}
doDisconnect();
if (_retryTimer.isActive()) {
return;
}
DEBUG_LOG(("MTP Info: immediate restart!"));
InvokeQueued(this, [=] { connectToServer(); });
const auto instance = _instance;
const auto shiftedDcId = _shiftedDcId;
InvokeQueued(instance, [=] {
instance->restartedByTimeout(shiftedDcId);
});
}
void SessionPrivate::waitConnectedFailed() {
DEBUG_LOG(("MTP Info: can't connect in %1ms").arg(_waitForConnected));
auto maxTimeout = kMaxConnectedTimeout;
for (const auto &connection : _testConnections) {
accumulate_max(maxTimeout, connection.data->fullConnectTimeout());
}
if (_waitForConnected < maxTimeout) {
_waitForConnected = std::min(maxTimeout, 2 * _waitForConnected);
}
connectingTimedOut();
DEBUG_LOG(("MTP Info: immediate restart!"));
InvokeQueued(this, [=] { connectToServer(); });
}
void SessionPrivate::waitBetterFailed() {
confirmBestConnection();
}
void SessionPrivate::connectingTimedOut() {
for (const auto &connection : _testConnections) {
connection.data->timedOut();
}
doDisconnect();
}
void SessionPrivate::doDisconnect() {
destroyAllConnections();
setState(DisconnectedState);
}
void SessionPrivate::requestCDNConfig() {
InvokeQueued(_instance, [instance = _instance] {
instance->requestCDNConfig();
});
}
void SessionPrivate::handleReceived() {
Expects(_encryptionKey != nullptr);
onReceivedSome();
while (!_connection->received().empty()) {
auto intsBuffer = std::move(_connection->received().front());
_connection->received().pop_front();
constexpr auto kExternalHeaderIntsCount = 6U; // 2 auth_key_id, 4 msg_key
constexpr auto kEncryptedHeaderIntsCount = 8U; // 2 salt, 2 session, 2 msg_id, 1 seq_no, 1 length
constexpr auto kMinimalEncryptedIntsCount = kEncryptedHeaderIntsCount + 4U; // + 1 data + 3 padding
constexpr auto kMinimalIntsCount = kExternalHeaderIntsCount + kMinimalEncryptedIntsCount;
auto intsCount = uint32(intsBuffer.size());
auto ints = intsBuffer.constData();
if ((intsCount < kMinimalIntsCount) || (intsCount > kMaxMessageLength / kIntSize)) {
LOG(("TCP Error: bad message received, len %1").arg(intsCount * kIntSize));
return restart();
}
if (_keyId != *(uint64*)ints) {
LOG(("TCP Error: bad auth_key_id %1 instead of %2 received").arg(_keyId).arg(*(uint64*)ints));
return restart();
}
constexpr auto kMinPaddingSize = 12U;
constexpr auto kMaxPaddingSize = 1024U;
auto encryptedInts = ints + kExternalHeaderIntsCount;
auto encryptedIntsCount = (intsCount - kExternalHeaderIntsCount) & ~0x03U;
auto encryptedBytesCount = encryptedIntsCount * kIntSize;
auto decryptedBuffer = QByteArray(encryptedBytesCount, Qt::Uninitialized);
auto msgKey = *(MTPint128*)(ints + 2);
aesIgeDecrypt(encryptedInts, decryptedBuffer.data(), encryptedBytesCount, _encryptionKey, msgKey);
auto decryptedInts = reinterpret_cast<const mtpPrime*>(decryptedBuffer.constData());
auto serverSalt = *(uint64*)&decryptedInts[0];
auto session = *(uint64*)&decryptedInts[2];
auto msgId = *(uint64*)&decryptedInts[4];
auto seqNo = *(uint32*)&decryptedInts[6];
auto needAck = ((seqNo & 0x01) != 0);
auto messageLength = *(uint32*)&decryptedInts[7];
auto fullDataLength = kEncryptedHeaderIntsCount * kIntSize + messageLength; // Without padding.
// Can underflow, but it is an unsigned type, so we just check the range later.
auto paddingSize = static_cast<uint32>(encryptedBytesCount) - static_cast<uint32>(fullDataLength);
std::array<uchar, 32> sha256Buffer = { { 0 } };
SHA256_CTX msgKeyLargeContext;
SHA256_Init(&msgKeyLargeContext);
SHA256_Update(&msgKeyLargeContext, _encryptionKey->partForMsgKey(false), 32);
SHA256_Update(&msgKeyLargeContext, decryptedInts, encryptedBytesCount);
SHA256_Final(sha256Buffer.data(), &msgKeyLargeContext);
constexpr auto kMsgKeyShift = 8U;
if (ConstTimeIsDifferent(&msgKey, sha256Buffer.data() + kMsgKeyShift, sizeof(msgKey))) {
LOG(("TCP Error: bad SHA256 hash after aesDecrypt in message"));
return restart();
}
if ((messageLength > kMaxMessageLength)
|| (messageLength & 0x03)
|| (paddingSize < kMinPaddingSize)
|| (paddingSize > kMaxPaddingSize)) {
LOG(("TCP Error: bad msg_len received %1, data size: %2").arg(messageLength).arg(encryptedBytesCount));
return restart();
}
if (Logs::DebugEnabled()) {
_connection->logInfo(u"Decrypted message %1,%2,%3 is %4 len"_q
.arg(msgId)
.arg(seqNo)
.arg(Logs::b(needAck))
.arg(fullDataLength));
}
if (session != _sessionId) {
LOG(("MTP Error: bad server session received"));
return restart();
}
const auto serverTime = int32(msgId >> 32);
const auto isReply = ((msgId & 0x03) == 1);
if (!isReply && ((msgId & 0x03) != 3)) {
LOG(("MTP Error: bad msg_id %1 in message received").arg(msgId));
return restart();
}
const auto clientTime = base::unixtime::now();
const auto badTime = (serverTime > clientTime + 60)
|| (serverTime + 300 < clientTime);
if (badTime) {
DEBUG_LOG(("MTP Info: bad server time from msg_id: %1, my time: %2").arg(serverTime).arg(clientTime));
}
bool wasConnected = (getState() == ConnectedState);
if (serverSalt != _sessionSalt) {
if (!badTime) {
DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2, updating...").arg(serverSalt).arg(_sessionSalt));
_sessionSalt = serverSalt;
if (setState(ConnectedState, ConnectingState)) {
resendAll();
}
} else {
DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2").arg(serverSalt).arg(_sessionSalt));
}
} else {
serverSalt = 0; // dont pass to handle method, so not to lock in setSalt()
}
if (needAck) _ackRequestData.push_back(MTP_long(msgId));
auto res = HandleResult::Success; // if no need to handle, then succeed
auto from = decryptedInts + kEncryptedHeaderIntsCount;
auto end = from + (messageLength / kIntSize);
auto sfrom = decryptedInts + 4U; // msg_id + seq_no + length + message
MTP_LOG(_shiftedDcId, ("Recv: ")
+ DumpToText(sfrom, end)
+ QString(" (dc:%1,key:%2)"
).arg(AbstractConnection::ProtocolDcDebugId(getProtocolDcId())
).arg(_encryptionKey->keyId()));
const auto registered = _receivedMessageIds.registerMsgId(
msgId,
needAck);
if (registered == ReceivedIdsManager::Result::Success) {
res = handleOneReceived(from, end, msgId, {
.outerMsgId = msgId,
.serverSalt = serverSalt,
.serverTime = serverTime,
.badTime = badTime,
});
} else if (registered == ReceivedIdsManager::Result::TooOld) {
res = HandleResult::ResetSession;
}
_receivedMessageIds.shrink();
// send acks
if (const auto toAckSize = _ackRequestData.size()) {
DEBUG_LOG(("MTP Info: will send %1 acks, ids: %2").arg(toAckSize).arg(LogIdsVector(_ackRequestData)));
_sessionData->queueSendAnything(kAckSendWaiting);
}
auto lock = QReadLocker(_sessionData->haveReceivedMutex());
const auto tryToReceive = !_sessionData->haveReceivedMessages().empty();
lock.unlock();
if (tryToReceive) {
DEBUG_LOG(("MTP Info: queueTryToReceive() - need to parse in another thread, %1 messages.").arg(_sessionData->haveReceivedMessages().size()));
_sessionData->queueTryToReceive();
}
if (res != HandleResult::Success && res != HandleResult::Ignored) {
if (res == HandleResult::DestroyTemporaryKey) {
destroyTemporaryKey();
} else if (res == HandleResult::ResetSession) {
_needSessionReset = true;
}
return restart();
}
_retryTimeout = 1; // reset restart() timer
_startedConnectingAt = crl::time(0);
if (!wasConnected) {
if (getState() == ConnectedState) {
_sessionData->queueNeedToResumeAndSend();
}
}
}
if (_connection->needHttpWait()) {
_sessionData->queueSendAnything();
}
}
SessionPrivate::HandleResult SessionPrivate::handleOneReceived(
const mtpPrime *from,
const mtpPrime *end,
uint64 msgId,
OuterInfo info) {
Expects(from < end);
switch (mtpTypeId(*from)) {
case mtpc_gzip_packed: {
DEBUG_LOG(("Message Info: gzip container"));
mtpBuffer response = ungzip(++from, end);
if (response.empty()) {
return HandleResult::RestartConnection;
}
return handleOneReceived(response.data(), response.data() + response.size(), msgId, info);
}
case mtpc_msg_container: {
if (++from >= end) {
return HandleResult::ParseError;
}
const mtpPrime *otherEnd;
const auto msgsCount = (uint32)*(from++);
DEBUG_LOG(("Message Info: container received, count: %1").arg(msgsCount));
for (uint32 i = 0; i < msgsCount; ++i) {
if (from + 4 >= end) {
return HandleResult::ParseError;
}
otherEnd = from + 4;
MTPlong inMsgId;
if (!inMsgId.read(from, otherEnd)) {
return HandleResult::ParseError;
}
bool isReply = ((inMsgId.v & 0x03) == 1);
if (!isReply && ((inMsgId.v & 0x03) != 3)) {
LOG(("Message Error: bad msg_id %1 in contained message received").arg(inMsgId.v));
return HandleResult::RestartConnection;
}
MTPint inSeqNo;
if (!inSeqNo.read(from, otherEnd)) {
return HandleResult::ParseError;
}
MTPint bytes;
if (!bytes.read(from, otherEnd)) {
return HandleResult::ParseError;
}
if ((bytes.v & 0x03) || bytes.v < 4) {
LOG(("Message Error: bad length %1 of contained message received").arg(bytes.v));
return HandleResult::RestartConnection;
}
bool needAck = (inSeqNo.v & 0x01);
if (needAck) _ackRequestData.push_back(inMsgId);
DEBUG_LOG(("Message Info: message from container, msg_id: %1, needAck: %2").arg(inMsgId.v).arg(Logs::b(needAck)));
otherEnd = from + (bytes.v >> 2);
if (otherEnd > end) {
return HandleResult::ParseError;
}
auto res = HandleResult::Success; // if no need to handle, then succeed
const auto registered = _receivedMessageIds.registerMsgId(
inMsgId.v,
needAck);
if (registered == ReceivedIdsManager::Result::Success) {
res = handleOneReceived(from, otherEnd, inMsgId.v, info);
info.badTime = false;
} else if (registered == ReceivedIdsManager::Result::TooOld) {
res = HandleResult::ResetSession;
}
if (res != HandleResult::Success) {
return res;
}
from = otherEnd;
}
} return HandleResult::Success;
case mtpc_msgs_ack: {
MTPMsgsAck msg;
if (!msg.read(from, end)) {
return HandleResult::ParseError;
}
const auto &ids = msg.c_msgs_ack().vmsg_ids().v;
DEBUG_LOG(("Message Info: acks received, ids: %1"
).arg(LogIdsVector(ids)));
if (ids.isEmpty()) {
return info.badTime ? HandleResult::Ignored : HandleResult::Success;
}
if (info.badTime) {
if (!requestsFixTimeSalt(ids, info)) {
return HandleResult::Ignored;
}
} else {
correctUnixtimeByFastRequest(ids, info.serverTime);
}
requestsAcked(ids);
} return HandleResult::Success;
case mtpc_bad_msg_notification: {
MTPBadMsgNotification msg;
if (!msg.read(from, end)) {
return HandleResult::ParseError;
}
const auto &data(msg.c_bad_msg_notification());
LOG(("Message Info: bad message notification received (error_code %3) for msg_id = %1, seq_no = %2").arg(data.vbad_msg_id().v).arg(data.vbad_msg_seqno().v).arg(data.verror_code().v));
const auto resendId = data.vbad_msg_id().v;
const auto errorCode = data.verror_code().v;
if (false
|| errorCode == 16
|| errorCode == 17
|| errorCode == 32
|| errorCode == 33
|| errorCode == 64) { // can handle
const auto needResend = false
|| (errorCode == 16) // bad msg_id
|| (errorCode == 17) // bad msg_id
|| (errorCode == 64); // bad container
if (errorCode == 64) { // bad container!
if (Logs::DebugEnabled()) {
const auto i = _sentContainers.find(resendId);
if (i == _sentContainers.end()) {
LOG(("Message Error: Container not found!"));
} else {
auto idsList = QStringList();
for (const auto innerMsgId : i->second.messages) {
idsList.push_back(QString::number(innerMsgId));
}
LOG(("Message Info: bad container received! messages: %1").arg(idsList.join(',')));
}
}
}
if (!wasSent(resendId)) {
DEBUG_LOG(("Message Error: "
"such message was not sent recently %1").arg(resendId));
return info.badTime
? HandleResult::Ignored
: HandleResult::Success;
}
if (needResend) { // bad msg_id or bad container
if (info.serverSalt) {
_sessionSalt = info.serverSalt;
}
correctUnixtimeWithBadLocal(info.serverTime);
DEBUG_LOG(("Message Info: unixtime updated, now %1, resending in container...").arg(info.serverTime));
resend(resendId);
} else { // must create new session, because msg_id and msg_seqno are inconsistent
if (info.badTime) {
if (info.serverSalt) {
_sessionSalt = info.serverSalt;
}
correctUnixtimeWithBadLocal(info.serverTime);
info.badTime = false;
}
LOG(("Message Info: bad message notification received, msgId %1, error_code %2").arg(data.vbad_msg_id().v).arg(errorCode));
return HandleResult::ResetSession;
}
} else { // fatal (except 48, but it must not get here)
const auto badMsgId = mtpMsgId(data.vbad_msg_id().v);
const auto requestId = wasSent(resendId);
if (requestId) {
LOG(("Message Error: "
"fatal bad message notification received, "
"msgId %1, error_code %2, requestId: %3"
).arg(badMsgId
).arg(errorCode
).arg(requestId));
auto reply = mtpBuffer();
MTPRpcError(MTP_rpc_error(
MTP_int(500),
MTP_string("PROTOCOL_ERROR")
)).write(reply);
// Save rpc_error for processing in the main thread.
QWriteLocker locker(_sessionData->haveReceivedMutex());
_sessionData->haveReceivedMessages().push_back({
.reply = std::move(reply),
.outerMsgId = info.outerMsgId,
.requestId = requestId,
});
} else {
DEBUG_LOG(("Message Error: "
"such message was not sent recently %1").arg(badMsgId));
}
return info.badTime
? HandleResult::Ignored
: HandleResult::Success;
}
} return HandleResult::Success;
case mtpc_bad_server_salt: {
MTPBadMsgNotification msg;
if (!msg.read(from, end)) {
return HandleResult::ParseError;
}
const auto &data = msg.c_bad_server_salt();
DEBUG_LOG(("Message Info: bad server salt received (error_code %4) for msg_id = %1, seq_no = %2, new salt: %3").arg(data.vbad_msg_id().v).arg(data.vbad_msg_seqno().v).arg(data.vnew_server_salt().v).arg(data.verror_code().v));
const auto resendId = data.vbad_msg_id().v;
if (!wasSent(resendId)) {
DEBUG_LOG(("Message Error: such message was not sent recently %1").arg(resendId));
return (info.badTime ? HandleResult::Ignored : HandleResult::Success);
}
_sessionSalt = data.vnew_server_salt().v;
correctUnixtimeWithBadLocal(info.serverTime);
if (setState(ConnectedState, ConnectingState)) {
resendAll();
}
info.badTime = false;
DEBUG_LOG(("Message Info: unixtime updated, now %1, server_salt updated, now %2, resending...").arg(info.serverTime).arg(info.serverSalt));
resend(resendId);
} return HandleResult::Success;
case mtpc_msgs_state_info: {
MTPMsgsStateInfo msg;
if (!msg.read(from, end)) {
return HandleResult::ParseError;
}
auto &data = msg.c_msgs_state_info();
auto reqMsgId = data.vreq_msg_id().v;
auto &states = data.vinfo().v;
DEBUG_LOG(("Message Info: msg state received, msgId %1, reqMsgId: %2, HEX states %3").arg(msgId).arg(reqMsgId).arg(Logs::mb(states.data(), states.length()).str()));
const auto i = _stateAndResendRequests.find(reqMsgId);
if (i == _stateAndResendRequests.end()) {
DEBUG_LOG(("Message Error: such message was not sent recently %1").arg(reqMsgId));
return info.badTime
? HandleResult::Ignored
: HandleResult::Success;
}
if (info.badTime) {
if (info.serverSalt) {
_sessionSalt = info.serverSalt; // requestsFixTimeSalt with no lookup
}
correctUnixtimeWithBadLocal(info.serverTime);
DEBUG_LOG(("Message Info: unixtime updated from mtpc_msgs_state_info, now %1").arg(info.serverTime));
info.badTime = false;
}
const auto originalRequest = i->second;
Assert(originalRequest->size() > 8);
requestsAcked(QVector<MTPlong>(1, MTP_long(reqMsgId)), true);
auto rFrom = originalRequest->constData() + 8;
const auto rEnd = originalRequest->constData() + originalRequest->size();
if (mtpTypeId(*rFrom) == mtpc_msgs_state_req) {
MTPMsgsStateReq request;
if (!request.read(rFrom, rEnd)) {
LOG(("Message Error: could not parse sent msgs_state_req"));
return HandleResult::ParseError;
}
handleMsgsStates(request.c_msgs_state_req().vmsg_ids().v, states);
} else {
MTPMsgResendReq request;
if (!request.read(rFrom, rEnd)) {
LOG(("Message Error: could not parse sent msgs_resend_req"));
return HandleResult::ParseError;
}
handleMsgsStates(request.c_msg_resend_req().vmsg_ids().v, states);
}
} return HandleResult::Success;
case mtpc_msgs_all_info: {
if (info.badTime) {
DEBUG_LOG(("Message Info: skipping with bad time..."));
return HandleResult::Ignored;
}
MTPMsgsAllInfo msg;
if (!msg.read(from, end)) {
return HandleResult::ParseError;
}
auto &data = msg.c_msgs_all_info();
auto &ids = data.vmsg_ids().v;
auto &states = data.vinfo().v;
DEBUG_LOG(("Message Info: msgs all info received, msgId %1, reqMsgIds: %2, states %3").arg(
QString::number(msgId),
LogIdsVector(ids),
Logs::mb(states.data(), states.length()).str()));
handleMsgsStates(ids, states);
} return HandleResult::Success;
case mtpc_msg_detailed_info: {
MTPMsgDetailedInfo msg;
if (!msg.read(from, end)) {
return HandleResult::ParseError;
}
const auto &data(msg.c_msg_detailed_info());
DEBUG_LOG(("Message Info: msg detailed info, sent msgId %1, answerId %2, status %3, bytes %4").arg(data.vmsg_id().v).arg(data.vanswer_msg_id().v).arg(data.vstatus().v).arg(data.vbytes().v));
QVector<MTPlong> ids(1, data.vmsg_id());
if (info.badTime) {
if (requestsFixTimeSalt(ids, info)) {
info.badTime = false;
} else {
DEBUG_LOG(("Message Info: error, such message was not sent recently %1").arg(data.vmsg_id().v));
return HandleResult::Ignored;
}
}
requestsAcked(ids);
const auto resMsgId = data.vanswer_msg_id();
if (_receivedMessageIds.lookup(resMsgId.v) != ReceivedIdsManager::State::NotFound) {
_ackRequestData.push_back(resMsgId);
} else {
DEBUG_LOG(("Message Info: answer message %1 was not received, requesting...").arg(resMsgId.v));
_resendRequestData.push_back(resMsgId);
}
} return HandleResult::Success;
case mtpc_msg_new_detailed_info: {
if (info.badTime) {
DEBUG_LOG(("Message Info: skipping msg_new_detailed_info with bad time..."));
return HandleResult::Ignored;
}
MTPMsgDetailedInfo msg;
if (!msg.read(from, end)) {
return HandleResult::ParseError;
}
const auto &data(msg.c_msg_new_detailed_info());
DEBUG_LOG(("Message Info: msg new detailed info, answerId %2, status %3, bytes %4").arg(data.vanswer_msg_id().v).arg(data.vstatus().v).arg(data.vbytes().v));
const auto resMsgId = data.vanswer_msg_id();
if (_receivedMessageIds.lookup(resMsgId.v) != ReceivedIdsManager::State::NotFound) {
_ackRequestData.push_back(resMsgId);
} else {
DEBUG_LOG(("Message Info: answer message %1 was not received, requesting...").arg(resMsgId.v));
_resendRequestData.push_back(resMsgId);
}
} return HandleResult::Success;
case mtpc_rpc_result: {
if (from + 3 > end) {
return HandleResult::ParseError;
}
auto response = mtpBuffer();
MTPlong reqMsgId;
if (!reqMsgId.read(++from, end)) {
return HandleResult::ParseError;
}
const auto requestMsgId = reqMsgId.v;
DEBUG_LOG(("RPC Info: response received for %1, queueing...").arg(requestMsgId));
QVector<MTPlong> ids(1, reqMsgId);
if (info.badTime) {
if (requestsFixTimeSalt(ids, info)) {
info.badTime = false;
} else {
DEBUG_LOG(("Message Info: error, such message was not sent recently %1").arg(requestMsgId));
return HandleResult::Ignored;
}
}
mtpTypeId typeId = from[0];
if (typeId == mtpc_gzip_packed) {
DEBUG_LOG(("RPC Info: gzip container"));
response = ungzip(++from, end);
if (response.empty()) {
return HandleResult::RestartConnection;
}
typeId = response[0];
} else {
response.resize(end - from);
memcpy(response.data(), from, (end - from) * sizeof(mtpPrime));
}
if (typeId == mtpc_rpc_error) {
if (IsDestroyedTemporaryKeyError(response)) {
return HandleResult::DestroyTemporaryKey;
}
// An error could be some RPC_CALL_FAIL or other error inside
// the initConnection, so we're not sure yet that it was inited.
// Wait till a good response is received.
} else {
_sessionData->notifyConnectionInited(*_options);
}
requestsAcked(ids, true);
const auto bindResult = handleBindResponse(requestMsgId, response);
if (bindResult != HandleResult::Ignored) {
return bindResult;
}
const auto requestId = wasSent(requestMsgId);
if (requestId && requestId != mtpRequestId(0xFFFFFFFF)) {
// Save rpc_result for processing in the main thread.
QWriteLocker locker(_sessionData->haveReceivedMutex());
_sessionData->haveReceivedMessages().push_back({
.reply = std::move(response),
.outerMsgId = info.outerMsgId,
.requestId = requestId,
});
} else {
DEBUG_LOG(("RPC Info: requestId not found for msgId %1").arg(requestMsgId));
}
} return HandleResult::Success;
case mtpc_new_session_created: {
const mtpPrime *start = from;
MTPNewSession msg;
if (!msg.read(from, end)) {
return HandleResult::ParseError;
}
const auto &data(msg.c_new_session_created());
if (info.badTime) {
if (requestsFixTimeSalt(QVector<MTPlong>(1, data.vfirst_msg_id()), info)) {
info.badTime = false;
} else {
DEBUG_LOG(("Message Info: error, such message was not sent recently %1").arg(data.vfirst_msg_id().v));
return HandleResult::Ignored;
}
}
DEBUG_LOG(("Message Info: new server session created, unique_id %1, first_msg_id %2, server_salt %3").arg(data.vunique_id().v).arg(data.vfirst_msg_id().v).arg(data.vserver_salt().v));
_sessionSalt = data.vserver_salt().v;
mtpMsgId firstMsgId = data.vfirst_msg_id().v;
QVector<quint64> toResend;
{
QReadLocker locker(_sessionData->haveSentMutex());
const auto &haveSent = _sessionData->haveSentMap();
toResend.reserve(haveSent.size());
for (const auto &[msgId, request] : haveSent) {
if (msgId >= firstMsgId) {
break;
} else if (request->requestId) {
toResend.push_back(msgId);
}
}
}
for (const auto msgId : toResend) {
resend(msgId, 10);
}
mtpBuffer update(from - start);
if (from > start) memcpy(update.data(), start, (from - start) * sizeof(mtpPrime));
// Notify main process about new session - need to get difference.
QWriteLocker locker(_sessionData->haveReceivedMutex());
_sessionData->haveReceivedMessages().push_back({
.reply = update,
.outerMsgId = info.outerMsgId,
});
} return HandleResult::Success;
case mtpc_pong: {
MTPPong msg;
if (!msg.read(from, end)) {
return HandleResult::ParseError;
}
const auto &data(msg.c_pong());
DEBUG_LOG(("Message Info: pong received, msg_id: %1, ping_id: %2").arg(data.vmsg_id().v).arg(data.vping_id().v));
if (!wasSent(data.vmsg_id().v)) {
DEBUG_LOG(("Message Error: such msg_id %1 ping_id %2 was not sent recently").arg(data.vmsg_id().v).arg(data.vping_id().v));
return HandleResult::Ignored;
}
if (data.vping_id().v == _pingId) {
_pingId = 0;
} else {
DEBUG_LOG(("Message Info: just pong..."));
}
QVector<MTPlong> ids(1, data.vmsg_id());
if (info.badTime) {
if (requestsFixTimeSalt(ids, info)) {
info.badTime = false;
} else {
return HandleResult::Ignored;
}
}
requestsAcked(ids, true);
} return HandleResult::Success;
}
if (info.badTime) {
DEBUG_LOG(("Message Error: bad time in updates cons, must create new session"));
return HandleResult::ResetSession;
}
if (_currentDcType == DcType::Regular) {
mtpBuffer update(end - from);
if (end > from) {
memcpy(update.data(), from, (end - from) * sizeof(mtpPrime));
}
// Notify main process about the new updates.
QWriteLocker locker(_sessionData->haveReceivedMutex());
_sessionData->haveReceivedMessages().push_back({
.reply = update,
.outerMsgId = info.outerMsgId,
});
} else {
LOG(("Message Error: unexpected updates in dcType: %1"
).arg(static_cast<int>(_currentDcType)));
}
return HandleResult::Success;
}
SessionPrivate::HandleResult SessionPrivate::handleBindResponse(
mtpMsgId requestMsgId,
const mtpBuffer &response) {
if (!_keyCreator || !_bindMsgId || _bindMsgId != requestMsgId) {
return HandleResult::Ignored;
}
_bindMsgId = 0;
const auto result = _keyCreator->handleBindResponse(response);
switch (result) {
case DcKeyBindState::Success:
if (!_sessionData->releaseKeyCreationOnDone(
_encryptionKey,
base::take(_keyCreator)->bindPersistentKey())) {
return HandleResult::DestroyTemporaryKey;
}
_sessionData->queueNeedToResumeAndSend();
return HandleResult::Success;
case DcKeyBindState::DefinitelyDestroyed:
if (destroyOldEnoughPersistentKey()) {
return HandleResult::DestroyTemporaryKey;
}
[[fallthrough]];
case DcKeyBindState::Failed:
_sessionData->queueNeedToResumeAndSend();
return HandleResult::Success;
}
Unexpected("Result of BoundKeyCreator::handleBindResponse.");
}
mtpBuffer SessionPrivate::ungzip(const mtpPrime *from, const mtpPrime *end) const {
mtpBuffer result; // * 4 because of mtpPrime type
result.resize(0);
MTPstring packed;
if (!packed.read(from, end)) { // read packed string as serialized mtp string type
LOG(("RPC Error: could not read gziped bytes."));
return result;
}
uint32 packedLen = packed.v.size(), unpackedChunk = packedLen;
z_stream stream;
stream.zalloc = 0;
stream.zfree = 0;
stream.opaque = 0;
stream.avail_in = 0;
stream.next_in = 0;
int res = inflateInit2(&stream, 16 + MAX_WBITS);
if (res != Z_OK) {
LOG(("RPC Error: could not init zlib stream, code: %1").arg(res));
return result;
}
stream.avail_in = packedLen;
stream.next_in = reinterpret_cast<Bytef*>(packed.v.data());
stream.avail_out = 0;
while (!stream.avail_out) {
result.resize(result.size() + unpackedChunk);
stream.avail_out = unpackedChunk * sizeof(mtpPrime);
stream.next_out = (Bytef*)&result[result.size() - unpackedChunk];
int res = inflate(&stream, Z_NO_FLUSH);
if (res != Z_OK && res != Z_STREAM_END) {
inflateEnd(&stream);
LOG(("RPC Error: could not unpack gziped data, code: %1").arg(res));
DEBUG_LOG(("RPC Error: bad gzip: %1").arg(Logs::mb(packed.v.constData(), packedLen).str()));
return mtpBuffer();
}
}
if (stream.avail_out & 0x03) {
uint32 badSize = result.size() * sizeof(mtpPrime) - stream.avail_out;
LOG(("RPC Error: bad length of unpacked data %1").arg(badSize));
DEBUG_LOG(("RPC Error: bad unpacked data %1").arg(Logs::mb(result.data(), badSize).str()));
return mtpBuffer();
}
result.resize(result.size() - (stream.avail_out >> 2));
inflateEnd(&stream);
if (!result.size()) {
LOG(("RPC Error: bad length of unpacked data 0"));
}
return result;
}
bool SessionPrivate::requestsFixTimeSalt(const QVector<MTPlong> &ids, const OuterInfo &info) {
for (const auto &id : ids) {
if (wasSent(id.v)) {
// Found such msg_id in recent acked or in recent sent requests.
if (info.serverSalt) {
_sessionSalt = info.serverSalt;
}
correctUnixtimeWithBadLocal(info.serverTime);
return true;
}
}
return false;
}
void SessionPrivate::correctUnixtimeByFastRequest(
const QVector<MTPlong> &ids,
TimeId serverTime) {
const auto now = crl::now();
QReadLocker locker(_sessionData->haveSentMutex());
const auto &haveSent = _sessionData->haveSentMap();
for (const auto &id : ids) {
const auto i = haveSent.find(id.v);
if (i == haveSent.end()) {
continue;
}
const auto duration = (now - i->second->lastSentTime);
if (duration < 0 || duration > SyncTimeRequestDuration) {
continue;
}
locker.unlock();
SyncTimeRequestDuration = duration;
base::unixtime::update(serverTime, true);
return;
}
}
void SessionPrivate::correctUnixtimeWithBadLocal(TimeId serverTime) {
SyncTimeRequestDuration = kFastRequestDuration;
base::unixtime::update(serverTime, true);
}
void SessionPrivate::requestsAcked(const QVector<MTPlong> &ids, bool byResponse) {
DEBUG_LOG(("Message Info: requests acked, ids %1").arg(LogIdsVector(ids)));
QVector<MTPlong> toAckMore;
{
QWriteLocker locker2(_sessionData->haveSentMutex());
auto &haveSent = _sessionData->haveSentMap();
for (const auto &wrappedMsgId : ids) {
const auto msgId = wrappedMsgId.v;
if (const auto i = _sentContainers.find(msgId); i != end(_sentContainers)) {
DEBUG_LOG(("Message Info: container ack received, msgId %1").arg(msgId));
const auto &list = i->second.messages;
toAckMore.reserve(toAckMore.size() + list.size());
for (const auto msgId : list) {
toAckMore.push_back(MTP_long(msgId));
}
_sentContainers.erase(i);
continue;
}
if (const auto i = _stateAndResendRequests.find(msgId); i != end(_stateAndResendRequests)) {
_stateAndResendRequests.erase(i);
continue;
}
if (const auto i = haveSent.find(msgId); i != end(haveSent)) {
const auto requestId = i->second->requestId;
if (!byResponse && _instance->hasCallback(requestId)) {
DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(requestId));
continue;
}
haveSent.erase(i);
_ackedIds.emplace(msgId, requestId);
continue;
}
DEBUG_LOG(("Message Info: msgId %1 was not found in recent sent, while acking requests, searching in resend...").arg(msgId));
if (const auto i = _resendingIds.find(msgId); i != end(_resendingIds)) {
const auto requestId = i->second;
if (!byResponse && _instance->hasCallback(requestId)) {
DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(requestId));
continue;
}
_resendingIds.erase(i);
QWriteLocker locker4(_sessionData->toSendMutex());
auto &toSend = _sessionData->toSendMap();
const auto j = toSend.find(requestId);
if (j == end(toSend)) {
DEBUG_LOG(("Message Info: msgId %1 was found in recent resent, requestId %2 was not found in prepared to send").arg(msgId).arg(requestId));
continue;
}
if (j->second->requestId != requestId) {
DEBUG_LOG(("Message Error: for msgId %1 found resent request, requestId %2, contains requestId %3").arg(msgId).arg(requestId).arg(j->second->requestId));
} else {
DEBUG_LOG(("Message Info: acked msgId %1 that was prepared to resend, requestId %2").arg(msgId).arg(requestId));
}
_ackedIds.emplace(msgId, j->second->requestId);
toSend.erase(j);
continue;
}
DEBUG_LOG(("Message Info: msgId %1 was not found in recent resent either").arg(msgId));
}
}
auto ackedCount = _ackedIds.size();
if (ackedCount > kIdsBufferSize) {
DEBUG_LOG(("Message Info: removing some old acked sent msgIds %1").arg(ackedCount - kIdsBufferSize));
while (ackedCount-- > kIdsBufferSize) {
_ackedIds.erase(_ackedIds.begin());
}
}
if (toAckMore.size()) {
requestsAcked(toAckMore);
}
}
void SessionPrivate::handleMsgsStates(const QVector<MTPlong> &ids, const QByteArray &states) {
const auto idsCount = ids.size();
if (!idsCount) {
DEBUG_LOG(("Message Info: void ids vector in handleMsgsStates()"));
return;
}
if (states.size() != idsCount) {
LOG(("Message Error: got less states than required ids count."));
return;
}
auto acked = QVector<MTPlong>();
acked.reserve(idsCount);
for (auto i = 0; i != idsCount; ++i) {
const auto state = states[i];
const auto requestMsgId = ids[i].v;
{
QReadLocker locker(_sessionData->haveSentMutex());
if (!_sessionData->haveSentMap().contains(requestMsgId)) {
DEBUG_LOG(("Message Info: state was received for msgId %1, but request is not found, looking in resent requests...").arg(requestMsgId));
const auto reqIt = _resendingIds.find(requestMsgId);
if (reqIt != _resendingIds.cend()) {
if ((state & 0x07) != 0x04) { // was received
DEBUG_LOG(("Message Info: state was received for msgId %1, state %2, already resending in container").arg(requestMsgId).arg((int32)state));
} else {
DEBUG_LOG(("Message Info: state was received for msgId %1, state %2, ack, cancelling resend").arg(requestMsgId).arg((int32)state));
acked.push_back(MTP_long(requestMsgId)); // will remove from resend in requestsAcked
}
} else {
DEBUG_LOG(("Message Info: msgId %1 was not found in recent resent either").arg(requestMsgId));
}
continue;
}
}
if ((state & 0x07) != 0x04) { // was received
DEBUG_LOG(("Message Info: state was received for msgId %1, state %2, resending in container").arg(requestMsgId).arg((int32)state));
resend(requestMsgId, 10);
} else {
DEBUG_LOG(("Message Info: state was received for msgId %1, state %2, ack").arg(requestMsgId).arg((int32)state));
acked.push_back(MTP_long(requestMsgId));
}
}
requestsAcked(acked);
}
void SessionPrivate::clearSpecialMsgId(mtpMsgId msgId) {
if (msgId == _pingMsgId) {
_pingMsgId = 0;
_pingId = 0;
} else if (msgId == _bindMsgId) {
_bindMsgId = 0;
}
}
void SessionPrivate::resend(mtpMsgId msgId, crl::time msCanWait) {
const auto guard = gsl::finally([&] {
clearSpecialMsgId(msgId);
if (msCanWait >= 0) {
_sessionData->queueSendAnything(msCanWait);
}
});
if (const auto i = _sentContainers.find(msgId); i != end(_sentContainers)) {
DEBUG_LOG(("Message Info: resending container, msgId %1").arg(msgId));
const auto ids = std::move(i->second.messages);
_sentContainers.erase(i);
for (const auto innerMsgId : ids) {
resend(innerMsgId, -1);
}
return;
}
auto lock = QWriteLocker(_sessionData->haveSentMutex());
auto &haveSent = _sessionData->haveSentMap();
auto i = haveSent.find(msgId);
if (i == haveSent.end()) {
return;
}
auto request = i->second;
haveSent.erase(i);
lock.unlock();
request->lastSentTime = crl::now();
request->forceSendInContainer = true;
_resendingIds.emplace(msgId, request->requestId);
{
QWriteLocker locker(_sessionData->toSendMutex());
_sessionData->toSendMap().emplace(request->requestId, request);
}
}
void SessionPrivate::resendAll() {
auto lock = QWriteLocker(_sessionData->haveSentMutex());
auto haveSent = base::take(_sessionData->haveSentMap());
lock.unlock();
{
auto lock = QWriteLocker(_sessionData->toSendMutex());
auto &toSend = _sessionData->toSendMap();
const auto now = crl::now();
for (auto &[msgId, request] : haveSent) {
const auto requestId = request->requestId;
request->lastSentTime = now;
request->forceSendInContainer = true;
_resendingIds.emplace(msgId, requestId);
toSend.emplace(requestId, std::move(request));
}
}
_sessionData->queueSendAnything();
}
void SessionPrivate::onConnected(
not_null<AbstractConnection*> connection) {
disconnect(connection, &AbstractConnection::connected, nullptr, nullptr);
if (!connection->isConnected()) {
LOG(("Connection Error: not connected in onConnected(), "
"state: %1").arg(connection->debugState()));
return restart();
}
_waitForConnected = kMinConnectedTimeout;
_waitForConnectedTimer.cancel();
const auto i = ranges::find(
_testConnections,
connection.get(),
[](const TestConnection &test) { return test.data.get(); });
Assert(i != end(_testConnections));
const auto my = i->priority;
const auto j = ranges::find_if(
_testConnections,
[&](const TestConnection &test) { return test.priority > my; });
if (j != end(_testConnections)) {
DEBUG_LOG(("MTP Info: connection %1 succeed, waiting for %2.").arg(
i->data->tag(),
j->data->tag()));
_waitForBetterTimer.callOnce(kWaitForBetterTimeout);
} else {
DEBUG_LOG(("MTP Info: connection through IPv4 succeed."));
_waitForBetterTimer.cancel();
_connection = std::move(i->data);
_testConnections.clear();
checkAuthKey();
}
}
void SessionPrivate::onDisconnected(
not_null<AbstractConnection*> connection) {
removeTestConnection(connection);
if (_testConnections.empty()) {
destroyAllConnections();
restart();
} else {
confirmBestConnection();
}
}
void SessionPrivate::confirmBestConnection() {
if (_waitForBetterTimer.isActive()) {
return;
}
const auto i = ranges::max_element(
_testConnections,
std::less<>(),
[](const TestConnection &test) {
return test.data->isConnected() ? test.priority : -1;
});
Assert(i != end(_testConnections));
if (!i->data->isConnected()) {
return;
}
DEBUG_LOG(("MTP Info: can't connect through better, using %1."
).arg(i->data->tag()));
_connection = std::move(i->data);
_testConnections.clear();
checkAuthKey();
}
void SessionPrivate::removeTestConnection(
not_null<AbstractConnection*> connection) {
_testConnections.erase(
ranges::remove(
_testConnections,
connection.get(),
[](const TestConnection &test) { return test.data.get(); }),
end(_testConnections));
}
void SessionPrivate::checkAuthKey() {
if (_keyId) {
authKeyChecked();
} else if (_instance->isKeysDestroyer()) {
applyAuthKey(_sessionData->getPersistentKey());
} else {
applyAuthKey(_sessionData->getTemporaryKey(
TemporaryKeyTypeByDcType(_currentDcType)));
}
}
void SessionPrivate::updateAuthKey() {
if (_instance->isKeysDestroyer() || _keyCreator || !_connection) {
return;
}
DEBUG_LOG(("AuthKey Info: Connection updating key from Session, dc %1"
).arg(_shiftedDcId));
applyAuthKey(_sessionData->getTemporaryKey(
TemporaryKeyTypeByDcType(_currentDcType)));
}
void SessionPrivate::setCurrentKeyId(uint64 newKeyId) {
if (_keyId == newKeyId) {
return;
}
_keyId = newKeyId;
DEBUG_LOG(("MTP Info: auth key id set to id %1").arg(newKeyId));
changeSessionId();
}
void SessionPrivate::applyAuthKey(AuthKeyPtr &&encryptionKey) {
_encryptionKey = std::move(encryptionKey);
const auto newKeyId = _encryptionKey ? _encryptionKey->keyId() : 0;
if (_keyId) {
if (_keyId == newKeyId) {
return;
}
setCurrentKeyId(0);
DEBUG_LOG(("MTP Info: auth_key id for dc %1 changed, restarting..."
).arg(_shiftedDcId));
if (_connection) {
restart();
}
return;
}
if (!_connection) {
return;
}
setCurrentKeyId(newKeyId);
Assert(!_connection->sentEncryptedWithKeyId());
DEBUG_LOG(("AuthKey Info: Connection update key from Session, "
"dc %1 result: %2"
).arg(_shiftedDcId
).arg(Logs::mb(&_keyId, sizeof(_keyId)).str()));
if (_keyId) {
return authKeyChecked();
}
if (_instance->isKeysDestroyer()) {
// We are here to destroy an old key, so we're done.
LOG(("MTP Error: No key %1 in updateAuthKey() for destroying."
).arg(_shiftedDcId));
_instance->keyWasPossiblyDestroyed(_shiftedDcId);
} else if (noMediaKeyWithExistingRegularKey()) {
DEBUG_LOG(("AuthKey Info: No key in updateAuthKey() for media, "
"but someone has created regular, trying to acquire."));
const auto dcType = tryAcquireKeyCreation();
if (_keyCreator && dcType != _currentDcType) {
DEBUG_LOG(("AuthKey Info: "
"Dc type changed for creation, restarting."));
restart();
return;
}
}
if (_keyCreator) {
DEBUG_LOG(("AuthKey Info: No key in updateAuthKey(), creating."));
_keyCreator->start(
BareDcId(_shiftedDcId),
getProtocolDcId(),
_connection.get(),
&_instance->dcOptions());
} else {
DEBUG_LOG(("AuthKey Info: No key in updateAuthKey(), "
"but someone is creating already, waiting."));
}
}
bool SessionPrivate::noMediaKeyWithExistingRegularKey() const {
return (TemporaryKeyTypeByDcType(_currentDcType)
== TemporaryKeyType::MediaCluster)
&& _sessionData->getTemporaryKey(TemporaryKeyType::Regular);
}
bool SessionPrivate::destroyOldEnoughPersistentKey() {
Expects(_keyCreator != nullptr);
const auto key = _keyCreator->bindPersistentKey();
Assert(key != nullptr);
const auto created = key->creationTime();
if (created > 0 && crl::now() - created < kKeyOldEnoughForDestroy) {
return false;
}
const auto instance = _instance;
const auto shiftedDcId = _shiftedDcId;
const auto keyId = key->keyId();
InvokeQueued(instance, [=] {
instance->keyDestroyedOnServer(shiftedDcId, keyId);
});
return true;
}
DcType SessionPrivate::tryAcquireKeyCreation() {
if (_keyCreator) {
return _currentDcType;
} else if (_instance->isKeysDestroyer()) {
return _realDcType;
}
const auto acquired = _sessionData->acquireKeyCreation(_realDcType);
if (acquired == CreatingKeyType::None) {
return _realDcType;
}
using Result = DcKeyResult;
using Error = DcKeyError;
auto delegate = BoundKeyCreator::Delegate();
delegate.unboundReady = [=](base::expected<Result, Error> result) {
if (!result) {
releaseKeyCreationOnFail();
if (result.error() == Error::UnknownPublicKey) {
if (_realDcType == DcType::Cdn) {
LOG(("Warning: CDN public RSA key not found"));
requestCDNConfig();
return;
}
LOG(("AuthKey Error: could not choose public RSA key"));
}
restart();
return;
}
DEBUG_LOG(("AuthKey Info: unbound key creation succeed, "
"ids: (%1, %2) server salts: (%3, %4)"
).arg(result->temporaryKey
? result->temporaryKey->keyId()
: 0
).arg(result->persistentKey
? result->persistentKey->keyId()
: 0
).arg(result->temporaryServerSalt
).arg(result->persistentServerSalt));
_sessionSalt = result->temporaryServerSalt;
result->temporaryKey->setExpiresAt(base::unixtime::now()
+ kTemporaryExpiresIn
+ kBindKeyAdditionalExpiresTimeout);
if (_realDcType != DcType::Cdn) {
auto key = result->persistentKey
? std::move(result->persistentKey)
: _sessionData->getPersistentKey();
if (!key) {
releaseKeyCreationOnFail();
restart();
return;
}
_keyCreator->bind(std::move(key));
}
applyAuthKey(std::move(result->temporaryKey));
if (_realDcType == DcType::Cdn) {
_keyCreator = nullptr;
if (!_sessionData->releaseCdnKeyCreationOnDone(_encryptionKey)) {
restart();
} else {
_sessionData->queueNeedToResumeAndSend();
}
}
};
delegate.sentSome = [=](uint64 size) {
onSentSome(size);
};
delegate.receivedSome = [=] {
onReceivedSome();
};
auto request = DcKeyRequest();
request.persistentNeeded = (acquired == CreatingKeyType::Persistent);
request.temporaryExpiresIn = kTemporaryExpiresIn;
_keyCreator = std::make_unique<BoundKeyCreator>(
request,
std::move(delegate));
const auto forceUseRegular = (_realDcType == DcType::MediaCluster)
&& (acquired != CreatingKeyType::TemporaryMediaCluster);
return forceUseRegular ? DcType::Regular : _realDcType;
}
void SessionPrivate::authKeyChecked() {
connect(_connection, &AbstractConnection::receivedData, [=] {
handleReceived();
});
if (_sessionSalt && setState(ConnectedState)) {
resendAll();
} // else receive salt in bad_server_salt first, then try to send all the requests
_pingIdToSend = base::RandomValue<uint64>(); // get server_salt
_sessionData->queueNeedToResumeAndSend();
}
void SessionPrivate::onError(
not_null<AbstractConnection*> connection,
qint32 errorCode) {
if (errorCode == -429) {
LOG(("Protocol Error: -429 flood code returned!"));
} else if (errorCode == -444) {
LOG(("Protocol Error: -444 bad dc_id code returned!"));
InvokeQueued(_instance, [instance = _instance] {
instance->badConfigurationError();
});
}
removeTestConnection(connection);
if (_testConnections.empty()) {
handleError(errorCode);
} else {
confirmBestConnection();
}
}
void SessionPrivate::handleError(int errorCode) {
destroyAllConnections();
_waitForConnectedTimer.cancel();
if (errorCode == -404) {
destroyTemporaryKey();
} else {
MTP_LOG(_shiftedDcId, ("Restarting after error in connection, error code: %1...").arg(errorCode));
return restart();
}
}
void SessionPrivate::destroyTemporaryKey() {
if (_instance->isKeysDestroyer()) {
LOG(("MTP Info: -404 error received in destroyer %1, assuming key was destroyed.").arg(_shiftedDcId));
_instance->keyWasPossiblyDestroyed(_shiftedDcId);
return;
}
LOG(("MTP Info: -404 error received in %1 with temporary key, assuming it was destroyed.").arg(_shiftedDcId));
releaseKeyCreationOnFail();
if (_encryptionKey) {
_sessionData->destroyTemporaryKey(_encryptionKey->keyId());
}
applyAuthKey(nullptr);
restart();
}
bool SessionPrivate::sendSecureRequest(
SerializedRequest &&request,
bool needAnyResponse) {
request.addPadding(false);
uint32 fullSize = request->size();
if (fullSize < 9) {
return false;
}
auto messageSize = request.messageSize();
if (messageSize < 5 || fullSize < messageSize + 4) {
return false;
}
memcpy(request->data() + 0, &_sessionSalt, 2 * sizeof(mtpPrime));
memcpy(request->data() + 2, &_sessionId, 2 * sizeof(mtpPrime));
auto from = request->constData() + 4;
MTP_LOG(_shiftedDcId, ("Send: ")
+ DumpToText(from, from + messageSize)
+ QString(" (dc:%1,key:%2)"
).arg(AbstractConnection::ProtocolDcDebugId(getProtocolDcId())
).arg(_encryptionKey->keyId()));
uchar encryptedSHA256[32];
MTPint128 &msgKey(*(MTPint128*)(encryptedSHA256 + 8));
SHA256_CTX msgKeyLargeContext;
SHA256_Init(&msgKeyLargeContext);
SHA256_Update(&msgKeyLargeContext, _encryptionKey->partForMsgKey(true), 32);
SHA256_Update(&msgKeyLargeContext, request->constData(), fullSize * sizeof(mtpPrime));
SHA256_Final(encryptedSHA256, &msgKeyLargeContext);
auto packet = _connection->prepareSecurePacket(_keyId, msgKey, fullSize);
const auto prefix = packet.size();
packet.resize(prefix + fullSize);
aesIgeEncrypt(
request->constData(),
&packet[prefix],
fullSize * sizeof(mtpPrime),
_encryptionKey,
msgKey);
DEBUG_LOG(("MTP Info: sending request, size: %1, num: %2, time: %3").arg(fullSize + 6).arg((*request)[4]).arg((*request)[5]));
_connection->setSentEncryptedWithKeyId(_keyId);
_connection->sendData(std::move(packet));
if (needAnyResponse) {
onSentSome((prefix + fullSize) * sizeof(mtpPrime));
}
return true;
}
mtpRequestId SessionPrivate::wasSent(mtpMsgId msgId) const {
if (msgId == _pingMsgId || msgId == _bindMsgId) {
return mtpRequestId(0xFFFFFFFF);
}
if (const auto i = _resendingIds.find(msgId); i != end(_resendingIds)) {
return i->second;
}
if (const auto i = _ackedIds.find(msgId); i != end(_ackedIds)) {
return i->second;
}
if (const auto i = _sentContainers.find(msgId); i != end(_sentContainers)) {
return mtpRequestId(0xFFFFFFFF);
}
{
QReadLocker locker(_sessionData->haveSentMutex());
const auto &haveSent = _sessionData->haveSentMap();
const auto i = haveSent.find(msgId);
if (i != haveSent.end()) {
return i->second->requestId
? i->second->requestId
: mtpRequestId(0xFFFFFFFF);
}
}
return 0;
}
void SessionPrivate::clearUnboundKeyCreator() {
if (_keyCreator) {
_keyCreator->stop();
}
}
void SessionPrivate::releaseKeyCreationOnFail() {
if (!_keyCreator) {
return;
}
_keyCreator = nullptr;
_sessionData->releaseKeyCreationOnFail();
}
} // namespace details
} // namespace MTP