mirror of
https://github.com/ceph/ceph
synced 2025-03-05 15:58:41 +00:00
common: add an async SharedMutex
Signed-off-by: Casey Bodley <cbodley@redhat.com>
This commit is contained in:
parent
e7d98d73e5
commit
36d3a8f984
428
src/common/async/shared_mutex.h
Normal file
428
src/common/async/shared_mutex.h
Normal file
@ -0,0 +1,428 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
/*
|
||||
* Ceph - scalable distributed file system
|
||||
*
|
||||
* Copyright (C) 2018 Red Hat
|
||||
*
|
||||
* This is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License version 2.1, as published by the Free Software
|
||||
* Foundation. See file COPYING.
|
||||
*
|
||||
*/
|
||||
|
||||
#ifndef CEPH_ASYNC_SHARED_MUTEX_H
|
||||
#define CEPH_ASYNC_SHARED_MUTEX_H
|
||||
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <shared_mutex> // for std::shared_lock
|
||||
|
||||
#include <boost/intrusive/list.hpp>
|
||||
|
||||
#include "common/async/completion.h"
|
||||
|
||||
namespace ceph::async {
|
||||
|
||||
/**
|
||||
* An asynchronous shared mutex for use with boost::asio.
|
||||
*
|
||||
* A shared mutex class with asynchronous lock operations that complete on a
|
||||
* boost::asio executor. The class also has synchronous interfaces that meet
|
||||
* most of the standard library's requirements for the SharedMutex concept,
|
||||
* which makes it compatible with lock_guard, unique_lock, and shared_lock.
|
||||
*
|
||||
* All lock requests can fail with operation_aborted on cancel() or destruction.
|
||||
* The non-error_code overloads of lock() and lock_shared() will throw this
|
||||
* error as an exception of type boost::system::system_error.
|
||||
*
|
||||
* Exclusive locks are prioritized over shared locks. Locks of the same type
|
||||
* are granted in fifo order. The implementation defines a limit on the number
|
||||
* of shared locks to 65534 at a time.
|
||||
*
|
||||
* Example use:
|
||||
*
|
||||
* boost::asio::io_context context;
|
||||
* SharedMutex mutex{context.get_executor()};
|
||||
*
|
||||
* mutex.async_lock([&] (boost::system::error_code ec, auto lock) {
|
||||
* if (!ec) {
|
||||
* // mutate shared state ...
|
||||
* }
|
||||
* });
|
||||
* mutex.async_lock_shared([&] (boost::system::error_code ec, auto lock) {
|
||||
* if (!ec) {
|
||||
* // read shared state ...
|
||||
* }
|
||||
* });
|
||||
*
|
||||
* context.run();
|
||||
*/
|
||||
template <typename Executor>
|
||||
class SharedMutex {
|
||||
public:
|
||||
SharedMutex(const Executor& ex1);
|
||||
|
||||
/// on destruction, all pending lock requests are canceled
|
||||
~SharedMutex();
|
||||
|
||||
using executor_type = Executor;
|
||||
executor_type get_executor() const noexcept { return ex1; }
|
||||
|
||||
/// initiate an asynchronous request for an exclusive lock. when the lock is
|
||||
/// granted, the completion handler is invoked with a successful error code
|
||||
/// and a std::unique_lock that owns this mutex.
|
||||
/// Signature = void(boost::system::error_code, std::unique_lock)
|
||||
template <typename CompletionToken>
|
||||
auto async_lock(CompletionToken&& token);
|
||||
|
||||
/// wait synchronously for an exclusive lock. if an error occurs before the
|
||||
/// lock is granted, that error is thrown as an exception
|
||||
void lock();
|
||||
|
||||
/// wait synchronously for an exclusive lock. if an error occurs before the
|
||||
/// lock is granted, that error is assigned to 'ec'
|
||||
void lock(boost::system::error_code& ec);
|
||||
|
||||
/// try to acquire an exclusive lock. if the lock is not immediately
|
||||
/// available, returns false
|
||||
bool try_lock();
|
||||
|
||||
/// releases an exclusive lock. not required to be called from the same thread
|
||||
/// that initiated the lock
|
||||
void unlock();
|
||||
|
||||
/// initiate an asynchronous request for a shared lock. when the lock is
|
||||
/// granted, the completion handler is invoked with a successful error code
|
||||
/// and a std::shared_lock that owns this mutex.
|
||||
/// Signature = void(boost::system::error_code, std::shared_lock)
|
||||
template <typename CompletionToken>
|
||||
auto async_lock_shared(CompletionToken&& token);
|
||||
|
||||
/// wait synchronously for a shared lock. if an error occurs before the
|
||||
/// lock is granted, that error is thrown as an exception
|
||||
void lock_shared();
|
||||
|
||||
/// wait synchronously for a shared lock. if an error occurs before the lock
|
||||
/// is granted, that error is assigned to 'ec'
|
||||
void lock_shared(boost::system::error_code& ec);
|
||||
|
||||
/// try to acquire a shared lock. if the lock is not immediately available,
|
||||
/// returns false
|
||||
bool try_lock_shared();
|
||||
|
||||
/// releases a shared lock. not required to be called from the same thread
|
||||
/// that initiated the lock
|
||||
void unlock_shared();
|
||||
|
||||
/// cancel any pending requests for exclusive or shared locks with an
|
||||
/// operation_aborted error
|
||||
void cancel();
|
||||
|
||||
private:
|
||||
Executor ex1; //< default callback executor
|
||||
|
||||
struct LockRequest : public boost::intrusive::list_base_hook<> {
|
||||
virtual ~LockRequest() {}
|
||||
virtual void complete(boost::system::error_code ec) = 0;
|
||||
virtual void destroy() = 0;
|
||||
};
|
||||
using RequestList = boost::intrusive::list<LockRequest>;
|
||||
|
||||
RequestList shared_queue; //< requests waiting on a shared lock
|
||||
RequestList exclusive_queue; //< requests waiting on an exclusive lock
|
||||
|
||||
/// lock state encodes the number of shared lockers, or 'max' for exclusive
|
||||
using LockState = uint16_t;
|
||||
static constexpr LockState Unlocked = 0;
|
||||
static constexpr LockState Exclusive = std::numeric_limits<LockState>::max();
|
||||
static constexpr LockState MaxShared = Exclusive - 1;
|
||||
LockState state = Unlocked; //< current lock state
|
||||
|
||||
std::mutex mutex; //< protects lock state and wait queues
|
||||
|
||||
// sync requests live on the stack and wait on a condition variable
|
||||
class SyncRequest;
|
||||
|
||||
// async requests use async::Completion to invoke a handler on its executor
|
||||
template <template <typename Mutex> typename Lock>
|
||||
class AsyncRequest;
|
||||
|
||||
using AsyncExclusiveRequest = AsyncRequest<std::unique_lock>;
|
||||
using AsyncSharedRequest = AsyncRequest<std::shared_lock>;
|
||||
|
||||
void complete(RequestList&& requests, boost::system::error_code ec);
|
||||
};
|
||||
|
||||
template <typename Executor>
|
||||
class SharedMutex<Executor>::SyncRequest : public LockRequest {
|
||||
std::condition_variable cond;
|
||||
std::optional<boost::system::error_code> ec;
|
||||
public:
|
||||
boost::system::error_code wait(std::unique_lock<std::mutex>& lock) {
|
||||
// return the error code once its been set
|
||||
cond.wait(lock, [this] { return ec; });
|
||||
return *ec;
|
||||
}
|
||||
void complete(boost::system::error_code ec) override {
|
||||
this->ec = ec;
|
||||
cond.notify_one();
|
||||
}
|
||||
void destroy() override {
|
||||
// nothing, SyncRequests live on the stack
|
||||
}
|
||||
};
|
||||
|
||||
template <typename Executor>
|
||||
template <template <typename Mutex> typename Lock>
|
||||
class SharedMutex<Executor>::AsyncRequest : public LockRequest {
|
||||
SharedMutex& mutex; //< mutex argument for lock guard
|
||||
public:
|
||||
AsyncRequest(SharedMutex& mutex) : mutex(mutex) {}
|
||||
|
||||
using Signature = void(boost::system::error_code, Lock<SharedMutex>);
|
||||
using LockCompletion = Completion<Signature, AsBase<AsyncRequest>>;
|
||||
|
||||
void complete(boost::system::error_code ec) override {
|
||||
auto r = static_cast<LockCompletion*>(this);
|
||||
// pass ownership of ourselves to post(). on error, pass an empty lock
|
||||
post(std::unique_ptr<LockCompletion>{r}, ec,
|
||||
ec ? Lock{mutex, std::defer_lock} : Lock{mutex, std::adopt_lock});
|
||||
}
|
||||
void destroy() override {
|
||||
delete static_cast<LockCompletion*>(this);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
template <typename Executor>
|
||||
inline SharedMutex<Executor>::SharedMutex(const Executor& ex1)
|
||||
: ex1(ex1)
|
||||
{
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
inline SharedMutex<Executor>::~SharedMutex()
|
||||
{
|
||||
try {
|
||||
cancel();
|
||||
} catch (const std::exception&) {
|
||||
// swallow any exceptions, the destructor can't throw
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
template <typename CompletionToken>
|
||||
auto SharedMutex<Executor>::async_lock(CompletionToken&& token)
|
||||
{
|
||||
using Signature = typename AsyncExclusiveRequest::Signature;
|
||||
boost::asio::async_completion<CompletionToken, Signature> init(token);
|
||||
auto& handler = init.completion_handler;
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
|
||||
if (state == Unlocked) {
|
||||
state = Exclusive;
|
||||
|
||||
// post the completion
|
||||
auto ex2 = boost::asio::get_associated_executor(handler, ex1);
|
||||
auto alloc2 = boost::asio::get_associated_allocator(handler);
|
||||
auto b = bind_handler(std::move(handler), boost::system::error_code{},
|
||||
std::unique_lock{*this, std::adopt_lock});
|
||||
ex2.post(forward_handler(std::move(b)), alloc2);
|
||||
} else {
|
||||
// create a request and add it to the exclusive list
|
||||
using LockCompletion = typename AsyncExclusiveRequest::LockCompletion;
|
||||
auto request = LockCompletion::create(ex1, std::move(handler), *this);
|
||||
exclusive_queue.push_back(*request.release());
|
||||
}
|
||||
}
|
||||
return init.result.get();
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
inline void SharedMutex<Executor>::lock()
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
lock(ec);
|
||||
if (ec) {
|
||||
throw boost::system::system_error(ec);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
void SharedMutex<Executor>::lock(boost::system::error_code& ec)
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
|
||||
if (state == Unlocked) {
|
||||
state = Exclusive;
|
||||
ec.clear();
|
||||
} else {
|
||||
SyncRequest request;
|
||||
exclusive_queue.push_back(request);
|
||||
ec = request.wait(lock);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
inline bool SharedMutex<Executor>::try_lock()
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
|
||||
if (state == Unlocked) {
|
||||
state = Exclusive;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
void SharedMutex<Executor>::unlock()
|
||||
{
|
||||
RequestList granted;
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
assert(state == Exclusive);
|
||||
|
||||
if (!exclusive_queue.empty()) {
|
||||
// grant next exclusive lock
|
||||
auto& request = exclusive_queue.front();
|
||||
exclusive_queue.pop_front();
|
||||
granted.push_back(request);
|
||||
} else {
|
||||
// grant shared locks, if any
|
||||
state = shared_queue.size();
|
||||
if (state > MaxShared) {
|
||||
state = MaxShared;
|
||||
auto end = std::next(shared_queue.begin(), MaxShared);
|
||||
granted.splice(granted.end(), shared_queue,
|
||||
shared_queue.begin(), end, MaxShared);
|
||||
} else {
|
||||
granted.splice(granted.end(), shared_queue);
|
||||
}
|
||||
}
|
||||
}
|
||||
complete(std::move(granted), boost::system::error_code{});
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
template <typename CompletionToken>
|
||||
auto SharedMutex<Executor>::async_lock_shared(CompletionToken&& token)
|
||||
{
|
||||
using Signature = typename AsyncSharedRequest::Signature;
|
||||
boost::asio::async_completion<CompletionToken, Signature> init(token);
|
||||
auto& handler = init.completion_handler;
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
|
||||
if (exclusive_queue.empty() && state < MaxShared) {
|
||||
state++;
|
||||
|
||||
auto ex2 = boost::asio::get_associated_executor(handler, ex1);
|
||||
auto alloc2 = boost::asio::get_associated_allocator(handler);
|
||||
auto b = bind_handler(std::move(handler), boost::system::error_code{},
|
||||
std::shared_lock{*this, std::adopt_lock});
|
||||
ex2.post(forward_handler(std::move(b)), alloc2);
|
||||
} else {
|
||||
using LockCompletion = typename AsyncSharedRequest::LockCompletion;
|
||||
auto request = LockCompletion::create(ex1, std::move(handler), *this);
|
||||
shared_queue.push_back(*request.release());
|
||||
}
|
||||
}
|
||||
return init.result.get();
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
inline void SharedMutex<Executor>::lock_shared()
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
lock_shared(ec);
|
||||
if (ec) {
|
||||
throw boost::system::system_error(ec);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
void SharedMutex<Executor>::lock_shared(boost::system::error_code& ec)
|
||||
{
|
||||
std::unique_lock lock{mutex};
|
||||
|
||||
if (exclusive_queue.empty() && state < MaxShared) {
|
||||
state++;
|
||||
ec.clear();
|
||||
} else {
|
||||
SyncRequest request;
|
||||
shared_queue.push_back(request);
|
||||
ec = request.wait(lock);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
inline bool SharedMutex<Executor>::try_lock_shared()
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
|
||||
if (exclusive_queue.empty() && state < MaxShared) {
|
||||
state++;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
inline void SharedMutex<Executor>::unlock_shared()
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
assert(state != Unlocked && state <= MaxShared);
|
||||
|
||||
if (state == 1 && !exclusive_queue.empty()) {
|
||||
// grant next exclusive lock
|
||||
state = Exclusive;
|
||||
auto& request = exclusive_queue.front();
|
||||
exclusive_queue.pop_front();
|
||||
request.complete(boost::system::error_code{});
|
||||
} else if (state == MaxShared && !shared_queue.empty() &&
|
||||
exclusive_queue.empty()) {
|
||||
// grant next shared lock
|
||||
auto& request = shared_queue.front();
|
||||
shared_queue.pop_front();
|
||||
request.complete(boost::system::error_code{});
|
||||
} else {
|
||||
state--;
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
inline void SharedMutex<Executor>::cancel()
|
||||
{
|
||||
RequestList canceled;
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
canceled.splice(canceled.end(), shared_queue);
|
||||
canceled.splice(canceled.end(), exclusive_queue);
|
||||
}
|
||||
complete(std::move(canceled), boost::asio::error::operation_aborted);
|
||||
}
|
||||
|
||||
template <typename Executor>
|
||||
void SharedMutex<Executor>::complete(RequestList&& requests,
|
||||
boost::system::error_code ec)
|
||||
{
|
||||
while (!requests.empty()) {
|
||||
auto& request = requests.front();
|
||||
requests.pop_front();
|
||||
try {
|
||||
request.complete(ec);
|
||||
} catch (...) {
|
||||
// clean up any remaining completions and rethrow
|
||||
requests.clear_and_dispose([] (LockRequest *r) { r->destroy(); });
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace ceph::async
|
||||
|
||||
#endif // CEPH_ASYNC_SHARED_MUTEX_H
|
@ -294,3 +294,7 @@ add_ceph_unittest(unittest_hobject)
|
||||
add_executable(unittest_async_completion test_async_completion.cc)
|
||||
add_ceph_unittest(unittest_async_completion)
|
||||
target_link_libraries(unittest_async_completion Boost::system)
|
||||
|
||||
add_executable(unittest_async_shared_mutex test_async_shared_mutex.cc)
|
||||
add_ceph_unittest(unittest_async_shared_mutex)
|
||||
target_link_libraries(unittest_async_shared_mutex Boost::system)
|
||||
|
428
src/test/common/test_async_shared_mutex.cc
Normal file
428
src/test/common/test_async_shared_mutex.cc
Normal file
@ -0,0 +1,428 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
/*
|
||||
* Ceph - scalable distributed file system
|
||||
*
|
||||
* Copyright (C) 2018 Red Hat
|
||||
*
|
||||
* This is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License version 2.1, as published by the Free Software
|
||||
* Foundation. See file COPYING.
|
||||
*
|
||||
*/
|
||||
|
||||
#include "common/async/shared_mutex.h"
|
||||
#include <optional>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
namespace ceph::async {
|
||||
|
||||
using executor_type = boost::asio::io_context::executor_type;
|
||||
using unique_lock = std::unique_lock<SharedMutex<executor_type>>;
|
||||
using shared_lock = std::shared_lock<SharedMutex<executor_type>>;
|
||||
|
||||
// return a lambda that captures its error code and lock
|
||||
auto capture(std::optional<boost::system::error_code>& ec, unique_lock& lock)
|
||||
{
|
||||
return [&] (boost::system::error_code e, unique_lock l) {
|
||||
ec = e;
|
||||
lock = std::move(l);
|
||||
};
|
||||
}
|
||||
auto capture(std::optional<boost::system::error_code>& ec, shared_lock& lock)
|
||||
{
|
||||
return [&] (boost::system::error_code e, shared_lock l) {
|
||||
ec = e;
|
||||
lock = std::move(l);
|
||||
};
|
||||
}
|
||||
|
||||
TEST(SharedMutex, async_exclusive)
|
||||
{
|
||||
boost::asio::io_context context;
|
||||
SharedMutex mutex(context.get_executor());
|
||||
|
||||
std::optional<boost::system::error_code> ec1, ec2, ec3;
|
||||
unique_lock lock1, lock2, lock3;
|
||||
|
||||
// request three exclusive locks
|
||||
mutex.async_lock(capture(ec1, lock1));
|
||||
mutex.async_lock(capture(ec2, lock2));
|
||||
mutex.async_lock(capture(ec3, lock3));
|
||||
|
||||
EXPECT_FALSE(ec1); // no callbacks until poll()
|
||||
EXPECT_FALSE(ec2);
|
||||
EXPECT_FALSE(ec3);
|
||||
|
||||
context.poll();
|
||||
EXPECT_FALSE(context.stopped()); // second lock still pending
|
||||
|
||||
ASSERT_TRUE(ec1);
|
||||
EXPECT_EQ(boost::system::errc::success, *ec1);
|
||||
ASSERT_TRUE(lock1);
|
||||
EXPECT_FALSE(ec2);
|
||||
|
||||
lock1.unlock();
|
||||
|
||||
EXPECT_FALSE(ec2);
|
||||
|
||||
context.poll();
|
||||
EXPECT_FALSE(context.stopped());
|
||||
|
||||
ASSERT_TRUE(ec2);
|
||||
EXPECT_EQ(boost::system::errc::success, *ec2);
|
||||
ASSERT_TRUE(lock2);
|
||||
EXPECT_FALSE(ec3);
|
||||
|
||||
lock2.unlock();
|
||||
|
||||
EXPECT_FALSE(ec3);
|
||||
|
||||
context.poll();
|
||||
EXPECT_TRUE(context.stopped());
|
||||
|
||||
ASSERT_TRUE(ec3);
|
||||
EXPECT_EQ(boost::system::errc::success, *ec3);
|
||||
ASSERT_TRUE(lock3);
|
||||
}
|
||||
|
||||
TEST(SharedMutex, async_shared)
|
||||
{
|
||||
boost::asio::io_context context;
|
||||
SharedMutex mutex(context.get_executor());
|
||||
|
||||
std::optional<boost::system::error_code> ec1, ec2;
|
||||
shared_lock lock1, lock2;
|
||||
|
||||
// request two shared locks
|
||||
mutex.async_lock_shared(capture(ec1, lock1));
|
||||
mutex.async_lock_shared(capture(ec2, lock2));
|
||||
|
||||
EXPECT_FALSE(ec1); // no callbacks until poll()
|
||||
EXPECT_FALSE(ec2);
|
||||
|
||||
context.poll();
|
||||
EXPECT_TRUE(context.stopped());
|
||||
|
||||
ASSERT_TRUE(ec1);
|
||||
EXPECT_EQ(boost::system::errc::success, *ec1);
|
||||
ASSERT_TRUE(lock1);
|
||||
ASSERT_TRUE(ec2);
|
||||
EXPECT_EQ(boost::system::errc::success, *ec2);
|
||||
ASSERT_TRUE(lock2);
|
||||
}
|
||||
|
||||
TEST(SharedMutex, async_exclusive_while_shared)
|
||||
{
|
||||
boost::asio::io_context context;
|
||||
SharedMutex mutex(context.get_executor());
|
||||
|
||||
std::optional<boost::system::error_code> ec1, ec2;
|
||||
shared_lock lock1;
|
||||
unique_lock lock2;
|
||||
|
||||
// request a shared and exclusive lock
|
||||
mutex.async_lock_shared(capture(ec1, lock1));
|
||||
mutex.async_lock(capture(ec2, lock2));
|
||||
|
||||
EXPECT_FALSE(ec1); // no callbacks until poll()
|
||||
EXPECT_FALSE(ec2);
|
||||
|
||||
context.poll();
|
||||
EXPECT_FALSE(context.stopped()); // second lock still pending
|
||||
|
||||
ASSERT_TRUE(ec1);
|
||||
EXPECT_EQ(boost::system::errc::success, *ec1);
|
||||
ASSERT_TRUE(lock1);
|
||||
EXPECT_FALSE(ec2);
|
||||
|
||||
lock1.unlock();
|
||||
|
||||
EXPECT_FALSE(ec2);
|
||||
|
||||
context.poll();
|
||||
EXPECT_TRUE(context.stopped());
|
||||
|
||||
ASSERT_TRUE(ec2);
|
||||
EXPECT_EQ(boost::system::errc::success, *ec2);
|
||||
ASSERT_TRUE(lock2);
|
||||
}
|
||||
|
||||
TEST(SharedMutex, async_shared_while_exclusive)
|
||||
{
|
||||
boost::asio::io_context context;
|
||||
SharedMutex mutex(context.get_executor());
|
||||
|
||||
std::optional<boost::system::error_code> ec1, ec2;
|
||||
unique_lock lock1;
|
||||
shared_lock lock2;
|
||||
|
||||
// request an exclusive and shared lock
|
||||
mutex.async_lock(capture(ec1, lock1));
|
||||
mutex.async_lock_shared(capture(ec2, lock2));
|
||||
|
||||
EXPECT_FALSE(ec1); // no callbacks until poll()
|
||||
EXPECT_FALSE(ec2);
|
||||
|
||||
context.poll();
|
||||
EXPECT_FALSE(context.stopped()); // second lock still pending
|
||||
|
||||
ASSERT_TRUE(ec1);
|
||||
EXPECT_EQ(boost::system::errc::success, *ec1);
|
||||
ASSERT_TRUE(lock1);
|
||||
EXPECT_FALSE(ec2);
|
||||
|
||||
lock1.unlock();
|
||||
|
||||
EXPECT_FALSE(ec2);
|
||||
|
||||
context.poll();
|
||||
EXPECT_TRUE(context.stopped());
|
||||
|
||||
ASSERT_TRUE(ec2);
|
||||
EXPECT_EQ(boost::system::errc::success, *ec2);
|
||||
ASSERT_TRUE(lock2);
|
||||
}
|
||||
|
||||
TEST(SharedMutex, async_prioritize_exclusive)
|
||||
{
|
||||
boost::asio::io_context context;
|
||||
SharedMutex mutex(context.get_executor());
|
||||
|
||||
std::optional<boost::system::error_code> ec1, ec2, ec3;
|
||||
shared_lock lock1, lock3;
|
||||
unique_lock lock2;
|
||||
|
||||
// acquire a shared lock, then request an exclusive and another shared lock
|
||||
mutex.async_lock_shared(capture(ec1, lock1));
|
||||
mutex.async_lock(capture(ec2, lock2));
|
||||
mutex.async_lock_shared(capture(ec3, lock3));
|
||||
|
||||
EXPECT_FALSE(ec1); // no callbacks until poll()
|
||||
EXPECT_FALSE(ec2);
|
||||
EXPECT_FALSE(ec3);
|
||||
|
||||
context.poll();
|
||||
EXPECT_FALSE(context.stopped());
|
||||
|
||||
ASSERT_TRUE(ec1);
|
||||
EXPECT_EQ(boost::system::errc::success, *ec1);
|
||||
ASSERT_TRUE(lock1);
|
||||
EXPECT_FALSE(ec2);
|
||||
// exclusive waiter blocks the second shared lock
|
||||
EXPECT_FALSE(ec3);
|
||||
|
||||
lock1.unlock();
|
||||
|
||||
EXPECT_FALSE(ec2);
|
||||
EXPECT_FALSE(ec3);
|
||||
|
||||
context.poll();
|
||||
EXPECT_FALSE(context.stopped());
|
||||
|
||||
ASSERT_TRUE(ec2);
|
||||
EXPECT_EQ(boost::system::errc::success, *ec2);
|
||||
ASSERT_TRUE(lock2);
|
||||
EXPECT_FALSE(ec3);
|
||||
}
|
||||
|
||||
TEST(SharedMutex, async_cancel)
|
||||
{
|
||||
boost::asio::io_context context;
|
||||
SharedMutex mutex(context.get_executor());
|
||||
|
||||
std::optional<boost::system::error_code> ec1, ec2, ec3, ec4;
|
||||
unique_lock lock1, lock2;
|
||||
shared_lock lock3, lock4;
|
||||
|
||||
// request 2 exclusive and shared locks
|
||||
mutex.async_lock(capture(ec1, lock1));
|
||||
mutex.async_lock(capture(ec2, lock2));
|
||||
mutex.async_lock_shared(capture(ec3, lock3));
|
||||
mutex.async_lock_shared(capture(ec4, lock4));
|
||||
|
||||
EXPECT_FALSE(ec1); // no callbacks until poll()
|
||||
EXPECT_FALSE(ec2);
|
||||
EXPECT_FALSE(ec3);
|
||||
EXPECT_FALSE(ec4);
|
||||
|
||||
context.poll();
|
||||
EXPECT_FALSE(context.stopped());
|
||||
|
||||
ASSERT_TRUE(ec1);
|
||||
EXPECT_EQ(boost::system::errc::success, *ec1);
|
||||
ASSERT_TRUE(lock1);
|
||||
EXPECT_FALSE(ec2);
|
||||
EXPECT_FALSE(ec3);
|
||||
EXPECT_FALSE(ec4);
|
||||
|
||||
mutex.cancel();
|
||||
|
||||
EXPECT_FALSE(ec2);
|
||||
EXPECT_FALSE(ec3);
|
||||
EXPECT_FALSE(ec4);
|
||||
|
||||
context.poll();
|
||||
EXPECT_TRUE(context.stopped());
|
||||
|
||||
ASSERT_TRUE(ec2);
|
||||
EXPECT_EQ(boost::asio::error::operation_aborted, *ec2);
|
||||
EXPECT_FALSE(lock2);
|
||||
ASSERT_TRUE(ec3);
|
||||
EXPECT_EQ(boost::asio::error::operation_aborted, *ec3);
|
||||
EXPECT_FALSE(lock3);
|
||||
ASSERT_TRUE(ec4);
|
||||
EXPECT_EQ(boost::asio::error::operation_aborted, *ec4);
|
||||
EXPECT_FALSE(lock4);
|
||||
}
|
||||
|
||||
TEST(SharedMutex, async_destruct)
|
||||
{
|
||||
boost::asio::io_context context;
|
||||
|
||||
std::optional<boost::system::error_code> ec1, ec2, ec3, ec4;
|
||||
unique_lock lock1, lock2;
|
||||
shared_lock lock3, lock4;
|
||||
|
||||
{
|
||||
SharedMutex mutex(context.get_executor());
|
||||
|
||||
// request 2 exclusive and shared locks
|
||||
mutex.async_lock(capture(ec1, lock1));
|
||||
mutex.async_lock(capture(ec2, lock2));
|
||||
mutex.async_lock_shared(capture(ec3, lock3));
|
||||
mutex.async_lock_shared(capture(ec4, lock4));
|
||||
}
|
||||
|
||||
EXPECT_FALSE(ec1); // no callbacks until poll()
|
||||
EXPECT_FALSE(ec2);
|
||||
EXPECT_FALSE(ec3);
|
||||
EXPECT_FALSE(ec4);
|
||||
|
||||
context.poll();
|
||||
EXPECT_TRUE(context.stopped());
|
||||
|
||||
ASSERT_TRUE(ec1);
|
||||
EXPECT_EQ(boost::system::errc::success, *ec1);
|
||||
ASSERT_TRUE(lock1);
|
||||
ASSERT_TRUE(ec2);
|
||||
EXPECT_EQ(boost::asio::error::operation_aborted, *ec2);
|
||||
EXPECT_FALSE(lock2);
|
||||
ASSERT_TRUE(ec3);
|
||||
EXPECT_EQ(boost::asio::error::operation_aborted, *ec3);
|
||||
EXPECT_FALSE(lock3);
|
||||
ASSERT_TRUE(ec4);
|
||||
EXPECT_EQ(boost::asio::error::operation_aborted, *ec4);
|
||||
EXPECT_FALSE(lock4);
|
||||
}
|
||||
|
||||
// return a capture() lambda that's bound to the given executor
|
||||
template <typename Executor, typename ...Args>
|
||||
auto capture_ex(const Executor& ex, Args&& ...args)
|
||||
{
|
||||
return boost::asio::bind_executor(ex, capture(std::forward<Args>(args)...));
|
||||
}
|
||||
|
||||
TEST(SharedMutex, cross_executor)
|
||||
{
|
||||
boost::asio::io_context mutex_context;
|
||||
SharedMutex mutex(mutex_context.get_executor());
|
||||
|
||||
boost::asio::io_context callback_context;
|
||||
auto ex2 = callback_context.get_executor();
|
||||
|
||||
std::optional<boost::system::error_code> ec1, ec2;
|
||||
unique_lock lock1, lock2;
|
||||
|
||||
// request two exclusive locks
|
||||
mutex.async_lock(capture_ex(ex2, ec1, lock1));
|
||||
mutex.async_lock(capture_ex(ex2, ec2, lock2));
|
||||
|
||||
EXPECT_FALSE(ec1);
|
||||
EXPECT_FALSE(ec2);
|
||||
|
||||
mutex_context.poll();
|
||||
EXPECT_FALSE(mutex_context.stopped()); // maintains work on both executors
|
||||
|
||||
EXPECT_FALSE(ec1); // no callbacks until poll() on callback_context
|
||||
EXPECT_FALSE(ec2);
|
||||
|
||||
callback_context.poll();
|
||||
EXPECT_FALSE(callback_context.stopped()); // second lock still pending
|
||||
|
||||
ASSERT_TRUE(ec1);
|
||||
EXPECT_EQ(boost::system::errc::success, *ec1);
|
||||
ASSERT_TRUE(lock1);
|
||||
EXPECT_FALSE(ec2);
|
||||
|
||||
lock1.unlock();
|
||||
|
||||
mutex_context.poll();
|
||||
EXPECT_TRUE(mutex_context.stopped());
|
||||
|
||||
EXPECT_FALSE(ec2);
|
||||
|
||||
callback_context.poll();
|
||||
EXPECT_TRUE(callback_context.stopped());
|
||||
|
||||
ASSERT_TRUE(ec2);
|
||||
EXPECT_EQ(boost::system::errc::success, *ec2);
|
||||
ASSERT_TRUE(lock2);
|
||||
}
|
||||
|
||||
TEST(SharedMutex, try_exclusive)
|
||||
{
|
||||
boost::asio::io_context context;
|
||||
SharedMutex mutex(context.get_executor());
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
ASSERT_FALSE(mutex.try_lock()); // fail during exclusive
|
||||
}
|
||||
{
|
||||
std::shared_lock lock{mutex};
|
||||
ASSERT_FALSE(mutex.try_lock()); // fail during shared
|
||||
}
|
||||
ASSERT_TRUE(mutex.try_lock());
|
||||
mutex.unlock();
|
||||
}
|
||||
|
||||
TEST(SharedMutex, try_shared)
|
||||
{
|
||||
boost::asio::io_context context;
|
||||
SharedMutex mutex(context.get_executor());
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
ASSERT_FALSE(mutex.try_lock_shared()); // fail during exclusive
|
||||
}
|
||||
{
|
||||
std::shared_lock lock{mutex};
|
||||
ASSERT_TRUE(mutex.try_lock_shared()); // succeed during shared
|
||||
mutex.unlock_shared();
|
||||
}
|
||||
ASSERT_TRUE(mutex.try_lock_shared());
|
||||
mutex.unlock_shared();
|
||||
}
|
||||
|
||||
TEST(SharedMutex, cancel)
|
||||
{
|
||||
boost::asio::io_context context;
|
||||
SharedMutex mutex(context.get_executor());
|
||||
|
||||
std::lock_guard l{mutex}; // exclusive lock blocks others
|
||||
|
||||
// make synchronous lock calls in other threads
|
||||
auto f1 = std::async(std::launch::async, [&] { mutex.lock(); });
|
||||
auto f2 = std::async(std::launch::async, [&] { mutex.lock_shared(); });
|
||||
|
||||
// this will race with spawned threads. just keep canceling until the
|
||||
// futures are ready
|
||||
const auto t = std::chrono::milliseconds(1);
|
||||
do { mutex.cancel(); } while (f1.wait_for(t) != std::future_status::ready);
|
||||
do { mutex.cancel(); } while (f2.wait_for(t) != std::future_status::ready);
|
||||
|
||||
EXPECT_THROW(f1.get(), boost::system::system_error);
|
||||
EXPECT_THROW(f2.get(), boost::system::system_error);
|
||||
}
|
||||
|
||||
} // namespace ceph::async
|
Loading…
Reference in New Issue
Block a user