diff --git a/Telegram/CMakeLists.txt b/Telegram/CMakeLists.txt index 2ed1ee430c..98c142c958 100644 --- a/Telegram/CMakeLists.txt +++ b/Telegram/CMakeLists.txt @@ -335,6 +335,8 @@ PRIVATE data/data_game.h data/data_groups.cpp data/data_groups.h + data/data_histories.cpp + data/data_histories.h data/data_location.cpp data/data_location.h data/data_media_rotation.cpp diff --git a/Telegram/SourceFiles/apiwrap.cpp b/Telegram/SourceFiles/apiwrap.cpp index dbf8b238b7..8604890703 100644 --- a/Telegram/SourceFiles/apiwrap.cpp +++ b/Telegram/SourceFiles/apiwrap.cpp @@ -6102,7 +6102,7 @@ void ApiWrap::sendReadRequest(not_null peer, MsgId upTo) { sendReadRequest(peer, *next); } else if (const auto history = _session->data().historyLoaded(peer)) { - if (history->unreadCountRefreshNeeded()) { + if (!history->unreadCountKnown()) { requestDialogEntry(history); } } diff --git a/Telegram/SourceFiles/apiwrap.h b/Telegram/SourceFiles/apiwrap.h index 276dc5ed2f..137169659d 100644 --- a/Telegram/SourceFiles/apiwrap.h +++ b/Telegram/SourceFiles/apiwrap.h @@ -394,6 +394,9 @@ public: //void readFeed( // #feed // not_null feed, // Data::MessagePosition position); + void applyAffectedMessages( + not_null peer, + const MTPmessages_AffectedMessages &result); void sendVoiceMessage( QByteArray result, @@ -628,9 +631,6 @@ private: not_null peer, const MTPmessages_AffectedHistory &result); void applyAffectedMessages(const MTPmessages_AffectedMessages &result); - void applyAffectedMessages( - not_null peer, - const MTPmessages_AffectedMessages &result); void deleteAllFromUserSend( not_null channel, diff --git a/Telegram/SourceFiles/data/data_histories.cpp b/Telegram/SourceFiles/data/data_histories.cpp new file mode 100644 index 0000000000..fa30a0743b --- /dev/null +++ b/Telegram/SourceFiles/data/data_histories.cpp @@ -0,0 +1,339 @@ +/* +This file is part of Telegram Desktop, +the official desktop application for the Telegram messaging service. + +For license and copyright information please follow this link: +https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL +*/ +#include "data/data_histories.h" + +#include "data/data_session.h" +#include "data/data_channel.h" +#include "main/main_session.h" +#include "history/history.h" +#include "history/history_item.h" +#include "history/view/history_view_element.h" +#include "apiwrap.h" + +namespace Data { +namespace { + +constexpr auto kReadRequestTimeout = 3 * crl::time(1000); +constexpr auto kReadRequestSent = std::numeric_limits::max(); + +} // namespace + +Histories::Histories(not_null owner) +: _owner(owner) +, _readRequestsTimer([=] { sendReadRequests(); }) { +} + +Session &Histories::owner() const { + return *_owner; +} + +Main::Session &Histories::session() const { + return _owner->session(); +} + +History *Histories::find(PeerId peerId) { + const auto i = peerId ? _map.find(peerId) : end(_map); + return (i != end(_map)) ? i->second.get() : nullptr; +} + +not_null Histories::findOrCreate(PeerId peerId) { + Expects(peerId != 0); + + if (const auto result = find(peerId)) { + return result; + } + const auto [i, ok] = _map.emplace( + peerId, + std::make_unique(&owner(), peerId)); + return i->second.get(); +} + +void Histories::unloadAll() { + for (const auto &[peerId, history] : _map) { + history->clear(History::ClearType::Unload); + } +} + +void Histories::clearAll() { + _map.clear(); +} + +void Histories::readInboxTill( + not_null history, + not_null item) { + if (!IsServerMsgId(item->id)) { + auto view = item->mainView(); + if (!view) { + return; + } + auto block = view->block(); + auto blockIndex = block->indexInHistory(); + auto itemIndex = view->indexInBlock(); + while (blockIndex > 0 || itemIndex > 0) { + if (itemIndex > 0) { + view = block->messages[--itemIndex].get(); + } else { + while (blockIndex > 0) { + block = history->blocks[--blockIndex].get(); + itemIndex = block->messages.size(); + if (itemIndex > 0) { + view = block->messages[--itemIndex].get(); + break; + } + } + } + item = view->data(); + if (IsServerMsgId(item->id)) { + break; + } + } + if (!IsServerMsgId(item->id)) { + LOG(("App Error: " + "Can't read history till unknown local message.")); + return; + } + } + const auto tillId = item->id; + if (!history->readInboxTillNeedsRequest(tillId)) { + return; + } + const auto maybeState = lookup(history); + if (maybeState && maybeState->readTill >= tillId) { + return; + } + const auto stillUnread = history->countStillUnreadLocal(tillId); + if (stillUnread + && history->unreadCountKnown() + && *stillUnread == history->unreadCount()) { + history->setInboxReadTill(tillId); + return; + } + auto &state = _states[history]; + const auto wasWaiting = (state.readTill != 0); + state.readTill = tillId; + if (!stillUnread) { + state.readWhen = 0; + sendReadRequests(); + return; + } else if (!wasWaiting) { + state.readWhen = crl::now() + kReadRequestTimeout; + if (!_readRequestsTimer.isActive()) { + _readRequestsTimer.callOnce(kReadRequestTimeout); + } + } + history->setInboxReadTill(tillId); + history->setUnreadCount(*stillUnread); + history->updateChatListEntry(); +} + +void Histories::sendPendingReadInbox(not_null history) { + if (const auto state = lookup(history)) { + if (state->readTill && state->readWhen) { + state->readWhen = 0; + sendReadRequests(); + } + } +} + +void Histories::sendReadRequests() { + if (_states.empty()) { + return; + } + const auto now = crl::now(); + auto next = std::optional(); + for (auto &[history, state] : _states) { + if (state.readTill && state.readWhen <= now) { + sendReadRequest(history, state); + } else if (!next || *next > state.readWhen) { + next = state.readWhen; + } + } + if (next.has_value()) { + _readRequestsTimer.callOnce(*next - now); + } +} + +void Histories::sendReadRequest(not_null history, State &state) { + const auto tillId = state.readTill; + state.readWhen = kReadRequestSent; + sendRequest(history, RequestType::ReadInbox, [=](Fn done) { + const auto finished = [=] { + const auto state = lookup(history); + Assert(state != nullptr); + if (history->unreadCountRefreshNeeded(tillId)) { + session().api().requestDialogEntry(history); + } + if (state->readWhen == kReadRequestSent) { + state->readWhen = 0; + state->readTill = 0; + } + done(); + }; + if (const auto channel = history->peer->asChannel()) { + return session().api().request(MTPchannels_ReadHistory( + channel->inputChannel, + MTP_int(tillId) + )).done([=](const MTPBool &result) { + finished(); + }).fail([=](const RPCError &error) { + finished(); + }).send(); + } else { + return session().api().request(MTPmessages_ReadHistory( + history->peer->input, + MTP_int(tillId) + )).done([=](const MTPmessages_AffectedMessages &result) { + session().api().applyAffectedMessages(history->peer, result); + finished(); + }).fail([=](const RPCError &error) { + finished(); + }).send(); + } + }); +} + +void Histories::checkEmptyState(not_null history) { + const auto empty = [](const State &state) { + return state.postponed.empty() + && state.sent.empty() + && (state.readTill == 0); + }; + const auto i = _states.find(history); + if (i != end(_states) && empty(i->second)) { + _states.erase(i); + } +} + +int Histories::sendRequest( + not_null history, + RequestType type, + Fn done)> generator) { + Expects(type != RequestType::None); + + auto &state = _states[history]; + const auto id = ++state.autoincrement; + const auto action = chooseAction(state, type); + if (action == Action::Send) { + state.sent.emplace(id, SentRequest{ + generator([=] { checkPostponed(history, id); }), + type + }); + if (base::take(state.thenRequestEntry)) { + session().api().requestDialogEntry(history); + } + } else if (action == Action::Postpone) { + state.postponed.emplace( + id, + PostponedRequest{ std::move(generator), type }); + } + return id; +} + +void Histories::checkPostponed(not_null history, int requestId) { + const auto state = lookup(history); + Assert(state != nullptr); + + state->sent.remove(requestId); + if (!state->postponed.empty()) { + auto &entry = state->postponed.front(); + const auto action = chooseAction(*state, entry.second.type, true); + if (action == Action::Send) { + const auto id = entry.first; + state->postponed.remove(id); + state->sent.emplace(id, SentRequest{ + entry.second.generator([=] { checkPostponed(history, id); }), + entry.second.type + }); + if (base::take(state->thenRequestEntry)) { + session().api().requestDialogEntry(history); + } + } else { + Assert(action == Action::Postpone); + } + } + checkEmptyState(history); +} + +Histories::Action Histories::chooseAction( + State &state, + RequestType type, + bool fromPostponed) const { + switch (type) { + case RequestType::ReadInbox: + for (const auto &[_, sent] : state.sent) { + if (sent.type == RequestType::ReadInbox + || sent.type == RequestType::DialogsEntry + || sent.type == RequestType::Delete) { + if (!fromPostponed) { + auto &postponed = state.postponed; + for (auto i = begin(postponed); i != end(postponed);) { + if (i->second.type == RequestType::ReadInbox) { + i = postponed.erase(i); + } else { + ++i; + } + } + } + return Action::Postpone; + } + } + return Action::Send; + + case RequestType::DialogsEntry: + for (const auto &[_, sent] : state.sent) { + if (sent.type == RequestType::DialogsEntry) { + return Action::Skip; + } + if (sent.type == RequestType::ReadInbox + || sent.type == RequestType::Delete) { + if (!fromPostponed) { + auto &postponed = state.postponed; + for (const auto &[_, postponed] : state.postponed) { + if (postponed.type == RequestType::DialogsEntry) { + return Action::Skip; + } + } + } + return Action::Postpone; + } + } + return Action::Send; + + case RequestType::History: + for (const auto &[_, sent] : state.sent) { + if (sent.type == RequestType::Delete) { + return Action::Postpone; + } + } + return Action::Send; + + case RequestType::Delete: + for (const auto &[_, sent] : state.sent) { + if (sent.type == RequestType::History + || sent.type == RequestType::ReadInbox) { + return Action::Postpone; + } + } + for (auto i = begin(state.sent); i != end(state.sent);) { + if (i->second.type == RequestType::DialogsEntry) { + session().api().request(i->second.id).cancel(); + i = state.sent.erase(i); + state.thenRequestEntry = true; + } + } + return Action::Send; + } + Unexpected("Request type in Histories::chooseAction."); +} + +Histories::State *Histories::lookup(not_null history) { + const auto i = _states.find(history); + return (i != end(_states)) ? &i->second : nullptr; +} + +} // namespace Data diff --git a/Telegram/SourceFiles/data/data_histories.h b/Telegram/SourceFiles/data/data_histories.h new file mode 100644 index 0000000000..de62ef09d5 --- /dev/null +++ b/Telegram/SourceFiles/data/data_histories.h @@ -0,0 +1,93 @@ +/* +This file is part of Telegram Desktop, +the official desktop application for the Telegram messaging service. + +For license and copyright information please follow this link: +https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL +*/ +#pragma once + +#include "base/timer.h" + +class History; +class HistoryItem; + +namespace Main { +class Session; +} // namespace Main + +namespace Data { + +class Session; + +class Histories final { +public: + explicit Histories(not_null owner); + + [[nodiscard]] Session &owner() const; + [[nodiscard]] Main::Session &session() const; + + [[nodiscard]] History *find(PeerId peerId); + [[nodiscard]] not_null findOrCreate(PeerId peerId); + + void unloadAll(); + void clearAll(); + + void readInboxTill( + not_null history, + not_null item); + void sendPendingReadInbox(not_null history); + +private: + enum class RequestType : uchar { + None, + DialogsEntry, + History, + ReadInbox, + Delete, + }; + enum class Action : uchar { + Send, + Postpone, + Skip, + }; + struct PostponedRequest { + Fn done)> generator; + RequestType type = RequestType::None; + }; + struct SentRequest { + mtpRequestId id = 0; + RequestType type = RequestType::None; + }; + struct State { + base::flat_map postponed; + base::flat_map sent; + crl::time readWhen = 0; + MsgId readTill = 0; + int autoincrement = 0; + bool thenRequestEntry = false; + }; + + void sendReadRequests(); + void sendReadRequest(not_null history, State &state); + [[nodiscard]] State *lookup(not_null history); + void checkEmptyState(not_null history); + int sendRequest( + not_null history, + RequestType type, + Fn done)> generator); + void checkPostponed(not_null history, int requestId); + [[nodiscard]] Action chooseAction( + State &state, + RequestType type, + bool fromPostponed = false) const; + + const not_null _owner; + + std::unordered_map> _map; + base::flat_map, State> _states; + base::Timer _readRequestsTimer; + +}; + +} // namespace Data diff --git a/Telegram/SourceFiles/data/data_session.cpp b/Telegram/SourceFiles/data/data_session.cpp index 6638aa8702..36f0540759 100644 --- a/Telegram/SourceFiles/data/data_session.cpp +++ b/Telegram/SourceFiles/data/data_session.cpp @@ -48,6 +48,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "data/data_cloud_themes.h" #include "data/data_streaming.h" #include "data/data_media_rotation.h" +#include "data/data_histories.h" #include "base/platform/base_platform_info.h" #include "base/unixtime.h" #include "base/call_delayed.h" @@ -194,7 +195,8 @@ Session::Session(not_null session) , _scheduledMessages(std::make_unique(this)) , _cloudThemes(std::make_unique(session)) , _streaming(std::make_unique(this)) -, _mediaRotation(std::make_unique()) { +, _mediaRotation(std::make_unique()) +, _histories(std::make_unique(this)) { _cache->open(Local::cacheKey()); _bigFileCache->open(Local::cacheBigFileKey()); @@ -215,9 +217,7 @@ Session::Session(not_null session) void Session::clear() { _sendActions.clear(); - for (const auto &[peerId, history] : _histories) { - history->clear(History::ClearType::Unload); - } + _histories->unloadAll(); _scheduledMessages = nullptr; _dependentMessages.clear(); base::take(_messages); @@ -227,7 +227,7 @@ void Session::clear() { cSetRecentInlineBots(RecentInlineBots()); cSetRecentStickers(RecentStickerPack()); App::clearMousedItems(); - _histories.clear(); + _histories->clearAll(); } not_null Session::peer(PeerId id) { @@ -768,20 +768,11 @@ void Session::enumerateChannels( } not_null Session::history(PeerId peerId) { - Expects(peerId != 0); - - if (const auto result = historyLoaded(peerId)) { - return result; - } - const auto [i, ok] = _histories.emplace( - peerId, - std::make_unique(this, peerId)); - return i->second.get(); + return _histories->findOrCreate(peerId); } History *Session::historyLoaded(PeerId peerId) const { - const auto i = peerId ? _histories.find(peerId) : end(_histories); - return (i != end(_histories)) ? i->second.get() : nullptr; + return _histories->find(peerId); } not_null Session::history(not_null peer) { diff --git a/Telegram/SourceFiles/data/data_session.h b/Telegram/SourceFiles/data/data_session.h index 9b24d5294d..aa2774292b 100644 --- a/Telegram/SourceFiles/data/data_session.h +++ b/Telegram/SourceFiles/data/data_session.h @@ -61,6 +61,7 @@ class ScheduledMessages; class CloudThemes; class Streaming; class MediaRotation; +class Histories; class Session final { public: @@ -96,6 +97,9 @@ public: [[nodiscard]] MediaRotation &mediaRotation() const { return *_mediaRotation; } + [[nodiscard]] Histories &histories() const { + return *_histories; + } [[nodiscard]] MsgId nextNonHistoryEntryId() { return ++_nonHistoryEntryId; } @@ -968,7 +972,6 @@ private: base::Timer _unmuteByFinishedTimer; std::unordered_map> _peers; - std::unordered_map> _histories; MessageIdsList _mimeForwardIds; @@ -989,6 +992,7 @@ private: std::unique_ptr _cloudThemes; std::unique_ptr _streaming; std::unique_ptr _mediaRotation; + std::unique_ptr _histories; MsgId _nonHistoryEntryId = ServerMaxMsgId; rpl::lifetime _lifetime; diff --git a/Telegram/SourceFiles/history/history.cpp b/Telegram/SourceFiles/history/history.cpp index fe2a8d8ed7..325bae8f5f 100644 --- a/Telegram/SourceFiles/history/history.cpp +++ b/Telegram/SourceFiles/history/history.cpp @@ -23,6 +23,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "data/data_channel.h" #include "data/data_chat.h" #include "data/data_user.h" +#include "data/data_histories.h" #include "lang/lang_keys.h" #include "apiwrap.h" #include "mainwidget.h" @@ -1591,6 +1592,16 @@ void History::calculateFirstUnreadMessage() { } } +bool History::readInboxTillNeedsRequest(MsgId tillId) { + Expects(IsServerMsgId(tillId)); + + readClientSideMessages(); + if (unreadMark()) { + session().api().changeDialogUnreadMark(this, false); + } + return (_inboxReadBefore.value_or(1) <= tillId); +} + void History::readClientSideMessages() { auto unread = unreadCount(); for (const auto item : _localMessages) { @@ -1616,80 +1627,28 @@ MsgId History::readInbox() { } void History::readInboxTill(not_null item) { - if (!IsServerMsgId(item->id)) { - auto view = item->mainView(); - if (!view) { - return; - } - auto block = view->block(); - auto blockIndex = block->indexInHistory(); - auto itemIndex = view->indexInBlock(); - while (blockIndex > 0 || itemIndex > 0) { - if (itemIndex > 0) { - view = block->messages[--itemIndex].get(); - } else { - while (blockIndex > 0) { - block = blocks[--blockIndex].get(); - itemIndex = block->messages.size(); - if (itemIndex > 0) { - view = block->messages[--itemIndex].get(); - break; - } - } - } - item = view->data(); - if (IsServerMsgId(item->id)) { - break; - } - } - if (!IsServerMsgId(item->id)) { - LOG(("App Error: " - "Can't read history till unknown local message.")); - return; - } - } - readClientSideMessages(); - if (unreadMark()) { - session().api().changeDialogUnreadMark(this, false); - } - if (_inboxReadTillLocal >= item->id) { - return; - } - _inboxReadTillLocal = item->id; - const auto stillUnread = countStillUnreadLocal(); - if (!stillUnread) { - session().api().readServerHistoryForce(this, _inboxReadTillLocal); - return; - } - setInboxReadTill(_inboxReadTillLocal); - if (stillUnread && _unreadCount && *stillUnread == *_unreadCount) { - return; - } - setUnreadCount(*stillUnread); - session().api().readServerHistoryForce(this, _inboxReadTillLocal); - updateChatListEntry(); + owner().histories().readInboxTill(this, item); } -bool History::unreadCountRefreshNeeded() const { +bool History::unreadCountRefreshNeeded(MsgId readTillId) const { return !unreadCountKnown() - || ((_inboxReadTillLocal + 1) > _inboxReadBefore.value_or(0)); + || ((readTillId + 1) > _inboxReadBefore.value_or(0)); } -std::optional History::countStillUnreadLocal() const { +std::optional History::countStillUnreadLocal(MsgId readTillId) const { if (isEmpty()) { return std::nullopt; } - const auto till = _inboxReadTillLocal; if (_inboxReadBefore) { const auto before = *_inboxReadBefore; - if (minMsgId() <= before && maxMsgId() >= till) { + if (minMsgId() <= before && maxMsgId() >= readTillId) { auto result = 0; for (const auto &block : blocks) { for (const auto &message : block->messages) { const auto item = message->data(); if (item->out() || !IsServerMsgId(item->id)) { continue; - } else if (item->id > till) { + } else if (item->id > readTillId) { break; } else if (item->id >= before) { ++result; @@ -1701,14 +1660,16 @@ std::optional History::countStillUnreadLocal() const { } } } - if (!loadedAtBottom() || minMsgId() > till) { + if (!loadedAtBottom() || minMsgId() > readTillId) { return std::nullopt; } auto result = 0; for (const auto &block : blocks) { for (const auto &message : block->messages) { const auto item = message->data(); - if (!item->out() && IsServerMsgId(item->id) && item->id > till) { + if (!item->out() + && IsServerMsgId(item->id) + && item->id > readTillId) { ++result; } } @@ -1727,7 +1688,7 @@ void History::applyInboxReadUpdate( session().api().requestDialogEntry(this); session().api().requestDialogEntry(folder); } - if (_inboxReadTillLocal <= upTo) { + if (_inboxReadBefore.value_or(1) <= upTo) { if (!peer->isChannel() || peer->asChannel()->pts() == channelPts) { inboxRead(upTo, stillUnread); } else { @@ -2760,7 +2721,8 @@ void History::applyDialogFields( } else { clearFolder(); } - if (!skipUnreadUpdate() && maxInboxRead >= _inboxReadTillLocal) { + if (!skipUnreadUpdate() + && maxInboxRead >= _inboxReadBefore.value_or(1)) { setUnreadCount(unreadCount); setInboxReadTill(maxInboxRead); } @@ -2794,7 +2756,6 @@ void History::setInboxReadTill(MsgId upTo) { } else { _inboxReadBefore = upTo + 1; } - accumulate_max(_inboxReadTillLocal, upTo); } void History::setOutboxReadTill(MsgId upTo) { diff --git a/Telegram/SourceFiles/history/history.h b/Telegram/SourceFiles/history/history.h index 83d8a9f157..9b57e4d62c 100644 --- a/Telegram/SourceFiles/history/history.h +++ b/Telegram/SourceFiles/history/history.h @@ -160,6 +160,7 @@ public: MsgId readInbox(); void readInboxTill(not_null item); + [[nodiscard]] bool readInboxTillNeedsRequest(MsgId tillId); void applyInboxReadUpdate( FolderId folderId, MsgId upTo, @@ -177,7 +178,7 @@ public: [[nodiscard]] bool unreadCountKnown() const; // Some old unread count is known, but we read history till some place. - [[nodiscard]] bool unreadCountRefreshNeeded() const; + [[nodiscard]] bool unreadCountRefreshNeeded(MsgId readTillId) const; void setUnreadCount(int newUnreadCount); void setUnreadMark(bool unread); @@ -349,6 +350,10 @@ public: HistoryItem *folderDialogItem = nullptr); void clearFolder(); + // Interface for Data::Histories. + void setInboxReadTill(MsgId upTo); + std::optional countStillUnreadLocal(MsgId readTillId) const; + // Still public data. std::deque> blocks; @@ -437,7 +442,6 @@ private: TimeId adjustedChatListTimeId() const override; void changedChatListPinHook() override; - void setInboxReadTill(MsgId upTo); void setOutboxReadTill(MsgId upTo); void readClientSideMessages(); @@ -474,7 +478,6 @@ private: void getNextFirstUnreadMessage(); bool nonEmptyCountMoreThan(int count) const; std::optional countUnread(MsgId upTo) const; - std::optional countStillUnreadLocal() const; // Creates if necessary a new block for adding item. // Depending on isBuildingFrontBlock() gets front or back block. @@ -503,7 +506,6 @@ private: std::optional _inboxReadBefore; std::optional _outboxReadBefore; - MsgId _inboxReadTillLocal = 0; std::optional _unreadCount; std::optional _unreadMentionsCount; base::flat_set _unreadMentions; diff --git a/Telegram/SourceFiles/history/history_widget.cpp b/Telegram/SourceFiles/history/history_widget.cpp index e7892de327..ea4701bd36 100644 --- a/Telegram/SourceFiles/history/history_widget.cpp +++ b/Telegram/SourceFiles/history/history_widget.cpp @@ -41,6 +41,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "data/data_user.h" #include "data/data_scheduled_messages.h" #include "data/data_file_origin.h" +#include "data/data_histories.h" #include "history/history.h" #include "history/history_item.h" #include "history/history_message.h" @@ -1706,6 +1707,7 @@ void HistoryWidget::showHistory( return; } updateSendAction(_history, SendAction::Type::Typing, -1); + session().data().histories().sendPendingReadInbox(_history); cancelTypingAction(); }