Use fine grained session timers

The check of sent requests and containers is done unconditionally
every second even though the request timeout is 10 seconds and the
container timeout is 600 seconds. This commit uses fine grained timers
instead in order to avoid useless system wake-up events.

The check of sent requests is now scheduled on demand when a new
request is queued. Then the callback, while parsing queued requests,
computes the delta to the closest expiring request and automatically
schedules the next check if necessary.

Given the high value of the container timeout, its callback is called
repeatedly every 600 seconds, unless it computes a lower delta for an
expiring container using the same logic as for the requests.
This commit is contained in:
Loïc Molinari 2021-04-14 19:49:17 +02:00 committed by John Preston
parent de70df0b6b
commit 06618a5253
2 changed files with 30 additions and 9 deletions

View File

@ -39,7 +39,6 @@ constexpr auto kPingSendAfterForce = 45 * crl::time(1000);
constexpr auto kTemporaryExpiresIn = TimeId(86400); constexpr auto kTemporaryExpiresIn = TimeId(86400);
constexpr auto kBindKeyAdditionalExpiresTimeout = TimeId(30); constexpr auto kBindKeyAdditionalExpiresTimeout = TimeId(30);
constexpr auto kTestModeDcIdShift = 10000; constexpr auto kTestModeDcIdShift = 10000;
constexpr auto kCheckSentRequestsEach = 1 * crl::time(1000);
constexpr auto kKeyOldEnoughForDestroy = 60 * crl::time(1000); constexpr auto kKeyOldEnoughForDestroy = 60 * crl::time(1000);
constexpr auto kSentContainerLives = 600 * crl::time(1000); constexpr auto kSentContainerLives = 600 * crl::time(1000);
constexpr auto kFastRequestDuration = crl::time(500); constexpr auto kFastRequestDuration = crl::time(500);
@ -163,13 +162,14 @@ SessionPrivate::SessionPrivate(
, _waitForConnected(kMinConnectedTimeout) , _waitForConnected(kMinConnectedTimeout)
, _pingSender(thread, [=] { sendPingByTimer(); }) , _pingSender(thread, [=] { sendPingByTimer(); })
, _checkSentRequestsTimer(thread, [=] { checkSentRequests(); }) , _checkSentRequestsTimer(thread, [=] { checkSentRequests(); })
, _clearOldContainersTimer(thread, [=] { clearOldContainers(); })
, _sessionData(std::move(data)) { , _sessionData(std::move(data)) {
Expects(_shiftedDcId != 0); Expects(_shiftedDcId != 0);
moveToThread(thread); moveToThread(thread);
InvokeQueued(this, [=] { InvokeQueued(this, [=] {
_checkSentRequestsTimer.callEach(kCheckSentRequestsEach); _clearOldContainersTimer.callEach(kSentContainerLives);
connectToServer(); connectToServer();
}); });
} }
@ -246,41 +246,47 @@ int16 SessionPrivate::getProtocolDcId() const {
} }
void SessionPrivate::checkSentRequests() { void SessionPrivate::checkSentRequests() {
clearOldContainers();
const auto now = crl::now(); const auto now = crl::now();
if (_bindMsgId && _bindMessageSent + kCheckSentRequestTimeout < now) { const auto checkTime = now - kCheckSentRequestTimeout;
if (_bindMsgId && _bindMessageSent < checkTime) {
DEBUG_LOG(("MTP Info: " DEBUG_LOG(("MTP Info: "
"Request state while key is not bound, restarting.")); "Request state while key is not bound, restarting."));
restart(); restart();
_checkSentRequestsTimer.callOnce(kCheckSentRequestTimeout);
return; return;
} }
auto requesting = false; auto requesting = false;
auto nextTimeout = kCheckSentRequestTimeout;
{ {
QReadLocker locker(_sessionData->haveSentMutex()); QReadLocker locker(_sessionData->haveSentMutex());
auto &haveSent = _sessionData->haveSentMap(); auto &haveSent = _sessionData->haveSentMap();
const auto haveSentCount = haveSent.size();
const auto checkAfter = kCheckSentRequestTimeout;
for (const auto &[msgId, request] : haveSent) { for (const auto &[msgId, request] : haveSent) {
if (request->lastSentTime + checkAfter < now) { if (request->lastSentTime <= checkTime) {
// Need to check state. // Need to check state.
request->lastSentTime = now; request->lastSentTime = now;
if (_stateRequestData.emplace(msgId).second) { if (_stateRequestData.emplace(msgId).second) {
requesting = true; requesting = true;
} }
} else {
nextTimeout = std::min(request->lastSentTime - checkTime, nextTimeout);
} }
} }
} }
if (requesting) { if (requesting) {
_sessionData->queueSendAnything(kSendStateRequestWaiting); _sessionData->queueSendAnything(kSendStateRequestWaiting);
} }
if (nextTimeout < kCheckSentRequestTimeout) {
_checkSentRequestsTimer.callOnce(nextTimeout);
}
} }
void SessionPrivate::clearOldContainers() { void SessionPrivate::clearOldContainers() {
auto resent = false; auto resent = false;
auto nextTimeout = kSentContainerLives;
const auto now = crl::now(); const auto now = crl::now();
const auto checkTime = now - kSentContainerLives;
for (auto i = _sentContainers.begin(); i != _sentContainers.end();) { for (auto i = _sentContainers.begin(); i != _sentContainers.end();) {
if (now > i->second.sent + kSentContainerLives) { if (i->second.sent <= checkTime) {
DEBUG_LOG(("MTP Info: Removing old container with resending %1, " DEBUG_LOG(("MTP Info: Removing old container with resending %1, "
"sent: %2, now: %3, current unixtime: %4" "sent: %2, now: %3, current unixtime: %4"
).arg(i->first ).arg(i->first
@ -296,12 +302,18 @@ void SessionPrivate::clearOldContainers() {
resend(innerMsgId, -1, true); resend(innerMsgId, -1, true);
} }
} else { } else {
nextTimeout = std::min(i->second.sent - checkTime, nextTimeout);
++i; ++i;
} }
} }
if (resent) { if (resent) {
_sessionData->queueNeedToResumeAndSend(); _sessionData->queueNeedToResumeAndSend();
} }
if (nextTimeout < kSentContainerLives) {
_clearOldContainersTimer.callOnce(nextTimeout);
} else if (!_clearOldContainersTimer.isActive()) {
_clearOldContainersTimer.callEach(nextTimeout);
}
} }
void SessionPrivate::destroyAllConnections() { void SessionPrivate::destroyAllConnections() {
@ -683,6 +695,8 @@ void SessionPrivate::tryToSend() {
{ {
QWriteLocker locker1(_sessionData->toSendMutex()); QWriteLocker locker1(_sessionData->toSendMutex());
auto scheduleCheckSentRequests = false;
auto toSendDummy = base::flat_map<mtpRequestId, SerializedRequest>(); auto toSendDummy = base::flat_map<mtpRequestId, SerializedRequest>();
auto &toSend = sendAll auto &toSend = sendAll
? _sessionData->toSendMap() ? _sessionData->toSendMap()
@ -748,6 +762,7 @@ void SessionPrivate::tryToSend() {
QWriteLocker locker2(_sessionData->haveSentMutex()); QWriteLocker locker2(_sessionData->haveSentMutex());
auto &haveSent = _sessionData->haveSentMap(); auto &haveSent = _sessionData->haveSentMap();
haveSent.emplace(msgId, toSendRequest); haveSent.emplace(msgId, toSendRequest);
scheduleCheckSentRequests = true;
const auto wrapLayer = needsLayer && toSendRequest->needsLayer; const auto wrapLayer = needsLayer && toSendRequest->needsLayer;
if (toSendRequest->after) { if (toSendRequest->after) {
@ -871,6 +886,7 @@ void SessionPrivate::tryToSend() {
//Assert(!haveSent.contains(msgId)); //Assert(!haveSent.contains(msgId));
haveSent.emplace(msgId, request); haveSent.emplace(msgId, request);
sentIdsWrap.messages.push_back(msgId); sentIdsWrap.messages.push_back(msgId);
scheduleCheckSentRequests = true;
needAnyResponse = true; needAnyResponse = true;
} else { } else {
_ackedIds.emplace(msgId, request->requestId); _ackedIds.emplace(msgId, request->requestId);
@ -922,6 +938,10 @@ void SessionPrivate::tryToSend() {
bigMsgId, bigMsgId,
forceNewMsgId); forceNewMsgId);
_sentContainers.emplace(containerMsgId, std::move(sentIdsWrap)); _sentContainers.emplace(containerMsgId, std::move(sentIdsWrap));
if (scheduleCheckSentRequests && !_checkSentRequestsTimer.isActive()) {
_checkSentRequestsTimer.callOnce(kCheckSentRequestTimeout);
}
} }
} }
sendSecureRequest(std::move(toSendRequest), needAnyResponse); sendSecureRequest(std::move(toSendRequest), needAnyResponse);

View File

@ -218,6 +218,7 @@ private:
mtpMsgId _pingMsgId = 0; mtpMsgId _pingMsgId = 0;
base::Timer _pingSender; base::Timer _pingSender;
base::Timer _checkSentRequestsTimer; base::Timer _checkSentRequestsTimer;
base::Timer _clearOldContainersTimer;
std::shared_ptr<SessionData> _sessionData; std::shared_ptr<SessionData> _sessionData;
std::unique_ptr<SessionOptions> _options; std::unique_ptr<SessionOptions> _options;