Merge pull request #52540 from petrutlucian94/single_process

rbd-wnbd: use a single daemon process per host

Reviewed-by: Ilya Dryomov <idryomov@gmail.com>
This commit is contained in:
Ilya Dryomov 2024-03-02 19:53:06 +01:00 committed by GitHub
commit 3e302abb81
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 849 additions and 585 deletions

View File

@ -162,7 +162,6 @@ def execute(*args, **kwargs):
exc = CommandFailed(
command=args, returncode=result.returncode,
stdout=result.stdout, stderr=result.stderr)
LOG.error(exc)
raise exc
return result
@ -367,6 +366,7 @@ class RbdImage(object):
self.mapped = False
@Tracer.trace
@retry_decorator()
def remove(self):
if not self.removed:
LOG.info("Removing image: %s", self.name)

View File

@ -1,5 +1,7 @@
add_executable(
rbd-wnbd
rados_client_cache.cc
rbd_mapping.cc rbd_mapping_config.cc
rbd_wnbd.cc wnbd_handler.cc wnbd_wmi.cc
../../common/win32/code_page.rc)
set_target_properties(

View File

@ -0,0 +1,91 @@
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2023 Cloudbase Solutions
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "rados_client_cache.h"
#include "common/errno.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
#define dout_prefix *_dout << "rbd-wnbd: "
std::shared_ptr<librados::Rados> RadosClientCache::init_client(
std::string& entity_name, std::string& cluster_name)
{
auto rados = std::make_shared<librados::Rados>();
int r = rados->init2(entity_name.c_str(), cluster_name.c_str(), 0);
if (r < 0) {
derr << "couldn't initialize rados: " << cpp_strerror(r)
<< dendl;
return std::shared_ptr<librados::Rados>();
}
r = rados->conf_read_file(nullptr);
if (r < 0) {
derr << "couldn't read conf file: " << cpp_strerror(r)
<< dendl;
return std::shared_ptr<librados::Rados>();
}
r = rados->connect();
if (r < 0) {
derr << "couldn't establish rados connection: "
<< cpp_strerror(r) << dendl;
return std::shared_ptr<librados::Rados>();
} else {
dout(1) << "successfully initialized rados connection" << dendl;
}
return rados;
}
std::shared_ptr<librados::Rados> RadosClientCache::get_client(
std::string& entity_name, std::string& cluster_name)
{
std::unique_lock l{cache_lock};
remove_expired();
std::string key = entity_name + "@" + cluster_name;
auto cached_client_weak = cache.find(key);
if (cached_client_weak != cache.end()) {
if (auto cached_client = cached_client_weak->second.lock()) {
dout(1) << "reusing cached rados client: " << key << dendl;
return cached_client;
} else {
dout(5) << "cleaning up expired rados ref: "
<< cached_client_weak->first << dendl;
cache.erase(cached_client_weak);
}
}
dout(1) << "creating new rados client: " << key << dendl;
auto client = init_client(entity_name, cluster_name);
cache.insert(std::pair{key, client});
return client;
}
void RadosClientCache::remove_expired()
{
auto i = cache.begin();
while (i != cache.end()) {
if (i->second.expired()) {
dout(5) << "removing expired rados ref: "
<< i->first << dendl;
i = cache.erase(i);
continue;
}
i++;
}
}

View File

@ -0,0 +1,39 @@
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2023 Cloudbase Solutions
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#pragma once
#include "common/debug.h"
#include "common/dout.h"
#include "global/global_init.h"
#include "include/rados/librados.hpp"
// In order to re-use OSD connections, we're caching one rados client
// per cluster.
class RadosClientCache
{
private:
std::map<std::string, std::weak_ptr<librados::Rados>> cache;
ceph::mutex cache_lock = ceph::make_mutex("RadosClientCache::MapLock");
// Remove deleted objects from the map.
void remove_expired();
std::shared_ptr<librados::Rados> init_client(
std::string& entity_name, std::string& cluster_name);
public:
std::shared_ptr<librados::Rados> get_client(
std::string& entity_name, std::string& cluster_name);
};

View File

@ -0,0 +1,284 @@
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2023 Cloudbase Solutions
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "rbd_mapping.h"
#include "common/debug.h"
#include "common/dout.h"
#include "common/errno.h"
#include "global/global_init.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
#define dout_prefix *_dout << "rbd-wnbd: "
#define DISK_STATUS_POLLING_INTERVAL_MS 500
int RbdMapping::init()
{
librbd::image_info_t info;
rados = client_cache.get_client(cfg.entity_name, cfg.cluster_name);
if (!rados) {
return -EINVAL;
}
int r = rados->ioctx_create(cfg.poolname.c_str(), io_ctx);
if (r < 0) {
derr << "rbd-wnbd: couldn't create IO context: " << cpp_strerror(r)
<< ". Pool name: " << cfg.poolname
<< dendl;
return r;
}
io_ctx.set_namespace(cfg.nsname);
r = rbd.open(io_ctx, image, cfg.imgname.c_str());
if (r < 0) {
derr << "rbd-wnbd: couldn't open rbd image: " << cpp_strerror(r)
<< dendl;
return r;
}
if (cfg.exclusive) {
r = image.lock_acquire(RBD_LOCK_MODE_EXCLUSIVE);
if (r < 0) {
derr << "rbd-wnbd: failed to acquire exclusive lock: " << cpp_strerror(r)
<< dendl;
return r;
}
}
if (!cfg.snapname.empty()) {
r = image.snap_set(cfg.snapname.c_str());
if (r < 0) {
derr << "rbd-wnbd: couldn't use snapshot: " << cpp_strerror(r)
<< dendl;
return r;
}
}
r = image.stat(info, sizeof(info));
if (r < 0)
return r;
initial_image_size = info.size;
CephContext* cct = reinterpret_cast<CephContext*>(io_ctx.cct());
ceph_assert(cct != nullptr);
handler = new WnbdHandler(image, cfg.devpath,
info.size / RBD_WNBD_BLKSIZE,
RBD_WNBD_BLKSIZE,
!cfg.snapname.empty() || cfg.readonly,
g_conf().get_val<bool>("rbd_cache"),
cfg.io_req_workers,
cfg.io_reply_workers,
cct->get_admin_socket());
return 0;
}
void RbdMapping::shutdown()
{
std::unique_lock l{shutdown_lock};
dout(5) << __func__ << ": removing RBD mapping: " << cfg.devpath << dendl;
int r = 0;
if (!cfg.persistent && saved_cfg_to_registry) {
dout(5) << __func__ << ": cleaning up non-persistent mapping: "
<< cfg.devpath << dendl;
r = remove_config_from_registry(&cfg);
if (r) {
derr << __func__ << ": could not clean up non-persistent mapping: "
<< cfg.devpath << ". Error: " << cpp_strerror(r) << dendl;
}
}
if (watch_ctx) {
r = image.update_unwatch(watch_handle);
if (r < 0) {
derr << __func__ << ": update_unwatch failed with error: "
<< cpp_strerror(r) << dendl;
}
delete watch_ctx;
watch_ctx = nullptr;
}
if (handler) {
handler->shutdown();
delete handler;
handler = nullptr;
}
image.close();
io_ctx.close();
}
int RbdMapping::start()
{
dout(10) << "initializing mapping" << dendl;
int r = init();
if (r < 0) {
return r;
}
dout(10) << "starting wnbd handler" << dendl;
r = handler->start();
if (r) {
return r == ERROR_ALREADY_EXISTS ? -EEXIST : -EINVAL;
}
dout(10) << "setting up watcher" << dendl;
watch_ctx = new WNBDWatchCtx(io_ctx, handler, image, initial_image_size);
r = image.update_watch(watch_ctx, &watch_handle);
if (r < 0) {
derr << __func__ << ": update_watch failed with error: "
<< cpp_strerror(r) << dendl;
return r;
}
// Wait for the mapped disk to become available.
r = wait_mapped_disk(cfg);
if (r < 0) {
return r;
}
// We're storing mapping details in the registry even for non-persistent
// mappings. This allows us to easily retrieve mapping details such
// as the rbd pool or admin socket path.
// We're cleaning up the registry entry when the non-persistent mapping
// gets disconnected or when the ceph service restarts.
r = save_config_to_registry(&cfg);
if (r < 0) {
return r;
} else {
saved_cfg_to_registry = true;
}
if (disconnect_cbk) {
monitor_thread = std::thread([this]{
int ret = this->wait();
// Allow "this" to be destroyed by the disconnect callback.
this->monitor_thread.detach();
dout(5) << "finished waiting for: " << this->cfg.devpath
<< ", ret: " << ret << dendl;
disconnect_cbk(this->cfg.devpath, ret);
});
}
return 0;
}
// Wait until the image gets disconnected.
int RbdMapping::wait()
{
if (handler) {
return handler->wait();
}
return 0;
}
RbdMapping::~RbdMapping()
{
dout(10) << __func__ << ": cleaning up rbd mapping: "
<< cfg.devpath << dendl;
shutdown();
}
// Wait for the mapped disk to become available.
int wait_mapped_disk(Config& cfg)
{
DWORD status = WnbdPollDiskNumber(
cfg.devpath.c_str(),
TRUE, // ExpectMapped
TRUE, // TryOpen
cfg.image_map_timeout * 1000,
DISK_STATUS_POLLING_INTERVAL_MS,
(PDWORD) &cfg.disk_number);
if (status) {
derr << "WNBD disk unavailable, error: "
<< win32_strerror(status) << dendl;
return -EINVAL;
}
dout(0) << "Successfully mapped image: " << cfg.devpath
<< ". Windows disk path: "
<< "\\\\.\\PhysicalDrive" + std::to_string(cfg.disk_number)
<< dendl;
return 0;
}
int RbdMappingDispatcher::create(Config& cfg)
{
if (cfg.devpath.empty()) {
derr << "missing device identifier" << dendl;
return -EINVAL;
}
if (get_mapping(cfg.devpath)) {
derr << "already mapped: " << cfg.devpath << dendl;
return -EEXIST;
}
auto rbd_mapping = std::make_shared<RbdMapping>(
cfg, client_cache,
std::bind(
&RbdMappingDispatcher::disconnect_cbk,
this,
std::placeholders::_1,
std::placeholders::_2));
int r = rbd_mapping.get()->start();
if (!r) {
std::unique_lock l{map_mutex};
mappings.insert(std::make_pair(cfg.devpath, rbd_mapping));
}
return r;
}
std::shared_ptr<RbdMapping> RbdMappingDispatcher::get_mapping(
std::string& devpath)
{
std::unique_lock l{map_mutex};
auto mapping_it = mappings.find(devpath);
if (mapping_it == mappings.end()) {
// not found
return std::shared_ptr<RbdMapping>();
} else {
return mapping_it->second;
}
}
void RbdMappingDispatcher::disconnect_cbk(std::string devpath, int ret)
{
dout(10) << "RbdMappingDispatcher: cleaning up stopped mapping" << dendl;
if (ret) {
derr << "rbd mapping wait error: " << ret
<< ", allowing cleanup to proceed"
<< dendl;
}
auto mapping = get_mapping(devpath);
if (mapping) {
// This step can be fairly time consuming, especially when
// cumulated. For this reason, we'll ensure that multiple mappings
// can be cleaned up simultaneously.
mapping->shutdown();
std::unique_lock l{map_mutex};
mappings.erase(devpath);
}
}

View File

@ -0,0 +1,119 @@
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2023 Cloudbase Solutions
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#pragma once
#include "rados_client_cache.h"
#include "rbd_mapping_config.h"
#include "wnbd_handler.h"
class WNBDWatchCtx : public librbd::UpdateWatchCtx
{
private:
librados::IoCtx &io_ctx;
WnbdHandler* handler;
librbd::Image &image;
uint64_t size;
public:
WNBDWatchCtx(librados::IoCtx& io_ctx, WnbdHandler* handler,
librbd::Image& image, uint64_t size)
: io_ctx(io_ctx)
, handler(handler)
, image(image)
, size(size)
{
}
~WNBDWatchCtx() override {}
void handle_notify() override
{
uint64_t new_size;
if (image.size(&new_size) == 0 && new_size != size &&
handler->resize(new_size) == 0) {
size = new_size;
}
}
};
typedef std::function<void(std::string devpath, int ret)> disconnect_cbk_t;
class RbdMapping
{
private:
Config cfg;
// We're sharing the rados object across mappings in order to
// reuse the OSD connections.
RadosClientCache& client_cache;
std::shared_ptr<librados::Rados> rados;
librbd::RBD rbd;
librados::IoCtx io_ctx;
librbd::Image image;
uint64_t initial_image_size;
WnbdHandler* handler = nullptr;
uint64_t watch_handle;
WNBDWatchCtx* watch_ctx = nullptr;
bool saved_cfg_to_registry = false;
disconnect_cbk_t disconnect_cbk;
ceph::mutex shutdown_lock = ceph::make_mutex("RbdMapping::ShutdownLock");
std::thread monitor_thread;
int init();
public:
RbdMapping(Config& _cfg,
RadosClientCache& _client_cache)
: cfg(_cfg)
, client_cache(_client_cache)
{}
RbdMapping(Config& _cfg,
RadosClientCache& _client_cache,
disconnect_cbk_t _disconnect_cbk)
: cfg(_cfg)
, client_cache(_client_cache)
, disconnect_cbk(_disconnect_cbk)
{}
~RbdMapping();
int start();
// Wait until the image gets disconnected.
int wait();
void shutdown();
};
// Wait for the mapped disk to become available.
int wait_mapped_disk(Config& cfg);
class RbdMappingDispatcher
{
private:
RadosClientCache& client_cache;
std::map<std::string, std::shared_ptr<RbdMapping>> mappings;
ceph::mutex map_mutex = ceph::make_mutex("RbdMappingDispatcher::MapMutex");
void disconnect_cbk(std::string devpath, int ret);
public:
RbdMappingDispatcher(RadosClientCache& _client_cache)
: client_cache(_client_cache)
{}
int create(Config& cfg);
std::shared_ptr<RbdMapping> get_mapping(std::string& devpath);
};

View File

@ -0,0 +1,119 @@
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2020 SUSE LINUX GmbH
* Copyright (C) 2023 Cloudbase Solutions
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include "rbd_mapping_config.h"
#include "common/debug.h"
#include "common/dout.h"
#include "common/win32/registry.h"
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
#define dout_prefix *_dout << "rbd-wnbd: "
int construct_devpath_if_missing(Config* cfg)
{
// Windows doesn't allow us to request specific disk paths when mapping an
// image. This will just be used by rbd-wnbd and wnbd as an identifier.
if (cfg->devpath.empty()) {
if (cfg->imgname.empty()) {
derr << "Missing image name." << dendl;
return -EINVAL;
}
if (!cfg->poolname.empty()) {
cfg->devpath += cfg->poolname;
cfg->devpath += '/';
}
if (!cfg->nsname.empty()) {
cfg->devpath += cfg->nsname;
cfg->devpath += '/';
}
cfg->devpath += cfg->imgname;
if (!cfg->snapname.empty()) {
cfg->devpath += '@';
cfg->devpath += cfg->snapname;
}
}
return 0;
}
int save_config_to_registry(Config* cfg)
{
std::string strKey{ SERVICE_REG_KEY };
strKey.append("\\");
strKey.append(cfg->devpath);
auto reg_key = RegistryKey(
g_ceph_context, HKEY_LOCAL_MACHINE, strKey.c_str(), true);
if (!reg_key.hKey) {
return -EINVAL;
}
int ret_val = 0;
// Registry writes are immediately available to other processes.
// Still, we'll do a flush to ensure that the mapping can be
// recreated after a system crash.
if (reg_key.set("pid", getpid()) ||
reg_key.set("devpath", cfg->devpath) ||
reg_key.set("poolname", cfg->poolname) ||
reg_key.set("nsname", cfg->nsname) ||
reg_key.set("imgname", cfg->imgname) ||
reg_key.set("snapname", cfg->snapname) ||
reg_key.set("command_line", cfg->command_line) ||
reg_key.set("persistent", cfg->persistent) ||
reg_key.set("admin_sock_path", g_conf()->admin_socket) ||
reg_key.flush()) {
ret_val = -EINVAL;
}
return ret_val;
}
int remove_config_from_registry(Config* cfg)
{
std::string strKey{ SERVICE_REG_KEY };
strKey.append("\\");
strKey.append(cfg->devpath);
return RegistryKey::remove(
g_ceph_context, HKEY_LOCAL_MACHINE, strKey.c_str());
}
int load_mapping_config_from_registry(std::string devpath, Config* cfg)
{
std::string strKey{ SERVICE_REG_KEY };
strKey.append("\\");
strKey.append(devpath);
auto reg_key = RegistryKey(
g_ceph_context, HKEY_LOCAL_MACHINE, strKey.c_str(), false);
if (!reg_key.hKey) {
if (reg_key.missingKey)
return -ENOENT;
else
return -EINVAL;
}
reg_key.get("devpath", cfg->devpath);
reg_key.get("poolname", cfg->poolname);
reg_key.get("nsname", cfg->nsname);
reg_key.get("imgname", cfg->imgname);
reg_key.get("snapname", cfg->snapname);
reg_key.get("command_line", cfg->command_line);
reg_key.get("persistent", cfg->persistent);
reg_key.get("admin_sock_path", cfg->admin_sock_path);
return 0;
}

View File

@ -0,0 +1,78 @@
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2020 SUSE LINUX GmbH
* Copyright (C) 2023 Cloudbase Solutions
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#pragma once
#include <string>
#include <wnbd.h>
#define SERVICE_REG_KEY "SYSTEM\\CurrentControlSet\\Services\\rbd-wnbd"
#define DEFAULT_SERVICE_START_TIMEOUT 120
#define DEFAULT_IMAGE_MAP_TIMEOUT 20
#define DEFAULT_SERVICE_THREAD_COUNT 8
#define DEFAULT_SOFT_REMOVE_TIMEOUT 15
#define DEFAULT_IO_WORKER_COUNT 4
#define RBD_WNBD_BLKSIZE 512UL
struct Config {
bool exclusive = false;
bool readonly = false;
std::string cluster_name;
std::string entity_name;
std::string poolname;
std::string nsname;
std::string imgname;
std::string snapname;
std::string devpath;
std::string format;
bool pretty_format = false;
bool hard_disconnect = false;
int soft_disconnect_timeout = DEFAULT_SOFT_REMOVE_TIMEOUT;
bool hard_disconnect_fallback = true;
int service_start_timeout = DEFAULT_SERVICE_START_TIMEOUT;
int image_map_timeout = DEFAULT_IMAGE_MAP_TIMEOUT;
bool remap_failure_fatal = false;
bool adapter_monitoring_enabled = false;
// TODO: consider moving those fields to a separate structure. Those
// provide connection information without actually being configurable.
// The disk number is provided by Windows.
int disk_number = -1;
int pid = 0;
std::string serial_number;
bool active = false;
bool wnbd_mapped = false;
std::string command_line;
std::string admin_sock_path;
WnbdLogLevel wnbd_log_level = WnbdLogLevelInfo;
int io_req_workers = DEFAULT_IO_WORKER_COUNT;
int io_reply_workers = DEFAULT_IO_WORKER_COUNT;
int service_thread_count = DEFAULT_SERVICE_THREAD_COUNT;
// register the mapping, recreating it when the Ceph service starts.
bool persistent = true;
};
int construct_devpath_if_missing(Config* cfg);
int save_config_to_registry(Config* cfg);
int remove_config_from_registry(Config* cfg);
int load_mapping_config_from_registry(std::string devpath, Config* cfg);

View File

@ -29,6 +29,7 @@
#include "wnbd_handler.h"
#include "wnbd_wmi.h"
#include "rbd_wnbd.h"
#include "rados_client_cache.h"
#include <fstream>
#include <memory>
@ -72,9 +73,12 @@ using namespace std;
// Wait for wmi events up to two seconds
#define WMI_EVENT_TIMEOUT 2
static WnbdHandler* handler = nullptr;
static ceph::mutex shutdown_lock = ceph::make_mutex("RbdWnbd::ShutdownLock");
static RadosClientCache client_cache;
static RbdMappingDispatcher mapping_dispatcher(client_cache);
static RbdMapping* daemon_mapping = nullptr;
bool is_process_running(DWORD pid)
{
HANDLE process = OpenProcess(SYNCHRONIZE, FALSE, pid);
@ -307,7 +311,7 @@ int send_map_request(std::string arguments) {
&reply,
sizeof(reply),
&bytes_read,
DEFAULT_MAP_TIMEOUT_MS);
DEFAULT_IMAGE_MAP_TIMEOUT * 1000);
if (!success) {
DWORD err = GetLastError();
derr << "Could not send device map request. "
@ -326,192 +330,55 @@ int send_map_request(std::string arguments) {
return reply.status;
}
// Spawn a subprocess using the specified "rbd-wnbd" command
// arguments. A pipe is passed to the child process,
// which will allow it to communicate the mapping status
int map_device_using_suprocess(std::string arguments, int timeout_ms)
int map_device_using_same_process(std::string command_line)
{
STARTUPINFOW si;
PROCESS_INFORMATION pi;
char ch;
DWORD err = 0, status = 0;
int exit_code = 0;
std::ostringstream command_line;
std::string exe_path;
// Windows async IO context
OVERLAPPED connect_o, read_o;
HANDLE connect_event = NULL, read_event = NULL;
// Used for waiting on multiple events that are going to be initialized later.
HANDLE wait_events[2] = { INVALID_HANDLE_VALUE, INVALID_HANDLE_VALUE};
DWORD bytes_read = 0;
// We may get a command line containing an old pipe handle when
// recreating mappings, so we'll have to replace it.
std::regex pipe_pattern("([\'\"]?--pipe-name[\'\"]? +[\'\"]?[^ ]+[\'\"]?)");
dout(5) << "Creating mapping using the same process. Command line: "
<< command_line << dendl;
uuid_d uuid;
uuid.generate_random();
std::ostringstream pipe_name;
pipe_name << "\\\\.\\pipe\\rbd-wnbd-" << uuid;
// Create an unique named pipe to communicate with the child. */
HANDLE pipe_handle = CreateNamedPipe(
pipe_name.str().c_str(),
PIPE_ACCESS_INBOUND | FILE_FLAG_FIRST_PIPE_INSTANCE |
FILE_FLAG_OVERLAPPED,
PIPE_WAIT,
1, // Only accept one instance
SERVICE_PIPE_BUFFSZ,
SERVICE_PIPE_BUFFSZ,
SERVICE_PIPE_TIMEOUT_MS,
NULL);
if (pipe_handle == INVALID_HANDLE_VALUE) {
err = GetLastError();
derr << "CreateNamedPipe failed: " << win32_strerror(err) << dendl;
exit_code = -ECHILD;
goto finally;
}
connect_event = CreateEvent(0, TRUE, FALSE, NULL);
read_event = CreateEvent(0, TRUE, FALSE, NULL);
if (!connect_event || !read_event) {
err = GetLastError();
derr << "CreateEvent failed: " << win32_strerror(err) << dendl;
exit_code = -ECHILD;
goto finally;
}
connect_o.hEvent = connect_event;
read_o.hEvent = read_event;
status = ConnectNamedPipe(pipe_handle, &connect_o);
err = GetLastError();
if (status || err != ERROR_IO_PENDING) {
if (status)
err = status;
derr << "ConnectNamedPipe failed: " << win32_strerror(err) << dendl;
exit_code = -ECHILD;
goto finally;
}
err = 0;
dout(5) << __func__ << ": command arguments: " << arguments << dendl;
// We'll avoid running arbitrary commands, instead using the executable
// path of this process (expected to be the full rbd-wnbd.exe path).
err = get_exe_path(exe_path);
if (err) {
exit_code = -EINVAL;
goto finally;
}
command_line << std::quoted(exe_path)
<< " " << std::regex_replace(arguments, pipe_pattern, "")
<< " --pipe-name " << pipe_name.str();
dout(5) << __func__ << ": command line: " << command_line.str() << dendl;
GetStartupInfoW(&si);
// Create a detached child
if (!CreateProcessW(
NULL, const_cast<wchar_t*>(to_wstring(command_line.str()).c_str()),
NULL, NULL, FALSE, DETACHED_PROCESS,
NULL, NULL, &si, &pi)) {
err = GetLastError();
derr << "CreateProcess failed: " << win32_strerror(err) << dendl;
exit_code = -ECHILD;
goto finally;
int argc;
// CommandLineToArgvW only has an UTF-16 variant.
LPWSTR* argv_w = CommandLineToArgvW(
to_wstring(command_line).c_str(), &argc);
if (!argv_w) {
DWORD err = GetLastError();
derr << "Couldn't parse args, error: "
<< win32_strerror(err) << dendl;
return -EINVAL;
}
wait_events[0] = connect_event;
wait_events[1] = pi.hProcess;
status = WaitForMultipleObjects(2, wait_events, FALSE, timeout_ms);
switch(status) {
case WAIT_OBJECT_0:
if (!GetOverlappedResult(pipe_handle, &connect_o, &bytes_read, TRUE)) {
err = GetLastError();
derr << "Couldn't establish a connection with the child process. "
<< "Error: " << win32_strerror(err) << dendl;
exit_code = -ECHILD;
goto clean_process;
}
// We have an incoming connection.
break;
case WAIT_OBJECT_0 + 1:
// The process has exited prematurely.
goto clean_process;
case WAIT_TIMEOUT:
derr << "Timed out waiting for child process connection." << dendl;
goto clean_process;
default:
derr << "Failed waiting for child process. Status: " << status << dendl;
goto clean_process;
std::vector<const char*> args;
std::vector<std::string> argv_sv;
// We're reserving the vector size in order to avoid resizes,
// which would invalidate our char* pointers.
argv_sv.reserve(argc);
args.reserve(argc);
for (int i = 0; i < argc; i++) {
argv_sv.push_back(to_string(argv_w[i]));
args.push_back(argv_sv[i].c_str());
}
// Block and wait for child to say it is ready.
dout(5) << __func__ << ": waiting for child notification." << dendl;
if (!ReadFile(pipe_handle, &ch, 1, NULL, &read_o)) {
err = GetLastError();
if (err != ERROR_IO_PENDING) {
derr << "Receiving child process reply failed with: "
<< win32_strerror(err) << dendl;
exit_code = -ECHILD;
goto clean_process;
}
LocalFree(argv_w);
Config cfg;
cfg.command_line = command_line;
Command parsed_cmd = None;
std::ostringstream err_msg;
int r = parse_args(args, &err_msg, &parsed_cmd, &cfg);
if (r) {
derr << "Couldn't parse args, error: " << r
<< ". Error message: " << err_msg.str() << dendl;
return -EINVAL;
}
wait_events[0] = read_event;
wait_events[1] = pi.hProcess;
// The RBD daemon is expected to write back right after opening the
// pipe. We'll use the same timeout value for now.
status = WaitForMultipleObjects(2, wait_events, FALSE, timeout_ms);
switch(status) {
case WAIT_OBJECT_0:
if (!GetOverlappedResult(pipe_handle, &read_o, &bytes_read, TRUE)) {
err = GetLastError();
derr << "Receiving child process reply failed with: "
<< win32_strerror(err) << dendl;
exit_code = -ECHILD;
goto clean_process;
}
break;
case WAIT_OBJECT_0 + 1:
// The process has exited prematurely.
goto clean_process;
case WAIT_TIMEOUT:
derr << "Timed out waiting for child process message." << dendl;
goto clean_process;
default:
derr << "Failed waiting for child process. Status: " << status << dendl;
goto clean_process;
if (parsed_cmd != Connect) {
derr << "Unexpected map command: " << parsed_cmd
<< ", expecting: " << Connect << dendl;
return -EINVAL;
}
dout(5) << __func__ << ": received child notification." << dendl;
goto finally;
if (construct_devpath_if_missing(&cfg)) {
return -EINVAL;
}
clean_process:
if (!is_process_running(pi.dwProcessId)) {
GetExitCodeProcess(pi.hProcess, (PDWORD)&exit_code);
if (!exit_code) {
// Child terminated unexpectedly.
exit_code = -ECHILD;
} else if (exit_code > 0) {
// Make sure to return a negative error code.
exit_code = -exit_code;
}
derr << "Daemon failed with: " << cpp_strerror(exit_code) << dendl;
} else {
// The process closed the pipe without notifying us or exiting.
// This is quite unlikely, but we'll terminate the process.
dout(0) << "Terminating unresponsive process." << dendl;
TerminateProcess(pi.hProcess, 1);
exit_code = -EINVAL;
}
finally:
if (exit_code)
derr << "Could not start RBD daemon." << dendl;
if (pipe_handle)
CloseHandle(pipe_handle);
if (connect_event)
CloseHandle(connect_event);
if (read_event)
CloseHandle(read_event);
return exit_code;
return mapping_dispatcher.create(cfg);
}
BOOL WINAPI console_handler_routine(DWORD dwCtrlType)
@ -520,78 +387,13 @@ BOOL WINAPI console_handler_routine(DWORD dwCtrlType)
<< ". Exiting." << dendl;
std::unique_lock l{shutdown_lock};
if (handler)
handler->shutdown();
if (daemon_mapping) {
daemon_mapping->shutdown();
}
return true;
}
int save_config_to_registry(Config* cfg)
{
std::string strKey{ SERVICE_REG_KEY };
strKey.append("\\");
strKey.append(cfg->devpath);
auto reg_key = RegistryKey(
g_ceph_context, HKEY_LOCAL_MACHINE, strKey.c_str(), true);
if (!reg_key.hKey) {
return -EINVAL;
}
int ret_val = 0;
// Registry writes are immediately available to other processes.
// Still, we'll do a flush to ensure that the mapping can be
// recreated after a system crash.
if (reg_key.set("pid", getpid()) ||
reg_key.set("devpath", cfg->devpath) ||
reg_key.set("poolname", cfg->poolname) ||
reg_key.set("nsname", cfg->nsname) ||
reg_key.set("imgname", cfg->imgname) ||
reg_key.set("snapname", cfg->snapname) ||
reg_key.set("command_line", get_cli_args()) ||
reg_key.set("persistent", cfg->persistent) ||
reg_key.set("admin_sock_path", g_conf()->admin_socket) ||
reg_key.flush()) {
ret_val = -EINVAL;
}
return ret_val;
}
int remove_config_from_registry(Config* cfg)
{
std::string strKey{ SERVICE_REG_KEY };
strKey.append("\\");
strKey.append(cfg->devpath);
return RegistryKey::remove(
g_ceph_context, HKEY_LOCAL_MACHINE, strKey.c_str());
}
int load_mapping_config_from_registry(string devpath, Config* cfg)
{
std::string strKey{ SERVICE_REG_KEY };
strKey.append("\\");
strKey.append(devpath);
auto reg_key = RegistryKey(
g_ceph_context, HKEY_LOCAL_MACHINE, strKey.c_str(), false);
if (!reg_key.hKey) {
if (reg_key.missingKey)
return -ENOENT;
else
return -EINVAL;
}
reg_key.get("devpath", cfg->devpath);
reg_key.get("poolname", cfg->poolname);
reg_key.get("nsname", cfg->nsname);
reg_key.get("imgname", cfg->imgname);
reg_key.get("snapname", cfg->snapname);
reg_key.get("command_line", cfg->command_line);
reg_key.get("persistent", cfg->persistent);
reg_key.get("admin_sock_path", cfg->admin_sock_path);
return 0;
}
int restart_registered_mappings(
int worker_count,
int total_timeout,
@ -659,7 +461,7 @@ int restart_registered_mappings(
// We'll try to map all devices and return a non-zero value
// if any of them fails.
int r = map_device_using_suprocess(cfg.command_line, time_left_ms);
int r = map_device_using_same_process(cfg.command_line);
if (r) {
err = r;
derr << "Could not create mapping: "
@ -792,8 +594,8 @@ class RBDService : public ServiceBase {
<< (char*)request->arguments << dendl;
// TODO: use the configured service map timeout.
// TODO: add ceph.conf options.
return map_device_using_suprocess(
(char*)request->arguments, DEFAULT_MAP_TIMEOUT_MS);
return map_device_using_same_process(
std::string((char*) request->arguments));
default:
dout(1) << "Received unsupported command: "
<< request->command << dendl;
@ -1006,6 +808,8 @@ exit:
} else {
dout(0) << "Ignoring image remap failure." << dendl;
}
} else {
dout(0) << "successfully restarted mappings" << dendl;
}
if (adapter_monitoring_enabled) {
@ -1042,35 +846,6 @@ exit:
}
};
class WNBDWatchCtx : public librbd::UpdateWatchCtx
{
private:
librados::IoCtx &io_ctx;
WnbdHandler* handler;
librbd::Image &image;
uint64_t size;
public:
WNBDWatchCtx(librados::IoCtx& io_ctx, WnbdHandler* handler,
librbd::Image& image, uint64_t size)
: io_ctx(io_ctx)
, handler(handler)
, image(image)
, size(size)
{ }
~WNBDWatchCtx() override {}
void handle_notify() override
{
uint64_t new_size;
if (image.size(&new_size) == 0 && new_size != size &&
handler->resize(new_size) == 0) {
size = new_size;
}
}
};
static void usage()
{
const char* usage_str =R"(
@ -1138,36 +913,6 @@ Common options:
static Command cmd = None;
int construct_devpath_if_missing(Config* cfg)
{
// Windows doesn't allow us to request specific disk paths when mapping an
// image. This will just be used by rbd-wnbd and wnbd as an identifier.
if (cfg->devpath.empty()) {
if (cfg->imgname.empty()) {
derr << "Missing image name." << dendl;
return -EINVAL;
}
if (!cfg->poolname.empty()) {
cfg->devpath += cfg->poolname;
cfg->devpath += '/';
}
if (!cfg->nsname.empty()) {
cfg->devpath += cfg->nsname;
cfg->devpath += '/';
}
cfg->devpath += cfg->imgname;
if (!cfg->snapname.empty()) {
cfg->devpath += '@';
cfg->devpath += cfg->snapname;
}
}
return 0;
}
boost::intrusive_ptr<CephContext> do_global_init(
int argc, const char *argv[], Config *cfg)
{
@ -1196,7 +941,7 @@ boost::intrusive_ptr<CephContext> do_global_init(
global_pre_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT, code_env, flags);
// Avoid cluttering the console when spawning a mapping that will run
// in the background.
if (g_conf()->daemonize && cfg->parent_pipe.empty()) {
if (g_conf()->daemonize) {
flags |= CINIT_FLAG_NO_DAEMON_ACTIONS;
}
auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
@ -1209,208 +954,20 @@ boost::intrusive_ptr<CephContext> do_global_init(
return cct;
}
// Wait for the mapped disk to become available.
static int wait_mapped_disk(Config *cfg)
{
DWORD status = WnbdPollDiskNumber(
cfg->devpath.c_str(),
TRUE, // ExpectMapped
TRUE, // TryOpen
cfg->image_map_timeout,
DISK_STATUS_POLLING_INTERVAL_MS,
(PDWORD) &cfg->disk_number);
if (status) {
derr << "WNBD disk unavailable, error: "
<< win32_strerror(status) << dendl;
return -EINVAL;
}
dout(0) << "Successfully mapped image: " << cfg->devpath
<< ". Windows disk path: "
<< "\\\\.\\PhysicalDrive" + std::to_string(cfg->disk_number)
<< dendl;
return 0;
}
static int do_map(Config *cfg)
{
int r;
librados::Rados rados;
librbd::RBD rbd;
librados::IoCtx io_ctx;
librbd::Image image;
librbd::image_info_t info;
HANDLE parent_pipe_handle = INVALID_HANDLE_VALUE;
int err = 0;
if (g_conf()->daemonize && cfg->parent_pipe.empty()) {
r = send_map_request(get_cli_args());
if (r < 0) {
return r;
}
return wait_mapped_disk(cfg);
}
dout(0) << "Mapping RBD image: " << cfg->devpath << dendl;
r = rados.init_with_context(g_ceph_context);
if (r < 0) {
derr << "rbd-wnbd: couldn't initialize rados: " << cpp_strerror(r)
<< dendl;
goto close_ret;
}
r = rados.connect();
if (r < 0) {
derr << "rbd-wnbd: couldn't connect to rados: " << cpp_strerror(r)
<< dendl;
goto close_ret;
}
r = rados.ioctx_create(cfg->poolname.c_str(), io_ctx);
if (r < 0) {
derr << "rbd-wnbd: couldn't create IO context: " << cpp_strerror(r)
<< dendl;
goto close_ret;
}
io_ctx.set_namespace(cfg->nsname);
r = rbd.open(io_ctx, image, cfg->imgname.c_str());
if (r < 0) {
derr << "rbd-wnbd: couldn't open rbd image: " << cpp_strerror(r)
<< dendl;
goto close_ret;
}
if (cfg->exclusive) {
r = image.lock_acquire(RBD_LOCK_MODE_EXCLUSIVE);
if (r < 0) {
derr << "rbd-wnbd: failed to acquire exclusive lock: " << cpp_strerror(r)
<< dendl;
goto close_ret;
}
}
if (!cfg->snapname.empty()) {
r = image.snap_set(cfg->snapname.c_str());
if (r < 0) {
derr << "rbd-wnbd: couldn't use snapshot: " << cpp_strerror(r)
<< dendl;
goto close_ret;
}
}
r = image.stat(info, sizeof(info));
if (r < 0)
goto close_ret;
if (info.size > _UI64_MAX) {
r = -EFBIG;
derr << "rbd-wnbd: image is too large (" << byte_u_t(info.size)
<< ", max is " << byte_u_t(_UI64_MAX) << ")" << dendl;
goto close_ret;
}
// We're storing mapping details in the registry even for non-persistent
// mappings. This allows us to easily retrieve mapping details such
// as the rbd pool or admin socket path.
// We're cleaning up the registry entry when the non-persistent mapping
// gets disconnected or when the ceph service restarts.
r = save_config_to_registry(cfg);
if (r < 0)
goto close_ret;
handler = new WnbdHandler(image, cfg->devpath,
info.size / RBD_WNBD_BLKSIZE,
RBD_WNBD_BLKSIZE,
!cfg->snapname.empty() || cfg->readonly,
g_conf().get_val<bool>("rbd_cache"),
cfg->io_req_workers,
cfg->io_reply_workers);
r = handler->start();
RbdMapping rbd_mapping(*cfg, client_cache);
int r = rbd_mapping.start();
if (r) {
r = r == ERROR_ALREADY_EXISTS ? -EEXIST : -EINVAL;
goto close_ret;
return r;
}
// TODO: consider substracting the time it took to perform the
// above operations from cfg->image_map_timeout in wait_mapped_disk().
r = wait_mapped_disk(cfg);
if (r < 0) {
goto close_ret;
}
daemon_mapping = &rbd_mapping;
// We're informing the parent processes that the initialization
// was successful.
if (!cfg->parent_pipe.empty()) {
parent_pipe_handle = CreateFile(
cfg->parent_pipe.c_str(), GENERIC_WRITE, 0, NULL,
OPEN_EXISTING, 0, NULL);
if (parent_pipe_handle == INVALID_HANDLE_VALUE) {
derr << "Could not open parent pipe: " << win32_strerror(err) << dendl;
} else if (!WriteFile(parent_pipe_handle, "a", 1, NULL, NULL)) {
// TODO: consider exiting in this case. The parent didn't wait for us,
// maybe it was killed after a timeout.
err = GetLastError();
derr << "Failed to communicate with the parent: "
<< win32_strerror(err) << dendl;
} else {
dout(5) << __func__ << ": submitted parent notification." << dendl;
}
if (parent_pipe_handle != INVALID_HANDLE_VALUE)
CloseHandle(parent_pipe_handle);
global_init_postfork_finish(g_ceph_context);
}
{
uint64_t watch_handle;
WNBDWatchCtx watch_ctx(io_ctx, handler, image, info.size);
r = image.update_watch(&watch_ctx, &watch_handle);
if (r < 0) {
derr << __func__ << ": update_watch failed with error: "
<< cpp_strerror(r) << dendl;
handler->shutdown();
goto close_ret;
}
handler->wait();
r = image.update_unwatch(watch_handle);
if (r < 0)
derr << __func__ << ": update_unwatch failed with error: "
<< cpp_strerror(r) << dendl;
handler->shutdown();
}
close_ret:
// The registry record shouldn't be removed for (already) running mappings.
if (!cfg->persistent) {
dout(5) << __func__ << ": cleaning up non-persistent mapping: "
<< cfg->devpath << dendl;
r = remove_config_from_registry(cfg);
if (r) {
derr << __func__ << ": could not clean up non-persistent mapping: "
<< cfg->devpath << dendl;
}
}
std::unique_lock l{shutdown_lock};
image.close();
io_ctx.close();
rados.shutdown();
if (handler) {
delete handler;
handler = nullptr;
}
return r;
dout(0) << "Successfully mapped RBD image: " << cfg->devpath << dendl;
return rbd_mapping.wait();
}
static int do_unmap(Config *cfg, bool unregister)
@ -1602,8 +1159,8 @@ static int do_stats(std::string search_devpath)
AdminSocketClient client = AdminSocketClient(cfg.admin_sock_path);
std::string output;
std::string result = client.do_request("{\"prefix\":\"wnbd stats\"}",
&output);
std::string cmd = "{\"prefix\":\"wnbd stats " + cfg.devpath + "\"}";
std::string result = client.do_request(cmd, &output);
if (!result.empty()) {
std::cerr << "Admin socket error: " << result << std::endl;
return -EINVAL;
@ -1641,10 +1198,17 @@ static int parse_args(std::vector<const char*>& args,
}
config.parse_env(CEPH_ENTITY_TYPE_CLIENT);
config.parse_argv(args);
cfg->cluster_name = string(config->cluster);
cfg->entity_name = config->name.to_str();
cfg->poolname = config.get_val<std::string>("rbd_default_pool");
std::vector<const char*>::iterator i;
std::ostringstream err;
// The parent pipe parameter has been deprecated since we're no longer
// using separate processes per mapping (unless "-f" is passed).
// TODO: remove this parameter eventually.
std::string parent_pipe;
// TODO: consider using boost::program_options like Device.cc does.
// This should simplify argument parsing. Also, some arguments must be tied
@ -1670,12 +1234,14 @@ static int parse_args(std::vector<const char*>& args,
cfg->remap_failure_fatal = true;
} else if (ceph_argparse_flag(args, i, "--adapter-monitoring-enabled", (char *)NULL)) {
cfg->adapter_monitoring_enabled = true;
} else if (ceph_argparse_witharg(args, i, &cfg->parent_pipe, err,
} else if (ceph_argparse_witharg(args, i, &parent_pipe, err,
"--pipe-name", (char *)NULL)) {
if (!err.str().empty()) {
*err_msg << "rbd-wnbd: " << err.str();
return -EINVAL;
}
std::cerr << "WARNING: '--pipe-name' has been deprecated and is currently ignored."
<< std::endl;
} else if (ceph_argparse_witharg(args, i, (int*)&cfg->wnbd_log_level,
err, "--wnbd-log-level", (char *)NULL)) {
if (!err.str().empty()) {
@ -1817,6 +1383,7 @@ static int parse_args(std::vector<const char*>& args,
static int rbd_wnbd(int argc, const char *argv[])
{
Config cfg;
cfg.command_line = get_cli_args();
auto args = argv_to_vec(argc, argv);
// Avoid using dout before calling "do_global_init"
@ -1848,6 +1415,14 @@ static int rbd_wnbd(int argc, const char *argv[])
if (construct_devpath_if_missing(&cfg)) {
return -EINVAL;
}
if (g_conf()->daemonize) {
r = send_map_request(cfg.command_line);
if (r < 0) {
return r;
}
return wait_mapped_disk(cfg);
}
r = do_map(&cfg);
if (r < 0)
return r;

View File

@ -20,73 +20,19 @@
#include "include/compat.h"
#include "common/win32/registry.h"
#include "wnbd_handler.h"
#include "rbd_mapping_config.h"
#include "rbd_mapping.h"
#define SERVICE_REG_KEY "SYSTEM\\CurrentControlSet\\Services\\rbd-wnbd"
#define SERVICE_PIPE_NAME "\\\\.\\pipe\\rbd-wnbd"
#define SERVICE_PIPE_TIMEOUT_MS 5000
#define SERVICE_PIPE_BUFFSZ 4096
#define DEFAULT_MAP_TIMEOUT_MS 30000
#define RBD_WNBD_BLKSIZE 512UL
#define DEFAULT_SERVICE_START_TIMEOUT 120
#define DEFAULT_IMAGE_MAP_TIMEOUT 20
#define DISK_STATUS_POLLING_INTERVAL_MS 500
#define HELP_INFO 1
#define VERSION_INFO 2
#define WNBD_STATUS_ACTIVE "active"
#define WNBD_STATUS_INACTIVE "inactive"
#define DEFAULT_SERVICE_THREAD_COUNT 8
struct Config {
bool exclusive = false;
bool readonly = false;
std::string parent_pipe;
std::string poolname;
std::string nsname;
std::string imgname;
std::string snapname;
std::string devpath;
std::string format;
bool pretty_format = false;
bool hard_disconnect = false;
int soft_disconnect_timeout = DEFAULT_SOFT_REMOVE_TIMEOUT;
bool hard_disconnect_fallback = true;
int service_start_timeout = DEFAULT_SERVICE_START_TIMEOUT;
int image_map_timeout = DEFAULT_IMAGE_MAP_TIMEOUT;
bool remap_failure_fatal = false;
bool adapter_monitoring_enabled = false;
// TODO: consider moving those fields to a separate structure. Those
// provide connection information without actually being configurable.
// The disk number is provided by Windows.
int disk_number = -1;
int pid = 0;
std::string serial_number;
bool active = false;
bool wnbd_mapped = false;
std::string command_line;
std::string admin_sock_path;
WnbdLogLevel wnbd_log_level = WnbdLogLevelInfo;
int io_req_workers = DEFAULT_IO_WORKER_COUNT;
int io_reply_workers = DEFAULT_IO_WORKER_COUNT;
int service_thread_count = DEFAULT_SERVICE_THREAD_COUNT;
// register the mapping, recreating it when the Ceph service starts.
bool persistent = true;
};
enum Command {
None,
Connect,
@ -116,21 +62,16 @@ int disconnect_all_mappings(
int worker_count);
int restart_registered_mappings(
int worker_count, int total_timeout, int image_map_timeout);
int map_device_using_suprocess(std::string command_line);
int construct_devpath_if_missing(Config* cfg);
int save_config_to_registry(Config* cfg);
int remove_config_from_registry(Config* cfg);
int load_mapping_config_from_registry(std::string devpath, Config* cfg);
int map_device_using_same_process(std::string command_line);
BOOL WINAPI console_handler_routine(DWORD dwCtrlType);
static int parse_args(std::vector<const char*>& args,
std::ostream *err_msg,
Command *command, Config *cfg);
static int do_map(Config *cfg);
static int do_unmap(Config *cfg, bool unregister);
class BaseIterator {
public:
virtual ~BaseIterator() {};

View File

@ -50,18 +50,35 @@ int WnbdHandler::wait()
{
int err = 0;
if (started && wnbd_disk) {
dout(10) << __func__ << ": waiting" << dendl;
dout(10) << "waiting for WNBD mapping: " << instance_name << dendl;
err = WnbdWaitDispatcher(wnbd_disk);
if (err) {
derr << __func__ << " failed waiting for dispatcher to stop: "
<< err << dendl;
derr << __func__ << ": failed waiting for dispatcher to stop: "
<< instance_name
<< ". Error: " << err << dendl;
} else {
dout(10) << "WNBD mapping disconnected: " << instance_name << dendl;
}
}
return err;
}
WnbdAdminHook::WnbdAdminHook(WnbdHandler *handler, AdminSocket* admin_socket)
: m_handler(handler)
, m_admin_socket(admin_socket)
{
if (m_admin_socket) {
m_admin_socket->register_command(
std::string("wnbd stats ") + m_handler->instance_name,
this, "get WNBD stats");
} else {
dout(0) << "no admin socket provided, skipped registering wnbd hooks"
<< dendl;
}
}
int WnbdAdminHook::call (
std::string_view command, const cmdmap_t& cmdmap,
const bufferlist&,
@ -69,7 +86,7 @@ int WnbdAdminHook::call (
std::ostream& errss,
bufferlist& out)
{
if (command == "wnbd stats") {
if (command == "wnbd stats " + m_handler->instance_name) {
return m_handler->dump_stats(f);
}
return -ENOSYS;

View File

@ -27,8 +27,6 @@
// TODO: make this configurable.
#define RBD_WNBD_MAX_TRANSFER 2 * 1024 * 1024
#define SOFT_REMOVE_RETRY_INTERVAL 2
#define DEFAULT_SOFT_REMOVE_TIMEOUT 15
#define DEFAULT_IO_WORKER_COUNT 4
// Not defined by mingw.
#ifndef SCSI_ADSENSE_UNRECOVERED_ERROR
@ -44,15 +42,14 @@ class WnbdHandler;
class WnbdAdminHook : public AdminSocketHook {
WnbdHandler *m_handler;
AdminSocket *m_admin_socket;
public:
explicit WnbdAdminHook(WnbdHandler *handler) :
m_handler(handler) {
g_ceph_context->get_admin_socket()->register_command(
"wnbd stats", this, "get WNBD stats");
}
explicit WnbdAdminHook(WnbdHandler *handler, AdminSocket* admin_socket);
~WnbdAdminHook() override {
g_ceph_context->get_admin_socket()->unregister_commands(this);
if (m_admin_socket) {
m_admin_socket->unregister_commands(this);
}
}
int call(std::string_view command, const cmdmap_t& cmdmap,
@ -80,7 +77,8 @@ public:
uint64_t _block_count, uint32_t _block_size,
bool _readonly, bool _rbd_cache_enabled,
uint32_t _io_req_workers,
uint32_t _io_reply_workers)
uint32_t _io_reply_workers,
AdminSocket* admin_socket)
: image(_image)
, instance_name(_instance_name)
, block_count(_block_count)
@ -90,7 +88,7 @@ public:
, io_req_workers(_io_req_workers)
, io_reply_workers(_io_reply_workers)
{
admin_hook = new WnbdAdminHook(this);
admin_hook = new WnbdAdminHook(this, admin_socket);
// Instead of relying on librbd's own thread pool, we're going to use a
// separate one. This allows us to make assumptions on the threads that
// are going to send the IO replies and thus be able to cache Windows
@ -142,6 +140,7 @@ private:
void set_sense(uint8_t sense_key, uint8_t asc);
};
friend WnbdAdminHook;
friend std::ostream &operator<<(std::ostream &os, const IOContext &ctx);
void send_io_response(IOContext *ctx);