MTP global state moved to MTP::Instance class.

Now there will be ability to start multiple mtproto instances.
This commit is contained in:
John Preston 2017-02-24 20:15:41 +03:00
parent c3b3819d9f
commit dd933cf61c
23 changed files with 1868 additions and 1341 deletions

View File

@ -226,11 +226,11 @@ namespace {
} // namespace
void logOut() {
if (MTP::started()) {
MTP::logoutKeys(rpcDone(&loggedOut), rpcFail(&loggedOut));
if (auto mtproto = Messenger::Instance().mtp()) {
mtproto->logout(rpcDone(&loggedOut), rpcFail(&loggedOut));
} else {
loggedOut();
MTP::start();
Messenger::Instance().startMtp();
}
}

View File

@ -124,7 +124,7 @@ void unixtimeSet(int32 serverTime, bool force) {
}
TimeId unixtime() {
TimeId result = myunixtime();
auto result = myunixtime();
QReadLocker locker(&unixtimeLock);
return result + unixtimeDelta;

View File

@ -23,6 +23,7 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
#include "core/basic_types.h"
#include <array>
#include <algorithm>
#include <set>
namespace base {
@ -89,6 +90,50 @@ inline bool contains(const Container &container, const T &value) {
return std::find(std::begin(container), end, value) != end;
}
// We need a custom comparator for std::set<std::unique_ptr<T>>::find to work with pointers.
// thanks to http://stackoverflow.com/questions/18939882/raw-pointer-lookup-for-sets-of-unique-ptrs
template <typename T>
struct pointer_comparator {
using is_transparent = std::true_type;
// helper does some magic in order to reduce the number of
// pairs of types we need to know how to compare: it turns
// everything into a pointer, and then uses `std::less<T*>`
// to do the comparison:
struct helper {
T *ptr = nullptr;
helper() = default;
helper(const helper &other) = default;
helper(T *p) : ptr(p) {
}
template <typename ...Ts>
helper(const std::shared_ptr<Ts...> &other) : ptr(other.get()) {
}
template <typename ...Ts>
helper(const std::unique_ptr<Ts...> &other) : ptr(other.get()) {
}
bool operator<(helper other) const {
return std::less<T*>()(ptr, other.ptr);
}
};
// without helper, we'd need 2^n different overloads, where
// n is the number of types we want to support (so, 8 with
// raw pointers, unique pointers, and shared pointers). That
// seems silly.
// && helps enforce rvalue use only
bool operator()(const helper &&lhs, const helper &&rhs) const {
return lhs < rhs;
}
};
template <typename T>
using set_of_unique_ptr = std::set<std::unique_ptr<T>, base::pointer_comparator<T>>;
template <typename T>
using set_of_shared_ptr = std::set<std::shared_ptr<T>, base::pointer_comparator<T>>;
} // namespace base
// using for_const instead of plain range-based for loop to ensure usage of const_iterator
@ -162,7 +207,6 @@ inline void t_assert_fail(const char *message, const char *file, int32 line) {
class Exception : public std::exception {
public:
Exception(const QString &msg, bool isFatal = true) : _fatal(isFatal), _msg(msg.toUtf8()) {
LOG(("Exception: %1").arg(msg));
}
@ -179,6 +223,7 @@ public:
private:
bool _fatal;
QByteArray _msg;
};
class MTPint;

View File

@ -265,7 +265,7 @@ bool Widget::resetFail(const RPCError &error) {
void Widget::gotNearestDC(const MTPNearestDc &result) {
auto &nearest = result.c_nearestDc();
DEBUG_LOG(("Got nearest dc, country: %1, nearest: %2, this: %3").arg(nearest.vcountry.c_string().v.c_str()).arg(nearest.vnearest_dc.v).arg(nearest.vthis_dc.v));
MTP::setdc(nearest.vnearest_dc.v, true);
Messenger::Instance().mtp()->suggestMainDcId(nearest.vnearest_dc.v);
auto nearestCountry = qs(nearest.vcountry);
if (getData()->country != nearestCountry) {
getData()->country = nearestCountry;

View File

@ -504,7 +504,7 @@ enum { // Local Storage Keys
enum {
dbiKey = 0x00,
dbiUser = 0x01,
dbiDcOptionOld = 0x02,
dbiDcOptionOldOld = 0x02,
dbiChatSizeMax = 0x03,
dbiMutePeer = 0x04,
dbiSendKey = 0x05,
@ -541,7 +541,7 @@ enum {
dbiRecentEmojiOld = 0x24,
dbiEmojiVariantsOld = 0x25,
dbiRecentStickers = 0x26,
dbiDcOption = 0x27,
dbiDcOptionOld = 0x27,
dbiTryIPv6 = 0x28,
dbiSongVolume = 0x29,
dbiWindowsNotificationsOld = 0x30,
@ -567,6 +567,7 @@ enum {
dbiDialogsWidthRatio = 0x48,
dbiUseExternalVideoPlayer = 0x49,
dbiDcOptions = 0x4a,
dbiMtpAuthorization = 0x4b,
dbiEncryptedWithSalt = 333,
dbiEncrypted = 444,
@ -843,7 +844,7 @@ void applyReadContext(const ReadSettingsContext &context) {
bool _readSetting(quint32 blockId, QDataStream &stream, int version, ReadSettingsContext &context) {
switch (blockId) {
case dbiDcOptionOld: {
case dbiDcOptionOldOld: {
quint32 dcId, port;
QString host, ip;
stream >> dcId >> host >> ip >> port;
@ -852,7 +853,7 @@ bool _readSetting(quint32 blockId, QDataStream &stream, int version, ReadSetting
context.dcOptions.constructAddOne(dcId, 0, ip.toStdString(), port);
} break;
case dbiDcOption: {
case dbiDcOptionOld: {
quint32 dcIdWithShift, port;
qint32 flags;
QString ip;
@ -863,7 +864,7 @@ bool _readSetting(quint32 blockId, QDataStream &stream, int version, ReadSetting
} break;
case dbiDcOptions: {
QByteArray serialized;
auto serialized = QByteArray();
stream >> serialized;
if (!_checkStreamStatus(stream)) return false;
@ -909,8 +910,7 @@ bool _readSetting(quint32 blockId, QDataStream &stream, int version, ReadSetting
if (!_checkStreamStatus(stream)) return false;
DEBUG_LOG(("MTP Info: user found, dc %1, uid %2").arg(dcId).arg(userId));
MTP::configure(dcId);
Messenger::Instance().setMtpMainDcId(dcId);
if (userId) {
Messenger::Instance().authSessionCreate(UserId(userId));
}
@ -923,7 +923,15 @@ bool _readSetting(quint32 blockId, QDataStream &stream, int version, ReadSetting
stream.readRawData(key.data(), key.size());
if (!_checkStreamStatus(stream)) return false;
MTP::setKey(dcId, key);
Messenger::Instance().setMtpKey(dcId, key);
} break;
case dbiMtpAuthorization: {
auto serialized = QByteArray();
stream >> serialized;
if (!_checkStreamStatus(stream)) return false;
Messenger::Instance().setMtpAuthorization(serialized);
} break;
case dbiAutoStart: {
@ -1777,18 +1785,12 @@ void _writeMtpData() {
return;
}
auto keys = MTP::getKeys();
auto mtpAuthorizationSerialized = Messenger::Instance().serializeMtpAuthorization();
quint32 size = sizeof(quint32) + sizeof(qint32) + sizeof(quint32);
size += keys.size() * (sizeof(quint32) + sizeof(quint32) + MTP::AuthKey::kSize);
quint32 size = sizeof(quint32) + Serialize::bytearraySize(mtpAuthorizationSerialized);
EncryptedDescriptor data(size);
data.stream << quint32(dbiUser) << qint32(AuthSession::CurrentUserId()) << quint32(MTP::maindc());
for_const (auto &key, keys) {
data.stream << quint32(dbiKey) << quint32(key->getDC());
key->write(data.stream);
}
data.stream << quint32(dbiMtpAuthorization) << mtpAuthorizationSerialized;
mtp.writeEncrypted(data);
}

View File

@ -25,6 +25,7 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
#include <new>
#include "pspecific.h"
#include "mtproto/connection.h"
#ifndef TDESKTOP_DISABLE_CRASH_REPORTS
@ -87,9 +88,8 @@ QString _logsEntryStart() {
static int32 index = 0;
QDateTime tm(QDateTime::currentDateTime());
QThread *thread = QThread::currentThread();
MTP::internal::Thread *mtpThread = qobject_cast<MTP::internal::Thread*>(thread);
uint threadId = mtpThread ? mtpThread->getThreadId() : 0;
auto thread = qobject_cast<MTP::internal::Thread*>(QThread::currentThread());
auto threadId = thread ? thread->getThreadIndex() : 0;
return QString("[%1 %2-%3]").arg(tm.toString("hh:mm:ss.zzz")).arg(QString("%1").arg(threadId, 2, 10, QChar('0'))).arg(++index, 7, 10, QChar('0'));
}

View File

@ -82,7 +82,9 @@ MainWidget::MainWidget(QWidget *parent) : TWidget(parent)
, _playerPanel(this, Media::Player::Panel::Layout::Full)
, _mediaType(this, st::defaultDropdownMenu)
, _api(new ApiWrap(this)) {
MTP::setGlobalDoneHandler(rpcDone(&MainWidget::updateReceived));
Messenger::Instance().mtp()->setUpdatesHandler(rpcDone(&MainWidget::updateReceived));
Messenger::Instance().mtp()->setGlobalFailHandler(rpcFail(&MainWidget::updateFail));
_ptsWaiter.setRequesting(true);
updateScrollColors();
@ -174,8 +176,6 @@ MainWidget::MainWidget(QWidget *parent) : TWidget(parent)
orderWidgets();
MTP::setGlobalFailHandler(rpcFail(&MainWidget::updateFail));
_mediaType->hide();
_mediaType->setOrigin(Ui::PanelAnimation::Origin::TopRight);
_topBar->mediaTypeButton()->installEventFilter(_mediaType);
@ -4279,7 +4279,7 @@ MainWidget::~MainWidget() {
_hider = nullptr;
delete hider;
}
MTP::clearGlobalHandlers();
Messenger::Instance().mtp()->clearGlobalHandlers();
if (App::wnd()) App::wnd()->noMain(this);
}

View File

@ -32,6 +32,7 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
#include "fileuploader.h"
#include "mainwidget.h"
#include "mtproto/dc_options.h"
#include "mtproto/mtp_instance.h"
#include "media/player/media_player_instance.h"
#include "window/notifications_manager.h"
#include "window/themes/window_theme.h"
@ -43,25 +44,18 @@ namespace {
Messenger *SingleInstance = nullptr;
void mtpStateChanged(int32 dc, int32 state) {
if (App::wnd()) {
App::wnd()->mtpStateChanged(dc, state);
}
}
void mtpSessionReset(int32 dc) {
if (App::main() && dc == MTP::maindc()) {
App::main()->getDifference();
}
}
} // namespace
Messenger *Messenger::InstancePointer() {
return SingleInstance;
}
Messenger::Messenger() : QObject() {
struct Messenger::Private {
MTP::Instance::Config mtpConfig;
};
Messenger::Messenger() : QObject()
, _private(std::make_unique<Private>()) {
t_assert(SingleInstance == nullptr);
SingleInstance = this;
@ -126,12 +120,9 @@ Messenger::Messenger() : QObject() {
DEBUG_LOG(("Application Info: passcode needed..."));
} else {
DEBUG_LOG(("Application Info: local map read..."));
MTP::start();
startMtp();
}
MTP::setStateChangedHandler(mtpStateChanged);
MTP::setSessionResetHandler(mtpSessionReset);
DEBUG_LOG(("Application Info: MTP started..."));
DEBUG_LOG(("Application Info: showing."));
@ -168,6 +159,107 @@ Messenger::Messenger() : QObject() {
}
}
void Messenger::setMtpMainDcId(MTP::DcId mainDcId) {
t_assert(!_mtproto);
_private->mtpConfig.mainDcId = mainDcId;
}
void Messenger::setMtpKey(MTP::DcId dcId, const MTP::AuthKey::Data &keyData) {
t_assert(!_mtproto);
_private->mtpConfig.keys.insert(std::make_pair(dcId, keyData));
}
QByteArray Messenger::serializeMtpAuthorization() const {
auto serialize = [this](auto keysCount, auto mainDcId, auto writeKeys) {
auto result = QByteArray();
auto size = sizeof(qint32) + sizeof(qint32) + sizeof(qint32); // userId + mainDcId + keys count
size += keysCount * (sizeof(qint32) + MTP::AuthKey::Data().size());
result.reserve(size);
{
QBuffer buffer(&result);
if (!buffer.open(QIODevice::WriteOnly)) {
LOG(("MTP Error: could not open buffer to serialize mtp authorization."));
return result;
}
QDataStream stream(&buffer);
stream.setVersion(QDataStream::Qt_5_1);
stream << qint32(AuthSession::CurrentUserId()) << qint32(mainDcId) << qint32(keysCount);
writeKeys(stream);
}
return result;
};
if (_mtproto) {
auto keys = _mtproto->getKeysForWrite();
return serialize(keys.size(), _mtproto->mainDcId(), [&keys](QDataStream &stream) {
for (auto &key : keys) {
stream << qint32(key->getDC());
key->write(stream);
}
});
}
auto &keys = _private->mtpConfig.keys;
return serialize(keys.size(), _private->mtpConfig.mainDcId, [&keys](QDataStream &stream) {
for (auto &key : keys) {
stream << qint32(key.first);
stream.writeRawData(key.second.data(), key.second.size());
}
});
}
void Messenger::setMtpAuthorization(const QByteArray &serialized) {
t_assert(!_mtproto);
t_assert(!authSession());
auto readonly = serialized;
QBuffer buffer(&readonly);
if (!buffer.open(QIODevice::ReadOnly)) {
LOG(("MTP Error: could not open serialized mtp authorization for reading."));
return;
}
QDataStream stream(&buffer);
stream.setVersion(QDataStream::Qt_5_1);
qint32 userId = 0, mainDcId = 0, count = 0;
stream >> userId >> mainDcId >> count;
if (stream.status() != QDataStream::Ok) {
LOG(("MTP Error: could not read main fields from serialized mtp authorization."));
return;
}
if (userId) {
authSessionCreate(userId);
}
_private->mtpConfig.mainDcId = mainDcId;
for (auto i = 0; i != count; ++i) {
qint32 dcId = 0;
MTP::AuthKey::Data keyData;
stream >> dcId;
stream.readRawData(keyData.data(), keyData.size());
if (stream.status() != QDataStream::Ok) {
LOG(("MTP Error: could not read key from serialized mtp authorization."));
return;
}
_private->mtpConfig.keys.insert(std::make_pair(dcId, keyData));
}
}
void Messenger::startMtp() {
t_assert(!_mtproto);
_mtproto = std::make_unique<MTP::Instance>(_dcOptions.get(), std::move(_private->mtpConfig));
_mtproto->setStateChangedHandler([](MTP::ShiftedDcId shiftedDcId, int32 state) {
if (App::wnd()) {
App::wnd()->mtpStateChanged(shiftedDcId, state);
}
});
_mtproto->setSessionResetHandler([](MTP::ShiftedDcId shiftedDcId) {
if (App::main() && shiftedDcId == MTP::maindc()) {
App::main()->getDifference();
}
});
}
void Messenger::loadLanguage() {
if (cLang() < languageTest) {
cSetLang(Sandbox::LangSystem());
@ -199,10 +291,12 @@ void Messenger::startLocalStorage() {
_dcOptions = std::make_unique<MTP::DcOptions>();
_dcOptions->constructFromBuiltIn();
Local::start();
subscribe(_dcOptions->changed(), [](const MTP::DcOptions::Ids &ids) {
subscribe(_dcOptions->changed(), [this](const MTP::DcOptions::Ids &ids) {
Local::writeSettings();
for (auto id : ids) {
MTP::restart(id);
if (auto instance = mtp()) {
for (auto id : ids) {
instance->restart(id);
}
}
});
}
@ -499,7 +593,7 @@ void Messenger::checkMapVersion() {
void Messenger::prepareToDestroy() {
_window.reset();
MTP::finish();
_mtproto.reset();
}
Messenger::~Messenger() {

View File

@ -24,6 +24,7 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
namespace MTP {
class DcOptions;
class Instance;
} // namespace MTP
class AuthSession;
@ -55,6 +56,16 @@ public:
MTP::DcOptions *dcOptions() {
return _dcOptions.get();
}
void setMtpMainDcId(MTP::DcId mainDcId);
void setMtpKey(MTP::DcId dcId, const MTP::AuthKey::Data &keyData);
QByteArray serializeMtpAuthorization() const;
void setMtpAuthorization(const QByteArray &serialized);
void startMtp();
MTP::Instance *mtp() {
return _mtproto.get();
}
AuthSession *authSession() {
return _authSession.get();
}
@ -114,13 +125,16 @@ private:
QMap<int32, TimeMs> killDownloadSessionTimes;
SingleTimer killDownloadSessionsTimer;
TimeMs _lastActionTime = 0;
// Some fields are just moved from the declaration.
struct Private;
const std::unique_ptr<Private> _private;
std::unique_ptr<MainWindow> _window;
FileUploader *_uploader = nullptr;
Translator *_translator = nullptr;
std::unique_ptr<MTP::DcOptions> _dcOptions;
std::unique_ptr<MTP::Instance> _mtproto;
std::unique_ptr<AuthSession> _authSession;
};

View File

@ -40,9 +40,31 @@ using std::string;
namespace MTP {
namespace internal {
namespace {
void wrapInvokeAfter(mtpRequest &to, const mtpRequest &from, const mtpRequestMap &haveSent, int32 skipBeforeRequest = 0) {
mtpMsgId afterId(*(mtpMsgId*)(from->after->data() + 4));
mtpRequestMap::const_iterator i = afterId ? haveSent.constFind(afterId) : haveSent.cend();
int32 size = to->size(), lenInInts = (from.innerLength() >> 2), headlen = 4, fulllen = headlen + lenInInts;
if (i == haveSent.constEnd()) { // no invoke after or such msg was not sent or was completed recently
to->resize(size + fulllen + skipBeforeRequest);
if (skipBeforeRequest) {
memcpy(to->data() + size, from->constData() + 4, headlen * sizeof(mtpPrime));
memcpy(to->data() + size + headlen + skipBeforeRequest, from->constData() + 4 + headlen, lenInInts * sizeof(mtpPrime));
} else {
memcpy(to->data() + size, from->constData() + 4, fulllen * sizeof(mtpPrime));
}
} else {
to->resize(size + fulllen + skipBeforeRequest + 3);
memcpy(to->data() + size, from->constData() + 4, headlen * sizeof(mtpPrime));
(*to)[size + 3] += 3 * sizeof(mtpPrime);
*((mtpTypeId*)&((*to)[size + headlen + skipBeforeRequest])) = mtpc_invokeAfterMsg;
memcpy(to->data() + size + headlen + skipBeforeRequest + 1, &afterId, 2 * sizeof(mtpPrime));
memcpy(to->data() + size + headlen + skipBeforeRequest + 3, from->constData() + 4 + headlen, lenInInts * sizeof(mtpPrime));
if (size + 3 != 7) (*to)[7] += 3 * sizeof(mtpPrime);
}
}
bool parsePQ(const string &pqStr, string &pStr, string &qStr) {
if (pqStr.length() > 8) return false; // more than 64 bit pq
@ -321,35 +343,20 @@ RSAPublicKeys InitRSAPublicKeys() {
} // namespace
uint32 ThreadIdIncrement = 0;
Thread::Thread() : QThread(nullptr)
, _threadId(++ThreadIdIncrement) {
}
uint32 Thread::getThreadId() const {
return _threadId;
}
Thread::~Thread() {
}
Connection::Connection() : thread(nullptr), data(nullptr) {
Connection::Connection(Instance *instance) : _instance(instance) {
}
int32 Connection::prepare(SessionData *sessionData, int32 dc) {
t_assert(thread == nullptr && data == nullptr);
thread = new Thread();
data = new ConnectionPrivate(thread, this, sessionData, dc);
dc = data->getDC();
if (!dc) {
delete data;
data = nullptr;
delete thread;
thread = nullptr;
return 0;
thread = std::make_unique<Thread>();
auto newData = std::make_unique<ConnectionPrivate>(_instance, thread.get(), this, sessionData, dc);
dc = newData->getDC();
if (dc) {
// will be deleted in the thread::finished signal
data = newData.release();
} else {
thread.reset();
}
return dc;
}
@ -361,9 +368,8 @@ void Connection::start() {
void Connection::kill() {
t_assert(data != nullptr && thread != nullptr);
data->stop();
data = nullptr; // will be deleted in thread::finished signal
data = nullptr;
thread->quit();
queueQuittingConnection(this);
}
void Connection::waitTillFinish() {
@ -371,8 +377,7 @@ void Connection::waitTillFinish() {
DEBUG_LOG(("Waiting for connectionThread to finish"));
thread->wait();
delete thread;
thread = nullptr;
thread.reset();
}
int32 Connection::state() const {
@ -388,7 +393,10 @@ QString Connection::transport() const {
}
Connection::~Connection() {
t_assert(data == nullptr && thread == nullptr);
t_assert(data == nullptr);
if (thread) {
waitTillFinish();
}
}
void ConnectionPrivate::createConn(bool createIPv4, bool createIPv6) {
@ -440,7 +448,8 @@ void ConnectionPrivate::destroyConn(AbstractConnection **conn) {
}
}
ConnectionPrivate::ConnectionPrivate(QThread *thread, Connection *owner, SessionData *data, uint32 _dc) : QObject(nullptr)
ConnectionPrivate::ConnectionPrivate(Instance *instance, QThread *thread, Connection *owner, SessionData *data, uint32 _dc) : QObject()
, _instance(instance)
, _state(DisconnectedState)
, dc(_dc)
, _owner(owner)
@ -463,7 +472,7 @@ ConnectionPrivate::ConnectionPrivate(QThread *thread, Connection *owner, Session
connect(thread, SIGNAL(started()), this, SLOT(socketStart()));
connect(thread, SIGNAL(finished()), this, SLOT(doFinish()));
connect(this, SIGNAL(finished(Connection*)), globalSlotCarrier(), SLOT(connectionFinished(Connection*)), Qt::QueuedConnection);
connect(this, SIGNAL(finished(internal::Connection*)), _instance, SLOT(connectionFinished(internal::Connection*)), Qt::QueuedConnection);
connect(&retryTimer, SIGNAL(timeout()), this, SLOT(retryByTimer()));
connect(&_waitForConnectedTimer, SIGNAL(timeout()), this, SLOT(onWaitConnectedFailed()));
@ -1104,8 +1113,8 @@ void ConnectionPrivate::socketStart(bool afterConfig) {
}
if (noIPv4) DEBUG_LOG(("MTP Info: DC %1 options for IPv4 over HTTP not found, waiting for config").arg(dc));
if (Global::TryIPv6() && noIPv6) DEBUG_LOG(("MTP Info: DC %1 options for IPv6 over HTTP not found, waiting for config").arg(dc));
connect(configLoader(), SIGNAL(loaded()), this, SLOT(onConfigLoaded()));
configLoader()->load();
connect(_instance, SIGNAL(configLoaded()), this, SLOT(onConfigLoaded()), Qt::UniqueConnection);
QMetaObject::invokeMethod(_instance, "configLoadRequest", Qt::QueuedConnection);
return;
}
@ -1635,7 +1644,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
mtpRequestId requestId = wasSent(resendId);
if (requestId) {
LOG(("Message Error: bad message notification received, msgId %1, error_code %2, fatal: clearing callbacks").arg(data.vbad_msg_id.v).arg(errorCode));
clearCallbacksDelayed(RPCCallbackClears(1, RPCCallbackClear(requestId, -errorCode)));
_instance->clearCallbacksDelayed(RPCCallbackClears(1, RPCCallbackClear(requestId, -errorCode)));
} else {
DEBUG_LOG(("Message Error: such message was not sent recently %1").arg(resendId));
}
@ -2101,7 +2110,7 @@ void ConnectionPrivate::requestsAcked(const QVector<MTPlong> &ids, bool byRespon
mtpRequestId reqId = req.value()->requestId;
bool moveToAcked = byResponse;
if (!moveToAcked) { // ignore ACK, if we need a response (if we have a handler)
moveToAcked = !hasCallbacks(reqId);
moveToAcked = !_instance->hasCallbacks(reqId);
}
if (moveToAcked) {
wereAcked.insert(msgId, reqId);
@ -2119,7 +2128,7 @@ void ConnectionPrivate::requestsAcked(const QVector<MTPlong> &ids, bool byRespon
mtpRequestId reqId = reqIt.value();
bool moveToAcked = byResponse;
if (!moveToAcked) { // ignore ACK, if we need a response (if we have a handler)
moveToAcked = !hasCallbacks(reqId);
moveToAcked = !_instance->hasCallbacks(reqId);
}
if (moveToAcked) {
QWriteLocker locker4(sessionData->toSendMutex());
@ -2160,7 +2169,7 @@ void ConnectionPrivate::requestsAcked(const QVector<MTPlong> &ids, bool byRespon
}
if (clearedAcked.size()) {
clearCallbacksDelayed(clearedAcked);
_instance->clearCallbacksDelayed(clearedAcked);
}
if (toAckMore.size()) {
@ -2681,8 +2690,8 @@ void ConnectionPrivate::dhClientParamsAnswered() {
DEBUG_LOG(("AuthKey Info: auth key gen succeed, id: %1, server salt: %2").arg(authKey->keyId()).arg(serverSalt));
sessionData->owner()->notifyKeyCreated(authKey); // slot will call authKeyCreated()
sessionData->clear();
sessionData->owner()->notifyKeyCreated(std::move(authKey)); // slot will call authKeyCreated()
sessionData->clear(_instance);
unlockKey();
} return;

View File

@ -25,6 +25,9 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
#include "core/single_timer.h"
namespace MTP {
class Instance;
namespace internal {
class AbstractConnection;
@ -35,24 +38,27 @@ class Thread : public QThread {
Q_OBJECT
public:
Thread();
uint32 getThreadId() const;
~Thread();
Thread() {
static int ThreadCounter = 0;
_threadIndex = ++ThreadCounter;
}
int getThreadIndex() const {
return _threadIndex;
}
private:
uint32 _threadId;
int _threadIndex = 0;
};
class Connection {
public:
enum ConnectionType {
TcpConnection,
HttpConnection
};
Connection();
Connection(Instance *instance);
int32 prepare(SessionData *data, int32 dc = 0); // return dc
void start();
@ -67,9 +73,9 @@ public:
QString transport() const;
private:
QThread *thread;
ConnectionPrivate *data;
Instance *_instance = nullptr;
std::unique_ptr<QThread> thread;
ConnectionPrivate *data = nullptr;
};
@ -77,7 +83,7 @@ class ConnectionPrivate : public QObject {
Q_OBJECT
public:
ConnectionPrivate(QThread *thread, Connection *owner, SessionData *data, uint32 dc);
ConnectionPrivate(Instance *instance, QThread *thread, Connection *owner, SessionData *data, uint32 dc);
~ConnectionPrivate();
void stop();
@ -102,10 +108,9 @@ signals:
void resendManyAsync(QVector<quint64> msgIds, qint64 msCanWait, bool forceContainer, bool sendMsgStateInfo);
void resendAllAsync();
void finished(Connection *connection);
void finished(internal::Connection *connection);
public slots:
void retryByTimer();
void restartNow();
void restart(bool mayBeBadKey = false);
@ -149,7 +154,6 @@ public slots:
void onConfigLoaded();
private:
void doDisconnect();
void createConn(bool createIPv4, bool createIPv6);
@ -175,8 +179,11 @@ private:
void clearMessages();
bool setState(int32 state, int32 ifState = Connection::UpdateAlways);
Instance *_instance = nullptr;
mutable QReadWriteLock stateConnMutex;
int32 _state;
int32 _state = DisconnectedState;
bool _needSessionReset = false;
void resetSession();

View File

@ -200,6 +200,7 @@ void DcOptions::constructFromSerialized(const QByteArray &serialized) {
return;
}
QDataStream stream(&buffer);
stream.setVersion(QDataStream::Qt_5_1);
qint32 count = 0;
stream >> count;
if (stream.status() != QDataStream::Ok) {
@ -243,10 +244,6 @@ DcId DcOptions::getDefaultDcId() const {
auto result = sortedDcIds();
t_assert(!result.empty());
auto main = internal::mainDC();
if (base::contains(result, main)) {
return main;
}
return result[0];
}

View File

@ -23,82 +23,22 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
#include "mtproto/facade.h"
#include "mtproto/dc_options.h"
#include "messenger.h"
#include "mtproto/mtp_instance.h"
#include "localstorage.h"
namespace MTP {
namespace internal {
namespace {
DcenterMap gDCs;
bool configLoadedOnce = false;
bool mainDCChanged = false;
int32 _mainDC = 2;
typedef QMap<int32, AuthKeyPtr> _KeysMapForWrite;
_KeysMapForWrite _keysMapForWrite;
QMutex _keysMapForWriteMutex;
constexpr auto kEnumerateDcTimeout = 8000; // 8 seconds timeout for help_getConfig to work (then move to other dc)
} // namespace
DcenterMap &DCMap() {
return gDCs;
}
bool configNeeded() {
return !configLoadedOnce;
}
int32 mainDC() {
return _mainDC;
}
namespace {
QMap<int32, mtpRequestId> logoutGuestMap; // dcWithShift to logout request id
bool logoutDone(mtpRequestId req) {
for (QMap<int32, mtpRequestId>::iterator i = logoutGuestMap.begin(); i != logoutGuestMap.end(); ++i) {
if (i.value() == req) {
MTP::killSession(i.key());
logoutGuestMap.erase(i);
return true;
}
}
return false;
}
}
void logoutOtherDCs() {
QList<int32> dcs;
{
QMutexLocker lock(&_keysMapForWriteMutex);
dcs = _keysMapForWrite.keys();
}
for (int32 i = 0, cnt = dcs.size(); i != cnt; ++i) {
if (dcs[i] != MTP::maindc()) {
logoutGuestMap.insert(MTP::lgtDcId(dcs[i]), MTP::send(MTPauth_LogOut(), rpcDone(&logoutDone), rpcFail(&logoutDone), MTP::lgtDcId(dcs[i])));
}
}
}
void setDC(int32 dc, bool firstOnly) {
if (!dc || (firstOnly && mainDCChanged)) return;
mainDCChanged = true;
if (dc != _mainDC) {
_mainDC = dc;
}
}
Dcenter::Dcenter(int32 id, const AuthKeyPtr &key) : _id(id), _key(key), _connectionInited(false) {
Dcenter::Dcenter(Instance *instance, DcId dcId, AuthKeyPtr &&key)
: _instance(instance)
, _id(dcId)
, _key(std::move(key)) {
connect(this, SIGNAL(authKeyCreated()), this, SLOT(authKeyWrite()), Qt::QueuedConnection);
QMutexLocker lock(&_keysMapForWriteMutex);
if (_key) {
_keysMapForWrite[_id] = _key;
} else {
_keysMapForWrite.remove(_id);
}
}
void Dcenter::authKeyWrite() {
@ -108,18 +48,13 @@ void Dcenter::authKeyWrite() {
}
}
void Dcenter::setKey(const AuthKeyPtr &key) {
void Dcenter::setKey(AuthKeyPtr &&key) {
DEBUG_LOG(("AuthKey Info: MTProtoDC::setKey(%1), emitting authKeyCreated, dc %2").arg(key ? key->keyId() : 0).arg(_id));
_key = key;
_key = std::move(key);
_connectionInited = false;
emit authKeyCreated();
QMutexLocker lock(&_keysMapForWriteMutex);
if (_key) {
_keysMapForWrite[_id] = _key;
} else {
_keysMapForWrite.remove(_id);
}
_instance->setKeyForWrite(_id, _key);
}
QReadWriteLock *Dcenter::keyMutex() const {
@ -132,99 +67,45 @@ const AuthKeyPtr &Dcenter::getKey() const {
void Dcenter::destroyKey() {
setKey(AuthKeyPtr());
QMutexLocker lock(&_keysMapForWriteMutex);
_keysMapForWrite.remove(_id);
}
namespace {
ConfigLoader *_configLoader = nullptr;
auto loadingConfig = false;
void configLoaded(const MTPConfig &result) {
loadingConfig = false;
auto &data = result.c_config();
DEBUG_LOG(("MTP Info: got config, chat_size_max: %1, date: %2, test_mode: %3, this_dc: %4, dc_options.length: %5").arg(data.vchat_size_max.v).arg(data.vdate.v).arg(mtpIsTrue(data.vtest_mode)).arg(data.vthis_dc.v).arg(data.vdc_options.c_vector().v.size()));
if (data.vdc_options.c_vector().v.empty()) {
LOG(("MTP Error: config with empty dc_options received!"));
} else {
Messenger::Instance().dcOptions()->setFromList(data.vdc_options);
}
Global::SetChatSizeMax(data.vchat_size_max.v);
Global::SetMegagroupSizeMax(data.vmegagroup_size_max.v);
Global::SetForwardedCountMax(data.vforwarded_count_max.v);
Global::SetOnlineUpdatePeriod(data.vonline_update_period_ms.v);
Global::SetOfflineBlurTimeout(data.voffline_blur_timeout_ms.v);
Global::SetOfflineIdleTimeout(data.voffline_idle_timeout_ms.v);
Global::SetOnlineCloudTimeout(data.vonline_cloud_timeout_ms.v);
Global::SetNotifyCloudDelay(data.vnotify_cloud_delay_ms.v);
Global::SetNotifyDefaultDelay(data.vnotify_default_delay_ms.v);
Global::SetChatBigSize(data.vchat_big_size.v); // ?
Global::SetPushChatPeriod(data.vpush_chat_period_ms.v); // ?
Global::SetPushChatLimit(data.vpush_chat_limit.v); // ?
Global::SetSavedGifsLimit(data.vsaved_gifs_limit.v);
Global::SetEditTimeLimit(data.vedit_time_limit.v); // ?
Global::SetStickersRecentLimit(data.vstickers_recent_limit.v);
Global::SetPinnedDialogsCountMax(data.vpinned_dialogs_count_max.v);
configLoadedOnce = true;
Local::writeSettings();
configLoader()->done();
}
bool configFailed(const RPCError &error) {
if (MTP::isDefaultHandledError(error)) return false;
loadingConfig = false;
LOG(("MTP Error: failed to get config!"));
return false;
}
};
ConfigLoader::ConfigLoader() {
ConfigLoader::ConfigLoader(Instance *instance, RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail) : _instance(instance)
, _doneHandler(onDone)
, _failHandler(onFail) {
connect(&_enumDCTimer, SIGNAL(timeout()), this, SLOT(enumDC()));
}
void ConfigLoader::load() {
if (loadingConfig) return;
loadingConfig = true;
MTP::send(MTPhelp_GetConfig(), rpcDone(configLoaded), rpcFail(configFailed));
sendRequest(_instance->mainDcId());
_enumDCTimer.start(kEnumerateDcTimeout);
}
void ConfigLoader::done() {
mtpRequestId ConfigLoader::sendRequest(ShiftedDcId shiftedDcId) {
return _instance->send(MTPhelp_GetConfig(), _doneHandler, _failHandler, shiftedDcId);
}
ConfigLoader::~ConfigLoader() {
_enumDCTimer.stop();
if (_enumRequest) {
MTP::cancel(_enumRequest);
_enumRequest = 0;
_instance->cancel(_enumRequest);
}
if (_enumCurrent) {
MTP::killSession(MTP::cfgDcId(_enumCurrent));
_enumCurrent = 0;
_instance->killSession(MTP::cfgDcId(_enumCurrent));
}
emit loaded();
}
void ConfigLoader::enumDC() {
if (!loadingConfig) return;
if (_enumRequest) MTP::cancel(_enumRequest);
if (_enumRequest) {
_instance->cancel(_enumRequest);
}
if (!_enumCurrent) {
_enumCurrent = _mainDC;
_enumCurrent = _instance->mainDcId();
} else {
MTP::killSession(MTP::cfgDcId(_enumCurrent));
_instance->killSession(MTP::cfgDcId(_enumCurrent));
}
auto ids = Messenger::Instance().dcOptions()->sortedDcIds();
auto ids = _instance->dcOptions()->sortedDcIds();
t_assert(!ids.empty());
auto i = std::find(ids.cbegin(), ids.cend(), _enumCurrent);
@ -233,34 +114,10 @@ void ConfigLoader::enumDC() {
} else {
_enumCurrent = *i;
}
_enumRequest = MTP::send(MTPhelp_GetConfig(), rpcDone(configLoaded), rpcFail(configFailed), MTP::cfgDcId(_enumCurrent));
_enumRequest = sendRequest(MTP::cfgDcId(_enumCurrent));
_enumDCTimer.start(kEnumerateDcTimeout);
}
ConfigLoader *configLoader() {
if (!_configLoader) _configLoader = new ConfigLoader();
return _configLoader;
}
void destroyConfigLoader() {
delete _configLoader;
_configLoader = nullptr;
}
AuthKeysMap getAuthKeys() {
AuthKeysMap result;
QMutexLocker lock(&_keysMapForWriteMutex);
for_const (const AuthKeyPtr &key, _keysMapForWrite) {
result.push_back(key);
}
return result;
}
void setAuthKey(int32 dcId, AuthKeyPtr key) {
DcenterPtr dc(new Dcenter(dcId, key));
gDCs.insert(dcId, dc);
}
} // namespace internal
} // namespace MTP

View File

@ -21,19 +21,24 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
#pragma once
#include "core/single_timer.h"
#include "mtproto/rpc_sender.h"
#include "mtproto/auth_key.h"
namespace MTP {
class Instance;
namespace internal {
class Dcenter : public QObject {
Q_OBJECT
public:
Dcenter(int32 id, const AuthKeyPtr &key);
Dcenter(Instance *instance, DcId dcId, AuthKeyPtr &&key);
QReadWriteLock *keyMutex() const;
const AuthKeyPtr &getKey() const;
void setKey(const AuthKeyPtr &key);
void setKey(AuthKeyPtr &&key);
void destroyKey();
bool connectionInited() const {
@ -56,47 +61,40 @@ private slots:
private:
mutable QReadWriteLock keyLock;
mutable QMutex initLock;
int32 _id;
Instance *_instance = nullptr;
DcId _id = 0;
AuthKeyPtr _key;
bool _connectionInited;
bool _connectionInited = false;
};
typedef QSharedPointer<Dcenter> DcenterPtr;
typedef QMap<uint32, DcenterPtr> DcenterMap;
using DcenterPtr = std::shared_ptr<Dcenter>;
using DcenterMap = std::map<DcId, DcenterPtr>;
class ConfigLoader : public QObject {
Q_OBJECT
public:
ConfigLoader();
ConfigLoader(Instance *instance, RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail);
~ConfigLoader();
void load();
void done();
public slots:
void enumDC();
signals:
void loaded();
private:
mtpRequestId sendRequest(ShiftedDcId shiftedDcId);
Instance *_instance = nullptr;
SingleTimer _enumDCTimer;
DcId _enumCurrent = 0;
mtpRequestId _enumRequest = 0;
RPCDoneHandlerPtr _doneHandler;
RPCFailHandlerPtr _failHandler;
};
ConfigLoader *configLoader();
void destroyConfigLoader();
DcenterMap &DCMap();
bool configNeeded();
int32 mainDC();
void logoutOtherDCs();
void setDC(int32 dc, bool firstOnly = false);
AuthKeysMap getAuthKeys();
void setAuthKey(int32 dc, AuthKeyPtr key);
} // namespace internal
} // namespace MTP

View File

@ -22,366 +22,16 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
#include "mtproto/facade.h"
#include "localstorage.h"
#include "auth_session.h"
#include "messenger.h"
namespace MTP {
namespace internal {
namespace {
typedef QMap<int32, internal::Session*> Sessions;
Sessions sessions;
internal::Session *mainSession;
typedef QMap<mtpRequestId, int32> RequestsByDC; // holds dcWithShift for request to this dc or -dc for request to main dc
RequestsByDC requestsByDC;
QMutex requestByDCLock;
typedef QMap<mtpRequestId, int32> AuthExportRequests; // holds target dcWithShift for auth export request
AuthExportRequests authExportRequests;
bool _started = false;
uint32 layer;
typedef QMap<mtpRequestId, RPCResponseHandler> ParserMap;
ParserMap parserMap;
QMutex parserMapLock;
typedef QMap<mtpRequestId, mtpRequest> RequestMap;
RequestMap requestMap;
QReadWriteLock requestMapLock;
typedef QPair<mtpRequestId, TimeMs> DelayedRequest;
typedef QList<DelayedRequest> DelayedRequestsList;
DelayedRequestsList delayedRequests;
typedef QMap<mtpRequestId, int32> RequestsDelays;
RequestsDelays requestsDelays;
typedef QSet<mtpRequestId> BadGuestDCRequests;
BadGuestDCRequests badGuestDCRequests;
typedef QVector<mtpRequestId> DCAuthWaiters;
typedef QMap<int32, DCAuthWaiters> AuthWaiters; // holds request ids waiting for auth import to specific dc
AuthWaiters authWaiters;
typedef OrderedSet<internal::Connection*> MTPQuittingConnections;
MTPQuittingConnections quittingConnections;
QMutex toClearLock;
RPCCallbackClears toClear;
RPCResponseHandler globalHandler;
MTPStateChangedHandler stateChangedHandler = 0;
MTPSessionResetHandler sessionResetHandler = 0;
internal::GlobalSlotCarrier *_globalSlotCarrier = 0;
bool hasAuthorization() {
return (AuthSession::Current() != nullptr);
}
void importDone(const MTPauth_Authorization &result, mtpRequestId req) {
QMutexLocker locker1(&requestByDCLock);
RequestsByDC::iterator i = requestsByDC.find(req);
if (i == requestsByDC.end()) {
LOG(("MTP Error: auth import request not found in requestsByDC, requestId: %1").arg(req));
RPCError error(internal::rpcClientError("AUTH_IMPORT_FAIL", QString("did not find import request in requestsByDC, request %1").arg(req)));
if (globalHandler.onFail && hasAuthorization()) (*globalHandler.onFail)(req, error); // auth failed in main dc
return;
}
DcId newdc = bareDcId(i.value());
DEBUG_LOG(("MTP Info: auth import to dc %1 succeeded").arg(newdc));
DCAuthWaiters &waiters(authWaiters[newdc]);
if (waiters.size()) {
QReadLocker locker(&requestMapLock);
for (DCAuthWaiters::iterator i = waiters.begin(), e = waiters.end(); i != e; ++i) {
mtpRequestId requestId = *i;
RequestMap::const_iterator j = requestMap.constFind(requestId);
if (j == requestMap.cend()) {
LOG(("MTP Error: could not find request %1 for resending").arg(requestId));
continue;
}
ShiftedDcId dcWithShift = newdc;
{
RequestsByDC::iterator k = requestsByDC.find(requestId);
if (k == requestsByDC.cend()) {
LOG(("MTP Error: could not find request %1 by dc for resending").arg(requestId));
continue;
}
if (k.value() < 0) {
setdc(newdc);
k.value() = -newdc;
} else {
dcWithShift = shiftDcId(newdc, getDcIdShift(k.value()));
k.value() = dcWithShift;
}
DEBUG_LOG(("MTP Info: resending request %1 to dc %2 after import auth").arg(requestId).arg(k.value()));
}
if (internal::Session *session = internal::getSession(dcWithShift)) {
session->sendPrepared(j.value());
}
}
waiters.clear();
}
}
bool importFail(const RPCError &error, mtpRequestId req) {
if (isDefaultHandledError(error)) return false;
if (globalHandler.onFail && hasAuthorization()) (*globalHandler.onFail)(req, error); // auth import failed
return true;
}
void exportDone(const MTPauth_ExportedAuthorization &result, mtpRequestId req) {
AuthExportRequests::const_iterator i = authExportRequests.constFind(req);
if (i == authExportRequests.cend()) {
LOG(("MTP Error: auth export request target dcWithShift not found, requestId: %1").arg(req));
RPCError error(internal::rpcClientError("AUTH_IMPORT_FAIL", QString("did not find target dcWithShift, request %1").arg(req)));
if (globalHandler.onFail && hasAuthorization()) (*globalHandler.onFail)(req, error); // auth failed in main dc
return;
}
const auto &data(result.c_auth_exportedAuthorization());
send(MTPauth_ImportAuthorization(data.vid, data.vbytes), rpcDone(importDone), rpcFail(importFail), i.value());
authExportRequests.remove(req);
}
bool exportFail(const RPCError &error, mtpRequestId req) {
if (isDefaultHandledError(error)) return false;
AuthExportRequests::const_iterator i = authExportRequests.constFind(req);
if (i != authExportRequests.cend()) {
authWaiters[bareDcId(i.value())].clear();
}
if (globalHandler.onFail && hasAuthorization()) (*globalHandler.onFail)(req, error); // auth failed in main dc
return true;
}
bool onErrorDefault(mtpRequestId requestId, const RPCError &error) {
const QString &err(error.type());
int32 code = error.code();
if (!isFloodError(error) && err != qstr("AUTH_KEY_UNREGISTERED")) {
int breakpoint = 0;
}
bool badGuestDC = (code == 400) && (err == qsl("FILE_ID_INVALID"));
QRegularExpressionMatch m;
if ((m = QRegularExpression("^(FILE|PHONE|NETWORK|USER)_MIGRATE_(\\d+)$").match(err)).hasMatch()) {
if (!requestId) return false;
ShiftedDcId dcWithShift = 0, newdcWithShift = m.captured(2).toInt();
{
QMutexLocker locker(&requestByDCLock);
RequestsByDC::iterator i = requestsByDC.find(requestId);
if (i == requestsByDC.end()) {
LOG(("MTP Error: could not find request %1 for migrating to %2").arg(requestId).arg(newdcWithShift));
} else {
dcWithShift = i.value();
}
}
if (!dcWithShift || !newdcWithShift) return false;
DEBUG_LOG(("MTP Info: changing request %1 from dcWithShift%2 to dc%3").arg(requestId).arg(dcWithShift).arg(newdcWithShift));
if (dcWithShift < 0) { // newdc shift = 0
if (false && hasAuthorization() && !authExportRequests.contains(requestId)) { // migrate not supported at this moment
DEBUG_LOG(("MTP Info: importing auth to dc %1").arg(newdcWithShift));
DCAuthWaiters &waiters(authWaiters[newdcWithShift]);
if (!waiters.size()) {
authExportRequests.insert(send(MTPauth_ExportAuthorization(MTP_int(newdcWithShift)), rpcDone(exportDone), rpcFail(exportFail)), newdcWithShift);
}
waiters.push_back(requestId);
return true;
} else {
MTP::setdc(newdcWithShift);
}
} else {
newdcWithShift = shiftDcId(newdcWithShift, getDcIdShift(dcWithShift));
}
mtpRequest req;
{
QReadLocker locker(&requestMapLock);
RequestMap::const_iterator i = requestMap.constFind(requestId);
if (i == requestMap.cend()) {
LOG(("MTP Error: could not find request %1").arg(requestId));
return false;
}
req = i.value();
}
if (auto session = internal::getSession(newdcWithShift)) {
internal::registerRequest(requestId, (dcWithShift < 0) ? -newdcWithShift : newdcWithShift);
session->sendPrepared(req);
}
return true;
} else if (code < 0 || code >= 500 || (m = QRegularExpression("^FLOOD_WAIT_(\\d+)$").match(err)).hasMatch()) {
if (!requestId) return false;
int32 secs = 1;
if (code < 0 || code >= 500) {
RequestsDelays::iterator i = requestsDelays.find(requestId);
if (i != requestsDelays.cend()) {
secs = (i.value() > 60) ? i.value() : (i.value() *= 2);
} else {
requestsDelays.insert(requestId, secs);
}
} else {
secs = m.captured(1).toInt();
// if (secs >= 60) return false;
}
auto sendAt = getms(true) + secs * 1000 + 10;
DelayedRequestsList::iterator i = delayedRequests.begin(), e = delayedRequests.end();
for (; i != e; ++i) {
if (i->first == requestId) return true;
if (i->second > sendAt) break;
}
delayedRequests.insert(i, DelayedRequest(requestId, sendAt));
if (_globalSlotCarrier) _globalSlotCarrier->checkDelayed();
return true;
} else if (code == 401 || (badGuestDC && badGuestDCRequests.constFind(requestId) == badGuestDCRequests.cend())) {
int32 dcWithShift = 0;
{
QMutexLocker locker(&requestByDCLock);
RequestsByDC::iterator i = requestsByDC.find(requestId);
if (i != requestsByDC.end()) {
dcWithShift = i.value();
} else {
LOG(("MTP Error: unauthorized request without dc info, requestId %1").arg(requestId));
}
}
int32 newdc = bareDcId(qAbs(dcWithShift));
if (!newdc || newdc == internal::mainDC() || !hasAuthorization()) {
if (!badGuestDC && globalHandler.onFail) (*globalHandler.onFail)(requestId, error); // auth failed in main dc
return false;
}
DEBUG_LOG(("MTP Info: importing auth to dcWithShift %1").arg(dcWithShift));
DCAuthWaiters &waiters(authWaiters[newdc]);
if (!waiters.size()) {
authExportRequests.insert(send(MTPauth_ExportAuthorization(MTP_int(newdc)), rpcDone(exportDone), rpcFail(exportFail)), abs(dcWithShift));
}
waiters.push_back(requestId);
if (badGuestDC) badGuestDCRequests.insert(requestId);
return true;
} else if (err == qstr("CONNECTION_NOT_INITED") || err == qstr("CONNECTION_LAYER_INVALID")) {
mtpRequest req;
{
QReadLocker locker(&requestMapLock);
RequestMap::const_iterator i = requestMap.constFind(requestId);
if (i == requestMap.cend()) {
LOG(("MTP Error: could not find request %1").arg(requestId));
return false;
}
req = i.value();
}
int32 dcWithShift = 0;
{
QMutexLocker locker(&requestByDCLock);
RequestsByDC::iterator i = requestsByDC.find(requestId);
if (i == requestsByDC.end()) {
LOG(("MTP Error: could not find request %1 for resending with init connection").arg(requestId));
} else {
dcWithShift = i.value();
}
}
if (!dcWithShift) return false;
if (internal::Session *session = internal::getSession(qAbs(dcWithShift))) {
req->needsLayer = true;
session->sendPrepared(req);
}
return true;
} else if (err == qstr("MSG_WAIT_FAILED")) {
mtpRequest req;
{
QReadLocker locker(&requestMapLock);
RequestMap::const_iterator i = requestMap.constFind(requestId);
if (i == requestMap.cend()) {
LOG(("MTP Error: could not find request %1").arg(requestId));
return false;
}
req = i.value();
}
if (!req->after) {
LOG(("MTP Error: wait failed for not dependent request %1").arg(requestId));
return false;
}
int32 dcWithShift = 0;
{
QMutexLocker locker(&requestByDCLock);
RequestsByDC::iterator i = requestsByDC.find(requestId), j = requestsByDC.find(req->after->requestId);
if (i == requestsByDC.end()) {
LOG(("MTP Error: could not find request %1 by dc").arg(requestId));
} else if (j == requestsByDC.end()) {
LOG(("MTP Error: could not find dependent request %1 by dc").arg(req->after->requestId));
} else {
dcWithShift = i.value();
if (i.value() != j.value()) {
req->after = mtpRequest();
}
}
}
if (!dcWithShift) return false;
if (!req->after) {
if (internal::Session *session = internal::getSession(qAbs(dcWithShift))) {
req->needsLayer = true;
session->sendPrepared(req);
}
} else {
int32 newdc = bareDcId(qAbs(dcWithShift));
DCAuthWaiters &waiters(authWaiters[newdc]);
if (waiters.indexOf(req->after->requestId) >= 0) {
if (waiters.indexOf(requestId) < 0) {
waiters.push_back(requestId);
}
if (badGuestDCRequests.constFind(req->after->requestId) != badGuestDCRequests.cend()) {
if (badGuestDCRequests.constFind(requestId) == badGuestDCRequests.cend()) {
badGuestDCRequests.insert(requestId);
}
}
} else {
DelayedRequestsList::iterator i = delayedRequests.begin(), e = delayedRequests.end();
for (; i != e; ++i) {
if (i->first == requestId) return true;
if (i->first == req->after->requestId) break;
}
if (i != e) {
delayedRequests.insert(i, DelayedRequest(requestId, i->second));
}
if (_globalSlotCarrier) _globalSlotCarrier->checkDelayed();
}
}
return true;
}
if (badGuestDC) badGuestDCRequests.remove(requestId);
return false;
}
int PauseLevel = 0;
} // namespace
namespace internal {
Session *getSession(ShiftedDcId shiftedDcId) {
if (!_started) return nullptr;
if (!shiftedDcId) return mainSession;
if (!bareDcId(shiftedDcId)) {
shiftedDcId += bareDcId(mainSession->getDcWithShift());
}
Sessions::const_iterator i = sessions.constFind(shiftedDcId);
if (i == sessions.cend()) {
i = sessions.insert(shiftedDcId, new Session(shiftedDcId));
}
return i.value();
}
bool paused() {
return PauseLevel > 0;
}
@ -392,504 +42,17 @@ void pause() {
void unpause() {
--PauseLevel;
if (_started) {
for_const (auto session, sessions) {
session->unpaused();
if (!PauseLevel) {
if (auto instance = MainInstance()) {
instance->unpaused();
}
}
}
void registerRequest(mtpRequestId requestId, int32 dcWithShift) {
{
QMutexLocker locker(&requestByDCLock);
requestsByDC.insert(requestId, dcWithShift);
}
internal::performDelayedClear(); // need to do it somewhere...
}
void unregisterRequest(mtpRequestId requestId) {
requestsDelays.remove(requestId);
{
QWriteLocker locker(&requestMapLock);
requestMap.remove(requestId);
}
QMutexLocker locker(&requestByDCLock);
requestsByDC.remove(requestId);
}
mtpRequestId storeRequest(mtpRequest &request, const RPCResponseHandler &parser) {
mtpRequestId res = reqid();
request->requestId = res;
if (parser.onDone || parser.onFail) {
QMutexLocker locker(&parserMapLock);
parserMap.insert(res, parser);
}
{
QWriteLocker locker(&requestMapLock);
requestMap.insert(res, request);
}
return res;
}
mtpRequest getRequest(mtpRequestId reqId) {
static mtpRequest zero;
mtpRequest req;
{
QReadLocker locker(&requestMapLock);
RequestMap::const_iterator i = requestMap.constFind(reqId);
req = (i == requestMap.cend()) ? zero : i.value();
}
return req;
}
void wrapInvokeAfter(mtpRequest &to, const mtpRequest &from, const mtpRequestMap &haveSent, int32 skipBeforeRequest) {
mtpMsgId afterId(*(mtpMsgId*)(from->after->data() + 4));
mtpRequestMap::const_iterator i = afterId ? haveSent.constFind(afterId) : haveSent.cend();
int32 size = to->size(), lenInInts = (from.innerLength() >> 2), headlen = 4, fulllen = headlen + lenInInts;
if (i == haveSent.constEnd()) { // no invoke after or such msg was not sent or was completed recently
to->resize(size + fulllen + skipBeforeRequest);
if (skipBeforeRequest) {
memcpy(to->data() + size, from->constData() + 4, headlen * sizeof(mtpPrime));
memcpy(to->data() + size + headlen + skipBeforeRequest, from->constData() + 4 + headlen, lenInInts * sizeof(mtpPrime));
} else {
memcpy(to->data() + size, from->constData() + 4, fulllen * sizeof(mtpPrime));
}
} else {
to->resize(size + fulllen + skipBeforeRequest + 3);
memcpy(to->data() + size, from->constData() + 4, headlen * sizeof(mtpPrime));
(*to)[size + 3] += 3 * sizeof(mtpPrime);
*((mtpTypeId*)&((*to)[size + headlen + skipBeforeRequest])) = mtpc_invokeAfterMsg;
memcpy(to->data() + size + headlen + skipBeforeRequest + 1, &afterId, 2 * sizeof(mtpPrime));
memcpy(to->data() + size + headlen + skipBeforeRequest + 3, from->constData() + 4 + headlen, lenInInts * sizeof(mtpPrime));
if (size + 3 != 7) (*to)[7] += 3 * sizeof(mtpPrime);
}
}
void clearCallbacks(mtpRequestId requestId, int32 errorCode) {
RPCResponseHandler h;
bool found = false;
{
QMutexLocker locker(&parserMapLock);
ParserMap::iterator i = parserMap.find(requestId);
if (i != parserMap.end()) {
h = i.value();
found = true;
parserMap.erase(i);
}
}
if (errorCode && found) {
rpcErrorOccured(requestId, h, rpcClientError("CLEAR_CALLBACK", QString("did not handle request %1, error code %2").arg(requestId).arg(errorCode)));
}
}
void clearCallbacksDelayed(const RPCCallbackClears &requestIds) {
uint32 idsCount = requestIds.size();
if (!idsCount) return;
if (cDebug()) {
QString idsStr = QString("%1").arg(requestIds[0].requestId);
for (uint32 i = 1; i < idsCount; ++i) {
idsStr += QString(", %1").arg(requestIds[i].requestId);
}
DEBUG_LOG(("RPC Info: clear callbacks delayed, msgIds: %1").arg(idsStr));
}
QMutexLocker lock(&toClearLock);
uint32 toClearNow = toClear.size();
if (toClearNow) {
toClear.resize(toClearNow + idsCount);
memcpy(toClear.data() + toClearNow, requestIds.constData(), idsCount * sizeof(RPCCallbackClear));
} else {
toClear = requestIds;
}
}
void performDelayedClear() {
QMutexLocker lock(&toClearLock);
if (!toClear.isEmpty()) {
for (RPCCallbackClears::iterator i = toClear.begin(), e = toClear.end(); i != e; ++i) {
if (cDebug()) {
QMutexLocker locker(&parserMapLock);
if (parserMap.find(i->requestId) != parserMap.end()) {
DEBUG_LOG(("RPC Info: clearing delayed callback %1, error code %2").arg(i->requestId).arg(i->errorCode));
}
}
clearCallbacks(i->requestId, i->errorCode);
internal::unregisterRequest(i->requestId);
}
toClear.clear();
}
}
void execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end) {
RPCResponseHandler h;
{
QMutexLocker locker(&parserMapLock);
ParserMap::iterator i = parserMap.find(requestId);
if (i != parserMap.cend()) {
h = i.value();
parserMap.erase(i);
DEBUG_LOG(("RPC Info: found parser for request %1, trying to parse response...").arg(requestId));
}
}
if (h.onDone || h.onFail) {
try {
if (from >= end) throw mtpErrorInsufficient();
if (*from == mtpc_rpc_error) {
RPCError err(MTPRpcError(from, end));
DEBUG_LOG(("RPC Info: error received, code %1, type %2, description: %3").arg(err.code()).arg(err.type()).arg(err.description()));
if (!rpcErrorOccured(requestId, h, err)) {
QMutexLocker locker(&parserMapLock);
parserMap.insert(requestId, h);
return;
}
} else {
if (h.onDone) {
// t_assert(App::app() != 0);
(*h.onDone)(requestId, from, end);
}
}
} catch (Exception &e) {
if (!rpcErrorOccured(requestId, h, rpcClientError("RESPONSE_PARSE_FAILED", QString("exception text: ") + e.what()))) {
QMutexLocker locker(&parserMapLock);
parserMap.insert(requestId, h);
return;
}
}
} else {
DEBUG_LOG(("RPC Info: parser not found for %1").arg(requestId));
}
unregisterRequest(requestId);
}
bool hasCallbacks(mtpRequestId requestId) {
QMutexLocker locker(&parserMapLock);
ParserMap::iterator i = parserMap.find(requestId);
return (i != parserMap.cend());
}
void globalCallback(const mtpPrime *from, const mtpPrime *end) {
if (globalHandler.onDone) (*globalHandler.onDone)(0, from, end); // some updates were received
}
void onStateChange(int32 dcWithShift, int32 state) {
if (stateChangedHandler) stateChangedHandler(dcWithShift, state);
}
void onSessionReset(int32 dcWithShift) {
if (sessionResetHandler) sessionResetHandler(dcWithShift);
}
bool rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err) { // return true if need to clean request data
if (isDefaultHandledError(err)) {
if (onFail && (*onFail)(requestId, err)) return true;
}
if (onErrorDefault(requestId, err)) {
return false;
}
LOG(("RPC Error: request %1 got fail with code %2, error %3%4").arg(requestId).arg(err.code()).arg(err.type()).arg(err.description().isEmpty() ? QString() : QString(": %1").arg(err.description())));
onFail && (*onFail)(requestId, err);
return true;
}
GlobalSlotCarrier::GlobalSlotCarrier() {
connect(&_timer, SIGNAL(timeout()), this, SLOT(checkDelayed()));
}
void GlobalSlotCarrier::checkDelayed() {
auto now = getms(true);
while (!delayedRequests.isEmpty() && now >= delayedRequests.front().second) {
mtpRequestId requestId = delayedRequests.front().first;
delayedRequests.pop_front();
int32 dcWithShift = 0;
{
QMutexLocker locker(&requestByDCLock);
RequestsByDC::const_iterator i = requestsByDC.constFind(requestId);
if (i != requestsByDC.cend()) {
dcWithShift = i.value();
} else {
LOG(("MTP Error: could not find request dc for delayed resend, requestId %1").arg(requestId));
continue;
}
}
mtpRequest req;
{
QReadLocker locker(&requestMapLock);
RequestMap::const_iterator j = requestMap.constFind(requestId);
if (j == requestMap.cend()) {
DEBUG_LOG(("MTP Error: could not find request %1").arg(requestId));
continue;
}
req = j.value();
}
if (Session *session = getSession(qAbs(dcWithShift))) {
session->sendPrepared(req);
}
}
if (!delayedRequests.isEmpty()) {
_timer.start(delayedRequests.front().second - now);
}
}
void GlobalSlotCarrier::connectionFinished(Connection *connection) {
MTPQuittingConnections::iterator i = quittingConnections.find(connection);
if (i != quittingConnections.cend()) {
quittingConnections.erase(i);
}
connection->waitTillFinish();
delete connection;
}
GlobalSlotCarrier *globalSlotCarrier() {
return _globalSlotCarrier;
}
void queueQuittingConnection(Connection *connection) {
quittingConnections.insert(connection);
}
} // namespace internal
void start() {
if (started()) return;
unixtimeInit();
internal::DcenterMap &dcs(internal::DCMap());
_globalSlotCarrier = new internal::GlobalSlotCarrier();
mainSession = new internal::Session(internal::mainDC());
sessions.insert(mainSession->getDcWithShift(), mainSession);
_started = true;
if (internal::configNeeded()) {
internal::configLoader()->load();
}
}
bool started() {
return _started;
}
void restart() {
if (!_started) return;
for (auto i = sessions.cbegin(), e = sessions.cend(); i != e; ++i) {
i.value()->restart();
}
}
void restart(int32 dcMask) {
if (!_started) return;
dcMask = bareDcId(dcMask);
for (Sessions::const_iterator i = sessions.cbegin(), e = sessions.cend(); i != e; ++i) {
if (bareDcId(i.value()->getDcWithShift()) == dcMask) {
i.value()->restart();
}
}
}
void configure(int32 dc) {
if (_started) return;
internal::setDC(dc);
}
void setdc(int32 dc, bool fromZeroOnly) {
if (!dc || !_started) return;
internal::setDC(dc, fromZeroOnly);
int32 oldMainDc = mainSession->getDcWithShift();
if (maindc() != oldMainDc) {
killSession(oldMainDc);
}
Local::writeMtpData();
}
int32 maindc() {
return internal::mainDC();
}
int32 dcstate(int32 dc) {
if (!_started) return 0;
if (!dc) return mainSession->getState();
if (!bareDcId(dc)) {
dc += bareDcId(mainSession->getDcWithShift());
}
Sessions::const_iterator i = sessions.constFind(dc);
if (i != sessions.cend()) return i.value()->getState();
return DisconnectedState;
}
QString dctransport(int32 dc) {
if (!_started) return QString();
if (!dc) return mainSession->transport();
if (!bareDcId(dc)) {
dc += bareDcId(mainSession->getDcWithShift());
}
Sessions::const_iterator i = sessions.constFind(dc);
if (i != sessions.cend()) return i.value()->transport();
return QString();
}
void ping() {
if (internal::Session *session = internal::getSession(0)) {
session->ping();
}
}
void cancel(mtpRequestId requestId) {
if (!_started || !requestId) return;
mtpMsgId msgId = 0;
requestsDelays.remove(requestId);
{
QWriteLocker locker(&requestMapLock);
RequestMap::iterator i = requestMap.find(requestId);
if (i != requestMap.end()) {
msgId = *(mtpMsgId*)(i.value()->constData() + 4);
requestMap.erase(i);
}
}
{
QMutexLocker locker(&requestByDCLock);
RequestsByDC::iterator i = requestsByDC.find(requestId);
if (i != requestsByDC.end()) {
if (internal::Session *session = internal::getSession(qAbs(i.value()))) {
session->cancel(requestId, msgId);
}
requestsByDC.erase(i);
}
}
internal::clearCallbacks(requestId);
}
void killSession(int32 dc) {
Sessions::iterator i = sessions.find(dc);
if (i != sessions.cend()) {
bool wasMain = (i.value() == mainSession);
i.value()->kill();
i.value()->deleteLater();
sessions.erase(i);
if (wasMain) {
mainSession = new internal::Session(internal::mainDC());
int32 newdc = mainSession->getDcWithShift();
i = sessions.find(newdc);
if (i != sessions.cend()) {
i.value()->kill();
i.value()->deleteLater();
sessions.erase(i);
}
sessions.insert(newdc, mainSession);
}
}
}
void stopSession(int32 dc) {
Sessions::iterator i = sessions.find(dc);
if (i != sessions.end()) {
if (i.value() != mainSession) { // don't stop main session
i.value()->stop();
}
}
}
int32 state(mtpRequestId requestId) {
if (requestId > 0) {
QMutexLocker locker(&requestByDCLock);
RequestsByDC::iterator i = requestsByDC.find(requestId);
if (i != requestsByDC.end()) {
if (internal::Session *session = internal::getSession(qAbs(i.value()))) {
return session->requestState(requestId);
}
return MTP::RequestConnecting;
}
return MTP::RequestSent;
}
if (internal::Session *session = internal::getSession(-requestId)) {
return session->requestState(0);
}
return MTP::RequestConnecting;
}
void finish() {
mainSession = nullptr;
for (auto session : base::take(sessions)) {
session->kill();
delete session;
}
for_const (auto connection, quittingConnections) {
connection->waitTillFinish();
delete connection;
}
quittingConnections.clear();
delete _globalSlotCarrier;
_globalSlotCarrier = nullptr;
internal::destroyConfigLoader();
_started = false;
}
void logoutKeys(RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail) {
mtpRequestId req = MTP::send(MTPauth_LogOut(), onDone, onFail);
internal::logoutOtherDCs();
}
void setGlobalDoneHandler(RPCDoneHandlerPtr handler) {
globalHandler.onDone = handler;
}
void setGlobalFailHandler(RPCFailHandlerPtr handler) {
globalHandler.onFail = handler;
}
void setStateChangedHandler(MTPStateChangedHandler handler) {
stateChangedHandler = handler;
}
void setSessionResetHandler(MTPSessionResetHandler handler) {
sessionResetHandler = handler;
}
void clearGlobalHandlers() {
setGlobalDoneHandler(RPCDoneHandlerPtr());
setGlobalFailHandler(RPCFailHandlerPtr());
setStateChangedHandler(0);
setSessionResetHandler(0);
}
AuthKeysMap getKeys() {
return internal::getAuthKeys();
}
void setKey(int dc, const AuthKey::Data &key) {
auto dcId = MTP::bareDcId(dc);
auto keyPtr = std::make_shared<MTP::AuthKey>();
keyPtr->setDC(dcId);
keyPtr->setKey(key);
return internal::setAuthKey(dc, std::move(keyPtr));
Instance *MainInstance() {
return Messenger::Instance().mtp();
}
} // namespace MTP

View File

@ -23,58 +23,41 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
#include "mtproto/core_types.h"
#include "mtproto/session.h"
#include "core/single_timer.h"
#include "mtproto/mtp_instance.h"
namespace MTP {
namespace internal {
Session *getSession(ShiftedDcId shiftedDcId); // 0 - current set dc
bool paused();
void pause();
void unpause();
void registerRequest(mtpRequestId requestId, int32 dc);
void unregisterRequest(mtpRequestId requestId);
mtpRequestId storeRequest(mtpRequest &request, const RPCResponseHandler &parser);
mtpRequest getRequest(mtpRequestId req);
void wrapInvokeAfter(mtpRequest &to, const mtpRequest &from, const mtpRequestMap &haveSent, int32 skipBeforeRequest = 0);
void clearCallbacks(mtpRequestId requestId, int32 errorCode = RPCError::NoError); // 0 - do not toggle onError callback
void clearCallbacksDelayed(const RPCCallbackClears &requestIds);
void performDelayedClear();
void execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end);
bool hasCallbacks(mtpRequestId requestId);
void globalCallback(const mtpPrime *from, const mtpPrime *end);
void onStateChange(int32 dcWithShift, int32 state);
void onSessionReset(int32 dcWithShift);
bool rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err); // return true if need to clean request data
inline bool rpcErrorOccured(mtpRequestId requestId, const RPCResponseHandler &handler, const RPCError &err) {
return rpcErrorOccured(requestId, handler.onFail, err);
}
// used for:
// - resending requests by timer which were postponed by flood delay
// - destroying MTProtoConnections whose thread has finished
class GlobalSlotCarrier : public QObject {
Q_OBJECT
} // namespace internal
class PauseHolder {
public:
GlobalSlotCarrier();
public slots:
void checkDelayed();
void connectionFinished(Connection *connection);
PauseHolder() {
restart();
}
void restart() {
if (!base::take(_paused, true)) {
internal::pause();
}
}
void release() {
if (base::take(_paused)) {
internal::unpause();
}
}
~PauseHolder() {
release();
}
private:
SingleTimer _timer;
bool _paused = false;
};
GlobalSlotCarrier *globalSlotCarrier();
void queueQuittingConnection(Connection *connection);
} // namespace internal
constexpr ShiftedDcId DCShift = 10000;
constexpr DcId bareDcId(ShiftedDcId shiftedDcId) {
return (shiftedDcId % DCShift);
@ -135,115 +118,105 @@ constexpr bool isUplDcId(ShiftedDcId shiftedDcId) {
return (shiftedDcId >= internal::uploadDcId(0, 0)) && (shiftedDcId < internal::uploadDcId(0, MTPUploadSessionsCount - 1) + DCShift);
}
void start();
bool started();
void restart();
void restart(int32 dcMask);
class PauseHolder {
public:
PauseHolder() {
restart();
}
void restart() {
if (!base::take(_paused, true)) {
internal::pause();
}
}
void release() {
if (base::take(_paused)) {
internal::unpause();
}
}
~PauseHolder() {
release();
}
private:
bool _paused = false;
};
void configure(int32 dc);
void setdc(int32 dc, bool fromZeroOnly = false);
int32 maindc();
enum {
DisconnectedState = 0,
ConnectingState = 1,
ConnectedState = 2,
};
int32 dcstate(int32 dc = 0);
QString dctransport(int32 dc = 0);
template <typename TRequest>
inline mtpRequestId send(const TRequest &request, RPCResponseHandler callbacks = RPCResponseHandler(), int32 dc = 0, TimeMs msCanWait = 0, mtpRequestId after = 0) {
if (internal::Session *session = internal::getSession(dc)) {
return session->send(request, callbacks, msCanWait, true, !dc, after);
}
return 0;
}
template <typename TRequest>
inline mtpRequestId send(const TRequest &request, RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail = RPCFailHandlerPtr(), int32 dc = 0, TimeMs msCanWait = 0, mtpRequestId after = 0) {
return send(request, RPCResponseHandler(onDone, onFail), dc, msCanWait, after);
}
inline void sendAnything(int32 dc = 0, TimeMs msCanWait = 0) {
if (auto session = internal::getSession(dc)) {
return session->sendAnything(msCanWait);
}
}
void ping();
void cancel(mtpRequestId req);
void killSession(int32 dc);
void stopSession(int32 dc);
enum {
RequestSent = 0,
RequestConnecting = 1,
RequestSending = 2
};
int32 state(mtpRequestId req); // < 0 means waiting for such count of ms
void finish();
Instance *MainInstance();
void logoutKeys(RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail);
inline void restart() {
return MainInstance()->restart();
}
void setGlobalDoneHandler(RPCDoneHandlerPtr handler);
void setGlobalFailHandler(RPCFailHandlerPtr handler);
void setStateChangedHandler(MTPStateChangedHandler handler);
void setSessionResetHandler(MTPSessionResetHandler handler);
void clearGlobalHandlers();
inline void restart(ShiftedDcId shiftedDcId) {
return MainInstance()->restart(shiftedDcId);
}
AuthKeysMap getKeys();
void setKey(int dc, const AuthKey::Data &key);
inline DcId maindc() {
return MainInstance()->mainDcId();
}
inline int32 dcstate(ShiftedDcId shiftedDcId = 0) {
if (auto instance = MainInstance()) {
return instance->dcstate(shiftedDcId);
}
return DisconnectedState;
}
inline QString dctransport(ShiftedDcId shiftedDcId = 0) {
if (auto instance = MainInstance()) {
return instance->dctransport(shiftedDcId);
}
return QString();
}
template <typename TRequest>
inline mtpRequestId send(const TRequest &request, RPCResponseHandler callbacks = RPCResponseHandler(), ShiftedDcId dcId = 0, TimeMs msCanWait = 0, mtpRequestId after = 0) {
return MainInstance()->send(request, std::move(callbacks), dcId, msCanWait, after);
}
template <typename TRequest>
inline mtpRequestId send(const TRequest &request, RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail = RPCFailHandlerPtr(), ShiftedDcId dcId = 0, TimeMs msCanWait = 0, mtpRequestId after = 0) {
return MainInstance()->send(request, std::move(onDone), std::move(onFail), dcId, msCanWait, after);
}
inline void sendAnything(ShiftedDcId shiftedDcId = 0, TimeMs msCanWait = 0) {
return MainInstance()->sendAnything(shiftedDcId, msCanWait);
}
inline void cancel(mtpRequestId requestId) {
return MainInstance()->cancel(requestId);
}
inline void ping() {
return MainInstance()->ping();
}
inline void killSession(ShiftedDcId shiftedDcId) {
return MainInstance()->killSession(shiftedDcId);
}
inline void stopSession(ShiftedDcId shiftedDcId) {
return MainInstance()->stopSession(shiftedDcId);
}
inline int32 state(mtpRequestId requestId) { // < 0 means waiting for such count of ms
return MainInstance()->state(requestId);
}
namespace internal {
template <typename TRequest>
mtpRequestId Session::send(const TRequest &request, RPCResponseHandler callbacks, TimeMs msCanWait, bool needsLayer, bool toMainDC, mtpRequestId after) {
mtpRequestId requestId = 0;
try {
uint32 requestSize = request.innerLength() >> 2;
mtpRequest reqSerialized(mtpRequestData::prepare(requestSize));
request.write(*reqSerialized);
template <typename TRequest>
mtpRequestId Session::send(const TRequest &request, RPCResponseHandler callbacks, TimeMs msCanWait, bool needsLayer, bool toMainDC, mtpRequestId after) {
mtpRequestId requestId = 0;
try {
uint32 requestSize = request.innerLength() >> 2;
mtpRequest reqSerialized(mtpRequestData::prepare(requestSize));
request.write(*reqSerialized);
DEBUG_LOG(("MTP Info: adding request to toSendMap, msCanWait %1").arg(msCanWait));
DEBUG_LOG(("MTP Info: adding request to toSendMap, msCanWait %1").arg(msCanWait));
reqSerialized->msDate = getms(true); // > 0 - can send without container
reqSerialized->needsLayer = needsLayer;
if (after) reqSerialized->after = MTP::internal::getRequest(after);
requestId = MTP::internal::storeRequest(reqSerialized, callbacks);
reqSerialized->msDate = getms(true); // > 0 - can send without container
reqSerialized->needsLayer = needsLayer;
if (after) reqSerialized->after = getRequest(after);
requestId = storeRequest(reqSerialized, callbacks);
sendPrepared(reqSerialized, msCanWait);
} catch (Exception &e) {
requestId = 0;
MTP::internal::rpcErrorOccured(requestId, callbacks, rpcClientError("NO_REQUEST_ID", QString("send() failed to queue request, exception: %1").arg(e.what())));
}
if (requestId) MTP::internal::registerRequest(requestId, toMainDC ? -getDcWithShift() : getDcWithShift());
return requestId;
sendPrepared(reqSerialized, msCanWait);
} catch (Exception &e) {
requestId = 0;
rpcErrorOccured(requestId, callbacks.onFail, rpcClientError("NO_REQUEST_ID", QString("send() failed to queue request, exception: %1").arg(e.what())));
}
if (requestId) registerRequest(requestId, toMainDC ? -getDcWithShift() : getDcWithShift());
return requestId;
}
} // namespace internal
} // namespace MTP

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,131 @@
/*
This file is part of Telegram Desktop,
the official desktop version of Telegram messaging app, see https://telegram.org
Telegram Desktop is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
It is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
In addition, as a special exception, the copyright holders give permission
to link the code of portions of this program with the OpenSSL library.
Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
*/
#pragma once
#include "mtproto/dcenter.h"
#include <map>
#include <set>
namespace MTP {
class DcOptions;
class Session;
class Instance : public QObject {
Q_OBJECT
public:
struct Config {
static constexpr auto kNoneMainDc = -1;
static constexpr auto kNotSetMainDc = 0;
static constexpr auto kDefaultMainDc = 2;
DcId mainDcId = kNotSetMainDc;
std::map<DcId, AuthKey::Data> keys;
};
Instance(DcOptions *options, Config &&config);
Instance(const Instance &other) = delete;
Instance &operator=(const Instance &other) = delete;
void suggestMainDcId(DcId mainDcId);
void setMainDcId(DcId mainDcId);
DcId mainDcId() const;
void Instance::setKeyForWrite(DcId dcId, const AuthKeyPtr &key);
AuthKeysMap Instance::getKeysForWrite() const;
DcOptions *dcOptions();
template <typename TRequest>
mtpRequestId send(const TRequest &request, RPCResponseHandler callbacks = RPCResponseHandler(), ShiftedDcId dcId = 0, TimeMs msCanWait = 0, mtpRequestId after = 0) {
if (auto session = getSession(dcId)) {
return session->send(request, callbacks, msCanWait, true, !dcId, after);
}
return 0;
}
template <typename TRequest>
mtpRequestId send(const TRequest &request, RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail = RPCFailHandlerPtr(), int32 dc = 0, TimeMs msCanWait = 0, mtpRequestId after = 0) {
return send(request, RPCResponseHandler(onDone, onFail), dc, msCanWait, after);
}
void sendAnything(ShiftedDcId dcId = 0, TimeMs msCanWait = 0) {
if (auto session = getSession(dcId)) {
session->sendAnything(msCanWait);
}
}
void restart();
void restart(ShiftedDcId shiftedDcId);
int32 dcstate(ShiftedDcId shiftedDcId = 0);
QString dctransport(ShiftedDcId shiftedDcId = 0);
void ping();
void cancel(mtpRequestId requestId);
int32 state(mtpRequestId requestId); // < 0 means waiting for such count of ms
void killSession(ShiftedDcId shiftedDcId);
void stopSession(ShiftedDcId shiftedDcId);
void logout(RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail);
internal::DcenterPtr getDcById(DcId dcId);
void unpaused();
void queueQuittingConnection(std::unique_ptr<internal::Connection> connection);
void setUpdatesHandler(RPCDoneHandlerPtr onDone);
void setGlobalFailHandler(RPCFailHandlerPtr onFail);
void setStateChangedHandler(base::lambda<void(ShiftedDcId shiftedDcId, int32 state)> &&handler);
void setSessionResetHandler(base::lambda<void(ShiftedDcId shiftedDcId)> &&handler);
void clearGlobalHandlers();
void onStateChange(ShiftedDcId dcWithShift, int32 state);
void onSessionReset(ShiftedDcId dcWithShift);
void registerRequest(mtpRequestId requestId, ShiftedDcId dcWithShift);
mtpRequestId storeRequest(mtpRequest &request, const RPCResponseHandler &parser);
mtpRequest getRequest(mtpRequestId requestId);
void clearCallbacksDelayed(const RPCCallbackClears &requestIds);
void execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end);
bool hasCallbacks(mtpRequestId requestId);
void globalCallback(const mtpPrime *from, const mtpPrime *end);
// return true if need to clean request data
bool rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err);
~Instance();
public slots:
void configLoadRequest();
void connectionFinished(internal::Connection *connection);
signals:
void configLoaded();
private:
internal::Session *getSession(ShiftedDcId shiftedDcId);
class Private;
const std::unique_ptr<Private> _private;
};
} // namespace MTP

View File

@ -19,13 +19,14 @@ Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
*/
#include "stdafx.h"
#include "mtproto/session.h"
#include "mtproto/connection.h"
namespace MTP {
namespace internal {
void SessionData::clear() {
void SessionData::clear(Instance *instance) {
RPCCallbackClears clearCallbacks;
{
QReadLocker locker1(haveSentMutex()), locker2(toResendMutex()), locker3(haveReceivedMutex()), locker4(wereAckedMutex());
@ -66,42 +67,23 @@ void SessionData::clear() {
QWriteLocker locker(receivedIdsMutex());
receivedIds.clear();
}
clearCallbacksDelayed(clearCallbacks);
instance->clearCallbacksDelayed(clearCallbacks);
}
Session::Session(int32 requestedDcId) : QObject()
, _connection(0)
, _killed(false)
, _needToReceive(false)
, data(this)
, dcWithShift(0)
, dc(0)
, msSendCall(0)
, msWait(0)
, _ping(false) {
if (_killed) {
DEBUG_LOG(("Session Error: can't start a killed session"));
return;
}
if (dcWithShift) {
DEBUG_LOG(("Session Info: Session::start called on already started session"));
return;
}
msSendCall = msWait = 0;
Session::Session(Instance *instance, ShiftedDcId requestedShiftedDcId) : QObject()
, _instance(instance)
, data(this) {
connect(&timeouter, SIGNAL(timeout()), this, SLOT(checkRequestsByTimer()));
timeouter.start(1000);
connect(&sender, SIGNAL(timeout()), this, SLOT(needToResumeAndSend()));
_connection = new Connection();
dcWithShift = _connection->prepare(&data, requestedDcId);
_connection = std::make_unique<Connection>(_instance);
dcWithShift = _connection->prepare(&data, requestedShiftedDcId);
if (!dcWithShift) {
delete _connection;
_connection = 0;
DEBUG_LOG(("Session Info: could not start connection to dc %1").arg(requestedDcId));
_connection.reset();
DEBUG_LOG(("Session Info: could not start connection to dc %1").arg(requestedShiftedDcId));
return;
}
createDcData();
@ -112,24 +94,33 @@ void Session::createDcData() {
if (dc) {
return;
}
int32 dcId = bareDcId(dcWithShift);
auto dcId = bareDcId(dcWithShift);
auto &dcs = DCMap();
auto dcIndex = dcs.constFind(dcId);
if (dcIndex == dcs.cend()) {
dc = DcenterPtr(new Dcenter(dcId, AuthKeyPtr()));
dcs.insert(dcId, dc);
} else {
dc = dcIndex.value();
}
dc = _instance->getDcById(dcId);
ReadLockerAttempt lock(keyMutex());
data.setKey(lock ? dc->getKey() : AuthKeyPtr());
if (lock && dc->connectionInited()) {
data.setLayerWasInited(true);
}
connect(dc.data(), SIGNAL(authKeyCreated()), this, SLOT(authKeyCreatedForDC()), Qt::QueuedConnection);
connect(dc.data(), SIGNAL(layerWasInited(bool)), this, SLOT(layerWasInitedForDC(bool)), Qt::QueuedConnection);
connect(dc.get(), SIGNAL(authKeyCreated()), this, SLOT(authKeyCreatedForDC()), Qt::QueuedConnection);
connect(dc.get(), SIGNAL(layerWasInited(bool)), this, SLOT(layerWasInitedForDC(bool)), Qt::QueuedConnection);
}
void Session::registerRequest(mtpRequestId requestId, ShiftedDcId dcWithShift) {
return _instance->registerRequest(requestId, dcWithShift);
}
mtpRequestId Session::storeRequest(mtpRequest &request, const RPCResponseHandler &parser) {
return _instance->storeRequest(request, parser);
}
mtpRequest Session::getRequest(mtpRequestId requestId) {
return _instance->getRequest(requestId);
}
bool Session::rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err) { // return true if need to clean request data
return _instance->rpcErrorOccured(requestId, onFail, err);
}
void Session::restart() {
@ -148,7 +139,7 @@ void Session::stop() {
DEBUG_LOG(("Session Info: stopping session dcWithShift %1").arg(dcWithShift));
if (_connection) {
_connection->kill();
_connection = 0;
_instance->queueQuittingConnection(std::move(_connection));
}
}
@ -202,12 +193,9 @@ void Session::needToResumeAndSend() {
}
if (!_connection) {
DEBUG_LOG(("Session Info: resuming session dcWithShift %1").arg(dcWithShift));
DcenterMap &dcs(DCMap());
_connection = new Connection();
_connection = std::make_unique<Connection>(_instance);
if (!_connection->prepare(&data, dcWithShift)) {
delete _connection;
_connection = 0;
_connection.reset();
DEBUG_LOG(("Session Info: could not start connection to dcWithShift %1").arg(dcWithShift));
dcWithShift = 0;
return;
@ -298,16 +286,16 @@ void Session::checkRequestsByTimer() {
}
}
}
clearCallbacksDelayed(clearCallbacks);
_instance->clearCallbacksDelayed(clearCallbacks);
}
}
void Session::onConnectionStateChange(qint32 newState) {
onStateChange(dcWithShift, newState);
_instance->onStateChange(dcWithShift, newState);
}
void Session::onResetDone() {
onSessionReset(dcWithShift);
_instance->onSessionReset(dcWithShift);
}
void Session::cancel(mtpRequestId requestId, mtpMsgId msgId) {
@ -473,9 +461,9 @@ void Session::authKeyCreatedForDC() {
emit authKeyCreated();
}
void Session::notifyKeyCreated(const AuthKeyPtr &key) {
void Session::notifyKeyCreated(AuthKeyPtr &&key) {
DEBUG_LOG(("AuthKey Info: Session::keyCreated(), setting, dcWithShift %1").arg(dcWithShift));
dc->setKey(key);
dc->setKey(std::move(key));
}
void Session::layerWasInitedForDC(bool wasInited) {
@ -530,17 +518,17 @@ void Session::tryToReceive() {
}
if (requestId <= 0) {
if (dcWithShift == bareDcId(dcWithShift)) { // call globalCallback only in main session
globalCallback(response.constData(), response.constData() + response.size());
_instance->globalCallback(response.constData(), response.constData() + response.size());
}
} else {
execCallback(requestId, response.constData(), response.constData() + response.size());
_instance->execCallback(requestId, response.constData(), response.constData() + response.size());
}
++cnt;
}
}
Session::~Session() {
t_assert(_connection == 0);
t_assert(_connection == nullptr);
}
MTPrpcError rpcClientError(const QString &type, const QString &description) {

View File

@ -20,14 +20,17 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
*/
#pragma once
#include "mtproto/connection.h"
#include "mtproto/dcenter.h"
#include "mtproto/rpc_sender.h"
#include "core/single_timer.h"
namespace MTP {
class Instance;
namespace internal {
class Connection;
class ReceivedMsgIds {
public:
bool registerMsgId(mtpMsgId msgId, bool needAck) {
@ -236,7 +239,7 @@ public:
return result * 2 + (needAck ? 1 : 0);
}
void clear();
void clear(Instance *instance);
private:
uint64 _session = 0;
@ -275,7 +278,7 @@ class Session : public QObject {
Q_OBJECT
public:
Session(int32 dcenter);
Session(Instance *instance, ShiftedDcId requestedShiftedDcId);
void restart();
void stop();
@ -283,10 +286,10 @@ public:
void unpaused();
int32 getDcWithShift() const;
ShiftedDcId getDcWithShift() const;
QReadWriteLock *keyMutex() const;
void notifyKeyCreated(const AuthKeyPtr &key);
void notifyKeyCreated(AuthKeyPtr &&key);
void destroyKey();
void notifyLayerInited(bool wasInited);
@ -331,19 +334,26 @@ public slots:
private:
void createDcData();
Connection *_connection;
void registerRequest(mtpRequestId requestId, ShiftedDcId dcWithShift);
mtpRequestId storeRequest(mtpRequest &request, const RPCResponseHandler &parser);
mtpRequest getRequest(mtpRequestId requestId);
bool rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err);
bool _killed;
bool _needToReceive;
Instance *_instance;
std::unique_ptr<Connection> _connection;
bool _killed = false;
bool _needToReceive = false;
SessionData data;
int32 dcWithShift;
ShiftedDcId dcWithShift = 0;
DcenterPtr dc;
TimeMs msSendCall, msWait;
TimeMs msSendCall = 0;
TimeMs msWait = 0;
bool _ping;
bool _ping = false;
QTimer timeouter;
SingleTimer sender;

View File

@ -72,7 +72,7 @@ void PasscodeWidget::onSubmit() {
if (Local::readMap(_passcode->text().toUtf8()) != Local::ReadMapPassNeeded) {
cSetPasscodeBadTries(0);
MTP::start();
Messenger::Instance().startMtp();
if (AuthSession::Current()) {
App::wnd()->setupMain();
} else {

View File

@ -186,9 +186,9 @@ static const NotifySettingsPtr EmptyNotifySettings = NotifySettingsPtr(1);
extern NotifySettings globalNotifyAll, globalNotifyUsers, globalNotifyChats;
extern NotifySettingsPtr globalNotifyAllPtr, globalNotifyUsersPtr, globalNotifyChatsPtr;
inline bool isNotifyMuted(NotifySettingsPtr settings, TimeId *changeIn = 0) {
inline bool isNotifyMuted(NotifySettingsPtr settings, TimeId *changeIn = nullptr) {
if (settings != UnknownNotifySettings && settings != EmptyNotifySettings) {
TimeId t = unixtime();
auto t = unixtime();
if (settings->mute > t) {
if (changeIn) *changeIn = settings->mute - t + 1;
return true;

View File

@ -323,8 +323,6 @@
'<(src_loc)/media/media_clip_qtgif.h',
'<(src_loc)/media/media_clip_reader.cpp',
'<(src_loc)/media/media_clip_reader.h',
'<(src_loc)/mtproto/facade.cpp',
'<(src_loc)/mtproto/facade.h',
'<(src_loc)/mtproto/auth_key.cpp',
'<(src_loc)/mtproto/auth_key.h',
'<(src_loc)/mtproto/connection.cpp',
@ -343,8 +341,12 @@
'<(src_loc)/mtproto/dcenter.h',
'<(src_loc)/mtproto/dc_options.cpp',
'<(src_loc)/mtproto/dc_options.h',
'<(src_loc)/mtproto/facade.cpp',
'<(src_loc)/mtproto/facade.h',
'<(src_loc)/mtproto/file_download.cpp',
'<(src_loc)/mtproto/file_download.h',
'<(src_loc)/mtproto/mtp_instance.cpp',
'<(src_loc)/mtproto/mtp_instance.h',
'<(src_loc)/mtproto/rsa_public_key.cpp',
'<(src_loc)/mtproto/rsa_public_key.h',
'<(src_loc)/mtproto/rpc_sender.cpp',