crimson/os/alienstore: use semaphore to manage tasks in thread pool

* implement std::counting_semaphore in C++17
* use the homebrew counting_semaphore as the synchronization primitive
  to access the pending tasks, for better performance than the
  implementation using mutex and condition_variable.

Signed-off-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Kefu Chai 2021-06-29 20:06:49 +08:00
parent 6ca774dbc7
commit d846120186
2 changed files with 113 additions and 15 deletions

View File

@ -0,0 +1,90 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
// vim: ts=8 sw=2 smarttab expandtab
#pragma once
#include <semaphore.h>
#include <ctime>
#include <cerrno>
#include <exception>
#include <chrono>
namespace crimson {
// an implementation of std::counting_semaphore<> in C++17 using the POSIX
// semaphore.
//
// LeastMaxValue is ignored, as we don't have different backends optimized
// for different LeastMaxValues
template<unsigned LeastMaxValue = 64>
class counting_semaphore {
using clock_t = std::chrono::system_clock;
public:
explicit counting_semaphore(unsigned count) noexcept {
sem_init(&sem, 0, count);
}
counting_semaphore(const counting_semaphore&) = delete;
counting_semaphore& operator=(const counting_semaphore&) = delete;
~counting_semaphore() {
sem_destroy(&sem);
}
void acquire() noexcept {
for (;;) {
int err = sem_wait(&sem);
if (err != 0) {
if (errno == EINTR) {
continue;
} else {
std::terminate();
}
} else {
break;
}
}
}
void release(unsigned update = 1) {
for (; update != 0; --update) {
int err = sem_post(&sem);
if (err != 0) {
std::terminate();
}
}
}
template<typename Clock, typename Duration>
bool try_acquire_until(const std::chrono::time_point<Clock, Duration>& abs_time) noexcept {
auto s = std::chrono::time_point_cast<std::chrono::seconds>(abs_time);
auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(abs_time - s);
struct timespec ts = {
static_cast<std::time_t>(s.time_since_epoch().count()),
static_cast<long>(ns.count())
};
for (;;) {
if (int err = sem_timedwait(&sem, &ts); err) {
if (errno == EINTR) {
continue;
} else if (errno == ETIMEDOUT || errno == EINVAL) {
return false;
} else {
std::terminate();
}
} else {
break;
}
}
return true;
}
template<typename Rep, typename Period>
bool try_acquire_for(const std::chrono::duration<Rep, Period>& rel_time) {
return try_acquire_until(clock_t::now() + rel_time);
}
private:
sem_t sem;
};
}

View File

@ -14,6 +14,15 @@
#include <seastar/core/semaphore.hh>
#include <seastar/core/sharded.hh>
#if __cplusplus > 201703L
#include <semaphore>
namespace crimson {
using std::counting_semaphore;
}
#else
#include "semaphore.h"
#endif
namespace crimson::os {
struct WorkItem {
@ -75,33 +84,32 @@ struct SubmitQueue {
struct ShardedWorkQueue {
public:
WorkItem* pop_front(std::chrono::milliseconds& queue_max_wait) {
WorkItem* work_item = nullptr;
std::unique_lock lock{mutex};
cond.wait_for(lock, queue_max_wait, [this, &work_item] {
return pending.pop(work_item) || is_stopping();
});
return work_item;
if (sem.try_acquire_for(queue_max_wait)) {
if (!is_stopping()) {
WorkItem* work_item = nullptr;
[[maybe_unused]] bool popped = pending.pop(work_item);
assert(popped);
return work_item;
}
}
return nullptr;
}
void stop() {
{
std::unique_lock lock{mutex};
stopping = true;
}
cond.notify_all();
stopping = true;
sem.release();
}
void push_back(WorkItem* work_item) {
[[maybe_unused]] bool pushed = pending.push(work_item);
assert(pushed);
cond.notify_one();
sem.release();
}
private:
bool is_stopping() const {
return stopping;
}
bool stopping = false;
std::mutex mutex;
std::condition_variable cond;
std::atomic<bool> stopping = false;
static constexpr unsigned QUEUE_SIZE = 128;
crimson::counting_semaphore<QUEUE_SIZE> sem{0};
boost::lockfree::queue<WorkItem*> pending{QUEUE_SIZE};
};