412 lines
11 KiB
C++
412 lines
11 KiB
C++
/*
|
|
This file is part of Telegram Desktop,
|
|
the official desktop version of Telegram messaging app, see https://telegram.org
|
|
|
|
Telegram Desktop is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
It is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
In addition, as a special exception, the copyright holders give permission
|
|
to link the code of portions of this program with the OpenSSL library.
|
|
|
|
Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE
|
|
Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org
|
|
*/
|
|
#include "core/task_queue.h"
|
|
|
|
namespace base {
|
|
namespace {
|
|
|
|
auto MainThreadId = QThread::currentThreadId();
|
|
const auto MaxThreadsCount = qMax(QThread::idealThreadCount(), 2);
|
|
|
|
template <typename Lambda>
|
|
class Thread : public QThread {
|
|
public:
|
|
Thread(Lambda code) : _code(std::move(code)) {
|
|
}
|
|
void run() override {
|
|
_code();
|
|
}
|
|
|
|
private:
|
|
Lambda _code;
|
|
|
|
};
|
|
|
|
template <typename Lambda>
|
|
object_ptr<Thread<Lambda>> MakeThread(Lambda code) {
|
|
return object_ptr<Thread<Lambda>>(std::move(code));
|
|
}
|
|
|
|
} // namespace
|
|
|
|
class TaskQueue::TaskQueueList {
|
|
public:
|
|
TaskQueueList();
|
|
|
|
void Register(TaskQueue *queue);
|
|
void Unregister(TaskQueue *queue);
|
|
bool IsInList(TaskQueue *queue) const;
|
|
void Clear();
|
|
bool Empty(int list_index_) const;
|
|
TaskQueue *TakeFirst(int list_index_);
|
|
|
|
private:
|
|
void Insert(TaskQueue *queue, int list_index_);
|
|
void Remove(TaskQueue *queue, int list_index_);
|
|
|
|
TaskQueue *Tail() { return &tail_; }
|
|
const TaskQueue *Tail() const { return &tail_; }
|
|
|
|
TaskQueue tail_ = { Type::Special, Priority::Normal };
|
|
TaskQueue *(lists_[kQueuesListsCount]);
|
|
|
|
};
|
|
|
|
class TaskQueue::TaskThreadPool {
|
|
struct Private {
|
|
};
|
|
|
|
public:
|
|
TaskThreadPool(const Private &) { }
|
|
static const QSharedPointer<TaskThreadPool> &Instance();
|
|
|
|
void AddQueueTask(TaskQueue *queue, Task &&task);
|
|
void RemoveQueue(TaskQueue *queue);
|
|
|
|
~TaskThreadPool();
|
|
|
|
private:
|
|
|
|
void ThreadFunction();
|
|
|
|
std::vector<object_ptr<QThread>> threads_;
|
|
QMutex queues_mutex_;
|
|
|
|
// queues_mutex_ must be locked when working with the list.
|
|
TaskQueueList queue_list_;
|
|
|
|
QWaitCondition thread_condition_;
|
|
bool stopped_ = false;
|
|
int tasks_in_process_ = 0;
|
|
int background_tasks_in_process_ = 0;
|
|
|
|
};
|
|
|
|
TaskQueue::TaskQueueList::TaskQueueList() {
|
|
for (auto &list : lists_) {
|
|
list = &tail_;
|
|
}
|
|
}
|
|
|
|
void TaskQueue::TaskQueueList::Register(TaskQueue *queue) {
|
|
t_assert(!queue->SerialTaskInProcess());
|
|
|
|
Insert(queue, kAllQueuesList);
|
|
if (queue->priority_ == Priority::Normal) {
|
|
Insert(queue, kOnlyNormalQueuesList);
|
|
}
|
|
}
|
|
|
|
void TaskQueue::TaskQueueList::Unregister(TaskQueue *queue) {
|
|
Remove(queue, kAllQueuesList);
|
|
if (queue->priority_ == Priority::Normal) {
|
|
Remove(queue, kOnlyNormalQueuesList);
|
|
}
|
|
}
|
|
|
|
void TaskQueue::TaskQueueList::Insert(TaskQueue *queue, int list_index_) {
|
|
t_assert(list_index_ < kQueuesListsCount);
|
|
|
|
auto tail = Tail();
|
|
if (lists_[list_index_] == tail) {
|
|
lists_[list_index_] = queue;
|
|
}
|
|
|
|
auto &list_entry = queue->list_entries_[list_index_];
|
|
t_assert(list_entry.after == nullptr);
|
|
if ((list_entry.before = tail->list_entries_[list_index_].before)) {
|
|
list_entry.before->list_entries_[list_index_].after = queue;
|
|
}
|
|
list_entry.after = tail;
|
|
tail->list_entries_[list_index_].before = queue;
|
|
}
|
|
|
|
void TaskQueue::TaskQueueList::Remove(TaskQueue *queue, int list_index_) {
|
|
t_assert(list_index_ < kQueuesListsCount);
|
|
|
|
auto &list_entry = queue->list_entries_[list_index_];
|
|
t_assert(list_entry.after != nullptr);
|
|
if (lists_[list_index_] == queue) {
|
|
lists_[list_index_] = list_entry.after;
|
|
} else {
|
|
t_assert(list_entry.before != nullptr);
|
|
list_entry.before->list_entries_[list_index_].after = list_entry.after;
|
|
}
|
|
list_entry.after->list_entries_[list_index_].before = list_entry.before;
|
|
list_entry.before = list_entry.after = nullptr;
|
|
}
|
|
|
|
bool TaskQueue::TaskQueueList::IsInList(TaskQueue *queue) const {
|
|
if (queue->list_entries_[kAllQueuesList].after) {
|
|
return true;
|
|
}
|
|
t_assert(queue->list_entries_[kOnlyNormalQueuesList].after == nullptr);
|
|
return false;
|
|
}
|
|
|
|
void TaskQueue::TaskQueueList::Clear() {
|
|
auto tail = Tail();
|
|
for (int i = 0; i < kQueuesListsCount; ++i) {
|
|
for (auto j = lists_[i], next = j; j != tail; j = next) {
|
|
auto &list_entry = j->list_entries_[i];
|
|
next = list_entry.after;
|
|
list_entry.before = list_entry.after = nullptr;
|
|
}
|
|
lists_[i] = tail;
|
|
}
|
|
}
|
|
|
|
bool TaskQueue::TaskQueueList::Empty(int list_index_) const {
|
|
t_assert(list_index_ < kQueuesListsCount);
|
|
|
|
auto list = lists_[list_index_];
|
|
t_assert(list != nullptr);
|
|
return (list->list_entries_[list_index_].after == nullptr);
|
|
}
|
|
|
|
TaskQueue *TaskQueue::TaskQueueList::TakeFirst(int list_index_) {
|
|
t_assert(!Empty(list_index_));
|
|
|
|
auto queue = lists_[list_index_];
|
|
Unregister(queue);
|
|
// log_msgs.push_back("Unregistered from list in TakeFirst");
|
|
return queue;
|
|
}
|
|
|
|
void TaskQueue::TaskThreadPool::AddQueueTask(TaskQueue *queue, Task &&task) {
|
|
QMutexLocker lock(&queues_mutex_);
|
|
|
|
queue->tasks_.push_back(std::move(task));
|
|
auto list_was_empty = queue_list_.Empty(kAllQueuesList);
|
|
auto threads_count = threads_.size();
|
|
auto all_threads_processing = (threads_count == tasks_in_process_);
|
|
auto some_threads_are_vacant = !all_threads_processing && list_was_empty;
|
|
auto will_create_thread = !some_threads_are_vacant && (threads_count < MaxThreadsCount);
|
|
|
|
if (!queue->SerialTaskInProcess()) {
|
|
if (!queue_list_.IsInList(queue)) {
|
|
queue_list_.Register(queue);
|
|
}
|
|
}
|
|
if (will_create_thread) {
|
|
threads_.push_back(MakeThread([this]() {
|
|
ThreadFunction();
|
|
}));
|
|
threads_.back()->start();
|
|
} else if (some_threads_are_vacant) {
|
|
t_assert(threads_count > tasks_in_process_);
|
|
thread_condition_.wakeOne();
|
|
}
|
|
}
|
|
|
|
void TaskQueue::TaskThreadPool::RemoveQueue(TaskQueue *queue) {
|
|
QMutexLocker lock(&queues_mutex_);
|
|
if (queue_list_.IsInList(queue)) {
|
|
queue_list_.Unregister(queue);
|
|
}
|
|
if (queue->destroyed_flag_) {
|
|
*queue->destroyed_flag_ = true;
|
|
}
|
|
}
|
|
|
|
TaskQueue::TaskThreadPool::~TaskThreadPool() {
|
|
{
|
|
QMutexLocker lock(&queues_mutex_);
|
|
queue_list_.Clear();
|
|
stopped_ = true;
|
|
}
|
|
thread_condition_.wakeAll();
|
|
for (auto &thread : threads_) {
|
|
thread->wait();
|
|
}
|
|
}
|
|
|
|
const QSharedPointer<TaskQueue::TaskThreadPool> &TaskQueue::TaskThreadPool::Instance() { // static
|
|
static auto Pool = MakeShared<TaskThreadPool>(Private());
|
|
return Pool;
|
|
}
|
|
|
|
void TaskQueue::TaskThreadPool::ThreadFunction() {
|
|
// Flag marking that the previous processed task was
|
|
// with a Background priority. We count all the background
|
|
// tasks being processed.
|
|
bool background_task = false;
|
|
|
|
// Saved serial queue pointer. When we process a serial
|
|
// queue task we don't return the queue to the list until
|
|
// the task is processed and we return it on the next cycle.
|
|
TaskQueue *serial_queue = nullptr;
|
|
bool serial_queue_destroyed = false;
|
|
bool task_was_processed = false;
|
|
while (true) {
|
|
Task task;
|
|
{
|
|
QMutexLocker lock(&queues_mutex_);
|
|
|
|
// Finish the previous task processing.
|
|
if (task_was_processed) {
|
|
--tasks_in_process_;
|
|
}
|
|
if (background_task) {
|
|
--background_tasks_in_process_;
|
|
background_task = false;
|
|
}
|
|
if (serial_queue) {
|
|
if (!serial_queue_destroyed) {
|
|
serial_queue->destroyed_flag_ = nullptr;
|
|
if (!serial_queue->tasks_.empty()) {
|
|
queue_list_.Register(serial_queue);
|
|
}
|
|
}
|
|
serial_queue = nullptr;
|
|
serial_queue_destroyed = false;
|
|
}
|
|
|
|
// Wait for a task to appear in the queues list.
|
|
while (queue_list_.Empty(kAllQueuesList)) {
|
|
if (stopped_) {
|
|
return;
|
|
}
|
|
thread_condition_.wait(&queues_mutex_);
|
|
}
|
|
|
|
// Select a task we will be processing.
|
|
auto processing_background = (background_tasks_in_process_ > 0);
|
|
auto take_only_normal = processing_background && !queue_list_.Empty(kOnlyNormalQueuesList);
|
|
auto take_from_list_ = take_only_normal ? kOnlyNormalQueuesList : kAllQueuesList;
|
|
auto queue = queue_list_.TakeFirst(take_from_list_);
|
|
|
|
t_assert(!queue->tasks_.empty());
|
|
|
|
task = std::move(queue->tasks_.front());
|
|
queue->tasks_.pop_front();
|
|
|
|
if (queue->type_ == Type::Serial) {
|
|
// Serial queues are returned in the list for processing
|
|
// only after the task is finished.
|
|
serial_queue = queue;
|
|
t_assert(serial_queue->destroyed_flag_ == nullptr);
|
|
serial_queue->destroyed_flag_ = &serial_queue_destroyed;
|
|
} else if (!queue->tasks_.empty()) {
|
|
queue_list_.Register(queue);
|
|
}
|
|
|
|
++tasks_in_process_;
|
|
task_was_processed = true;
|
|
if (queue->priority_ == Priority::Background) {
|
|
++background_tasks_in_process_;
|
|
background_task = true;
|
|
}
|
|
}
|
|
|
|
task();
|
|
}
|
|
}
|
|
|
|
TaskQueue::TaskQueue(Type type, Priority priority)
|
|
: type_(type)
|
|
, priority_(priority) {
|
|
if (type_ != Type::Main && type_ != Type::Special) {
|
|
weak_thread_pool_ = TaskThreadPool::Instance();
|
|
}
|
|
}
|
|
|
|
TaskQueue::~TaskQueue() {
|
|
if (type_ != Type::Main && type_ != Type::Special) {
|
|
if (auto thread_pool = weak_thread_pool_.toStrongRef()) {
|
|
thread_pool->RemoveQueue(this);
|
|
}
|
|
}
|
|
}
|
|
|
|
void TaskQueue::Put(Task &&task) {
|
|
if (type_ == Type::Main) {
|
|
QMutexLocker lock(&tasks_mutex_);
|
|
tasks_.push_back(std::move(task));
|
|
|
|
Sandbox::MainThreadTaskAdded();
|
|
} else {
|
|
t_assert(type_ != Type::Special);
|
|
TaskThreadPool::Instance()->AddQueueTask(this, std::move(task));
|
|
}
|
|
}
|
|
|
|
void TaskQueue::ProcessMainTasks() { // static
|
|
t_assert(QThread::currentThreadId() == MainThreadId);
|
|
|
|
while (ProcessOneMainTask()) {
|
|
}
|
|
}
|
|
|
|
void TaskQueue::ProcessMainTasks(TimeMs max_time_spent) { // static
|
|
t_assert(QThread::currentThreadId() == MainThreadId);
|
|
|
|
auto start_time = getms();
|
|
while (ProcessOneMainTask()) {
|
|
if (getms() >= start_time + max_time_spent) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
bool TaskQueue::ProcessOneMainTask() { // static
|
|
Task task;
|
|
{
|
|
QMutexLocker lock(&Main().tasks_mutex_);
|
|
auto &tasks = Main().tasks_;
|
|
if (tasks.empty()) {
|
|
return false;
|
|
}
|
|
|
|
task = std::move(tasks.front());
|
|
tasks.pop_front();
|
|
}
|
|
|
|
task();
|
|
return true;
|
|
}
|
|
|
|
bool TaskQueue::IsMyThread() const {
|
|
if (type_ == Type::Main) {
|
|
return (QThread::currentThreadId() == MainThreadId);
|
|
}
|
|
t_assert(type_ != Type::Special);
|
|
return false;
|
|
}
|
|
|
|
// Default queues.
|
|
TaskQueue &TaskQueue::Main() { // static
|
|
static TaskQueue MainQueue { Type::Main, Priority::Normal };
|
|
return MainQueue;
|
|
}
|
|
|
|
TaskQueue &TaskQueue::Normal() { // static
|
|
static TaskQueue NormalQueue { Type::Concurrent, Priority::Normal };
|
|
return NormalQueue;
|
|
}
|
|
|
|
TaskQueue &TaskQueue::Background() { // static
|
|
static TaskQueue BackgroundQueue { Type::Concurrent, Priority::Background };
|
|
return BackgroundQueue;
|
|
}
|
|
|
|
} // namespace base
|