Prepare Data::Histories for requests interdependencies.

This commit is contained in:
John Preston 2020-02-18 15:39:24 +04:00
parent 70408f0e22
commit b0e1ae3948
10 changed files with 482 additions and 88 deletions

View File

@ -335,6 +335,8 @@ PRIVATE
data/data_game.h
data/data_groups.cpp
data/data_groups.h
data/data_histories.cpp
data/data_histories.h
data/data_location.cpp
data/data_location.h
data/data_media_rotation.cpp

View File

@ -6102,7 +6102,7 @@ void ApiWrap::sendReadRequest(not_null<PeerData*> peer, MsgId upTo) {
sendReadRequest(peer, *next);
} else if (const auto history
= _session->data().historyLoaded(peer)) {
if (history->unreadCountRefreshNeeded()) {
if (!history->unreadCountKnown()) {
requestDialogEntry(history);
}
}

View File

@ -394,6 +394,9 @@ public:
//void readFeed( // #feed
// not_null<Data::Feed*> feed,
// Data::MessagePosition position);
void applyAffectedMessages(
not_null<PeerData*> peer,
const MTPmessages_AffectedMessages &result);
void sendVoiceMessage(
QByteArray result,
@ -628,9 +631,6 @@ private:
not_null<PeerData*> peer,
const MTPmessages_AffectedHistory &result);
void applyAffectedMessages(const MTPmessages_AffectedMessages &result);
void applyAffectedMessages(
not_null<PeerData*> peer,
const MTPmessages_AffectedMessages &result);
void deleteAllFromUserSend(
not_null<ChannelData*> channel,

View File

@ -0,0 +1,339 @@
/*
This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#include "data/data_histories.h"
#include "data/data_session.h"
#include "data/data_channel.h"
#include "main/main_session.h"
#include "history/history.h"
#include "history/history_item.h"
#include "history/view/history_view_element.h"
#include "apiwrap.h"
namespace Data {
namespace {
constexpr auto kReadRequestTimeout = 3 * crl::time(1000);
constexpr auto kReadRequestSent = std::numeric_limits<crl::time>::max();
} // namespace
Histories::Histories(not_null<Session*> owner)
: _owner(owner)
, _readRequestsTimer([=] { sendReadRequests(); }) {
}
Session &Histories::owner() const {
return *_owner;
}
Main::Session &Histories::session() const {
return _owner->session();
}
History *Histories::find(PeerId peerId) {
const auto i = peerId ? _map.find(peerId) : end(_map);
return (i != end(_map)) ? i->second.get() : nullptr;
}
not_null<History*> Histories::findOrCreate(PeerId peerId) {
Expects(peerId != 0);
if (const auto result = find(peerId)) {
return result;
}
const auto [i, ok] = _map.emplace(
peerId,
std::make_unique<History>(&owner(), peerId));
return i->second.get();
}
void Histories::unloadAll() {
for (const auto &[peerId, history] : _map) {
history->clear(History::ClearType::Unload);
}
}
void Histories::clearAll() {
_map.clear();
}
void Histories::readInboxTill(
not_null<History*> history,
not_null<HistoryItem*> item) {
if (!IsServerMsgId(item->id)) {
auto view = item->mainView();
if (!view) {
return;
}
auto block = view->block();
auto blockIndex = block->indexInHistory();
auto itemIndex = view->indexInBlock();
while (blockIndex > 0 || itemIndex > 0) {
if (itemIndex > 0) {
view = block->messages[--itemIndex].get();
} else {
while (blockIndex > 0) {
block = history->blocks[--blockIndex].get();
itemIndex = block->messages.size();
if (itemIndex > 0) {
view = block->messages[--itemIndex].get();
break;
}
}
}
item = view->data();
if (IsServerMsgId(item->id)) {
break;
}
}
if (!IsServerMsgId(item->id)) {
LOG(("App Error: "
"Can't read history till unknown local message."));
return;
}
}
const auto tillId = item->id;
if (!history->readInboxTillNeedsRequest(tillId)) {
return;
}
const auto maybeState = lookup(history);
if (maybeState && maybeState->readTill >= tillId) {
return;
}
const auto stillUnread = history->countStillUnreadLocal(tillId);
if (stillUnread
&& history->unreadCountKnown()
&& *stillUnread == history->unreadCount()) {
history->setInboxReadTill(tillId);
return;
}
auto &state = _states[history];
const auto wasWaiting = (state.readTill != 0);
state.readTill = tillId;
if (!stillUnread) {
state.readWhen = 0;
sendReadRequests();
return;
} else if (!wasWaiting) {
state.readWhen = crl::now() + kReadRequestTimeout;
if (!_readRequestsTimer.isActive()) {
_readRequestsTimer.callOnce(kReadRequestTimeout);
}
}
history->setInboxReadTill(tillId);
history->setUnreadCount(*stillUnread);
history->updateChatListEntry();
}
void Histories::sendPendingReadInbox(not_null<History*> history) {
if (const auto state = lookup(history)) {
if (state->readTill && state->readWhen) {
state->readWhen = 0;
sendReadRequests();
}
}
}
void Histories::sendReadRequests() {
if (_states.empty()) {
return;
}
const auto now = crl::now();
auto next = std::optional<crl::time>();
for (auto &[history, state] : _states) {
if (state.readTill && state.readWhen <= now) {
sendReadRequest(history, state);
} else if (!next || *next > state.readWhen) {
next = state.readWhen;
}
}
if (next.has_value()) {
_readRequestsTimer.callOnce(*next - now);
}
}
void Histories::sendReadRequest(not_null<History*> history, State &state) {
const auto tillId = state.readTill;
state.readWhen = kReadRequestSent;
sendRequest(history, RequestType::ReadInbox, [=](Fn<void()> done) {
const auto finished = [=] {
const auto state = lookup(history);
Assert(state != nullptr);
if (history->unreadCountRefreshNeeded(tillId)) {
session().api().requestDialogEntry(history);
}
if (state->readWhen == kReadRequestSent) {
state->readWhen = 0;
state->readTill = 0;
}
done();
};
if (const auto channel = history->peer->asChannel()) {
return session().api().request(MTPchannels_ReadHistory(
channel->inputChannel,
MTP_int(tillId)
)).done([=](const MTPBool &result) {
finished();
}).fail([=](const RPCError &error) {
finished();
}).send();
} else {
return session().api().request(MTPmessages_ReadHistory(
history->peer->input,
MTP_int(tillId)
)).done([=](const MTPmessages_AffectedMessages &result) {
session().api().applyAffectedMessages(history->peer, result);
finished();
}).fail([=](const RPCError &error) {
finished();
}).send();
}
});
}
void Histories::checkEmptyState(not_null<History*> history) {
const auto empty = [](const State &state) {
return state.postponed.empty()
&& state.sent.empty()
&& (state.readTill == 0);
};
const auto i = _states.find(history);
if (i != end(_states) && empty(i->second)) {
_states.erase(i);
}
}
int Histories::sendRequest(
not_null<History*> history,
RequestType type,
Fn<mtpRequestId(Fn<void()> done)> generator) {
Expects(type != RequestType::None);
auto &state = _states[history];
const auto id = ++state.autoincrement;
const auto action = chooseAction(state, type);
if (action == Action::Send) {
state.sent.emplace(id, SentRequest{
generator([=] { checkPostponed(history, id); }),
type
});
if (base::take(state.thenRequestEntry)) {
session().api().requestDialogEntry(history);
}
} else if (action == Action::Postpone) {
state.postponed.emplace(
id,
PostponedRequest{ std::move(generator), type });
}
return id;
}
void Histories::checkPostponed(not_null<History*> history, int requestId) {
const auto state = lookup(history);
Assert(state != nullptr);
state->sent.remove(requestId);
if (!state->postponed.empty()) {
auto &entry = state->postponed.front();
const auto action = chooseAction(*state, entry.second.type, true);
if (action == Action::Send) {
const auto id = entry.first;
state->postponed.remove(id);
state->sent.emplace(id, SentRequest{
entry.second.generator([=] { checkPostponed(history, id); }),
entry.second.type
});
if (base::take(state->thenRequestEntry)) {
session().api().requestDialogEntry(history);
}
} else {
Assert(action == Action::Postpone);
}
}
checkEmptyState(history);
}
Histories::Action Histories::chooseAction(
State &state,
RequestType type,
bool fromPostponed) const {
switch (type) {
case RequestType::ReadInbox:
for (const auto &[_, sent] : state.sent) {
if (sent.type == RequestType::ReadInbox
|| sent.type == RequestType::DialogsEntry
|| sent.type == RequestType::Delete) {
if (!fromPostponed) {
auto &postponed = state.postponed;
for (auto i = begin(postponed); i != end(postponed);) {
if (i->second.type == RequestType::ReadInbox) {
i = postponed.erase(i);
} else {
++i;
}
}
}
return Action::Postpone;
}
}
return Action::Send;
case RequestType::DialogsEntry:
for (const auto &[_, sent] : state.sent) {
if (sent.type == RequestType::DialogsEntry) {
return Action::Skip;
}
if (sent.type == RequestType::ReadInbox
|| sent.type == RequestType::Delete) {
if (!fromPostponed) {
auto &postponed = state.postponed;
for (const auto &[_, postponed] : state.postponed) {
if (postponed.type == RequestType::DialogsEntry) {
return Action::Skip;
}
}
}
return Action::Postpone;
}
}
return Action::Send;
case RequestType::History:
for (const auto &[_, sent] : state.sent) {
if (sent.type == RequestType::Delete) {
return Action::Postpone;
}
}
return Action::Send;
case RequestType::Delete:
for (const auto &[_, sent] : state.sent) {
if (sent.type == RequestType::History
|| sent.type == RequestType::ReadInbox) {
return Action::Postpone;
}
}
for (auto i = begin(state.sent); i != end(state.sent);) {
if (i->second.type == RequestType::DialogsEntry) {
session().api().request(i->second.id).cancel();
i = state.sent.erase(i);
state.thenRequestEntry = true;
}
}
return Action::Send;
}
Unexpected("Request type in Histories::chooseAction.");
}
Histories::State *Histories::lookup(not_null<History*> history) {
const auto i = _states.find(history);
return (i != end(_states)) ? &i->second : nullptr;
}
} // namespace Data

View File

@ -0,0 +1,93 @@
/*
This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#pragma once
#include "base/timer.h"
class History;
class HistoryItem;
namespace Main {
class Session;
} // namespace Main
namespace Data {
class Session;
class Histories final {
public:
explicit Histories(not_null<Session*> owner);
[[nodiscard]] Session &owner() const;
[[nodiscard]] Main::Session &session() const;
[[nodiscard]] History *find(PeerId peerId);
[[nodiscard]] not_null<History*> findOrCreate(PeerId peerId);
void unloadAll();
void clearAll();
void readInboxTill(
not_null<History*> history,
not_null<HistoryItem*> item);
void sendPendingReadInbox(not_null<History*> history);
private:
enum class RequestType : uchar {
None,
DialogsEntry,
History,
ReadInbox,
Delete,
};
enum class Action : uchar {
Send,
Postpone,
Skip,
};
struct PostponedRequest {
Fn<mtpRequestId(Fn<void()> done)> generator;
RequestType type = RequestType::None;
};
struct SentRequest {
mtpRequestId id = 0;
RequestType type = RequestType::None;
};
struct State {
base::flat_map<int, PostponedRequest> postponed;
base::flat_map<int, SentRequest> sent;
crl::time readWhen = 0;
MsgId readTill = 0;
int autoincrement = 0;
bool thenRequestEntry = false;
};
void sendReadRequests();
void sendReadRequest(not_null<History*> history, State &state);
[[nodiscard]] State *lookup(not_null<History*> history);
void checkEmptyState(not_null<History*> history);
int sendRequest(
not_null<History*> history,
RequestType type,
Fn<mtpRequestId(Fn<void()> done)> generator);
void checkPostponed(not_null<History*> history, int requestId);
[[nodiscard]] Action chooseAction(
State &state,
RequestType type,
bool fromPostponed = false) const;
const not_null<Session*> _owner;
std::unordered_map<PeerId, std::unique_ptr<History>> _map;
base::flat_map<not_null<History*>, State> _states;
base::Timer _readRequestsTimer;
};
} // namespace Data

View File

@ -48,6 +48,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "data/data_cloud_themes.h"
#include "data/data_streaming.h"
#include "data/data_media_rotation.h"
#include "data/data_histories.h"
#include "base/platform/base_platform_info.h"
#include "base/unixtime.h"
#include "base/call_delayed.h"
@ -194,7 +195,8 @@ Session::Session(not_null<Main::Session*> session)
, _scheduledMessages(std::make_unique<ScheduledMessages>(this))
, _cloudThemes(std::make_unique<CloudThemes>(session))
, _streaming(std::make_unique<Streaming>(this))
, _mediaRotation(std::make_unique<MediaRotation>()) {
, _mediaRotation(std::make_unique<MediaRotation>())
, _histories(std::make_unique<Histories>(this)) {
_cache->open(Local::cacheKey());
_bigFileCache->open(Local::cacheBigFileKey());
@ -215,9 +217,7 @@ Session::Session(not_null<Main::Session*> session)
void Session::clear() {
_sendActions.clear();
for (const auto &[peerId, history] : _histories) {
history->clear(History::ClearType::Unload);
}
_histories->unloadAll();
_scheduledMessages = nullptr;
_dependentMessages.clear();
base::take(_messages);
@ -227,7 +227,7 @@ void Session::clear() {
cSetRecentInlineBots(RecentInlineBots());
cSetRecentStickers(RecentStickerPack());
App::clearMousedItems();
_histories.clear();
_histories->clearAll();
}
not_null<PeerData*> Session::peer(PeerId id) {
@ -768,20 +768,11 @@ void Session::enumerateChannels(
}
not_null<History*> Session::history(PeerId peerId) {
Expects(peerId != 0);
if (const auto result = historyLoaded(peerId)) {
return result;
}
const auto [i, ok] = _histories.emplace(
peerId,
std::make_unique<History>(this, peerId));
return i->second.get();
return _histories->findOrCreate(peerId);
}
History *Session::historyLoaded(PeerId peerId) const {
const auto i = peerId ? _histories.find(peerId) : end(_histories);
return (i != end(_histories)) ? i->second.get() : nullptr;
return _histories->find(peerId);
}
not_null<History*> Session::history(not_null<const PeerData*> peer) {

View File

@ -61,6 +61,7 @@ class ScheduledMessages;
class CloudThemes;
class Streaming;
class MediaRotation;
class Histories;
class Session final {
public:
@ -96,6 +97,9 @@ public:
[[nodiscard]] MediaRotation &mediaRotation() const {
return *_mediaRotation;
}
[[nodiscard]] Histories &histories() const {
return *_histories;
}
[[nodiscard]] MsgId nextNonHistoryEntryId() {
return ++_nonHistoryEntryId;
}
@ -968,7 +972,6 @@ private:
base::Timer _unmuteByFinishedTimer;
std::unordered_map<PeerId, std::unique_ptr<PeerData>> _peers;
std::unordered_map<PeerId, std::unique_ptr<History>> _histories;
MessageIdsList _mimeForwardIds;
@ -989,6 +992,7 @@ private:
std::unique_ptr<CloudThemes> _cloudThemes;
std::unique_ptr<Streaming> _streaming;
std::unique_ptr<MediaRotation> _mediaRotation;
std::unique_ptr<Histories> _histories;
MsgId _nonHistoryEntryId = ServerMaxMsgId;
rpl::lifetime _lifetime;

View File

@ -23,6 +23,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "data/data_channel.h"
#include "data/data_chat.h"
#include "data/data_user.h"
#include "data/data_histories.h"
#include "lang/lang_keys.h"
#include "apiwrap.h"
#include "mainwidget.h"
@ -1591,6 +1592,16 @@ void History::calculateFirstUnreadMessage() {
}
}
bool History::readInboxTillNeedsRequest(MsgId tillId) {
Expects(IsServerMsgId(tillId));
readClientSideMessages();
if (unreadMark()) {
session().api().changeDialogUnreadMark(this, false);
}
return (_inboxReadBefore.value_or(1) <= tillId);
}
void History::readClientSideMessages() {
auto unread = unreadCount();
for (const auto item : _localMessages) {
@ -1616,80 +1627,28 @@ MsgId History::readInbox() {
}
void History::readInboxTill(not_null<HistoryItem*> item) {
if (!IsServerMsgId(item->id)) {
auto view = item->mainView();
if (!view) {
return;
}
auto block = view->block();
auto blockIndex = block->indexInHistory();
auto itemIndex = view->indexInBlock();
while (blockIndex > 0 || itemIndex > 0) {
if (itemIndex > 0) {
view = block->messages[--itemIndex].get();
} else {
while (blockIndex > 0) {
block = blocks[--blockIndex].get();
itemIndex = block->messages.size();
if (itemIndex > 0) {
view = block->messages[--itemIndex].get();
break;
}
}
}
item = view->data();
if (IsServerMsgId(item->id)) {
break;
}
}
if (!IsServerMsgId(item->id)) {
LOG(("App Error: "
"Can't read history till unknown local message."));
return;
}
}
readClientSideMessages();
if (unreadMark()) {
session().api().changeDialogUnreadMark(this, false);
}
if (_inboxReadTillLocal >= item->id) {
return;
}
_inboxReadTillLocal = item->id;
const auto stillUnread = countStillUnreadLocal();
if (!stillUnread) {
session().api().readServerHistoryForce(this, _inboxReadTillLocal);
return;
}
setInboxReadTill(_inboxReadTillLocal);
if (stillUnread && _unreadCount && *stillUnread == *_unreadCount) {
return;
}
setUnreadCount(*stillUnread);
session().api().readServerHistoryForce(this, _inboxReadTillLocal);
updateChatListEntry();
owner().histories().readInboxTill(this, item);
}
bool History::unreadCountRefreshNeeded() const {
bool History::unreadCountRefreshNeeded(MsgId readTillId) const {
return !unreadCountKnown()
|| ((_inboxReadTillLocal + 1) > _inboxReadBefore.value_or(0));
|| ((readTillId + 1) > _inboxReadBefore.value_or(0));
}
std::optional<int> History::countStillUnreadLocal() const {
std::optional<int> History::countStillUnreadLocal(MsgId readTillId) const {
if (isEmpty()) {
return std::nullopt;
}
const auto till = _inboxReadTillLocal;
if (_inboxReadBefore) {
const auto before = *_inboxReadBefore;
if (minMsgId() <= before && maxMsgId() >= till) {
if (minMsgId() <= before && maxMsgId() >= readTillId) {
auto result = 0;
for (const auto &block : blocks) {
for (const auto &message : block->messages) {
const auto item = message->data();
if (item->out() || !IsServerMsgId(item->id)) {
continue;
} else if (item->id > till) {
} else if (item->id > readTillId) {
break;
} else if (item->id >= before) {
++result;
@ -1701,14 +1660,16 @@ std::optional<int> History::countStillUnreadLocal() const {
}
}
}
if (!loadedAtBottom() || minMsgId() > till) {
if (!loadedAtBottom() || minMsgId() > readTillId) {
return std::nullopt;
}
auto result = 0;
for (const auto &block : blocks) {
for (const auto &message : block->messages) {
const auto item = message->data();
if (!item->out() && IsServerMsgId(item->id) && item->id > till) {
if (!item->out()
&& IsServerMsgId(item->id)
&& item->id > readTillId) {
++result;
}
}
@ -1727,7 +1688,7 @@ void History::applyInboxReadUpdate(
session().api().requestDialogEntry(this);
session().api().requestDialogEntry(folder);
}
if (_inboxReadTillLocal <= upTo) {
if (_inboxReadBefore.value_or(1) <= upTo) {
if (!peer->isChannel() || peer->asChannel()->pts() == channelPts) {
inboxRead(upTo, stillUnread);
} else {
@ -2760,7 +2721,8 @@ void History::applyDialogFields(
} else {
clearFolder();
}
if (!skipUnreadUpdate() && maxInboxRead >= _inboxReadTillLocal) {
if (!skipUnreadUpdate()
&& maxInboxRead >= _inboxReadBefore.value_or(1)) {
setUnreadCount(unreadCount);
setInboxReadTill(maxInboxRead);
}
@ -2794,7 +2756,6 @@ void History::setInboxReadTill(MsgId upTo) {
} else {
_inboxReadBefore = upTo + 1;
}
accumulate_max(_inboxReadTillLocal, upTo);
}
void History::setOutboxReadTill(MsgId upTo) {

View File

@ -160,6 +160,7 @@ public:
MsgId readInbox();
void readInboxTill(not_null<HistoryItem*> item);
[[nodiscard]] bool readInboxTillNeedsRequest(MsgId tillId);
void applyInboxReadUpdate(
FolderId folderId,
MsgId upTo,
@ -177,7 +178,7 @@ public:
[[nodiscard]] bool unreadCountKnown() const;
// Some old unread count is known, but we read history till some place.
[[nodiscard]] bool unreadCountRefreshNeeded() const;
[[nodiscard]] bool unreadCountRefreshNeeded(MsgId readTillId) const;
void setUnreadCount(int newUnreadCount);
void setUnreadMark(bool unread);
@ -349,6 +350,10 @@ public:
HistoryItem *folderDialogItem = nullptr);
void clearFolder();
// Interface for Data::Histories.
void setInboxReadTill(MsgId upTo);
std::optional<int> countStillUnreadLocal(MsgId readTillId) const;
// Still public data.
std::deque<std::unique_ptr<HistoryBlock>> blocks;
@ -437,7 +442,6 @@ private:
TimeId adjustedChatListTimeId() const override;
void changedChatListPinHook() override;
void setInboxReadTill(MsgId upTo);
void setOutboxReadTill(MsgId upTo);
void readClientSideMessages();
@ -474,7 +478,6 @@ private:
void getNextFirstUnreadMessage();
bool nonEmptyCountMoreThan(int count) const;
std::optional<int> countUnread(MsgId upTo) const;
std::optional<int> countStillUnreadLocal() const;
// Creates if necessary a new block for adding item.
// Depending on isBuildingFrontBlock() gets front or back block.
@ -503,7 +506,6 @@ private:
std::optional<MsgId> _inboxReadBefore;
std::optional<MsgId> _outboxReadBefore;
MsgId _inboxReadTillLocal = 0;
std::optional<int> _unreadCount;
std::optional<int> _unreadMentionsCount;
base::flat_set<MsgId> _unreadMentions;

View File

@ -41,6 +41,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "data/data_user.h"
#include "data/data_scheduled_messages.h"
#include "data/data_file_origin.h"
#include "data/data_histories.h"
#include "history/history.h"
#include "history/history_item.h"
#include "history/history_message.h"
@ -1706,6 +1707,7 @@ void HistoryWidget::showHistory(
return;
}
updateSendAction(_history, SendAction::Type::Typing, -1);
session().data().histories().sendPendingReadInbox(_history);
cancelTypingAction();
}