librbd: serialize maintenance operations by type

Signed-off-by: Mykola Golub <mgolub@suse.com>
This commit is contained in:
Mykola Golub 2020-11-08 09:06:41 +00:00
parent 01ff153054
commit 2cdbfa868c
5 changed files with 560 additions and 411 deletions

View File

@ -6,7 +6,6 @@
#include "librbd/ImageCtx.h"
#include "librbd/ImageState.h"
#include "librbd/internal.h"
#include "librbd/Operations.h"
#include "librbd/TaskFinisher.h"
#include "librbd/Types.h"
#include "librbd/Utils.h"
@ -913,6 +912,57 @@ void ImageWatcher<I>::cancel_quiesce_requests() {
}
}
template <typename I>
bool ImageWatcher<I>::handle_operation_request(
const AsyncRequestId& async_request_id,
exclusive_lock::OperationRequestType request_type, Operation operation,
std::function<void(ProgressContext &prog_ctx, Context*)> execute,
C_NotifyAck *ack_ctx) {
std::shared_lock owner_locker{m_image_ctx.owner_lock};
if (m_image_ctx.exclusive_lock != nullptr) {
int r = 0;
if (m_image_ctx.exclusive_lock->accept_request(request_type, &r)) {
bool new_request;
Context *ctx;
ProgressContext *prog_ctx;
bool complete;
if (async_request_id) {
r = prepare_async_request(async_request_id, &new_request, &ctx,
&prog_ctx);
encode(ResponseMessage(r), ack_ctx->out);
complete = true;
} else {
new_request = true;
ctx = new C_ResponseMessage(ack_ctx);
prog_ctx = &m_no_op_prog_ctx;
complete = false;
}
if (r == 0 && new_request) {
ctx = new LambdaContext(
[this, operation, ctx](int r) {
m_image_ctx.operations->finish_op(operation, r);
ctx->complete(r);
});
ctx = new LambdaContext(
[this, execute, prog_ctx, ctx](int r) {
if (r < 0) {
ctx->complete(r);
return;
}
std::shared_lock l{m_image_ctx.owner_lock};
execute(*prog_ctx, ctx);
});
m_image_ctx.operations->start_op(operation, ctx);
}
return complete;
} else if (r < 0) {
encode(ResponseMessage(r), ack_ctx->out);
}
}
return true;
}
template <typename I>
bool ImageWatcher<I>::handle_payload(const HeaderUpdatePayload &payload,
C_NotifyAck *ack_ctx) {
@ -1055,416 +1105,232 @@ bool ImageWatcher<I>::handle_payload(const AsyncCompletePayload &payload,
template <typename I>
bool ImageWatcher<I>::handle_payload(const FlattenPayload &payload,
C_NotifyAck *ack_ctx) {
ldout(m_image_ctx.cct, 10) << this << " remote flatten request: "
<< payload.async_request_id << dendl;
std::shared_lock l{m_image_ctx.owner_lock};
if (m_image_ctx.exclusive_lock != nullptr) {
int r;
if (m_image_ctx.exclusive_lock->accept_request(
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
bool new_request;
Context *ctx;
ProgressContext *prog_ctx;
r = prepare_async_request(payload.async_request_id, &new_request,
&ctx, &prog_ctx);
if (r == 0 && new_request) {
ldout(m_image_ctx.cct, 10) << this << " remote flatten request: "
<< payload.async_request_id << dendl;
m_image_ctx.operations->execute_flatten(*prog_ctx, ctx);
}
encode(ResponseMessage(r), ack_ctx->out);
} else if (r < 0) {
encode(ResponseMessage(r), ack_ctx->out);
}
}
return true;
return handle_operation_request(
payload.async_request_id, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
OPERATION_FLATTEN, std::bind(&Operations<I>::execute_flatten,
m_image_ctx.operations,
std::placeholders::_1,
std::placeholders::_2),
ack_ctx);
}
template <typename I>
bool ImageWatcher<I>::handle_payload(const ResizePayload &payload,
C_NotifyAck *ack_ctx) {
std::shared_lock l{m_image_ctx.owner_lock};
if (m_image_ctx.exclusive_lock != nullptr) {
int r;
if (m_image_ctx.exclusive_lock->accept_request(
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
bool new_request;
Context *ctx;
ProgressContext *prog_ctx;
r = prepare_async_request(payload.async_request_id, &new_request,
&ctx, &prog_ctx);
if (r == 0 && new_request) {
ldout(m_image_ctx.cct, 10) << this << " remote resize request: "
<< payload.async_request_id << " "
<< payload.size << " "
<< payload.allow_shrink << dendl;
m_image_ctx.operations->execute_resize(payload.size, payload.allow_shrink, *prog_ctx, ctx, 0);
}
ldout(m_image_ctx.cct, 10) << this << " remote resize request: "
<< payload.async_request_id << " "
<< payload.size << " "
<< payload.allow_shrink << dendl;
encode(ResponseMessage(r), ack_ctx->out);
} else if (r < 0) {
encode(ResponseMessage(r), ack_ctx->out);
}
}
return true;
return handle_operation_request(
payload.async_request_id, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
OPERATION_RESIZE, std::bind(&Operations<I>::execute_resize,
m_image_ctx.operations, payload.size,
payload.allow_shrink, std::placeholders::_1,
std::placeholders::_2, 0), ack_ctx);
}
template <typename I>
bool ImageWatcher<I>::handle_payload(const SnapCreatePayload &payload,
C_NotifyAck *ack_ctx) {
std::shared_lock l{m_image_ctx.owner_lock};
if (m_image_ctx.exclusive_lock != nullptr) {
int r;
auto request_type = exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL;
ldout(m_image_ctx.cct, 10) << this << " remote snap_create request: "
<< payload.async_request_id << " "
<< payload.snap_namespace << " "
<< payload.snap_name << " "
<< payload.flags << dendl;
// rbd-mirror needs to accept forced promotion orphan snap create requests
auto mirror_ns = boost::get<cls::rbd::MirrorSnapshotNamespace>(
&payload.snap_namespace);
if (mirror_ns != nullptr && mirror_ns->is_orphan()) {
request_type = exclusive_lock::OPERATION_REQUEST_TYPE_FORCE_PROMOTION;
}
auto request_type = exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL;
if (m_image_ctx.exclusive_lock->accept_request(request_type, &r)) {
bool new_request;
Context *ctx;
ProgressContext *prog_ctx;
bool complete;
if (payload.async_request_id) {
r = prepare_async_request(payload.async_request_id, &new_request,
&ctx, &prog_ctx);
encode(ResponseMessage(r), ack_ctx->out);
complete = true;
} else {
new_request = true;
prog_ctx = &m_no_op_prog_ctx;
ctx = new C_ResponseMessage(ack_ctx);
complete = false;
}
if (r == 0 && new_request) {
ldout(m_image_ctx.cct, 10) << this << " remote snap_create request: "
<< payload.async_request_id << " "
<< payload.snap_namespace << " "
<< payload.snap_name << " "
<< payload.flags << dendl;
m_image_ctx.operations->execute_snap_create(payload.snap_namespace,
payload.snap_name,
ctx, 0, payload.flags,
*prog_ctx);
}
return complete;
} else if (r < 0) {
encode(ResponseMessage(r), ack_ctx->out);
}
// rbd-mirror needs to accept forced promotion orphan snap create requests
auto mirror_ns = boost::get<cls::rbd::MirrorSnapshotNamespace>(
&payload.snap_namespace);
if (mirror_ns != nullptr && mirror_ns->is_orphan()) {
request_type = exclusive_lock::OPERATION_REQUEST_TYPE_FORCE_PROMOTION;
}
return true;
return handle_operation_request(
payload.async_request_id, request_type,
OPERATION_SNAP_CREATE, std::bind(&Operations<I>::execute_snap_create,
m_image_ctx.operations,
payload.snap_namespace,
payload.snap_name, std::placeholders::_2,
0, payload.flags, std::placeholders::_1),
ack_ctx);
}
template <typename I>
bool ImageWatcher<I>::handle_payload(const SnapRenamePayload &payload,
C_NotifyAck *ack_ctx) {
std::shared_lock l{m_image_ctx.owner_lock};
if (m_image_ctx.exclusive_lock != nullptr) {
int r;
if (m_image_ctx.exclusive_lock->accept_request(
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
bool new_request;
Context *ctx;
bool complete;
if (payload.async_request_id) {
r = prepare_async_request(payload.async_request_id, &new_request,
&ctx, nullptr);
encode(ResponseMessage(r), ack_ctx->out);
complete = true;
} else {
new_request = true;
ctx = new C_ResponseMessage(ack_ctx);
complete = false;
}
if (r == 0 && new_request) {
ldout(m_image_ctx.cct, 10) << this << " remote snap_rename request: "
<< payload.snap_id << " to "
<< payload.snap_name << dendl;
ldout(m_image_ctx.cct, 10) << this << " remote snap_rename request: "
<< payload.async_request_id << " "
<< payload.snap_id << " to "
<< payload.snap_name << dendl;
m_image_ctx.operations->execute_snap_rename(payload.snap_id,
payload.snap_name, ctx);
}
return complete;
} else if (r < 0) {
encode(ResponseMessage(r), ack_ctx->out);
}
}
return true;
return handle_operation_request(
payload.async_request_id, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
OPERATION_SNAP_RENAME, std::bind(&Operations<I>::execute_snap_rename,
m_image_ctx.operations, payload.snap_id,
payload.snap_name,
std::placeholders::_2), ack_ctx);
}
template <typename I>
bool ImageWatcher<I>::handle_payload(const SnapRemovePayload &payload,
C_NotifyAck *ack_ctx) {
std::shared_lock l{m_image_ctx.owner_lock};
if (m_image_ctx.exclusive_lock != nullptr) {
auto request_type = exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL;
if (cls::rbd::get_snap_namespace_type(payload.snap_namespace) ==
cls::rbd::SNAPSHOT_NAMESPACE_TYPE_TRASH) {
request_type = exclusive_lock::OPERATION_REQUEST_TYPE_TRASH_SNAP_REMOVE;
}
int r;
if (m_image_ctx.exclusive_lock->accept_request(request_type, &r)) {
bool new_request;
Context *ctx;
bool complete;
if (payload.async_request_id) {
r = prepare_async_request(payload.async_request_id, &new_request,
&ctx, nullptr);
encode(ResponseMessage(r), ack_ctx->out);
complete = true;
} else {
new_request = true;
ctx = new C_ResponseMessage(ack_ctx);
complete = false;
}
if (r == 0 && new_request) {
ldout(m_image_ctx.cct, 10) << this << " remote snap_remove request: "
<< payload.snap_name << dendl;
ldout(m_image_ctx.cct, 10) << this << " remote snap_remove request: "
<< payload.snap_name << dendl;
m_image_ctx.operations->execute_snap_remove(payload.snap_namespace,
payload.snap_name, ctx);
}
return complete;
} else if (r < 0) {
encode(ResponseMessage(r), ack_ctx->out);
}
auto request_type = exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL;
if (cls::rbd::get_snap_namespace_type(payload.snap_namespace) ==
cls::rbd::SNAPSHOT_NAMESPACE_TYPE_TRASH) {
request_type = exclusive_lock::OPERATION_REQUEST_TYPE_TRASH_SNAP_REMOVE;
}
return true;
return handle_operation_request(
payload.async_request_id, request_type, OPERATION_SNAP_REMOVE,
std::bind(&Operations<I>::execute_snap_remove, m_image_ctx.operations,
payload.snap_namespace, payload.snap_name,
std::placeholders::_2), ack_ctx);
}
template <typename I>
bool ImageWatcher<I>::handle_payload(const SnapProtectPayload& payload,
C_NotifyAck *ack_ctx) {
std::shared_lock owner_locker{m_image_ctx.owner_lock};
if (m_image_ctx.exclusive_lock != nullptr) {
int r;
if (m_image_ctx.exclusive_lock->accept_request(
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
bool new_request;
Context *ctx;
bool complete;
if (payload.async_request_id) {
r = prepare_async_request(payload.async_request_id, &new_request,
&ctx, nullptr);
encode(ResponseMessage(r), ack_ctx->out);
complete = true;
} else {
new_request = true;
ctx = new C_ResponseMessage(ack_ctx);
complete = false;
}
if (r == 0 && new_request) {
ldout(m_image_ctx.cct, 10) << this << " remote snap_protect request: "
<< payload.snap_name << dendl;
ldout(m_image_ctx.cct, 10) << this << " remote snap_protect request: "
<< payload.async_request_id << " "
<< payload.snap_name << dendl;
m_image_ctx.operations->execute_snap_protect(payload.snap_namespace,
payload.snap_name, ctx);
}
return complete;
} else if (r < 0) {
encode(ResponseMessage(r), ack_ctx->out);
}
}
return true;
return handle_operation_request(
payload.async_request_id, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
OPERATION_SNAP_PROTECT, std::bind(&Operations<I>::execute_snap_protect,
m_image_ctx.operations,
payload.snap_namespace,
payload.snap_name,
std::placeholders::_2), ack_ctx);
}
template <typename I>
bool ImageWatcher<I>::handle_payload(const SnapUnprotectPayload& payload,
C_NotifyAck *ack_ctx) {
std::shared_lock owner_locker{m_image_ctx.owner_lock};
if (m_image_ctx.exclusive_lock != nullptr) {
int r;
if (m_image_ctx.exclusive_lock->accept_request(
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
bool new_request;
Context *ctx;
bool complete;
if (payload.async_request_id) {
r = prepare_async_request(payload.async_request_id, &new_request,
&ctx, nullptr);
encode(ResponseMessage(r), ack_ctx->out);
complete = true;
} else {
new_request = true;
ctx = new C_ResponseMessage(ack_ctx);
complete = false;
}
if (r == 0 && new_request) {
ldout(m_image_ctx.cct, 10) << this << " remote snap_unprotect request: "
<< payload.snap_name << dendl;
ldout(m_image_ctx.cct, 10) << this << " remote snap_unprotect request: "
<< payload.async_request_id << " "
<< payload.snap_name << dendl;
m_image_ctx.operations->execute_snap_unprotect(payload.snap_namespace,
payload.snap_name, ctx);
}
return complete;
} else if (r < 0) {
encode(ResponseMessage(r), ack_ctx->out);
}
}
return true;
return handle_operation_request(
payload.async_request_id, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
OPERATION_SNAP_UNPROTECT, std::bind(&Operations<I>::execute_snap_unprotect,
m_image_ctx.operations,
payload.snap_namespace,
payload.snap_name,
std::placeholders::_2), ack_ctx);
}
template <typename I>
bool ImageWatcher<I>::handle_payload(const RebuildObjectMapPayload& payload,
C_NotifyAck *ack_ctx) {
std::shared_lock l{m_image_ctx.owner_lock};
if (m_image_ctx.exclusive_lock != nullptr) {
int r;
if (m_image_ctx.exclusive_lock->accept_request(
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
bool new_request;
Context *ctx;
ProgressContext *prog_ctx;
r = prepare_async_request(payload.async_request_id, &new_request,
&ctx, &prog_ctx);
if (r == 0 && new_request) {
ldout(m_image_ctx.cct, 10) << this
<< " remote rebuild object map request: "
<< payload.async_request_id << dendl;
m_image_ctx.operations->execute_rebuild_object_map(*prog_ctx, ctx);
}
ldout(m_image_ctx.cct, 10) << this << " remote rebuild object map request: "
<< payload.async_request_id << dendl;
encode(ResponseMessage(r), ack_ctx->out);
} else if (r < 0) {
encode(ResponseMessage(r), ack_ctx->out);
}
}
return true;
return handle_operation_request(
payload.async_request_id, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
OPERATION_REBUILD_OBJECT_MAP,
std::bind(&Operations<I>::execute_rebuild_object_map,
m_image_ctx.operations, std::placeholders::_1,
std::placeholders::_2), ack_ctx);
}
template <typename I>
bool ImageWatcher<I>::handle_payload(const RenamePayload& payload,
C_NotifyAck *ack_ctx) {
std::shared_lock owner_locker{m_image_ctx.owner_lock};
if (m_image_ctx.exclusive_lock != nullptr) {
int r;
if (m_image_ctx.exclusive_lock->accept_request(
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
bool new_request;
Context *ctx;
bool complete;
if (payload.async_request_id) {
r = prepare_async_request(payload.async_request_id, &new_request,
&ctx, nullptr);
encode(ResponseMessage(r), ack_ctx->out);
complete = true;
} else {
new_request = true;
ctx = new C_ResponseMessage(ack_ctx);
complete = false;
}
if (r == 0 && new_request) {
ldout(m_image_ctx.cct, 10) << this << " remote rename request: "
<< payload.image_name << dendl;
ldout(m_image_ctx.cct, 10) << this << " remote rename request: "
<< payload.async_request_id << " "
<< payload.image_name << dendl;
m_image_ctx.operations->execute_rename(payload.image_name, ctx);
}
return complete;
} else if (r < 0) {
encode(ResponseMessage(r), ack_ctx->out);
}
}
return true;
return handle_operation_request(
payload.async_request_id, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
OPERATION_RENAME, std::bind(&Operations<I>::execute_rename,
m_image_ctx.operations, payload.image_name,
std::placeholders::_2), ack_ctx);
}
template <typename I>
bool ImageWatcher<I>::handle_payload(const UpdateFeaturesPayload& payload,
C_NotifyAck *ack_ctx) {
std::shared_lock owner_locker{m_image_ctx.owner_lock};
if (m_image_ctx.exclusive_lock != nullptr) {
int r;
if (m_image_ctx.exclusive_lock->accept_request(
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
bool new_request;
Context *ctx;
bool complete;
if (payload.async_request_id) {
r = prepare_async_request(payload.async_request_id, &new_request,
&ctx, nullptr);
encode(ResponseMessage(r), ack_ctx->out);
complete = true;
} else {
new_request = true;
ctx = new C_ResponseMessage(ack_ctx);
complete = false;
}
if (r == 0 && new_request) {
ldout(m_image_ctx.cct, 10) << this << " remote update_features request: "
<< payload.features << " "
<< (payload.enabled ? "enabled" : "disabled")
<< dendl;
ldout(m_image_ctx.cct, 10) << this << " remote update_features request: "
<< payload.async_request_id << " "
<< payload.features << " "
<< (payload.enabled ? "enabled" : "disabled")
<< dendl;
m_image_ctx.operations->execute_update_features(payload.features,
payload.enabled, ctx,
0);
}
return complete;
} else if (r < 0) {
encode(ResponseMessage(r), ack_ctx->out);
}
}
return true;
return handle_operation_request(
payload.async_request_id, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
OPERATION_UPDATE_FEATURES,
std::bind(&Operations<I>::execute_update_features, m_image_ctx.operations,
payload.features, payload.enabled, std::placeholders::_2, 0),
ack_ctx);
}
template <typename I>
bool ImageWatcher<I>::handle_payload(const MigratePayload &payload,
C_NotifyAck *ack_ctx) {
ldout(m_image_ctx.cct, 10) << this << " remote migrate request: "
<< payload.async_request_id << dendl;
std::shared_lock l{m_image_ctx.owner_lock};
if (m_image_ctx.exclusive_lock != nullptr) {
int r;
if (m_image_ctx.exclusive_lock->accept_request(
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
bool new_request;
Context *ctx;
ProgressContext *prog_ctx;
r = prepare_async_request(payload.async_request_id, &new_request,
&ctx, &prog_ctx);
if (r == 0 && new_request) {
ldout(m_image_ctx.cct, 10) << this << " remote migrate request: "
<< payload.async_request_id << dendl;
m_image_ctx.operations->execute_migrate(*prog_ctx, ctx);
}
encode(ResponseMessage(r), ack_ctx->out);
} else if (r < 0) {
encode(ResponseMessage(r), ack_ctx->out);
}
}
return true;
return handle_operation_request(
payload.async_request_id, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
OPERATION_MIGRATE, std::bind(&Operations<I>::execute_migrate,
m_image_ctx.operations,
std::placeholders::_1,
std::placeholders::_2), ack_ctx);
}
template <typename I>
bool ImageWatcher<I>::handle_payload(const SparsifyPayload &payload,
C_NotifyAck *ack_ctx) {
std::shared_lock l{m_image_ctx.owner_lock};
if (m_image_ctx.exclusive_lock != nullptr) {
int r;
if (m_image_ctx.exclusive_lock->accept_request(
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
bool new_request;
Context *ctx;
ProgressContext *prog_ctx;
r = prepare_async_request(payload.async_request_id, &new_request,
&ctx, &prog_ctx);
if (r == 0 && new_request) {
ldout(m_image_ctx.cct, 10) << this << " remote sparsify request: "
<< payload.async_request_id << dendl;
m_image_ctx.operations->execute_sparsify(payload.sparse_size, *prog_ctx,
ctx);
}
ldout(m_image_ctx.cct, 10) << this << " remote sparsify request: "
<< payload.async_request_id << dendl;
encode(ResponseMessage(r), ack_ctx->out);
} else if (r < 0) {
encode(ResponseMessage(r), ack_ctx->out);
}
return handle_operation_request(
payload.async_request_id, exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
OPERATION_SPARSIFY, std::bind(&Operations<I>::execute_sparsify,
m_image_ctx.operations,
payload.sparse_size, std::placeholders::_1,
std::placeholders::_2), ack_ctx);
}
template <typename I>
bool ImageWatcher<I>::handle_payload(const MetadataUpdatePayload &payload,
C_NotifyAck *ack_ctx) {
if (payload.value) {
ldout(m_image_ctx.cct, 10) << this << " remote metadata_set request: "
<< payload.async_request_id << " "
<< "key=" << payload.key << ", value="
<< *payload.value << dendl;
return handle_operation_request(
payload.async_request_id,
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
OPERATION_METADATA_UPDATE,
std::bind(&Operations<I>::execute_metadata_set,
m_image_ctx.operations, payload.key, *payload.value,
std::placeholders::_2),
ack_ctx);
} else {
ldout(m_image_ctx.cct, 10) << this << " remote metadata_remove request: "
<< payload.async_request_id << " "
<< "key=" << payload.key << dendl;
return handle_operation_request(
payload.async_request_id,
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
OPERATION_METADATA_UPDATE,
std::bind(&Operations<I>::execute_metadata_remove,
m_image_ctx.operations, payload.key, std::placeholders::_2),
ack_ctx);
}
return true;
}
template <typename I>
@ -1493,50 +1359,6 @@ bool ImageWatcher<I>::handle_payload(const UnquiescePayload &payload,
return true;
}
template <typename I>
bool ImageWatcher<I>::handle_payload(const MetadataUpdatePayload &payload,
C_NotifyAck *ack_ctx) {
std::shared_lock l{m_image_ctx.owner_lock};
if (m_image_ctx.exclusive_lock != nullptr) {
int r;
if (m_image_ctx.exclusive_lock->accept_request(
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, &r)) {
bool new_request;
Context *ctx;
bool complete;
if (payload.async_request_id) {
r = prepare_async_request(payload.async_request_id, &new_request,
&ctx, nullptr);
encode(ResponseMessage(r), ack_ctx->out);
complete = true;
} else {
new_request = true;
ctx = new C_ResponseMessage(ack_ctx);
complete = false;
}
if (r == 0 && new_request) {
if (payload.value) {
ldout(m_image_ctx.cct, 10) << this << " remote metadata_set request: "
<< "key=" << payload.key << ", value="
<< *payload.value << dendl;
m_image_ctx.operations->execute_metadata_set(payload.key,
*payload.value, ctx);
} else {
ldout(m_image_ctx.cct, 10) << this << " remote metadata_remove request: "
<< "key=" << payload.key << dendl;
m_image_ctx.operations->execute_metadata_remove(payload.key, ctx);
}
}
return complete;
} else if (r < 0) {
encode(ResponseMessage(r), ack_ctx->out);
}
}
return true;
}
template <typename I>
bool ImageWatcher<I>::handle_payload(const UnknownPayload &payload,
C_NotifyAck *ack_ctx) {

View File

@ -9,9 +9,12 @@
#include "common/ceph_mutex.h"
#include "include/Context.h"
#include "include/rbd/librbd.hpp"
#include "librbd/Operations.h"
#include "librbd/Watcher.h"
#include "librbd/WatchNotifyTypes.h"
#include "librbd/exclusive_lock/Policy.h"
#include "librbd/internal.h"
#include <functional>
#include <set>
#include <string>
#include <utility>
@ -241,6 +244,12 @@ private:
size_t attempts, ProgressContext &prog_ctx,
Context *on_finish);
bool handle_operation_request(
const watch_notify::AsyncRequestId& async_request_id,
exclusive_lock::OperationRequestType request_type, Operation operation,
std::function<void(ProgressContext &prog_ctx, Context*)> execute,
C_NotifyAck *ack_ctx);
bool handle_payload(const watch_notify::HeaderUpdatePayload& payload,
C_NotifyAck *ctx);
bool handle_payload(const watch_notify::AcquiredLockPayload& payload,

View File

@ -51,6 +51,60 @@ using namespace boost::placeholders;
namespace {
std::ostream &operator<<(std::ostream &out, const Operation &op) {
switch (op) {
case OPERATION_CHECK_OBJECT_MAP:
out << "check object map";
break;
case OPERATION_FLATTEN:
out << "flatten";
break;
case OPERATION_METADATA_UPDATE:
out << "metadata update";
break;
case OPERATION_MIGRATE:
out << "migrate";
break;
case OPERATION_REBUILD_OBJECT_MAP:
out << "rebuild object map";
break;
case OPERATION_RENAME:
out << "rename";
break;
case OPERATION_RESIZE:
out << "resize";
break;
case OPERATION_SNAP_CREATE:
out << "snap create";
break;
case OPERATION_SNAP_PROTECT:
out << "snap protect";
break;
case OPERATION_SNAP_REMOVE:
out << "snap remove";
break;
case OPERATION_SNAP_RENAME:
out << "snap rename";
break;
case OPERATION_SNAP_ROLLBACK:
out << "snap rollback";
break;
case OPERATION_SNAP_UNPROTECT:
out << "snap unprotect";
break;
case OPERATION_SPARSIFY:
out << "sparsify";
break;
case OPERATION_UPDATE_FEATURES:
out << "update features";
break;
default:
ceph_abort();
break;
}
return out;
}
template <typename I>
struct C_NotifyUpdate : public Context {
I &image_ctx;
@ -125,7 +179,7 @@ struct C_InvokeAsyncRequest : public Context {
*/
I &image_ctx;
std::string name;
Operation operation;
exclusive_lock::OperationRequestType request_type;
bool permit_snapshot;
boost::function<void(Context*)> local;
@ -134,14 +188,14 @@ struct C_InvokeAsyncRequest : public Context {
Context *on_finish;
bool request_lock = false;
C_InvokeAsyncRequest(I &image_ctx, const std::string& name,
C_InvokeAsyncRequest(I &image_ctx, Operation operation,
exclusive_lock::OperationRequestType request_type,
bool permit_snapshot,
const boost::function<void(Context*)>& local,
const boost::function<void(Context*)>& remote,
const std::set<int> &filter_error_codes,
Context *on_finish)
: image_ctx(image_ctx), name(name), request_type(request_type),
: image_ctx(image_ctx), operation(operation), request_type(request_type),
permit_snapshot(permit_snapshot), local(local), remote(remote),
filter_error_codes(filter_error_codes), on_finish(on_finish) {
}
@ -270,7 +324,8 @@ struct C_InvokeAsyncRequest : public Context {
ldout(cct, 20) << __func__ << ": r=" << r << dendl;
if (r == -EOPNOTSUPP) {
ldout(cct, 5) << name << " not supported by current lock owner" << dendl;
ldout(cct, 5) << operation << " not supported by current lock owner"
<< dendl;
request_lock = true;
send_refresh_image();
return;
@ -281,12 +336,26 @@ struct C_InvokeAsyncRequest : public Context {
return;
}
ldout(cct, 5) << name << " timed out notifying lock owner" << dendl;
ldout(cct, 5) << operation << " timed out notifying lock owner" << dendl;
send_refresh_image();
}
void send_local_request() {
ceph_assert(ceph_mutex_is_locked(image_ctx.owner_lock));
auto ctx = new LambdaContext(
[this](int r) {
if (r == -ERESTART) {
image_ctx.operations->finish_op(operation, r);
send_refresh_image();
return;
}
execute_local_request();
});
image_ctx.operations->start_op(operation, ctx);
}
void execute_local_request() {
std::shared_lock owner_locker{image_ctx.owner_lock};
CephContext *cct = image_ctx.cct;
ldout(cct, 20) << __func__ << dendl;
@ -302,6 +371,8 @@ struct C_InvokeAsyncRequest : public Context {
CephContext *cct = image_ctx.cct;
ldout(cct, 20) << __func__ << ": r=" << r << dendl;
image_ctx.operations->finish_op(operation, r);
if (r == -ERESTART) {
send_refresh_image();
return;
@ -333,7 +404,61 @@ bool needs_invalidate(I& image_ctx, uint64_t object_no,
template <typename I>
Operations<I>::Operations(I &image_ctx)
: m_image_ctx(image_ctx), m_async_request_seq(0) {
: m_image_ctx(image_ctx), m_async_request_seq(0),
m_queue_lock(ceph::make_mutex(
util::unique_lock_name("librbd::Operations::m_queue_lock",
this))) {
}
template <typename I>
void Operations<I>::start_op(Operation op, Context *ctx) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << __func__ << ": " << op << " " << ctx << dendl;
ctx = util::create_async_context_callback(
m_image_ctx, new LambdaContext(
[this, op, ctx](int r) {
if (r == 0) {
std::shared_lock owner_locker{m_image_ctx.owner_lock};
std::shared_lock image_locker{m_image_ctx.image_lock};
if (m_image_ctx.exclusive_lock != nullptr &&
(!m_image_ctx.exclusive_lock->is_lock_owner())) {
ldout(m_image_ctx.cct, 20) << "lock owner lost, restarting" << dendl;
r = -ERESTART;
}
}
ldout(m_image_ctx.cct, 20) << "start " << op << " " << ctx << dendl;
ctx->complete(r);
}));
std::unique_lock locker{m_queue_lock};
if (!m_in_flight_ops.insert(op).second) {
ldout(cct, 20) << __func__ << ": " << op << " in flight" << dendl;
m_queued_ops[op].push_back(ctx);
return;
}
ctx->complete(0);
}
template <typename I>
void Operations<I>::finish_op(Operation op, int r) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << __func__ << ": " << op << " r=" << r << dendl;
std::unique_lock locker{m_queue_lock};
auto &queue = m_queued_ops[op];
if (queue.empty()) {
m_in_flight_ops.erase(op);
return;
}
auto ctx = queue.front();
queue.pop_front();
// propagate -ERESTART through all the queue
ctx->complete(r == -ERESTART ? r : 0);
}
template <typename I>
@ -359,7 +484,7 @@ int Operations<I>::flatten(ProgressContext &prog_ctx) {
}
uint64_t request_id = ++m_async_request_seq;
r = invoke_async_request("flatten",
r = invoke_async_request(OPERATION_FLATTEN,
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
false,
boost::bind(&Operations<I>::execute_flatten, this,
@ -433,7 +558,7 @@ int Operations<I>::rebuild_object_map(ProgressContext &prog_ctx) {
}
uint64_t request_id = ++m_async_request_seq;
r = invoke_async_request("rebuild object map",
r = invoke_async_request(OPERATION_REBUILD_OBJECT_MAP,
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, true,
boost::bind(&Operations<I>::execute_rebuild_object_map,
this, boost::ref(prog_ctx), _1),
@ -484,7 +609,7 @@ int Operations<I>::check_object_map(ProgressContext &prog_ctx) {
return r;
}
r = invoke_async_request("check object map",
r = invoke_async_request(OPERATION_CHECK_OBJECT_MAP,
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, true,
boost::bind(&Operations<I>::check_object_map, this,
boost::ref(prog_ctx), _1),
@ -539,7 +664,7 @@ int Operations<I>::rename(const char *dstname) {
if (m_image_ctx.test_features(RBD_FEATURE_JOURNALING)) {
uint64_t request_id = ++m_async_request_seq;
r = invoke_async_request("rename",
r = invoke_async_request(OPERATION_RENAME,
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
true,
boost::bind(&Operations<I>::execute_rename, this,
@ -638,7 +763,7 @@ int Operations<I>::resize(uint64_t size, bool allow_shrink, ProgressContext& pro
}
uint64_t request_id = ++m_async_request_seq;
r = invoke_async_request("resize",
r = invoke_async_request(OPERATION_RESIZE,
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
false,
boost::bind(&Operations<I>::execute_resize, this,
@ -734,8 +859,8 @@ void Operations<I>::snap_create(const cls::rbd::SnapshotNamespace &snap_namespac
uint64_t request_id = ++m_async_request_seq;
C_InvokeAsyncRequest<I> *req = new C_InvokeAsyncRequest<I>(
m_image_ctx, "snap_create", exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
true,
m_image_ctx, OPERATION_SNAP_CREATE,
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL, true,
boost::bind(&Operations<I>::execute_snap_create, this, snap_namespace, snap_name,
_1, 0, flags, boost::ref(prog_ctx)),
boost::bind(&ImageWatcher<I>::notify_snap_create, m_image_ctx.image_watcher,
@ -819,7 +944,22 @@ int Operations<I>::snap_rollback(const cls::rbd::SnapshotNamespace& snap_namespa
return r;
}
execute_snap_rollback(snap_namespace, snap_name, prog_ctx, &cond_ctx);
Context *ctx = new LambdaContext(
[this, ctx=&cond_ctx](int r) {
m_image_ctx.operations->finish_op(OPERATION_SNAP_ROLLBACK, r);
ctx->complete(r);
});
ctx = new LambdaContext(
[this, snap_namespace, snap_name, &prog_ctx, ctx](int r) {
if (r < 0) {
ctx->complete(r);
return;
}
std::shared_lock l{m_image_ctx.owner_lock};
execute_snap_rollback(snap_namespace, snap_name, prog_ctx, ctx);
});
m_image_ctx.operations->start_op(OPERATION_SNAP_ROLLBACK, ctx);
}
r = cond_ctx.wait();
@ -923,7 +1063,7 @@ void Operations<I>::snap_remove(const cls::rbd::SnapshotNamespace& snap_namespac
request_type = exclusive_lock::OPERATION_REQUEST_TYPE_TRASH_SNAP_REMOVE;
}
C_InvokeAsyncRequest<I> *req = new C_InvokeAsyncRequest<I>(
m_image_ctx, "snap_remove", request_type, true,
m_image_ctx, OPERATION_SNAP_REMOVE, request_type, true,
boost::bind(&Operations<I>::execute_snap_remove, this, snap_namespace,
snap_name, _1),
boost::bind(&ImageWatcher<I>::notify_snap_remove,
@ -1017,7 +1157,7 @@ int Operations<I>::snap_rename(const char *srcname, const char *dstname) {
if (m_image_ctx.test_features(RBD_FEATURE_JOURNALING)) {
uint64_t request_id = ++m_async_request_seq;
r = invoke_async_request("snap_rename",
r = invoke_async_request(OPERATION_SNAP_RENAME,
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
true,
boost::bind(&Operations<I>::execute_snap_rename,
@ -1119,7 +1259,7 @@ int Operations<I>::snap_protect(const cls::rbd::SnapshotNamespace& snap_namespac
if (m_image_ctx.test_features(RBD_FEATURE_JOURNALING)) {
uint64_t request_id = ++m_async_request_seq;
r = invoke_async_request("snap_protect",
r = invoke_async_request(OPERATION_SNAP_PROTECT,
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
true,
boost::bind(&Operations<I>::execute_snap_protect,
@ -1217,7 +1357,7 @@ int Operations<I>::snap_unprotect(const cls::rbd::SnapshotNamespace& snap_namesp
if (m_image_ctx.test_features(RBD_FEATURE_JOURNALING)) {
uint64_t request_id = ++m_async_request_seq;
r = invoke_async_request("snap_unprotect",
r = invoke_async_request(OPERATION_SNAP_UNPROTECT,
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
true,
boost::bind(&Operations<I>::execute_snap_unprotect,
@ -1418,7 +1558,7 @@ int Operations<I>::update_features(uint64_t features, bool enabled) {
r = cond_ctx.wait();
} else {
uint64_t request_id = ++m_async_request_seq;
r = invoke_async_request("update_features",
r = invoke_async_request(OPERATION_UPDATE_FEATURES,
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
false,
boost::bind(&Operations<I>::execute_update_features,
@ -1493,7 +1633,7 @@ int Operations<I>::metadata_set(const std::string &key,
}
uint64_t request_id = ++m_async_request_seq;
r = invoke_async_request("metadata_set",
r = invoke_async_request(OPERATION_METADATA_UPDATE,
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
false,
boost::bind(&Operations<I>::execute_metadata_set,
@ -1553,7 +1693,7 @@ int Operations<I>::metadata_remove(const std::string &key) {
return r;
uint64_t request_id = ++m_async_request_seq;
r = invoke_async_request("metadata_remove",
r = invoke_async_request(OPERATION_METADATA_UPDATE,
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
false,
boost::bind(&Operations<I>::execute_metadata_remove,
@ -1618,7 +1758,7 @@ int Operations<I>::migrate(ProgressContext &prog_ctx) {
}
uint64_t request_id = ++m_async_request_seq;
r = invoke_async_request("migrate",
r = invoke_async_request(OPERATION_MIGRATE,
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
false,
boost::bind(&Operations<I>::execute_migrate, this,
@ -1684,7 +1824,7 @@ int Operations<I>::sparsify(size_t sparse_size, ProgressContext &prog_ctx) {
}
uint64_t request_id = ++m_async_request_seq;
int r = invoke_async_request("sparsify",
int r = invoke_async_request(OPERATION_SPARSIFY,
exclusive_lock::OPERATION_REQUEST_TYPE_GENERAL,
false,
boost::bind(&Operations<I>::execute_sparsify,
@ -1777,11 +1917,11 @@ int Operations<I>::prepare_image_update(
template <typename I>
int Operations<I>::invoke_async_request(
const std::string& name, exclusive_lock::OperationRequestType request_type,
Operation op, exclusive_lock::OperationRequestType request_type,
bool permit_snapshot, const boost::function<void(Context*)>& local_request,
const boost::function<void(Context*)>& remote_request) {
C_SaferCond ctx;
C_InvokeAsyncRequest<I> *req = new C_InvokeAsyncRequest<I>(m_image_ctx, name,
C_InvokeAsyncRequest<I> *req = new C_InvokeAsyncRequest<I>(m_image_ctx, op,
request_type,
permit_snapshot,
local_request,

View File

@ -10,6 +10,9 @@
#include "librbd/operation/ObjectMapIterate.h"
#include <atomic>
#include <string>
#include <list>
#include <map>
#include <set>
#include <boost/function.hpp>
class Context;
@ -19,6 +22,24 @@ namespace librbd {
class ImageCtx;
class ProgressContext;
enum Operation {
OPERATION_CHECK_OBJECT_MAP,
OPERATION_FLATTEN,
OPERATION_METADATA_UPDATE,
OPERATION_MIGRATE,
OPERATION_REBUILD_OBJECT_MAP,
OPERATION_RENAME,
OPERATION_RESIZE,
OPERATION_SNAP_CREATE,
OPERATION_SNAP_PROTECT,
OPERATION_SNAP_REMOVE,
OPERATION_SNAP_RENAME,
OPERATION_SNAP_ROLLBACK,
OPERATION_SNAP_UNPROTECT,
OPERATION_SPARSIFY,
OPERATION_UPDATE_FEATURES,
};
template <typename ImageCtxT = ImageCtx>
class Operations {
public:
@ -28,6 +49,9 @@ public:
return ++m_async_request_seq;
}
void start_op(enum Operation op, Context *ctx);
void finish_op(enum Operation op, int r);
int flatten(ProgressContext &prog_ctx);
void execute_flatten(ProgressContext &prog_ctx, Context *on_finish);
@ -121,7 +145,11 @@ private:
ImageCtxT &m_image_ctx;
std::atomic<uint64_t> m_async_request_seq;
int invoke_async_request(const std::string& name,
mutable ceph::mutex m_queue_lock;
std::set<Operation> m_in_flight_ops;
std::map<Operation, std::list<Context *>> m_queued_ops;
int invoke_async_request(Operation op,
exclusive_lock::OperationRequestType request_type,
bool permit_snapshot,
const boost::function<void(Context*)>& local,

View File

@ -8781,6 +8781,156 @@ TEST_F(TestLibRBD, WriteZeroesThickProvision) {
ASSERT_EQ(0, image.close());
}
TEST_F(TestLibRBD, ConcurentOperations)
{
REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
librbd::RBD rbd;
librados::IoCtx ioctx;
ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
std::string name = get_temp_image_name();
int order = 0;
uint64_t size = 2 << 20;
ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
// Test creating/removing many snapshots simultaneously
std::vector<librbd::Image> images(10);
std::vector<librbd::RBD::AioCompletion *> comps;
for (auto &image : images) {
auto comp = new librbd::RBD::AioCompletion(NULL, NULL);
ASSERT_EQ(0, rbd.aio_open(ioctx, image, name.c_str(), NULL, comp));
comps.push_back(comp);
}
for (auto &comp : comps) {
ASSERT_EQ(0, comp->wait_for_complete());
ASSERT_EQ(1, comp->is_complete());
ASSERT_EQ(0, comp->get_return_value());
comp->release();
}
comps.clear();
std::vector<std::thread> threads;
int i = 0;
for (auto &image : images) {
std::string snap_name = "snap" + stringify(i++);
threads.emplace_back([&image, snap_name]() {
int r = image.snap_create(snap_name.c_str());
ceph_assert(r == 0);
});
}
for (auto &t : threads) {
t.join();
}
threads.clear();
i = 0;
for (auto &image : images) {
std::string snap_name = "snap" + stringify(i++);
threads.emplace_back([&image, snap_name](){
int r = image.snap_remove(snap_name.c_str());
ceph_assert(r == 0);
});
}
for (auto &t : threads) {
t.join();
}
threads.clear();
for (auto &image : images) {
auto comp = new librbd::RBD::AioCompletion(NULL, NULL);
ASSERT_EQ(0, image.aio_close(comp));
comps.push_back(comp);
}
for (auto &comp : comps) {
ASSERT_EQ(0, comp->wait_for_complete());
ASSERT_EQ(1, comp->is_complete());
ASSERT_EQ(0, comp->get_return_value());
comp->release();
}
comps.clear();
// Test shutdown
{
librbd::Image image1, image2, image3;
ASSERT_EQ(0, rbd.open(ioctx, image1, name.c_str(), NULL));
ASSERT_EQ(0, rbd.open(ioctx, image2, name.c_str(), NULL));
ASSERT_EQ(0, rbd.open(ioctx, image3, name.c_str(), NULL));
ASSERT_EQ(0, image1.lock_acquire(RBD_LOCK_MODE_EXCLUSIVE));
struct Watcher : public librbd::QuiesceWatchCtx {
size_t count = 0;
ceph::mutex lock = ceph::make_mutex("lock");
ceph::condition_variable cv;
void handle_quiesce() override {
std::unique_lock locker(lock);
count++;
cv.notify_one();
}
void handle_unquiesce() override {
}
bool wait_for_quiesce(size_t c) {
std::unique_lock locker(lock);
return cv.wait_for(locker, seconds(60),
[this, c]() { return count >= c; });
}
} watcher;
uint64_t handle;
ASSERT_EQ(0, image2.quiesce_watch(&watcher, &handle));
std::thread create_snap1([&image1]() {
int r = image1.snap_create("snap1");
ceph_assert(r == 0);
r = image1.close();
ceph_assert(r == 0);
});
ASSERT_TRUE(watcher.wait_for_quiesce(1));
std::thread create_snap2([&image2]() {
int r = image2.snap_create("snap2");
ceph_assert(r == 0);
});
std::thread create_snap3([&image3]() {
int r = image3.snap_create("snap3");
ceph_assert(r == 0);
});
image2.quiesce_complete(handle, 0);
ASSERT_TRUE(watcher.wait_for_quiesce(2));
image2.quiesce_complete(handle, 0);
create_snap1.join();
ASSERT_TRUE(watcher.wait_for_quiesce(3));
image2.quiesce_complete(handle, 0);
create_snap2.join();
create_snap3.join();
ASSERT_EQ(0, image2.quiesce_unwatch(handle));
ASSERT_EQ(0, image2.snap_remove("snap1"));
ASSERT_EQ(0, image2.snap_remove("snap2"));
ASSERT_EQ(0, image2.snap_remove("snap3"));
}
ASSERT_EQ(0, rbd.remove(ioctx, name.c_str()));
ioctx.close();
}
// poorman's ceph_assert()
namespace ceph {
void __ceph_assert_fail(const char *assertion, const char *file, int line,