Use limited number of threads in MTP.

This commit is contained in:
John Preston 2019-11-27 18:09:56 +03:00
parent c742d7406c
commit 64535251e8
4 changed files with 74 additions and 32 deletions

View File

@ -139,7 +139,7 @@ Connection::Connection(
moveToThread(thread);
connect(thread, &QThread::started, this, [=] {
InvokeQueued(this, [=] {
_checkSentRequestsTimer.callEach(kCheckSentRequestsEach);
connectToServer();
});

View File

@ -170,6 +170,7 @@ private:
Session *findSession(ShiftedDcId shiftedDcId);
not_null<Session*> startSession(ShiftedDcId shiftedDcId);
Session *removeSession(ShiftedDcId shiftedDcId);
[[nodiscard]] not_null<QThread*> getThreadForDc(ShiftedDcId shiftedDcId);
void applyDomainIps(
const QString &host,
@ -200,6 +201,10 @@ private:
const not_null<DcOptions*> _dcOptions;
const Instance::Mode _mode = Instance::Mode::Normal;
std::unique_ptr<QThread> _mainSessionThread;
std::unique_ptr<QThread> _otherSessionsThread;
std::vector<std::unique_ptr<QThread>> _fileSessionThreads;
QString _deviceModel;
QString _systemVersion;
@ -263,6 +268,8 @@ Instance::Private::Private(
, _instance(instance)
, _dcOptions(options)
, _mode(mode) {
const auto idealThreadPoolSize = QThread::idealThreadCount();
_fileSessionThreads.resize(2 * std::max(idealThreadPoolSize / 2, 1));
}
void Instance::Private::start(Config &&config) {
@ -1486,11 +1493,11 @@ not_null<Session*> Instance::Private::startSession(ShiftedDcId shiftedDcId) {
Expects(BareDcId(shiftedDcId) != 0);
const auto dc = getDcById(shiftedDcId);
const auto thread = getThreadForDc(shiftedDcId);
const auto result = _sessions.emplace(
shiftedDcId,
std::make_unique<Session>(_instance, shiftedDcId, dc)
std::make_unique<Session>(_instance, thread, shiftedDcId, dc)
).first->second.get();
result->start();
if (isKeysDestroyer()) {
scheduleKeyDestroy(shiftedDcId);
}
@ -1509,6 +1516,47 @@ Session *Instance::Private::removeSession(ShiftedDcId shiftedDcId) {
return _sessionsToDestroy.back().get();
}
not_null<QThread*> Instance::Private::getThreadForDc(
ShiftedDcId shiftedDcId) {
static const auto EnsureStarted = [](std::unique_ptr<QThread> &thread) {
if (!thread) {
thread = std::make_unique<QThread>();
thread->start();
}
return thread.get();
};
static const auto FindOne = [](
std::vector<std::unique_ptr<QThread>> &threads,
int index,
bool shift) {
Expects(!threads.empty());
Expects(!(threads.size() % 2));
const auto count = int(threads.size());
index %= count;
if (index >= count / 2) {
index = (count - 1) - (index - count / 2);
}
if (shift) {
index = (index + count / 2) % count;
}
return EnsureStarted(threads[index]);
};
if (shiftedDcId == BareDcId(shiftedDcId)) {
return EnsureStarted(_mainSessionThread);
} else if (isDownloadDcId(shiftedDcId)) {
const auto index = GetDcIdShift(shiftedDcId) - kBaseDownloadDcShift;
const auto composed = index + BareDcId(shiftedDcId);
return FindOne(_fileSessionThreads, composed, false);
} else if (isUploadDcId(shiftedDcId)) {
const auto index = GetDcIdShift(shiftedDcId) - kBaseUploadDcShift;
const auto composed = index + BareDcId(shiftedDcId);
return FindOne(_fileSessionThreads, composed, true);
}
return EnsureStarted(_otherSessionsThread);
}
void Instance::Private::scheduleKeyDestroy(ShiftedDcId shiftedDcId) {
Expects(isKeysDestroyer());
@ -1616,6 +1664,23 @@ void Instance::Private::prepareToDestroy() {
session->kill();
}
_mainSession = nullptr;
auto threads = std::vector<std::unique_ptr<QThread>>();
threads.push_back(base::take(_mainSessionThread));
threads.push_back(base::take(_otherSessionsThread));
for (auto &thread : base::take(_fileSessionThreads)) {
threads.push_back(std::move(thread));
}
for (const auto &thread : threads) {
if (thread) {
thread->quit();
}
}
for (const auto &thread : threads) {
if (thread) {
thread->wait();
}
}
}
Instance::Instance(not_null<DcOptions*> options, Mode mode, Config &&config)

View File

@ -146,29 +146,28 @@ void SessionData::detach() {
Session::Session(
not_null<Instance*> instance,
not_null<QThread*> thread,
ShiftedDcId shiftedDcId,
not_null<Dcenter*> dc)
: _instance(instance)
, _shiftedDcId(shiftedDcId)
, _dc(dc)
, _data(std::make_shared<SessionData>(this))
, _thread(thread)
, _sender([=] { needToResumeAndSend(); }) {
_timeouter.callEach(1000);
refreshOptions();
watchDcKeyChanges();
watchDcOptionsChanges();
start();
}
Session::~Session() {
Expects(!_connection);
Expects(!_thread);
if (_myKeyCreation != CreatingKeyType::None) {
releaseKeyCreationOnFail();
}
for (const auto &thread : _destroyingThreads) {
thread->wait();
}
}
void Session::watchDcKeyChanges() {
@ -210,27 +209,11 @@ void Session::watchDcOptionsChanges() {
void Session::start() {
killConnection();
_thread = std::make_unique<QThread>();
const auto thread = _thread.get();
connect(thread, &QThread::finished, [=] {
InvokeQueued(this, [=] {
const auto i = ranges::find(
_destroyingThreads,
thread,
&std::unique_ptr<QThread>::get);
if (i != _destroyingThreads.end()) {
_destroyingThreads.erase(i);
}
});
});
_connection = new Connection(
_instance,
thread,
_thread.get(),
_data,
_shiftedDcId);
thread->start();
}
bool Session::rpcErrorOccured(
@ -578,18 +561,13 @@ void Session::tryToReceive() {
}
void Session::killConnection() {
Expects(!_thread || _connection);
if (!_connection) {
return;
}
base::take(_connection)->deleteLater();
_destroyingThreads.push_back(base::take(_thread));
_destroyingThreads.back()->quit();
Ensures(_connection == nullptr);
Ensures(_thread == nullptr);
}
} // namespace internal

View File

@ -138,6 +138,7 @@ public:
// Main thread.
Session(
not_null<Instance*> instance,
not_null<QThread*> thread,
ShiftedDcId shiftedDcId,
not_null<Dcenter*> dc);
~Session();
@ -198,9 +199,7 @@ private:
const ShiftedDcId _shiftedDcId = 0;
const not_null<Dcenter*> _dc;
const std::shared_ptr<SessionData> _data;
std::unique_ptr<QThread> _thread;
std::vector<std::unique_ptr<QThread>> _destroyingThreads;
const not_null<QThread*> _thread;
Connection *_connection = nullptr;