mirror of
https://github.com/ceph/ceph
synced 2025-02-24 03:27:10 +00:00
common: SharedMutex uses ref-counted implementation
add a reference-counted SharedMutexImpl so that lock guards can outlive the SharedMutex itself. this is required because the lock guards are passed with async completions, and there is no guarantee that the executor will process those completions before the SharedMutex destructs. this case is exercised by the async_destruct unit test Fixes: http://tracker.ceph.com/issues/24124 Signed-off-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
parent
36d3a8f984
commit
9f0f8f3c62
185
src/common/async/detail/shared_lock.h
Normal file
185
src/common/async/detail/shared_lock.h
Normal file
@ -0,0 +1,185 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
/*
|
||||
* Ceph - scalable distributed file system
|
||||
*
|
||||
* Copyright (C) 2018 Red Hat
|
||||
*
|
||||
* This is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License version 2.1, as published by the Free Software
|
||||
* Foundation. See file COPYING.
|
||||
*
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
namespace std {
|
||||
|
||||
// specialize unique_lock and shared_lock for SharedMutex to operate on
|
||||
// SharedMutexImpl instead, because the locks may outlive the SharedMutex itself
|
||||
|
||||
template <typename Executor>
|
||||
class unique_lock<ceph::async::SharedMutex<Executor>> {
|
||||
public:
|
||||
using mutex_type = boost::intrusive_ptr<ceph::async::detail::SharedMutexImpl>;
|
||||
|
||||
unique_lock() = default;
|
||||
explicit unique_lock(ceph::async::SharedMutex<Executor>& m)
|
||||
: impl(m.impl), locked(true)
|
||||
{
|
||||
impl->lock();
|
||||
}
|
||||
unique_lock(ceph::async::SharedMutex<Executor>& m, defer_lock_t t) noexcept
|
||||
: impl(m.impl)
|
||||
{}
|
||||
unique_lock(ceph::async::SharedMutex<Executor>& m, try_to_lock_t t)
|
||||
: impl(m.impl), locked(impl->try_lock())
|
||||
{}
|
||||
unique_lock(ceph::async::SharedMutex<Executor>& m, adopt_lock_t t) noexcept
|
||||
: impl(m.impl), locked(true)
|
||||
{}
|
||||
~unique_lock() {
|
||||
if (impl && locked)
|
||||
impl->unlock();
|
||||
}
|
||||
|
||||
unique_lock(unique_lock&& other) noexcept
|
||||
: impl(std::move(other.impl)),
|
||||
locked(other.locked) {
|
||||
other.locked = false;
|
||||
}
|
||||
unique_lock& operator=(unique_lock&& other) noexcept {
|
||||
if (impl && locked) {
|
||||
impl->unlock();
|
||||
}
|
||||
impl = std::move(other.impl);
|
||||
locked = other.locked;
|
||||
other.locked = false;
|
||||
return *this;
|
||||
}
|
||||
void swap(unique_lock& other) noexcept {
|
||||
using std::swap;
|
||||
swap(impl, other.impl);
|
||||
swap(locked, other.locked);
|
||||
}
|
||||
|
||||
mutex_type mutex() const noexcept { return impl; }
|
||||
bool owns_lock() const noexcept { return impl && locked; }
|
||||
explicit operator bool() const noexcept { return impl && locked; }
|
||||
|
||||
mutex_type release() {
|
||||
auto result = std::move(impl);
|
||||
locked = false;
|
||||
return result;
|
||||
}
|
||||
|
||||
void lock() {
|
||||
if (!impl)
|
||||
throw system_error(make_error_code(errc::operation_not_permitted));
|
||||
if (locked)
|
||||
throw system_error(make_error_code(errc::resource_deadlock_would_occur));
|
||||
impl->lock();
|
||||
locked = true;
|
||||
}
|
||||
bool try_lock() {
|
||||
if (!impl)
|
||||
throw system_error(make_error_code(errc::operation_not_permitted));
|
||||
if (locked)
|
||||
throw system_error(make_error_code(errc::resource_deadlock_would_occur));
|
||||
return locked = impl->try_lock();
|
||||
}
|
||||
void unlock() {
|
||||
if (!impl || !locked)
|
||||
throw system_error(make_error_code(errc::operation_not_permitted));
|
||||
impl->unlock();
|
||||
locked = false;
|
||||
}
|
||||
private:
|
||||
mutex_type impl;
|
||||
bool locked{false};
|
||||
};
|
||||
|
||||
template <typename Executor>
|
||||
class shared_lock<ceph::async::SharedMutex<Executor>> {
|
||||
public:
|
||||
using mutex_type = boost::intrusive_ptr<ceph::async::detail::SharedMutexImpl>;
|
||||
|
||||
shared_lock() = default;
|
||||
explicit shared_lock(ceph::async::SharedMutex<Executor>& m)
|
||||
: impl(m.impl), locked(true)
|
||||
{
|
||||
impl->lock_shared();
|
||||
}
|
||||
shared_lock(ceph::async::SharedMutex<Executor>& m, defer_lock_t t) noexcept
|
||||
: impl(m.impl)
|
||||
{}
|
||||
shared_lock(ceph::async::SharedMutex<Executor>& m, try_to_lock_t t)
|
||||
: impl(m.impl), locked(impl->try_lock_shared())
|
||||
{}
|
||||
shared_lock(ceph::async::SharedMutex<Executor>& m, adopt_lock_t t) noexcept
|
||||
: impl(m.impl), locked(true)
|
||||
{}
|
||||
|
||||
~shared_lock() {
|
||||
if (impl && locked)
|
||||
impl->unlock_shared();
|
||||
}
|
||||
|
||||
shared_lock(shared_lock&& other) noexcept
|
||||
: impl(std::move(other.impl)),
|
||||
locked(other.locked) {
|
||||
other.locked = false;
|
||||
}
|
||||
shared_lock& operator=(shared_lock&& other) noexcept {
|
||||
if (impl && locked) {
|
||||
impl->unlock_shared();
|
||||
}
|
||||
impl = std::move(other.impl);
|
||||
locked = other.locked;
|
||||
other.locked = false;
|
||||
return *this;
|
||||
}
|
||||
void swap(shared_lock& other) noexcept {
|
||||
using std::swap;
|
||||
swap(impl, other.impl);
|
||||
swap(locked, other.locked);
|
||||
}
|
||||
|
||||
mutex_type mutex() const noexcept { return impl; }
|
||||
bool owns_lock() const noexcept { return impl && locked; }
|
||||
explicit operator bool() const noexcept { return impl && locked; }
|
||||
|
||||
mutex_type release() {
|
||||
auto result = std::move(impl);
|
||||
locked = false;
|
||||
return result;
|
||||
}
|
||||
|
||||
void lock() {
|
||||
if (!impl)
|
||||
throw system_error(make_error_code(errc::operation_not_permitted));
|
||||
if (locked)
|
||||
throw system_error(make_error_code(errc::resource_deadlock_would_occur));
|
||||
impl->lock_shared();
|
||||
locked = true;
|
||||
}
|
||||
bool try_lock() {
|
||||
if (!impl)
|
||||
throw system_error(make_error_code(errc::operation_not_permitted));
|
||||
if (locked)
|
||||
throw system_error(make_error_code(errc::resource_deadlock_would_occur));
|
||||
return locked = impl->try_lock_shared();
|
||||
}
|
||||
void unlock() {
|
||||
if (!impl || !locked)
|
||||
throw system_error(make_error_code(errc::operation_not_permitted));
|
||||
impl->unlock_shared();
|
||||
locked = false;
|
||||
}
|
||||
private:
|
||||
mutex_type impl;
|
||||
bool locked{false};
|
||||
};
|
||||
|
||||
} // namespace std
|
323
src/common/async/detail/shared_mutex.h
Normal file
323
src/common/async/detail/shared_mutex.h
Normal file
@ -0,0 +1,323 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
/*
|
||||
* Ceph - scalable distributed file system
|
||||
*
|
||||
* Copyright (C) 2018 Red Hat
|
||||
*
|
||||
* This is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License version 2.1, as published by the Free Software
|
||||
* Foundation. See file COPYING.
|
||||
*
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <shared_mutex> // for std::shared_lock
|
||||
|
||||
#include <boost/smart_ptr/intrusive_ref_counter.hpp>
|
||||
#include <boost/intrusive_ptr.hpp>
|
||||
#include <boost/intrusive/list.hpp>
|
||||
|
||||
#include "common/async/completion.h"
|
||||
|
||||
namespace ceph::async::detail {
|
||||
|
||||
struct LockRequest : public boost::intrusive::list_base_hook<> {
|
||||
virtual ~LockRequest() {}
|
||||
virtual void complete(boost::system::error_code ec) = 0;
|
||||
virtual void destroy() = 0;
|
||||
};
|
||||
|
||||
class SharedMutexImpl : public boost::intrusive_ref_counter<SharedMutexImpl> {
|
||||
public:
|
||||
~SharedMutexImpl();
|
||||
|
||||
template <typename Mutex, typename CompletionToken>
|
||||
auto async_lock(Mutex& mtx, CompletionToken&& token);
|
||||
void lock();
|
||||
void lock(boost::system::error_code& ec);
|
||||
bool try_lock();
|
||||
void unlock();
|
||||
template <typename Mutex, typename CompletionToken>
|
||||
auto async_lock_shared(Mutex& mtx, CompletionToken&& token);
|
||||
void lock_shared();
|
||||
void lock_shared(boost::system::error_code& ec);
|
||||
bool try_lock_shared();
|
||||
void unlock_shared();
|
||||
void cancel();
|
||||
|
||||
private:
|
||||
using RequestList = boost::intrusive::list<LockRequest>;
|
||||
|
||||
RequestList shared_queue; //< requests waiting on a shared lock
|
||||
RequestList exclusive_queue; //< requests waiting on an exclusive lock
|
||||
|
||||
/// lock state encodes the number of shared lockers, or 'max' for exclusive
|
||||
using LockState = uint16_t;
|
||||
static constexpr LockState Unlocked = 0;
|
||||
static constexpr LockState Exclusive = std::numeric_limits<LockState>::max();
|
||||
static constexpr LockState MaxShared = Exclusive - 1;
|
||||
LockState state = Unlocked; //< current lock state
|
||||
|
||||
std::mutex mutex; //< protects lock state and wait queues
|
||||
|
||||
void complete(RequestList&& requests, boost::system::error_code ec);
|
||||
};
|
||||
|
||||
// sync requests live on the stack and wait on a condition variable
|
||||
class SyncRequest : public LockRequest {
|
||||
std::condition_variable cond;
|
||||
std::optional<boost::system::error_code> ec;
|
||||
public:
|
||||
boost::system::error_code wait(std::unique_lock<std::mutex>& lock) {
|
||||
// return the error code once its been set
|
||||
cond.wait(lock, [this] { return ec; });
|
||||
return *ec;
|
||||
}
|
||||
void complete(boost::system::error_code ec) override {
|
||||
this->ec = ec;
|
||||
cond.notify_one();
|
||||
}
|
||||
void destroy() override {
|
||||
// nothing, SyncRequests live on the stack
|
||||
}
|
||||
};
|
||||
|
||||
// async requests use async::Completion to invoke a handler on its executor
|
||||
template <typename Mutex, template <typename> typename Lock>
|
||||
class AsyncRequest : public LockRequest {
|
||||
Mutex& mutex; //< mutex argument for lock guard
|
||||
public:
|
||||
explicit AsyncRequest(Mutex& mutex) : mutex(mutex) {}
|
||||
|
||||
using Signature = void(boost::system::error_code, Lock<Mutex>);
|
||||
using LockCompletion = Completion<Signature, AsBase<AsyncRequest>>;
|
||||
|
||||
void complete(boost::system::error_code ec) override {
|
||||
auto r = static_cast<LockCompletion*>(this);
|
||||
// pass ownership of ourselves to post(). on error, pass an empty lock
|
||||
post(std::unique_ptr<LockCompletion>{r}, ec,
|
||||
ec ? Lock{mutex, std::defer_lock} : Lock{mutex, std::adopt_lock});
|
||||
}
|
||||
void destroy() override {
|
||||
delete static_cast<LockCompletion*>(this);
|
||||
}
|
||||
};
|
||||
|
||||
inline SharedMutexImpl::~SharedMutexImpl()
|
||||
{
|
||||
assert(state == Unlocked);
|
||||
assert(shared_queue.empty());
|
||||
assert(exclusive_queue.empty());
|
||||
}
|
||||
|
||||
template <typename Mutex, typename CompletionToken>
|
||||
auto SharedMutexImpl::async_lock(Mutex& mtx, CompletionToken&& token)
|
||||
{
|
||||
using Request = AsyncRequest<Mutex, std::unique_lock>;
|
||||
using Signature = typename Request::Signature;
|
||||
boost::asio::async_completion<CompletionToken, Signature> init(token);
|
||||
auto& handler = init.completion_handler;
|
||||
auto ex1 = mtx.get_executor();
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
|
||||
boost::system::error_code ec;
|
||||
if (state == Unlocked) {
|
||||
state = Exclusive;
|
||||
|
||||
// post a successful completion
|
||||
auto ex2 = boost::asio::get_associated_executor(handler, ex1);
|
||||
auto alloc2 = boost::asio::get_associated_allocator(handler);
|
||||
auto b = bind_handler(std::move(handler), ec,
|
||||
std::unique_lock{mtx, std::adopt_lock});
|
||||
ex2.post(forward_handler(std::move(b)), alloc2);
|
||||
} else {
|
||||
// create a request and add it to the exclusive list
|
||||
using LockCompletion = typename Request::LockCompletion;
|
||||
auto request = LockCompletion::create(ex1, std::move(handler), mtx);
|
||||
exclusive_queue.push_back(*request.release());
|
||||
}
|
||||
}
|
||||
return init.result.get();
|
||||
}
|
||||
|
||||
inline void SharedMutexImpl::lock()
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
lock(ec);
|
||||
if (ec) {
|
||||
throw boost::system::system_error(ec);
|
||||
}
|
||||
}
|
||||
|
||||
void SharedMutexImpl::lock(boost::system::error_code& ec)
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
|
||||
if (state == Unlocked) {
|
||||
state = Exclusive;
|
||||
ec.clear();
|
||||
} else {
|
||||
SyncRequest request;
|
||||
exclusive_queue.push_back(request);
|
||||
ec = request.wait(lock);
|
||||
}
|
||||
}
|
||||
|
||||
inline bool SharedMutexImpl::try_lock()
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
|
||||
if (state == Unlocked) {
|
||||
state = Exclusive;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void SharedMutexImpl::unlock()
|
||||
{
|
||||
RequestList granted;
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
assert(state == Exclusive);
|
||||
|
||||
if (!exclusive_queue.empty()) {
|
||||
// grant next exclusive lock
|
||||
auto& request = exclusive_queue.front();
|
||||
exclusive_queue.pop_front();
|
||||
granted.push_back(request);
|
||||
} else {
|
||||
// grant shared locks, if any
|
||||
state = shared_queue.size();
|
||||
if (state > MaxShared) {
|
||||
state = MaxShared;
|
||||
auto end = std::next(shared_queue.begin(), MaxShared);
|
||||
granted.splice(granted.end(), shared_queue,
|
||||
shared_queue.begin(), end, MaxShared);
|
||||
} else {
|
||||
granted.splice(granted.end(), shared_queue);
|
||||
}
|
||||
}
|
||||
}
|
||||
complete(std::move(granted), boost::system::error_code{});
|
||||
}
|
||||
|
||||
template <typename Mutex, typename CompletionToken>
|
||||
auto SharedMutexImpl::async_lock_shared(Mutex& mtx, CompletionToken&& token)
|
||||
{
|
||||
using Request = AsyncRequest<Mutex, std::shared_lock>;
|
||||
using Signature = typename Request::Signature;
|
||||
boost::asio::async_completion<CompletionToken, Signature> init(token);
|
||||
auto& handler = init.completion_handler;
|
||||
auto ex1 = mtx.get_executor();
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
|
||||
boost::system::error_code ec;
|
||||
if (exclusive_queue.empty() && state < MaxShared) {
|
||||
state++;
|
||||
|
||||
auto ex2 = boost::asio::get_associated_executor(handler, ex1);
|
||||
auto alloc2 = boost::asio::get_associated_allocator(handler);
|
||||
auto b = bind_handler(std::move(handler), ec,
|
||||
std::shared_lock{mtx, std::adopt_lock});
|
||||
ex2.post(forward_handler(std::move(b)), alloc2);
|
||||
} else {
|
||||
using LockCompletion = typename Request::LockCompletion;
|
||||
auto request = LockCompletion::create(ex1, std::move(handler), mtx);
|
||||
shared_queue.push_back(*request.release());
|
||||
}
|
||||
}
|
||||
return init.result.get();
|
||||
}
|
||||
|
||||
inline void SharedMutexImpl::lock_shared()
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
lock_shared(ec);
|
||||
if (ec) {
|
||||
throw boost::system::system_error(ec);
|
||||
}
|
||||
}
|
||||
|
||||
void SharedMutexImpl::lock_shared(boost::system::error_code& ec)
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
|
||||
if (exclusive_queue.empty() && state < MaxShared) {
|
||||
state++;
|
||||
ec.clear();
|
||||
} else {
|
||||
SyncRequest request;
|
||||
shared_queue.push_back(request);
|
||||
ec = request.wait(lock);
|
||||
}
|
||||
}
|
||||
|
||||
inline bool SharedMutexImpl::try_lock_shared()
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
|
||||
if (exclusive_queue.empty() && state < MaxShared) {
|
||||
state++;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
inline void SharedMutexImpl::unlock_shared()
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
assert(state != Unlocked && state <= MaxShared);
|
||||
|
||||
if (state == 1 && !exclusive_queue.empty()) {
|
||||
// grant next exclusive lock
|
||||
state = Exclusive;
|
||||
auto& request = exclusive_queue.front();
|
||||
exclusive_queue.pop_front();
|
||||
request.complete(boost::system::error_code{});
|
||||
} else if (state == MaxShared && !shared_queue.empty() &&
|
||||
exclusive_queue.empty()) {
|
||||
// grant next shared lock
|
||||
auto& request = shared_queue.front();
|
||||
shared_queue.pop_front();
|
||||
request.complete(boost::system::error_code{});
|
||||
} else {
|
||||
state--;
|
||||
}
|
||||
}
|
||||
|
||||
inline void SharedMutexImpl::cancel()
|
||||
{
|
||||
RequestList canceled;
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
canceled.splice(canceled.end(), shared_queue);
|
||||
canceled.splice(canceled.end(), exclusive_queue);
|
||||
}
|
||||
complete(std::move(canceled), boost::asio::error::operation_aborted);
|
||||
}
|
||||
|
||||
void SharedMutexImpl::complete(RequestList&& requests,
|
||||
boost::system::error_code ec)
|
||||
{
|
||||
while (!requests.empty()) {
|
||||
auto& request = requests.front();
|
||||
requests.pop_front();
|
||||
try {
|
||||
request.complete(ec);
|
||||
} catch (...) {
|
||||
// clean up any remaining completions and rethrow
|
||||
requests.clear_and_dispose([] (LockRequest *r) { r->destroy(); });
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace ceph::async::detail
|
@ -12,16 +12,9 @@
|
||||
*
|
||||
*/
|
||||
|
||||
#ifndef CEPH_ASYNC_SHARED_MUTEX_H
|
||||
#define CEPH_ASYNC_SHARED_MUTEX_H
|
||||
#pragma once
|
||||
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <shared_mutex> // for std::shared_lock
|
||||
|
||||
#include <boost/intrusive/list.hpp>
|
||||
|
||||
#include "common/async/completion.h"
|
||||
#include "common/async/detail/shared_mutex.h"
|
||||
|
||||
namespace ceph::async {
|
||||
|
||||
@ -62,13 +55,13 @@ namespace ceph::async {
|
||||
template <typename Executor>
|
||||
class SharedMutex {
|
||||
public:
|
||||
SharedMutex(const Executor& ex1);
|
||||
explicit SharedMutex(const Executor& ex);
|
||||
|
||||
/// on destruction, all pending lock requests are canceled
|
||||
~SharedMutex();
|
||||
|
||||
using executor_type = Executor;
|
||||
executor_type get_executor() const noexcept { return ex1; }
|
||||
executor_type get_executor() const noexcept { return ex; }
|
||||
|
||||
/// initiate an asynchronous request for an exclusive lock. when the lock is
|
||||
/// granted, the completion handler is invoked with a successful error code
|
||||
@ -121,92 +114,26 @@ class SharedMutex {
|
||||
void cancel();
|
||||
|
||||
private:
|
||||
Executor ex1; //< default callback executor
|
||||
Executor ex; //< default callback executor
|
||||
boost::intrusive_ptr<detail::SharedMutexImpl> impl;
|
||||
|
||||
struct LockRequest : public boost::intrusive::list_base_hook<> {
|
||||
virtual ~LockRequest() {}
|
||||
virtual void complete(boost::system::error_code ec) = 0;
|
||||
virtual void destroy() = 0;
|
||||
};
|
||||
using RequestList = boost::intrusive::list<LockRequest>;
|
||||
|
||||
RequestList shared_queue; //< requests waiting on a shared lock
|
||||
RequestList exclusive_queue; //< requests waiting on an exclusive lock
|
||||
|
||||
/// lock state encodes the number of shared lockers, or 'max' for exclusive
|
||||
using LockState = uint16_t;
|
||||
static constexpr LockState Unlocked = 0;
|
||||
static constexpr LockState Exclusive = std::numeric_limits<LockState>::max();
|
||||
static constexpr LockState MaxShared = Exclusive - 1;
|
||||
LockState state = Unlocked; //< current lock state
|
||||
|
||||
std::mutex mutex; //< protects lock state and wait queues
|
||||
|
||||
// sync requests live on the stack and wait on a condition variable
|
||||
class SyncRequest;
|
||||
|
||||
// async requests use async::Completion to invoke a handler on its executor
|
||||
template <template <typename Mutex> typename Lock>
|
||||
class AsyncRequest;
|
||||
|
||||
using AsyncExclusiveRequest = AsyncRequest<std::unique_lock>;
|
||||
using AsyncSharedRequest = AsyncRequest<std::shared_lock>;
|
||||
|
||||
void complete(RequestList&& requests, boost::system::error_code ec);
|
||||
};
|
||||
|
||||
template <typename Executor>
|
||||
class SharedMutex<Executor>::SyncRequest : public LockRequest {
|
||||
std::condition_variable cond;
|
||||
std::optional<boost::system::error_code> ec;
|
||||
public:
|
||||
boost::system::error_code wait(std::unique_lock<std::mutex>& lock) {
|
||||
// return the error code once its been set
|
||||
cond.wait(lock, [this] { return ec; });
|
||||
return *ec;
|
||||
}
|
||||
void complete(boost::system::error_code ec) override {
|
||||
this->ec = ec;
|
||||
cond.notify_one();
|
||||
}
|
||||
void destroy() override {
|
||||
// nothing, SyncRequests live on the stack
|
||||
}
|
||||
};
|
||||
|
||||
template <typename Executor>
|
||||
template <template <typename Mutex> typename Lock>
|
||||
class SharedMutex<Executor>::AsyncRequest : public LockRequest {
|
||||
SharedMutex& mutex; //< mutex argument for lock guard
|
||||
public:
|
||||
AsyncRequest(SharedMutex& mutex) : mutex(mutex) {}
|
||||
|
||||
using Signature = void(boost::system::error_code, Lock<SharedMutex>);
|
||||
using LockCompletion = Completion<Signature, AsBase<AsyncRequest>>;
|
||||
|
||||
void complete(boost::system::error_code ec) override {
|
||||
auto r = static_cast<LockCompletion*>(this);
|
||||
// pass ownership of ourselves to post(). on error, pass an empty lock
|
||||
post(std::unique_ptr<LockCompletion>{r}, ec,
|
||||
ec ? Lock{mutex, std::defer_lock} : Lock{mutex, std::adopt_lock});
|
||||
}
|
||||
void destroy() override {
|
||||
delete static_cast<LockCompletion*>(this);
|
||||
}
|
||||
// allow lock guards to access impl
|
||||
friend class std::unique_lock<SharedMutex>;
|
||||
friend class std::shared_lock<SharedMutex>;
|
||||
};
|
||||
|
||||
|
||||
template <typename Executor>
|
||||
inline SharedMutex<Executor>::SharedMutex(const Executor& ex1)
|
||||
: ex1(ex1)
|
||||
SharedMutex<Executor>::SharedMutex(const Executor& ex)
|
||||
: ex(ex), impl(new detail::SharedMutexImpl)
|
||||
{
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
inline SharedMutex<Executor>::~SharedMutex()
|
||||
SharedMutex<Executor>::~SharedMutex()
|
||||
{
|
||||
try {
|
||||
cancel();
|
||||
impl->cancel();
|
||||
} catch (const std::exception&) {
|
||||
// swallow any exceptions, the destructor can't throw
|
||||
}
|
||||
@ -216,213 +143,70 @@ template <typename Executor>
|
||||
template <typename CompletionToken>
|
||||
auto SharedMutex<Executor>::async_lock(CompletionToken&& token)
|
||||
{
|
||||
using Signature = typename AsyncExclusiveRequest::Signature;
|
||||
boost::asio::async_completion<CompletionToken, Signature> init(token);
|
||||
auto& handler = init.completion_handler;
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
|
||||
if (state == Unlocked) {
|
||||
state = Exclusive;
|
||||
|
||||
// post the completion
|
||||
auto ex2 = boost::asio::get_associated_executor(handler, ex1);
|
||||
auto alloc2 = boost::asio::get_associated_allocator(handler);
|
||||
auto b = bind_handler(std::move(handler), boost::system::error_code{},
|
||||
std::unique_lock{*this, std::adopt_lock});
|
||||
ex2.post(forward_handler(std::move(b)), alloc2);
|
||||
} else {
|
||||
// create a request and add it to the exclusive list
|
||||
using LockCompletion = typename AsyncExclusiveRequest::LockCompletion;
|
||||
auto request = LockCompletion::create(ex1, std::move(handler), *this);
|
||||
exclusive_queue.push_back(*request.release());
|
||||
}
|
||||
}
|
||||
return init.result.get();
|
||||
return impl->async_lock(*this, std::forward<CompletionToken>(token));
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
inline void SharedMutex<Executor>::lock()
|
||||
void SharedMutex<Executor>::lock()
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
lock(ec);
|
||||
if (ec) {
|
||||
throw boost::system::system_error(ec);
|
||||
}
|
||||
impl->lock();
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
void SharedMutex<Executor>::lock(boost::system::error_code& ec)
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
|
||||
if (state == Unlocked) {
|
||||
state = Exclusive;
|
||||
ec.clear();
|
||||
} else {
|
||||
SyncRequest request;
|
||||
exclusive_queue.push_back(request);
|
||||
ec = request.wait(lock);
|
||||
}
|
||||
impl->lock(ec);
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
inline bool SharedMutex<Executor>::try_lock()
|
||||
bool SharedMutex<Executor>::try_lock()
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
|
||||
if (state == Unlocked) {
|
||||
state = Exclusive;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
return impl->try_lock();
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
void SharedMutex<Executor>::unlock()
|
||||
{
|
||||
RequestList granted;
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
assert(state == Exclusive);
|
||||
|
||||
if (!exclusive_queue.empty()) {
|
||||
// grant next exclusive lock
|
||||
auto& request = exclusive_queue.front();
|
||||
exclusive_queue.pop_front();
|
||||
granted.push_back(request);
|
||||
} else {
|
||||
// grant shared locks, if any
|
||||
state = shared_queue.size();
|
||||
if (state > MaxShared) {
|
||||
state = MaxShared;
|
||||
auto end = std::next(shared_queue.begin(), MaxShared);
|
||||
granted.splice(granted.end(), shared_queue,
|
||||
shared_queue.begin(), end, MaxShared);
|
||||
} else {
|
||||
granted.splice(granted.end(), shared_queue);
|
||||
}
|
||||
}
|
||||
}
|
||||
complete(std::move(granted), boost::system::error_code{});
|
||||
impl->unlock();
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
template <typename CompletionToken>
|
||||
auto SharedMutex<Executor>::async_lock_shared(CompletionToken&& token)
|
||||
{
|
||||
using Signature = typename AsyncSharedRequest::Signature;
|
||||
boost::asio::async_completion<CompletionToken, Signature> init(token);
|
||||
auto& handler = init.completion_handler;
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
|
||||
if (exclusive_queue.empty() && state < MaxShared) {
|
||||
state++;
|
||||
|
||||
auto ex2 = boost::asio::get_associated_executor(handler, ex1);
|
||||
auto alloc2 = boost::asio::get_associated_allocator(handler);
|
||||
auto b = bind_handler(std::move(handler), boost::system::error_code{},
|
||||
std::shared_lock{*this, std::adopt_lock});
|
||||
ex2.post(forward_handler(std::move(b)), alloc2);
|
||||
} else {
|
||||
using LockCompletion = typename AsyncSharedRequest::LockCompletion;
|
||||
auto request = LockCompletion::create(ex1, std::move(handler), *this);
|
||||
shared_queue.push_back(*request.release());
|
||||
}
|
||||
}
|
||||
return init.result.get();
|
||||
return impl->async_lock_shared(*this, std::forward<CompletionToken>(token));
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
inline void SharedMutex<Executor>::lock_shared()
|
||||
void SharedMutex<Executor>::lock_shared()
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
lock_shared(ec);
|
||||
if (ec) {
|
||||
throw boost::system::system_error(ec);
|
||||
}
|
||||
impl->lock_shared();
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
void SharedMutex<Executor>::lock_shared(boost::system::error_code& ec)
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
|
||||
if (exclusive_queue.empty() && state < MaxShared) {
|
||||
state++;
|
||||
ec.clear();
|
||||
} else {
|
||||
SyncRequest request;
|
||||
shared_queue.push_back(request);
|
||||
ec = request.wait(lock);
|
||||
}
|
||||
impl->lock_shared(ec);
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
inline bool SharedMutex<Executor>::try_lock_shared()
|
||||
bool SharedMutex<Executor>::try_lock_shared()
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
|
||||
if (exclusive_queue.empty() && state < MaxShared) {
|
||||
state++;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
return impl->try_lock_shared();
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
inline void SharedMutex<Executor>::unlock_shared()
|
||||
void SharedMutex<Executor>::unlock_shared()
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
assert(state != Unlocked && state <= MaxShared);
|
||||
|
||||
if (state == 1 && !exclusive_queue.empty()) {
|
||||
// grant next exclusive lock
|
||||
state = Exclusive;
|
||||
auto& request = exclusive_queue.front();
|
||||
exclusive_queue.pop_front();
|
||||
request.complete(boost::system::error_code{});
|
||||
} else if (state == MaxShared && !shared_queue.empty() &&
|
||||
exclusive_queue.empty()) {
|
||||
// grant next shared lock
|
||||
auto& request = shared_queue.front();
|
||||
shared_queue.pop_front();
|
||||
request.complete(boost::system::error_code{});
|
||||
} else {
|
||||
state--;
|
||||
}
|
||||
impl->unlock_shared();
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
inline void SharedMutex<Executor>::cancel()
|
||||
void SharedMutex<Executor>::cancel()
|
||||
{
|
||||
RequestList canceled;
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
canceled.splice(canceled.end(), shared_queue);
|
||||
canceled.splice(canceled.end(), exclusive_queue);
|
||||
}
|
||||
complete(std::move(canceled), boost::asio::error::operation_aborted);
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
void SharedMutex<Executor>::complete(RequestList&& requests,
|
||||
boost::system::error_code ec)
|
||||
{
|
||||
while (!requests.empty()) {
|
||||
auto& request = requests.front();
|
||||
requests.pop_front();
|
||||
try {
|
||||
request.complete(ec);
|
||||
} catch (...) {
|
||||
// clean up any remaining completions and rethrow
|
||||
requests.clear_and_dispose([] (LockRequest *r) { r->destroy(); });
|
||||
throw;
|
||||
}
|
||||
}
|
||||
impl->cancel();
|
||||
}
|
||||
|
||||
} // namespace ceph::async
|
||||
|
||||
#endif // CEPH_ASYNC_SHARED_MUTEX_H
|
||||
#include "common/async/detail/shared_lock.h"
|
||||
|
Loading…
Reference in New Issue
Block a user