From a3bf24d71e0d9d65bb369e567dcaf15d7e58ba04 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 5 Sep 2019 14:24:30 -0500 Subject: [PATCH] common/admin_socket: add ability to process MCommand via asok queue Signed-off-by: Sage Weil --- src/common/admin_socket.cc | 37 +++++++++++++++++++++++++++++++++++++ src/common/admin_socket.h | 9 +++++++++ 2 files changed, 46 insertions(+) diff --git a/src/common/admin_socket.cc b/src/common/admin_socket.cc index cea62e5f0e3..ef22b3c2615 100644 --- a/src/common/admin_socket.cc +++ b/src/common/admin_socket.cc @@ -22,6 +22,8 @@ #include "common/Thread.h" #include "common/version.h" +#include "messages/MCommand.h" +#include "messages/MCommandReply.h" // re-include our assert to clobber the system one; fix dout: #include "include/ceph_assert.h" @@ -246,6 +248,7 @@ void AdminSocket::entry() noexcept // read off one byte char buf; ::read(m_wakeup_rd_fd, &buf, 1); + do_tell_queue(); } if (m_shutdown) { // Parent wants us to shut down @@ -364,6 +367,25 @@ bool AdminSocket::do_accept() return rval; } +void AdminSocket::do_tell_queue() +{ + ldout(m_cct,10) << __func__ << dendl; + std::list> q; + { + std::lock_guard l(tell_lock); + q.swap(tell_queue); + } + for (auto& m : q) { + bufferlist outbl; + bool success = execute_command(m->cmd, outbl); + int r = success ? 0 : -1; // FIXME! + auto reply = new MCommandReply(r, ""); + reply->set_tid(m->get_tid()); + reply->set_data(outbl); + m->get_connection()->send_message(reply); + } +} + int AdminSocket::execute_command(const std::vector& cmdvec, ceph::bufferlist& out) { @@ -433,6 +455,13 @@ int AdminSocket::execute_command(const std::vector& cmdvec, return true; } +void AdminSocket::queue_tell_command(ref_t m) +{ + ldout(m_cct,10) << __func__ << " " << *m << dendl; + std::lock_guard l(tell_lock); + tell_queue.push_back(std::move(m)); + wakeup(); +} bool AdminSocket::validate(const std::string& command, @@ -668,3 +697,11 @@ void AdminSocket::shutdown() remove_cleanup_file(m_path); m_path.clear(); } + +void AdminSocket::wakeup() +{ + // Send a byte to the wakeup pipe that the thread is listening to + char buf[1] = { 0x0 }; + int r = safe_write(m_wakeup_wr_fd, buf, sizeof(buf)); + (void)r; +} diff --git a/src/common/admin_socket.h b/src/common/admin_socket.h index 2e10fd07472..5e7d56b3f2f 100644 --- a/src/common/admin_socket.h +++ b/src/common/admin_socket.h @@ -22,10 +22,12 @@ #include #include "include/buffer.h" +#include "common/ref.h" #include "common/cmdparse.h" class AdminSocket; class CephContext; +class MCommand; using namespace std::literals; @@ -97,9 +99,12 @@ public: int execute_command(const std::vector& cmd, ceph::bufferlist& out); + void queue_tell_command(ref_t m); + private: void shutdown(); + void wakeup(); std::string create_wakeup_pipe(int *pipe_rd, int *pipe_wr); std::string destroy_wakeup_pipe(); @@ -108,6 +113,7 @@ private: std::thread th; void entry() noexcept; bool do_accept(); + void do_tell_queue(); bool validate(const std::string& command, const cmdmap_t& cmdmap, ceph::buffer::list& out) const; @@ -126,6 +132,9 @@ private: std::unique_ptr help_hook; std::unique_ptr getdescs_hook; + std::mutex tell_lock; + std::list> tell_queue; + struct hook_info { AdminSocketHook* hook; std::string desc;