crimson/tools/store_nbd: add graceful shutdown support

we could have more sophisticate mechinary for interrupting fio job,
but so far it is able to stop itself if it idle by handling ctrl-C.

Signed-off-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Kefu Chai 2021-06-16 21:51:57 +08:00
parent 3faea72b70
commit 0689b128e1

View File

@ -35,7 +35,10 @@
#include <linux/nbd.h>
#include <linux/fs.h>
#include <seastar/apps/lib/stop_signal.hh>
#include <seastar/core/byteorder.hh>
#include <seastar/util/defer.hh>
#include <seastar/core/gate.hh>
#include <seastar/core/rwlock.hh>
#include "crimson/common/log.h"
@ -215,6 +218,8 @@ struct RequestWriter {
class NBDHandler {
BlockDriver &backend;
std::string uds_path;
std::optional<seastar::server_socket> socket;
seastar::gate gate;
public:
struct config_t {
std::string uds_path;
@ -242,6 +247,7 @@ public:
{}
seastar::future<> run();
seastar::future<> stop();
};
int main(int argc, char** argv)
@ -285,7 +291,10 @@ int main(int argc, char** argv)
}
std::vector<const char*> args(argv, argv + argc);
seastar::app_template app;
seastar::app_template::config app_cfg;
app_cfg.name = "crimson-store-nbd";
app_cfg.auto_handle_sigint_sigterm = false;
seastar::app_template app(std::move(app_cfg));
std::vector<char*> av{argv[0]};
std::transform(begin(unrecognized_options),
@ -294,39 +303,36 @@ int main(int argc, char** argv)
[](auto& s) {
return const_cast<char*>(s.c_str());
});
return app.run(av.size(), av.data(), [&] {
if (debug) {
seastar::global_logger_registry().set_all_loggers_level(
seastar::log_level::debug
);
}
return seastar::async([&] {
seastar_apps_lib::stop_signal should_stop;
crimson::common::sharded_conf()
.start(EntityName{}, string_view{"ceph"}).get();
auto stop_conf = seastar::defer([] {
crimson::common::sharded_conf().stop().get();
});
SeastarRunner sc;
sc.init(av.size(), av.data());
if (debug) {
seastar::global_logger_registry().set_all_loggers_level(
seastar::log_level::debug
);
}
sc.run([=] {
return crimson::common::sharded_conf(
).start(EntityName{}, string_view{"ceph"}
).then([=] {
auto backend = get_backend(backend_config);
return seastar::do_with(
NBDHandler(*backend, nbd_config),
std::move(backend),
[](auto &nbd, auto &backend) {
return backend->mount(
).then([&] {
logger().debug("Running nbd server...");
return nbd.run();
}).then([&] {
return backend->close();
});
});
}).then([=] {
return crimson::common::sharded_conf().stop();
NBDHandler nbd(*backend, nbd_config);
backend->mount().get();
auto close_backend = seastar::defer([&] {
backend->close().get();
});
logger().debug("Running nbd server...");
(void)nbd.run();
auto stop_nbd = seastar::defer([&] {
nbd.stop().get();
});
should_stop.wait().get();
return 0;
});
});
sc.stop();
}
class nbd_oldstyle_negotiation_t {
@ -417,38 +423,47 @@ seastar::future<> handle_commands(
seastar::future<> NBDHandler::run()
{
logger().debug("About to listen on {}", uds_path);
return seastar::do_with(
seastar::engine().listen(
socket = seastar::engine().listen(
seastar::socket_address{
seastar::unix_domain_addr{uds_path}}),
[=](auto &socket) {
return seastar::keep_doing(
[this, &socket] {
return socket.accept().then([this](auto acc) {
logger().debug("Accepted");
return seastar::do_with(
std::move(acc.connection),
[this](auto &conn) {
return seastar::do_with(
conn.input(),
RequestWriter{conn.output()},
[&, this](auto &input, auto &output) {
return send_negotiation(
backend.get_size(),
output.stream
).then([&, this] {
return handle_commands(backend, input, output);
}).finally([&] {
return input.close();
}).finally([&] {
return output.close();
}).handle_exception([](auto e) {
logger().error("NBDHandler::run saw exception {}", e);
return seastar::now();
});
});
});
});
});
});
seastar::unix_domain_addr{uds_path}});
return seastar::keep_doing([this] {
return seastar::try_with_gate(gate, [this] {
return socket->accept().then([this](auto acc) {
logger().debug("Accepted");
return seastar::do_with(
std::move(acc.connection),
[this](auto &conn) {
return seastar::do_with(
conn.input(),
RequestWriter{conn.output()},
[&, this](auto &input, auto &output) {
return send_negotiation(
backend.get_size(),
output.stream
).then([&, this] {
return handle_commands(backend, input, output);
}).finally([&] {
return input.close();
}).finally([&] {
return output.close();
}).handle_exception([](auto e) {
logger().error("NBDHandler::run saw exception {}", e);
return seastar::now();
});
});
});
});
}).handle_exception_type([](const seastar::gate_closed_exception&) {});
});
}
seastar::future<> NBDHandler::stop()
{
seastar::future<> done = gate.close();
if (socket) {
socket->abort_accept();
socket.reset();
}
return done;
}