common/admin_socket: add ability to process MCommand via asok queue

Signed-off-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2019-09-05 14:24:30 -05:00
parent 32ad6692de
commit a3bf24d71e
2 changed files with 46 additions and 0 deletions

View File

@ -22,6 +22,8 @@
#include "common/Thread.h"
#include "common/version.h"
#include "messages/MCommand.h"
#include "messages/MCommandReply.h"
// re-include our assert to clobber the system one; fix dout:
#include "include/ceph_assert.h"
@ -246,6 +248,7 @@ void AdminSocket::entry() noexcept
// read off one byte
char buf;
::read(m_wakeup_rd_fd, &buf, 1);
do_tell_queue();
}
if (m_shutdown) {
// Parent wants us to shut down
@ -364,6 +367,25 @@ bool AdminSocket::do_accept()
return rval;
}
void AdminSocket::do_tell_queue()
{
ldout(m_cct,10) << __func__ << dendl;
std::list<ref_t<MCommand>> q;
{
std::lock_guard l(tell_lock);
q.swap(tell_queue);
}
for (auto& m : q) {
bufferlist outbl;
bool success = execute_command(m->cmd, outbl);
int r = success ? 0 : -1; // FIXME!
auto reply = new MCommandReply(r, "");
reply->set_tid(m->get_tid());
reply->set_data(outbl);
m->get_connection()->send_message(reply);
}
}
int AdminSocket::execute_command(const std::vector<std::string>& cmdvec,
ceph::bufferlist& out)
{
@ -433,6 +455,13 @@ int AdminSocket::execute_command(const std::vector<std::string>& cmdvec,
return true;
}
void AdminSocket::queue_tell_command(ref_t<MCommand> m)
{
ldout(m_cct,10) << __func__ << " " << *m << dendl;
std::lock_guard l(tell_lock);
tell_queue.push_back(std::move(m));
wakeup();
}
bool AdminSocket::validate(const std::string& command,
@ -668,3 +697,11 @@ void AdminSocket::shutdown()
remove_cleanup_file(m_path);
m_path.clear();
}
void AdminSocket::wakeup()
{
// Send a byte to the wakeup pipe that the thread is listening to
char buf[1] = { 0x0 };
int r = safe_write(m_wakeup_wr_fd, buf, sizeof(buf));
(void)r;
}

View File

@ -22,10 +22,12 @@
#include <thread>
#include "include/buffer.h"
#include "common/ref.h"
#include "common/cmdparse.h"
class AdminSocket;
class CephContext;
class MCommand;
using namespace std::literals;
@ -97,9 +99,12 @@ public:
int execute_command(const std::vector<std::string>& cmd,
ceph::bufferlist& out);
void queue_tell_command(ref_t<MCommand> m);
private:
void shutdown();
void wakeup();
std::string create_wakeup_pipe(int *pipe_rd, int *pipe_wr);
std::string destroy_wakeup_pipe();
@ -108,6 +113,7 @@ private:
std::thread th;
void entry() noexcept;
bool do_accept();
void do_tell_queue();
bool validate(const std::string& command,
const cmdmap_t& cmdmap,
ceph::buffer::list& out) const;
@ -126,6 +132,9 @@ private:
std::unique_ptr<AdminSocketHook> help_hook;
std::unique_ptr<AdminSocketHook> getdescs_hook;
std::mutex tell_lock;
std::list<ref_t<MCommand>> tell_queue;
struct hook_info {
AdminSocketHook* hook;
std::string desc;