diff --git a/src/crimson/os/alienstore/semaphore.h b/src/crimson/os/alienstore/semaphore.h new file mode 100644 index 00000000000..8cba02ab15f --- /dev/null +++ b/src/crimson/os/alienstore/semaphore.h @@ -0,0 +1,90 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab expandtab +#pragma once + +#include +#include +#include +#include +#include + +namespace crimson { + +// an implementation of std::counting_semaphore<> in C++17 using the POSIX +// semaphore. +// +// LeastMaxValue is ignored, as we don't have different backends optimized +// for different LeastMaxValues +template +class counting_semaphore { + using clock_t = std::chrono::system_clock; +public: + explicit counting_semaphore(unsigned count) noexcept { + sem_init(&sem, 0, count); + } + + counting_semaphore(const counting_semaphore&) = delete; + counting_semaphore& operator=(const counting_semaphore&) = delete; + + ~counting_semaphore() { + sem_destroy(&sem); + } + + void acquire() noexcept { + for (;;) { + int err = sem_wait(&sem); + if (err != 0) { + if (errno == EINTR) { + continue; + } else { + std::terminate(); + } + } else { + break; + } + } + } + + void release(unsigned update = 1) { + for (; update != 0; --update) { + int err = sem_post(&sem); + if (err != 0) { + std::terminate(); + } + } + } + + template + bool try_acquire_until(const std::chrono::time_point& abs_time) noexcept { + auto s = std::chrono::time_point_cast(abs_time); + auto ns = std::chrono::duration_cast(abs_time - s); + struct timespec ts = { + static_cast(s.time_since_epoch().count()), + static_cast(ns.count()) + }; + for (;;) { + if (int err = sem_timedwait(&sem, &ts); err) { + if (errno == EINTR) { + continue; + } else if (errno == ETIMEDOUT || errno == EINVAL) { + return false; + } else { + std::terminate(); + } + } else { + break; + } + } + return true; + } + + template + bool try_acquire_for(const std::chrono::duration& rel_time) { + return try_acquire_until(clock_t::now() + rel_time); + } + +private: + sem_t sem; +}; + +} diff --git a/src/crimson/os/alienstore/thread_pool.h b/src/crimson/os/alienstore/thread_pool.h index 3a99398527b..8f3069af3a5 100644 --- a/src/crimson/os/alienstore/thread_pool.h +++ b/src/crimson/os/alienstore/thread_pool.h @@ -14,6 +14,15 @@ #include #include +#if __cplusplus > 201703L +#include +namespace crimson { + using std::counting_semaphore; +} +#else +#include "semaphore.h" +#endif + namespace crimson::os { struct WorkItem { @@ -75,33 +84,32 @@ struct SubmitQueue { struct ShardedWorkQueue { public: WorkItem* pop_front(std::chrono::milliseconds& queue_max_wait) { - WorkItem* work_item = nullptr; - std::unique_lock lock{mutex}; - cond.wait_for(lock, queue_max_wait, [this, &work_item] { - return pending.pop(work_item) || is_stopping(); - }); - return work_item; + if (sem.try_acquire_for(queue_max_wait)) { + if (!is_stopping()) { + WorkItem* work_item = nullptr; + [[maybe_unused]] bool popped = pending.pop(work_item); + assert(popped); + return work_item; + } + } + return nullptr; } void stop() { - { - std::unique_lock lock{mutex}; - stopping = true; - } - cond.notify_all(); + stopping = true; + sem.release(); } void push_back(WorkItem* work_item) { [[maybe_unused]] bool pushed = pending.push(work_item); assert(pushed); - cond.notify_one(); + sem.release(); } private: bool is_stopping() const { return stopping; } - bool stopping = false; - std::mutex mutex; - std::condition_variable cond; + std::atomic stopping = false; static constexpr unsigned QUEUE_SIZE = 128; + crimson::counting_semaphore sem{0}; boost::lockfree::queue pending{QUEUE_SIZE}; };