tdesktop/Telegram/SourceFiles/mtproto/mtproto_concurrent_sender.cpp

201 lines
5.0 KiB
C++
Raw Normal View History

2018-06-02 14:29:21 +00:00
/*
This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
2019-12-02 13:10:19 +00:00
#include "mtproto/mtproto_concurrent_sender.h"
2018-06-02 14:29:21 +00:00
#include "mtproto/mtp_instance.h"
2021-03-12 12:48:00 +00:00
#include "mtproto/mtproto_response.h"
2019-11-13 08:31:12 +00:00
#include "mtproto/facade.h"
2018-06-02 14:29:21 +00:00
namespace MTP {
class ConcurrentSender::HandlerMaker final {
2018-06-02 14:29:21 +00:00
public:
2021-03-12 12:48:00 +00:00
static DoneHandler MakeDone(
2018-06-02 14:29:21 +00:00
not_null<ConcurrentSender*> sender,
Fn<void(FnMut<void()>)> runner);
2021-03-12 12:48:00 +00:00
static FailHandler MakeFail(
2018-06-02 14:29:21 +00:00
not_null<ConcurrentSender*> sender,
Fn<void(FnMut<void()>)> runner,
2018-06-02 14:29:21 +00:00
FailSkipPolicy skipPolicy);
};
2021-03-12 12:48:00 +00:00
DoneHandler ConcurrentSender::HandlerMaker::MakeDone(
not_null<ConcurrentSender*> sender,
Fn<void(FnMut<void()>)> runner) {
return [
2022-10-19 10:59:37 +00:00
weak = base::make_weak(sender),
runner = std::move(runner)
](const Response &response) mutable {
runner([=]() mutable {
if (const auto strong = weak.get()) {
strong->senderRequestDone(
response.requestId,
bytes::make_span(response.reply));
}
});
return true;
};
}
2021-03-12 12:48:00 +00:00
FailHandler ConcurrentSender::HandlerMaker::MakeFail(
not_null<ConcurrentSender*> sender,
Fn<void(FnMut<void()>)> runner,
FailSkipPolicy skipPolicy) {
return [
2022-10-19 10:59:37 +00:00
weak = base::make_weak(sender),
runner = std::move(runner),
skipPolicy
2021-03-12 12:48:00 +00:00
](const Error &error, const Response &response) mutable {
if (skipPolicy == FailSkipPolicy::Simple) {
2021-03-12 12:48:00 +00:00
if (IsDefaultHandledError(error)) {
return false;
}
} else if (skipPolicy == FailSkipPolicy::HandleFlood) {
2021-03-12 12:48:00 +00:00
if (IsDefaultHandledError(error) && !IsFloodError(error)) {
return false;
}
2018-06-02 14:29:21 +00:00
}
runner([=, requestId = response.requestId]() mutable {
if (const auto strong = weak.get()) {
strong->senderRequestFail(requestId, error);
}
});
return true;
};
2018-06-02 14:29:21 +00:00
}
template <typename Method>
auto ConcurrentSender::with_instance(Method &&method)
-> std::enable_if_t<is_callable_v<Method, not_null<Instance*>>> {
crl::on_main([
weak = _weak,
method = std::forward<Method>(method)
]() mutable {
if (const auto instance = weak.data()) {
2018-06-02 14:29:21 +00:00
std::move(method)(instance);
}
});
}
ConcurrentSender::RequestBuilder::RequestBuilder(
not_null<ConcurrentSender*> sender,
details::SerializedRequest &&serialized) noexcept
2018-06-02 14:29:21 +00:00
: _sender(sender)
, _serialized(std::move(serialized)) {
}
void ConcurrentSender::RequestBuilder::setToDC(ShiftedDcId dcId) noexcept {
_dcId = dcId;
}
void ConcurrentSender::RequestBuilder::setCanWait(crl::time ms) noexcept {
2018-06-02 14:29:21 +00:00
_canWait = ms;
}
void ConcurrentSender::RequestBuilder::setFailSkipPolicy(
FailSkipPolicy policy) noexcept {
_failSkipPolicy = policy;
}
void ConcurrentSender::RequestBuilder::setAfter(
mtpRequestId requestId) noexcept {
_afterRequestId = requestId;
}
mtpRequestId ConcurrentSender::RequestBuilder::send() {
2019-12-02 13:10:19 +00:00
const auto requestId = details::GetNextRequestId();
2018-06-02 14:29:21 +00:00
const auto dcId = _dcId;
const auto msCanWait = _canWait;
const auto afterRequestId = _afterRequestId;
_sender->senderRequestRegister(requestId, std::move(_handlers));
_sender->with_instance([
=,
request = std::move(_serialized),
done = HandlerMaker::MakeDone(_sender, _sender->_runner),
fail = HandlerMaker::MakeFail(
2018-06-02 14:29:21 +00:00
_sender,
_sender->_runner,
2018-06-02 14:29:21 +00:00
_failSkipPolicy)
](not_null<Instance*> instance) mutable {
instance->sendSerialized(
requestId,
std::move(request),
ResponseHandler{ std::move(done), std::move(fail) },
2018-06-02 14:29:21 +00:00
dcId,
msCanWait,
afterRequestId);
});
return requestId;
}
ConcurrentSender::ConcurrentSender(
QPointer<Instance> weak,
Fn<void(FnMut<void()>)> runner)
: _weak(weak)
, _runner(runner) {
2018-06-02 14:29:21 +00:00
}
ConcurrentSender::~ConcurrentSender() {
senderRequestCancelAll();
}
void ConcurrentSender::senderRequestRegister(
mtpRequestId requestId,
Handlers &&handlers) {
_requests.emplace(requestId, std::move(handlers));
}
void ConcurrentSender::senderRequestDone(
mtpRequestId requestId,
bytes::const_span result) {
if (auto handlers = _requests.take(requestId)) {
2019-07-18 14:06:38 +00:00
if (!handlers->done(requestId, result)) {
2019-01-13 13:28:05 +00:00
handlers->fail(
requestId,
2021-03-12 12:48:00 +00:00
Error::Local(
2019-01-13 13:28:05 +00:00
"RESPONSE_PARSE_FAILED",
2019-07-18 14:06:38 +00:00
"ConcurrentSender::senderRequestDone"));
}
2018-06-02 14:29:21 +00:00
}
}
void ConcurrentSender::senderRequestFail(
mtpRequestId requestId,
2021-03-12 12:48:00 +00:00
const Error &error) {
2018-06-02 14:29:21 +00:00
if (auto handlers = _requests.take(requestId)) {
handlers->fail(requestId, error);
2018-06-02 14:29:21 +00:00
}
}
void ConcurrentSender::senderRequestCancel(mtpRequestId requestId) {
2018-06-21 00:54:59 +00:00
senderRequestDetach(requestId);
2018-06-02 14:29:21 +00:00
with_instance([=](not_null<Instance*> instance) {
instance->cancel(requestId);
});
}
void ConcurrentSender::senderRequestCancelAll() {
auto list = std::vector<mtpRequestId>(_requests.size());
for (const auto &pair : base::take(_requests)) {
list.push_back(pair.first);
2018-06-02 14:29:21 +00:00
}
with_instance([list = std::move(list)](not_null<Instance*> instance) {
for (const auto requestId : list) {
instance->cancel(requestId);
}
});
}
2018-06-21 00:54:59 +00:00
void ConcurrentSender::senderRequestDetach(mtpRequestId requestId) {
_requests.erase(requestId);
}
2018-06-02 14:29:21 +00:00
} // namespace MTP