Improve working with sessions.

This commit is contained in:
John Preston 2019-11-14 20:39:15 +03:00
parent 1e252a6505
commit fe1e627cac
3 changed files with 139 additions and 116 deletions

View File

@ -35,6 +35,8 @@ constexpr auto kConfigBecomesOldIn = 2 * 60 * crl::time(1000);
constexpr auto kConfigBecomesOldForBlockedIn = 8 * crl::time(1000);
constexpr auto kCheckKeyEach = 60 * crl::time(1000);
using namespace internal;
} // namespace
class Instance::Private : private Sender {
@ -79,12 +81,17 @@ public:
void reInitConnection(DcId dcId);
void logout(RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail);
std::shared_ptr<internal::Dcenter> getDcById(ShiftedDcId shiftedDcId);
std::shared_ptr<Dcenter> getDcById(ShiftedDcId shiftedDcId);
std::shared_ptr<Dcenter> findDc(ShiftedDcId shiftedDcId);
std::shared_ptr<Dcenter> addDc(
ShiftedDcId shiftedDcId,
AuthKeyPtr &&key = nullptr);
void removeDc(ShiftedDcId shiftedDcId);
void unpaused();
void queueQuittingConnection(
std::unique_ptr<internal::Connection> &&connection);
void connectionFinished(internal::Connection *connection);
std::unique_ptr<Connection> &&connection);
void connectionFinished(Connection *connection);
void sendRequest(
mtpRequestId requestId,
@ -121,7 +128,7 @@ public:
void setSessionResetHandler(Fn<void(ShiftedDcId shiftedDcId)> handler);
void clearGlobalHandlers();
not_null<internal::Session*> getSession(ShiftedDcId shiftedDcId);
[[nodiscard]] not_null<Session*> getSession(ShiftedDcId shiftedDcId);
bool isNormal() const {
return (_mode == Instance::Mode::Normal);
@ -147,6 +154,10 @@ private:
bool exportFail(const RPCError &error, mtpRequestId requestId);
bool onErrorDefault(mtpRequestId requestId, const RPCError &error);
Session *findSession(ShiftedDcId shiftedDcId);
not_null<Session*> startSession(ShiftedDcId shiftedDcId);
Session *removeSessionToKilled(ShiftedDcId shiftedDcId);
void applyDomainIps(
const QString &host,
const QStringList &ips,
@ -178,18 +189,18 @@ private:
DcId _mainDcId = Config::kDefaultMainDc;
bool _mainDcIdForced = false;
std::map<DcId, std::shared_ptr<internal::Dcenter>> _dcenters;
base::flat_map<DcId, std::shared_ptr<Dcenter>> _dcenters;
QString _deviceModel;
QString _systemVersion;
internal::Session *_mainSession = nullptr;
std::map<ShiftedDcId, std::unique_ptr<internal::Session>> _sessions;
std::vector<std::unique_ptr<internal::Session>> _killedSessions; // delayed delete
Session *_mainSession = nullptr;
base::flat_map<ShiftedDcId, std::unique_ptr<Session>> _sessions;
std::vector<std::unique_ptr<Session>> _killedSessions; // delayed delete
base::set_of_unique_ptr<internal::Connection> _quittingConnections;
base::set_of_unique_ptr<Connection> _quittingConnections;
std::unique_ptr<internal::ConfigLoader> _configLoader;
std::unique_ptr<ConfigLoader> _configLoader;
std::unique_ptr<DomainResolver> _domainResolver;
std::unique_ptr<SpecialConfigRequest> _httpUnixtimeLoader;
QString _userPhone;
@ -258,9 +269,7 @@ void Instance::Private::start(Config &&config) {
}
}
_keysForWrite[shiftedDcId] = key;
auto dc = std::make_shared<internal::Dcenter>(_instance, dcId, std::move(key));
_dcenters.emplace(shiftedDcId, std::move(dc));
addDc(shiftedDcId, std::move(key));
}
if (config.mainDcId != Config::kNotSetMainDc) {
@ -269,17 +278,11 @@ void Instance::Private::start(Config &&config) {
}
if (isKeysDestroyer()) {
for (auto &dc : _dcenters) {
auto shiftedDcId = dc.first;
auto session = std::make_unique<internal::Session>(_instance, shiftedDcId);
auto it = _sessions.emplace(shiftedDcId, std::move(session)).first;
it->second->start();
for (const auto &[shiftedDcId, dc] : _dcenters) {
startSession(shiftedDcId);
}
} else if (_mainDcId != Config::kNoneMainDc) {
auto main = std::make_unique<internal::Session>(_instance, _mainDcId);
_mainSession = main.get();
_sessions.emplace(_mainDcId, std::move(main));
_mainSession->start();
_mainSession = startSession(_mainDcId);
}
_checkDelayedTimer.setCallback([this] { checkDelayedRequests(); });
@ -335,8 +338,8 @@ void Instance::Private::applyDomainIps(
}
if (applyToProxy(Global::RefSelectedProxy())
&& (Global::ProxySettings() == ProxyData::Settings::Enabled)) {
for (auto &session : _sessions) {
session.second->refreshOptions();
for (const auto &[shiftedDcId, session] : _sessions) {
session->refreshOptions();
}
}
emit _instance->proxyDomainResolved(host, ips, expireAt);
@ -398,7 +401,7 @@ void Instance::Private::requestConfig() {
if (_configLoader || isKeysDestroyer()) {
return;
}
_configLoader = std::make_unique<internal::ConfigLoader>(
_configLoader = std::make_unique<ConfigLoader>(
_instance,
_userPhone,
rpcDone([=](const MTPConfig &result) { configLoadDone(result); }),
@ -472,16 +475,16 @@ void Instance::Private::requestCDNConfig() {
}
void Instance::Private::restart() {
for (auto &session : _sessions) {
session.second->restart();
for (const auto &[shiftedDcId, session] : _sessions) {
session->restart();
}
}
void Instance::Private::restart(ShiftedDcId shiftedDcId) {
auto dcId = BareDcId(shiftedDcId);
for (auto &session : _sessions) {
if (BareDcId(session.second->getDcWithShift()) == dcId) {
session.second->restart();
const auto dcId = BareDcId(shiftedDcId);
for (const auto &[shiftedDcId, session] : _sessions) {
if (BareDcId(shiftedDcId) == dcId) {
session->restart();
}
}
}
@ -497,11 +500,9 @@ int32 Instance::Private::dcstate(ShiftedDcId shiftedDcId) {
shiftedDcId += BareDcId(_mainSession->getDcWithShift());
}
auto it = _sessions.find(shiftedDcId);
if (it != _sessions.cend()) {
return it->second->getState();
if (const auto session = findSession(shiftedDcId)) {
return session->getState();
}
return DisconnectedState;
}
@ -515,11 +516,9 @@ QString Instance::Private::dctransport(ShiftedDcId shiftedDcId) {
shiftedDcId += BareDcId(_mainSession->getDcWithShift());
}
auto it = _sessions.find(shiftedDcId);
if (it != _sessions.cend()) {
return it->second->transport();
if (const auto session = findSession(shiftedDcId)) {
return session->transport();
}
return QString();
}
@ -563,25 +562,15 @@ int32 Instance::Private::state(mtpRequestId requestId) {
}
void Instance::Private::killSession(ShiftedDcId shiftedDcId) {
auto checkIfMainAndKill = [this](ShiftedDcId shiftedDcId) {
auto it = _sessions.find(shiftedDcId);
if (it != _sessions.cend()) {
_killedSessions.push_back(std::move(it->second));
_sessions.erase(it);
_killedSessions.back()->kill();
return (_killedSessions.back().get() == _mainSession);
}
return false;
const auto checkIfMainAndKill = [&](ShiftedDcId shiftedDcId) {
const auto killed = removeSessionToKilled(shiftedDcId);
return killed && (killed == _mainSession);
};
if (checkIfMainAndKill(shiftedDcId)) {
checkIfMainAndKill(_mainDcId);
auto main = std::make_unique<internal::Session>(_instance, _mainDcId);
_mainSession = main.get();
_sessions.emplace(_mainDcId, std::move(main));
_mainSession->start();
_mainSession = startSession(_mainDcId);
}
InvokeQueued(_instance, [this] {
InvokeQueued(_instance, [=] {
clearKilledSessions();
});
}
@ -591,18 +580,17 @@ void Instance::Private::clearKilledSessions() {
}
void Instance::Private::stopSession(ShiftedDcId shiftedDcId) {
auto it = _sessions.find(shiftedDcId);
if (it != _sessions.end()) {
if (it->second.get() != _mainSession) { // don't stop main session
it->second->stop();
if (const auto session = findSession(shiftedDcId)) {
if (session != _mainSession) { // don't stop main session
session->stop();
}
}
}
void Instance::Private::reInitConnection(DcId dcId) {
for (auto &session : _sessions) {
if (BareDcId(session.second->getDcWithShift()) == dcId) {
session.second->reInitConnection();
for (const auto &[shiftedDcId, session] : _sessions) {
if (BareDcId(shiftedDcId) == dcId) {
session->reInitConnection();
}
}
}
@ -647,22 +635,40 @@ bool Instance::Private::logoutGuestDone(mtpRequestId requestId) {
return false;
}
std::shared_ptr<internal::Dcenter> Instance::Private::getDcById(ShiftedDcId shiftedDcId) {
auto it = _dcenters.find(shiftedDcId);
if (it == _dcenters.cend()) {
auto dcId = BareDcId(shiftedDcId);
if (isTemporaryDcId(dcId)) {
if (auto realDcId = getRealIdFromTemporaryDcId(dcId)) {
dcId = realDcId;
}
}
it = _dcenters.find(dcId);
if (it == _dcenters.cend()) {
auto result = std::make_shared<internal::Dcenter>(_instance, dcId, AuthKeyPtr());
return _dcenters.emplace(dcId, std::move(result)).first->second;
std::shared_ptr<Dcenter> Instance::Private::findDc(ShiftedDcId shiftedDcId) {
const auto i = _dcenters.find(shiftedDcId);
return (i != _dcenters.end()) ? i->second : nullptr;
}
std::shared_ptr<Dcenter> Instance::Private::addDc(
ShiftedDcId shiftedDcId,
AuthKeyPtr &&key) {
const auto dcId = BareDcId(shiftedDcId);
return _dcenters.emplace(
shiftedDcId,
std::make_shared<Dcenter>(_instance, dcId, std::move(key))
).first->second;
}
void Instance::Private::removeDc(ShiftedDcId shiftedDcId) {
_dcenters.erase(shiftedDcId);
}
std::shared_ptr<Dcenter> Instance::Private::getDcById(
ShiftedDcId shiftedDcId) {
if (auto result = findDc(shiftedDcId)) {
return result;
}
auto dcId = BareDcId(shiftedDcId);
if (isTemporaryDcId(dcId)) {
if (const auto realDcId = getRealIdFromTemporaryDcId(dcId)) {
dcId = realDcId;
}
}
return it->second;
if (auto result = findDc(dcId)) {
return result;
}
return addDc(dcId);
}
void Instance::Private::setKeyForWrite(DcId dcId, const AuthKeyPtr &key) {
@ -693,7 +699,7 @@ void Instance::Private::addKeysForDestroy(AuthKeysList &&keys) {
Expects(isKeysDestroyer());
for (auto &key : keys) {
auto dcId = key->dcId();
const auto dcId = key->dcId();
auto shiftedDcId = MTP::destroyKeyNextDcId(dcId);
{
@ -706,12 +712,8 @@ void Instance::Private::addKeysForDestroy(AuthKeysList &&keys) {
_keysForWrite[shiftedDcId] = key;
}
auto dc = std::make_shared<internal::Dcenter>(_instance, dcId, std::move(key));
_dcenters.emplace(shiftedDcId, std::move(dc));
auto session = std::make_unique<internal::Session>(_instance, shiftedDcId);
auto it = _sessions.emplace(shiftedDcId, std::move(session)).first;
it->second->start();
addDc(shiftedDcId, std::move(key));
startSession(shiftedDcId);
}
}
@ -728,18 +730,18 @@ QString Instance::Private::systemVersion() const {
}
void Instance::Private::unpaused() {
for (auto &session : _sessions) {
session.second->unpaused();
for (const auto &[shiftedDcId, session] : _sessions) {
session->unpaused();
}
}
void Instance::Private::queueQuittingConnection(
std::unique_ptr<internal::Connection> &&connection) {
std::unique_ptr<Connection> &&connection) {
_quittingConnections.insert(std::move(connection));
}
void Instance::Private::connectionFinished(internal::Connection *connection) {
auto it = _quittingConnections.find(connection);
void Instance::Private::connectionFinished(Connection *connection) {
const auto it = _quittingConnections.find(connection);
if (it != _quittingConnections.end()) {
_quittingConnections.erase(it);
}
@ -1430,23 +1432,47 @@ bool Instance::Private::onErrorDefault(mtpRequestId requestId, const RPCError &e
return false;
}
not_null<internal::Session*> Instance::Private::getSession(
not_null<Session*> Instance::Private::getSession(
ShiftedDcId shiftedDcId) {
if (!shiftedDcId) {
Assert(_mainSession != nullptr);
return _mainSession;
}
if (!BareDcId(shiftedDcId)) {
} else if (!BareDcId(shiftedDcId)) {
Assert(_mainSession != nullptr);
shiftedDcId += BareDcId(_mainSession->getDcWithShift());
}
auto it = _sessions.find(shiftedDcId);
if (it == _sessions.cend()) {
it = _sessions.emplace(shiftedDcId, std::make_unique<internal::Session>(_instance, shiftedDcId)).first;
it->second->start();
if (const auto session = findSession(shiftedDcId)) {
return session;
}
return it->second.get();
return startSession(shiftedDcId);
}
Session *Instance::Private::findSession(ShiftedDcId shiftedDcId) {
const auto i = _sessions.find(shiftedDcId);
return (i != _sessions.end()) ? i->second.get() : nullptr;
}
not_null<Session*> Instance::Private::startSession(ShiftedDcId shiftedDcId) {
Expects(BareDcId(shiftedDcId) != 0);
const auto result = _sessions.emplace(
shiftedDcId,
std::make_unique<Session>(_instance, shiftedDcId)
).first->second.get();
result->start();
return result;
}
Session *Instance::Private::removeSessionToKilled(ShiftedDcId shiftedDcId) {
const auto i = _sessions.find(shiftedDcId);
if (i == _sessions.cend()) {
return nullptr;
}
i->second->kill();
_killedSessions.push_back(std::move(i->second));
_sessions.erase(i);
return _killedSessions.back().get();
}
void Instance::Private::scheduleKeyDestroy(ShiftedDcId shiftedDcId) {
@ -1488,7 +1514,7 @@ void Instance::Private::performKeyDestroy(ShiftedDcId shiftedDcId) {
void Instance::Private::completedKeyDestroy(ShiftedDcId shiftedDcId) {
Expects(isKeysDestroyer());
_dcenters.erase(shiftedDcId);
removeDc(shiftedDcId);
{
QWriteLocker lock(&_keysForWriteLock);
_keysForWrite.erase(shiftedDcId);
@ -1502,7 +1528,7 @@ void Instance::Private::completedKeyDestroy(ShiftedDcId shiftedDcId) {
void Instance::Private::checkMainDcKey() {
const auto id = mainDcId();
const auto shiftedDcId = ShiftDcId(id, kCheckKeyDcShift);
if (_sessions.find(shiftedDcId) != _sessions.end()) {
if (findSession(shiftedDcId)) {
return;
}
const auto key = [&] {
@ -1527,9 +1553,8 @@ void Instance::Private::keyDestroyedOnServer(DcId dcId, uint64 keyId) {
}
restart();
} else {
const auto i = _dcenters.find(dcId);
if (i != end(_dcenters)) {
i->second->destroyKey();
if (const auto dc = findDc(dcId)) {
return dc->destroyKey();
}
restart(dcId);
}
@ -1564,8 +1589,8 @@ void Instance::Private::prepareToDestroy() {
requestCancellingDiscard();
for (auto &session : base::take(_sessions)) {
session.second->kill();
for (const auto &[shiftedDcId, session] : base::take(_sessions)) {
session->kill();
}
_mainSession = nullptr;
}
@ -1632,7 +1657,7 @@ void Instance::requestCDNConfig() {
_private->requestCDNConfig();
}
void Instance::connectionFinished(internal::Connection *connection) {
void Instance::connectionFinished(Connection *connection) {
_private->connectionFinished(connection);
}
@ -1680,7 +1705,7 @@ void Instance::logout(RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail) {
_private->logout(onDone, onFail);
}
std::shared_ptr<internal::Dcenter> Instance::getDcById(ShiftedDcId shiftedDcId) {
std::shared_ptr<Dcenter> Instance::getDcById(ShiftedDcId shiftedDcId) {
return _private->getDcById(shiftedDcId);
}
@ -1713,7 +1738,7 @@ void Instance::unpaused() {
}
void Instance::queueQuittingConnection(
std::unique_ptr<internal::Connection> &&connection) {
std::unique_ptr<Connection> &&connection) {
_private->queueQuittingConnection(std::move(connection));
}

View File

@ -143,12 +143,11 @@ void SessionData::clear(Instance *instance) {
Session::Session(not_null<Instance*> instance, ShiftedDcId shiftedDcId)
: QObject()
, _instance(instance)
, _data(this)
, _shiftedDcId(shiftedDcId)
, _data(this)
, _timeouter([=] { checkRequestsByTimer(); })
, _sender([=] { needToResumeAndSend(); }) {
connect(&_timeouter, SIGNAL(timeout()), this, SLOT(checkRequestsByTimer()));
_timeouter.start(1000);
_timeouter.callEach(1000);
refreshOptions();
}
@ -291,9 +290,7 @@ void Session::needToResumeAndSend() {
}
if (!_connection) {
DEBUG_LOG(("Session Info: resuming session dcWithShift %1").arg(_shiftedDcId));
createDcData();
_connection = std::make_unique<Connection>(_instance);
_connection->start(&_data, _shiftedDcId);
start();
}
if (_ping) {
_ping = false;

View File

@ -378,7 +378,6 @@ public slots:
void connectionWasInitedForDC();
void tryToReceive();
void checkRequestsByTimer();
void onConnectionStateChange(qint32 newState);
void onResetDone();
@ -388,10 +387,13 @@ public slots:
private:
void createDcData();
void checkRequestsByTimer();
bool rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err);
not_null<Instance*> _instance;
const not_null<Instance*> _instance;
const ShiftedDcId _shiftedDcId = 0;
std::unique_ptr<Connection> _connection;
bool _killed = false;
@ -399,7 +401,6 @@ private:
SessionData _data;
ShiftedDcId _shiftedDcId = 0;
std::shared_ptr<Dcenter> _dc;
AuthKeyPtr _dcKeyForCheck;
@ -408,7 +409,7 @@ private:
bool _ping = false;
QTimer _timeouter;
base::Timer _timeouter;
base::Timer _sender;
};