mirror of
https://github.com/ceph/ceph
synced 2025-01-02 00:52:22 +00:00
xio: initial mark_* and queueing/flow control
This changes implements explicit support for Accelio sender-side flow control, which requires queuing messages for later delivery when the connection is ready to send. This rquirement to queue messages for later delivery, and related connection state logic, is substantially shared with new session reset behavior, so we've pulled a subset of that logic foward. Again due to shared implementation logic, this change also adds implementations of mark_down(), mark_down_all(), mark_disposable(), and related methods from Messenger, which were required to be implemented after Hammer. Add XioSubmit.h. For now, start at state UP, READY. When considering if a flow-controlled connection can be unblocked, consider only the computed queue depth. Re-activate and flush the connection iff the computed queue depth <= 1/2 of the queue high-water mark. Placeholder added for byte-throttled case. Fix lock flags abuse (found by Casey). Discard deferred and unsent messages on unplanned disconnect. The change causes discard_input_queue() to be called in Accelio's on_disconnect_event() handler, as well as on mark_down(). xio: Change new established connection's state to up and ready Change the new established passive connection's state to up and ready then flush all pending msgs in input_queue Signed-off-by: Matt Benjamin <matt@cohortfs.com> Signed-off-by: Vu Pham <vu@mellanox.com> Signed-off-by: Matt Benjamin <matt@cohortfs.com>
This commit is contained in:
parent
1c2efde84d
commit
f276145fb6
@ -535,9 +535,9 @@ static simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZER;
|
||||
#if defined(HAVE_XIO)
|
||||
class buffer::xio_msg_buffer : public buffer::raw {
|
||||
private:
|
||||
XioCompletionHook* m_hook;
|
||||
XioDispatchHook* m_hook;
|
||||
public:
|
||||
xio_msg_buffer(XioCompletionHook* _m_hook, const char *d,
|
||||
xio_msg_buffer(XioDispatchHook* _m_hook, const char *d,
|
||||
unsigned l) :
|
||||
raw((char*)d, l), m_hook(_m_hook->get()) {}
|
||||
|
||||
@ -576,7 +576,7 @@ static simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZER;
|
||||
}
|
||||
|
||||
buffer::raw* buffer::create_msg(
|
||||
unsigned len, char *buf, XioCompletionHook *m_hook) {
|
||||
unsigned len, char *buf, XioDispatchHook* m_hook) {
|
||||
XioPool& pool = m_hook->get_pool();
|
||||
buffer::raw* bp =
|
||||
static_cast<buffer::raw*>(pool.alloc(sizeof(xio_msg_buffer)));
|
||||
|
@ -60,7 +60,7 @@
|
||||
|
||||
#if defined(HAVE_XIO)
|
||||
struct xio_mempool_obj;
|
||||
class XioCompletionHook;
|
||||
class XioDispatchHook;
|
||||
#endif
|
||||
|
||||
namespace ceph {
|
||||
@ -160,7 +160,7 @@ public:
|
||||
static raw* create_unshareable(unsigned len);
|
||||
|
||||
#if defined(HAVE_XIO)
|
||||
static raw* create_msg(unsigned len, char *buf, XioCompletionHook *m_hook);
|
||||
static raw* create_msg(unsigned len, char *buf, XioDispatchHook *m_hook);
|
||||
#endif
|
||||
|
||||
/*
|
||||
|
54
src/messages/MNop.h
Normal file
54
src/messages/MNop.h
Normal file
@ -0,0 +1,54 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
/*
|
||||
* Ceph - scalable distributed file system
|
||||
*
|
||||
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
|
||||
* Portions Copyright (C) 2014 CohortFS, LLC
|
||||
*
|
||||
* This is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License version 2.1, as published by the Free Software
|
||||
* Foundation. See file COPYING.
|
||||
*
|
||||
*/
|
||||
|
||||
#ifndef CEPH_MSG_NOP_H
|
||||
#define CEPH_MSG_NOP_H
|
||||
|
||||
#include "msg/Message.h"
|
||||
#include "msg/msg_types.h"
|
||||
|
||||
/*
|
||||
* A message with no (remote) effect.
|
||||
*/
|
||||
class MNop : public Message {
|
||||
public:
|
||||
static const int HEAD_VERSION = 1;
|
||||
static const int COMPAT_VERSION = 1;
|
||||
|
||||
__u32 tag; // ignored tag value
|
||||
|
||||
MNop()
|
||||
: Message(MSG_NOP, HEAD_VERSION, COMPAT_VERSION)
|
||||
{}
|
||||
|
||||
~MNop() {}
|
||||
|
||||
void encode_payload(uint64_t _features) {
|
||||
::encode(tag, payload);
|
||||
}
|
||||
|
||||
void decode_payload() {
|
||||
bufferlist::iterator p = payload.begin();
|
||||
::decode(tag, p);
|
||||
}
|
||||
|
||||
const char *get_type_name() const { return "MNop"; }
|
||||
|
||||
void print(ostream& out) const {
|
||||
out << get_type_name() << " ";
|
||||
}
|
||||
}; /* MNop */
|
||||
|
||||
#endif /* CEPH_MSG_NOP_H */
|
@ -176,6 +176,11 @@
|
||||
// Xio Testing
|
||||
#define MSG_DATA_PING 0x602
|
||||
|
||||
// Xio intends to define messages 0x603..0x606
|
||||
|
||||
// Special
|
||||
#define MSG_NOP 0x607
|
||||
|
||||
// ======================================================
|
||||
|
||||
// abstract Message class
|
||||
|
@ -89,7 +89,8 @@ XioConnection::XioConnection(XioMessenger *m, XioConnection::type _type,
|
||||
magic(m->get_magic()),
|
||||
scount(0),
|
||||
send_ctr(0),
|
||||
in_seq()
|
||||
in_seq(),
|
||||
cstate(this)
|
||||
{
|
||||
pthread_spin_init(&sp, PTHREAD_PROCESS_PRIVATE);
|
||||
if (xio_conn_type == XioConnection::ACTIVE)
|
||||
@ -185,16 +186,16 @@ int XioConnection::passive_setup()
|
||||
|
||||
#define uint_to_timeval(tv, s) ((tv).tv_sec = (s), (tv).tv_usec = 0)
|
||||
|
||||
static inline XioCompletionHook* pool_alloc_xio_completion_hook(
|
||||
static inline XioDispatchHook* pool_alloc_xio_dispatch_hook(
|
||||
XioConnection *xcon, Message *m, XioInSeq& msg_seq)
|
||||
{
|
||||
struct xio_mempool_obj mp_mem;
|
||||
int e = xpool_alloc(xio_msgr_noreg_mpool,
|
||||
sizeof(XioCompletionHook), &mp_mem);
|
||||
sizeof(XioDispatchHook), &mp_mem);
|
||||
if (!!e)
|
||||
return NULL;
|
||||
XioCompletionHook *xhook = (XioCompletionHook*) mp_mem.addr;
|
||||
new (xhook) XioCompletionHook(xcon, m, msg_seq, mp_mem);
|
||||
XioDispatchHook *xhook = (XioDispatchHook*) mp_mem.addr;
|
||||
new (xhook) XioDispatchHook(xcon, m, msg_seq, mp_mem);
|
||||
return xhook;
|
||||
}
|
||||
|
||||
@ -237,8 +238,8 @@ int XioConnection::on_msg_req(struct xio_session *session,
|
||||
}
|
||||
|
||||
XioMessenger *msgr = static_cast<XioMessenger*>(get_messenger());
|
||||
XioCompletionHook *m_hook =
|
||||
pool_alloc_xio_completion_hook(this, NULL /* msg */, in_seq);
|
||||
XioDispatchHook *m_hook =
|
||||
pool_alloc_xio_dispatch_hook(this, NULL /* msg */, in_seq);
|
||||
XioInSeq& msg_seq = m_hook->msg_seq;
|
||||
in_seq.clear();
|
||||
|
||||
@ -448,6 +449,16 @@ int XioConnection::on_ow_msg_send_complete(struct xio_session *session,
|
||||
" seq: " << xmsg->m->get_seq() << dendl;
|
||||
|
||||
--send_ctr; /* atomic, because portal thread */
|
||||
|
||||
/* unblock flow-controlled connections, avoid oscillation */
|
||||
if (unlikely(cstate.session_state.read() ==
|
||||
XioConnection::FLOW_CONTROLLED)) {
|
||||
if ((send_ctr <= uint32_t(xio_qdepth_low_mark())) &&
|
||||
(1 /* XXX memory <= memory low-water mark */)) {
|
||||
cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
|
||||
}
|
||||
}
|
||||
|
||||
xmsg->put();
|
||||
|
||||
return 0;
|
||||
@ -467,6 +478,98 @@ void XioConnection::msg_release_fail(struct xio_msg *msg, int code)
|
||||
" (" << xio_strerror(code) << ")" << dendl;
|
||||
} /* msg_release_fail */
|
||||
|
||||
int XioConnection::flush_input_queue(uint32_t flags) {
|
||||
XioMessenger* msgr = static_cast<XioMessenger*>(get_messenger());
|
||||
if (! (flags & CState::OP_FLAG_LOCKED))
|
||||
pthread_spin_lock(&sp);
|
||||
|
||||
// send deferred 1 (direct backpresssure)
|
||||
if (outgoing.requeue.size() > 0)
|
||||
portal->requeue(this, outgoing.requeue);
|
||||
|
||||
// send deferred 2 (sent while deferred)
|
||||
int ix, q_size = outgoing.mqueue.size();
|
||||
for (ix = 0; ix < q_size; ++ix) {
|
||||
Message::Queue::iterator q_iter = outgoing.mqueue.begin();
|
||||
Message* m = &(*q_iter);
|
||||
outgoing.mqueue.erase(q_iter);
|
||||
msgr->_send_message_impl(m, this);
|
||||
}
|
||||
if (! (flags & CState::OP_FLAG_LOCKED))
|
||||
pthread_spin_unlock(&sp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int XioConnection::discard_input_queue(uint32_t flags)
|
||||
{
|
||||
Message::Queue disc_q;
|
||||
XioSubmit::Queue deferred_q;
|
||||
|
||||
if (! (flags & CState::OP_FLAG_LOCKED))
|
||||
pthread_spin_lock(&sp);
|
||||
|
||||
/* the two send queues contain different objects:
|
||||
* - anything on the mqueue is a Message
|
||||
* - anything on the requeue is an XioMsg
|
||||
*/
|
||||
Message::Queue::const_iterator i1 = disc_q.end();
|
||||
disc_q.splice(i1, outgoing.mqueue);
|
||||
|
||||
XioSubmit::Queue::const_iterator i2 = deferred_q.end();
|
||||
deferred_q.splice(i2, outgoing.requeue);
|
||||
|
||||
if (! (flags & CState::OP_FLAG_LOCKED))
|
||||
pthread_spin_unlock(&sp);
|
||||
|
||||
// mqueue
|
||||
int ix, q_size = disc_q.size();
|
||||
for (ix = 0; ix < q_size; ++ix) {
|
||||
Message::Queue::iterator q_iter = disc_q.begin();
|
||||
Message* m = &(*q_iter);
|
||||
disc_q.erase(q_iter);
|
||||
m->put();
|
||||
}
|
||||
|
||||
// requeue
|
||||
q_size = deferred_q.size();
|
||||
for (ix = 0; ix < q_size; ++ix) {
|
||||
XioSubmit::Queue::iterator q_iter = deferred_q.begin();
|
||||
XioSubmit* xs = &(*q_iter);
|
||||
assert(xs->type == XioSubmit::OUTGOING_MSG);
|
||||
XioMsg* xmsg = static_cast<XioMsg*>(xs);
|
||||
deferred_q.erase(q_iter);
|
||||
// release once for each chained xio_msg
|
||||
for (ix = 0; ix < int(xmsg->hdr.msg_cnt); ++ix)
|
||||
xmsg->put();
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int XioConnection::adjust_clru(uint32_t flags)
|
||||
{
|
||||
if (flags & CState::OP_FLAG_LOCKED)
|
||||
pthread_spin_unlock(&sp);
|
||||
|
||||
XioMessenger* msgr = static_cast<XioMessenger*>(get_messenger());
|
||||
msgr->conns_sp.lock();
|
||||
pthread_spin_lock(&sp);
|
||||
|
||||
if (cstate.flags & CState::FLAG_MAPPED) {
|
||||
XioConnection::ConnList::iterator citer =
|
||||
XioConnection::ConnList::s_iterator_to(*this);
|
||||
msgr->conns_list.erase(citer);
|
||||
msgr->conns_list.push_front(*this); // LRU
|
||||
}
|
||||
|
||||
msgr->conns_sp.unlock();
|
||||
|
||||
if (! (flags & CState::OP_FLAG_LOCKED))
|
||||
pthread_spin_unlock(&sp);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int XioConnection::on_msg_error(struct xio_session *session,
|
||||
enum xio_status error,
|
||||
struct xio_msg *msg,
|
||||
@ -480,6 +583,116 @@ int XioConnection::on_msg_error(struct xio_session *session,
|
||||
return 0;
|
||||
} /* on_msg_error */
|
||||
|
||||
void XioConnection::mark_down()
|
||||
{
|
||||
_mark_down(XioConnection::CState::OP_FLAG_NONE);
|
||||
}
|
||||
|
||||
int XioConnection::_mark_down(uint32_t flags)
|
||||
{
|
||||
if (! (flags & CState::OP_FLAG_LOCKED))
|
||||
pthread_spin_lock(&sp);
|
||||
|
||||
// per interface comment, we only stage a remote reset if the
|
||||
// current policy required it
|
||||
if (cstate.policy.resetcheck)
|
||||
cstate.flags |= CState::FLAG_RESET;
|
||||
|
||||
// Accelio disconnect
|
||||
xio_disconnect(conn);
|
||||
|
||||
/* XXX this will almost certainly be called again from
|
||||
* on_disconnect_event() */
|
||||
discard_input_queue(flags|CState::OP_FLAG_LOCKED);
|
||||
|
||||
if (! (flags & CState::OP_FLAG_LOCKED))
|
||||
pthread_spin_unlock(&sp);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void XioConnection::mark_disposable()
|
||||
{
|
||||
_mark_disposable(XioConnection::CState::OP_FLAG_NONE);
|
||||
}
|
||||
|
||||
int XioConnection::_mark_disposable(uint32_t flags)
|
||||
{
|
||||
if (! (flags & CState::OP_FLAG_LOCKED))
|
||||
pthread_spin_lock(&sp);
|
||||
|
||||
cstate.policy.lossy = true;
|
||||
|
||||
if (! (flags & CState::OP_FLAG_LOCKED))
|
||||
pthread_spin_unlock(&sp);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int XioConnection::CState::state_up_ready(uint32_t flags)
|
||||
{
|
||||
if (! (flags & CState::OP_FLAG_LOCKED))
|
||||
pthread_spin_lock(&xcon->sp);
|
||||
|
||||
xcon->flush_input_queue(flags|CState::OP_FLAG_LOCKED);
|
||||
|
||||
session_state.set(UP);
|
||||
startup_state.set(READY);
|
||||
|
||||
if (! (flags & CState::OP_FLAG_LOCKED))
|
||||
pthread_spin_unlock(&xcon->sp);
|
||||
|
||||
return (0);
|
||||
}
|
||||
|
||||
int XioConnection::CState::state_discon()
|
||||
{
|
||||
session_state.set(DISCONNECTED);
|
||||
startup_state.set(IDLE);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int XioConnection::CState::state_flow_controlled(uint32_t flags) {
|
||||
dout(11) << __func__ << " ENTER " << dendl;
|
||||
|
||||
if (! (flags & OP_FLAG_LOCKED))
|
||||
pthread_spin_lock(&xcon->sp);
|
||||
|
||||
session_state.set(FLOW_CONTROLLED);
|
||||
|
||||
if (! (flags & OP_FLAG_LOCKED))
|
||||
pthread_spin_unlock(&xcon->sp);
|
||||
|
||||
return (0);
|
||||
}
|
||||
|
||||
int XioConnection::CState::state_fail(Message* m, uint32_t flags)
|
||||
{
|
||||
if (! (flags & OP_FLAG_LOCKED))
|
||||
pthread_spin_lock(&xcon->sp);
|
||||
|
||||
// advance to state FAIL, drop queued, msgs, adjust LRU
|
||||
session_state.set(DISCONNECTED);
|
||||
startup_state.set(FAIL);
|
||||
|
||||
xcon->discard_input_queue(flags|OP_FLAG_LOCKED);
|
||||
xcon->adjust_clru(flags|OP_FLAG_LOCKED|OP_FLAG_LRU);
|
||||
|
||||
// Accelio disconnect
|
||||
xio_disconnect(xcon->conn);
|
||||
|
||||
if (! (flags & OP_FLAG_LOCKED))
|
||||
pthread_spin_unlock(&xcon->sp);
|
||||
|
||||
// notify ULP
|
||||
XioMessenger* msgr = static_cast<XioMessenger*>(xcon->get_messenger());
|
||||
msgr->ms_deliver_handle_reset(xcon);
|
||||
m->put();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int XioLoopbackConnection::send_message(Message *m)
|
||||
{
|
||||
|
@ -22,13 +22,17 @@ extern "C" {
|
||||
#include "libxio.h"
|
||||
}
|
||||
#include "XioInSeq.h"
|
||||
#include "XioSubmit.h"
|
||||
#include "msg/Connection.h"
|
||||
#include "msg/Messenger.h"
|
||||
#include "include/atomic.h"
|
||||
#include "auth/AuthSessionHandler.h"
|
||||
|
||||
#define XIO_ALL_FEATURES (CEPH_FEATURES_ALL & \
|
||||
~CEPH_FEATURE_MSGR_KEEPALIVE2)
|
||||
|
||||
#define XIO_NOP_TAG_MARKDOWN 0x0001
|
||||
|
||||
namespace bi = boost::intrusive;
|
||||
|
||||
class XioPortal;
|
||||
@ -40,6 +44,24 @@ class XioConnection : public Connection
|
||||
public:
|
||||
enum type { ACTIVE, PASSIVE };
|
||||
|
||||
enum session_states {
|
||||
INIT = 0,
|
||||
START,
|
||||
UP,
|
||||
FLOW_CONTROLLED,
|
||||
DISCONNECTED,
|
||||
DELETED,
|
||||
BARRIER
|
||||
};
|
||||
|
||||
enum session_startup_states {
|
||||
IDLE = 0,
|
||||
CONNECTING,
|
||||
ACCEPTING,
|
||||
READY,
|
||||
FAIL
|
||||
};
|
||||
|
||||
private:
|
||||
XioConnection::type xio_conn_type;
|
||||
XioPortal *portal;
|
||||
@ -56,6 +78,7 @@ private:
|
||||
uint64_t scount;
|
||||
uint32_t send_ctr;
|
||||
int q_high_mark;
|
||||
int q_low_mark;
|
||||
|
||||
struct lifecycle {
|
||||
// different from Pipe states?
|
||||
@ -81,13 +104,92 @@ private:
|
||||
|
||||
uint32_t next_out_seq() {
|
||||
return out_seq.inc();
|
||||
};
|
||||
}
|
||||
|
||||
} state;
|
||||
|
||||
/* batching */
|
||||
XioInSeq in_seq;
|
||||
|
||||
class CState
|
||||
{
|
||||
public:
|
||||
static const int FLAG_NONE = 0x0000;
|
||||
static const int FLAG_BAD_AUTH = 0x0001;
|
||||
static const int FLAG_MAPPED = 0x0002;
|
||||
static const int FLAG_RESET = 0x0004;
|
||||
|
||||
static const int OP_FLAG_NONE = 0x0000;
|
||||
static const int OP_FLAG_LOCKED = 0x0001;
|
||||
static const int OP_FLAG_LRU = 0x0002;
|
||||
|
||||
uint64_t features;
|
||||
Messenger::Policy policy;
|
||||
|
||||
CryptoKey session_key;
|
||||
ceph::shared_ptr<AuthSessionHandler> session_security;
|
||||
AuthAuthorizer *authorizer;
|
||||
XioConnection *xcon;
|
||||
uint32_t protocol_version;
|
||||
|
||||
atomic_t session_state;
|
||||
atomic_t startup_state;
|
||||
|
||||
uint32_t reconnects;
|
||||
uint32_t connect_seq, global_seq, peer_global_seq;
|
||||
uint32_t in_seq, out_seq_acked; // atomic<uint64_t>, got receipt
|
||||
atomic_t out_seq; // atomic<uint32_t>
|
||||
|
||||
uint32_t flags;
|
||||
|
||||
CState(XioConnection* _xcon)
|
||||
: xcon(_xcon),
|
||||
protocol_version(0),
|
||||
session_state(INIT),
|
||||
startup_state(IDLE),
|
||||
in_seq(0),
|
||||
out_seq(0),
|
||||
flags(FLAG_NONE) {}
|
||||
|
||||
uint64_t get_session_state() {
|
||||
return session_state.read();
|
||||
}
|
||||
|
||||
uint64_t get_startup_state() {
|
||||
return startup_state.read();
|
||||
}
|
||||
|
||||
void set_in_seq(uint32_t seq) {
|
||||
in_seq = seq;
|
||||
}
|
||||
|
||||
uint32_t next_out_seq() {
|
||||
return out_seq.inc();
|
||||
};
|
||||
|
||||
// state machine
|
||||
int init_state();
|
||||
int next_state(Message* m);
|
||||
#if 0 // future (session startup)
|
||||
int msg_connect(MConnect *m);
|
||||
int msg_connect_reply(MConnectReply *m);
|
||||
int msg_connect_reply(MConnectAuthReply *m);
|
||||
int msg_connect_auth(MConnectAuth *m);
|
||||
int msg_connect_auth_reply(MConnectAuthReply *m);
|
||||
#endif
|
||||
int state_up_ready(uint32_t flags);
|
||||
int state_flow_controlled(uint32_t flags);
|
||||
int state_discon();
|
||||
int state_fail(Message* m, uint32_t flags);
|
||||
|
||||
} cstate; /* CState */
|
||||
|
||||
// message submission queue
|
||||
struct SendQ {
|
||||
Message::Queue mqueue; // deferred
|
||||
XioSubmit::Queue requeue;
|
||||
} outgoing;
|
||||
|
||||
// conns_entity_map comparison functor
|
||||
struct EntityComp
|
||||
{
|
||||
@ -118,12 +220,14 @@ private:
|
||||
|
||||
friend class XioPortal;
|
||||
friend class XioMessenger;
|
||||
friend class XioCompletionHook;
|
||||
friend class XioDispatchHook;
|
||||
friend class XioMarkDownHook;
|
||||
friend class XioMsg;
|
||||
|
||||
int on_disconnect_event() {
|
||||
connected.set(false);
|
||||
pthread_spin_lock(&sp);
|
||||
discard_input_queue(CState::OP_FLAG_LOCKED);
|
||||
if (!conn)
|
||||
this->put();
|
||||
pthread_spin_unlock(&sp);
|
||||
@ -144,6 +248,10 @@ private:
|
||||
return q_high_mark;
|
||||
}
|
||||
|
||||
int xio_qdepth_low_mark() {
|
||||
return q_low_mark;
|
||||
}
|
||||
|
||||
public:
|
||||
XioConnection(XioMessenger *m, XioConnection::type _type,
|
||||
const entity_inst_t& peer);
|
||||
@ -157,8 +265,10 @@ public:
|
||||
|
||||
int send_message(Message *m);
|
||||
void send_keepalive() {}
|
||||
void mark_down() {}
|
||||
void mark_disposable() {}
|
||||
virtual void mark_down();
|
||||
int _mark_down(uint32_t flags);
|
||||
virtual void mark_disposable();
|
||||
int _mark_disposable(uint32_t flags);
|
||||
|
||||
const entity_inst_t& get_peer() const { return peer; }
|
||||
|
||||
@ -196,16 +306,15 @@ public:
|
||||
|
||||
int on_msg_req(struct xio_session *session, struct xio_msg *req,
|
||||
int more_in_batch, void *cb_user_context);
|
||||
|
||||
int on_ow_msg_send_complete(struct xio_session *session, struct xio_msg *msg,
|
||||
void *conn_user_context);
|
||||
|
||||
int on_msg_error(struct xio_session *session, enum xio_status error,
|
||||
struct xio_msg *msg, void *conn_user_context);
|
||||
|
||||
void msg_send_fail(XioMsg *xmsg, int code);
|
||||
|
||||
void msg_release_fail(struct xio_msg *msg, int code);
|
||||
int flush_input_queue(uint32_t flags);
|
||||
int discard_input_queue(uint32_t flags);
|
||||
int adjust_clru(uint32_t flags);
|
||||
};
|
||||
|
||||
typedef boost::intrusive_ptr<XioConnection> XioConnectionRef;
|
||||
|
@ -22,6 +22,7 @@
|
||||
#include "XioMsg.h"
|
||||
#include "XioMessenger.h"
|
||||
#include "common/address_helper.h"
|
||||
#include "messages/MNop.h"
|
||||
|
||||
#define dout_subsys ceph_subsys_xio
|
||||
|
||||
@ -441,8 +442,11 @@ int XioMessenger::session_event(struct xio_session *session,
|
||||
* it's peer address */
|
||||
conns_sp.unlock();
|
||||
|
||||
ldout(cct,4) << "new connection session " << session
|
||||
<< " xcon " << xcon << dendl;
|
||||
/* XXXX pre-merge of session startup negotiation ONLY! */
|
||||
xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
|
||||
|
||||
ldout(cct,2) << "new connection session " << session
|
||||
<< " xcon " << xcon << dendl;
|
||||
}
|
||||
break;
|
||||
case XIO_SESSION_CONNECTION_ERROR_EVENT:
|
||||
@ -714,14 +718,6 @@ static inline XioMsg* pool_alloc_xio_msg(Message *m, XioConnection *xcon,
|
||||
|
||||
int XioMessenger::_send_message(Message *m, Connection *con)
|
||||
{
|
||||
|
||||
static uint32_t nreqs;
|
||||
if (unlikely(XioPool::trace_mempool)) {
|
||||
if (unlikely((++nreqs % 65536) == 0)) {
|
||||
xp_stats.dump(__func__, nreqs);
|
||||
}
|
||||
}
|
||||
|
||||
if (con == &loop_con) {
|
||||
m->set_connection(con);
|
||||
m->set_src(get_myinst().name);
|
||||
@ -732,11 +728,32 @@ int XioMessenger::_send_message(Message *m, Connection *con)
|
||||
}
|
||||
|
||||
XioConnection *xcon = static_cast<XioConnection*>(con);
|
||||
if (! xcon->is_connected())
|
||||
return ENOTCONN;
|
||||
|
||||
/* If con is not in READY state, we have to enforce policy */
|
||||
if (xcon->cstate.session_state.read() != XioConnection::UP) {
|
||||
pthread_spin_lock(&xcon->sp);
|
||||
if (xcon->cstate.session_state.read() != XioConnection::UP) {
|
||||
xcon->outgoing.mqueue.push_back(*m);
|
||||
pthread_spin_unlock(&xcon->sp);
|
||||
return 0;
|
||||
}
|
||||
pthread_spin_unlock(&xcon->sp);
|
||||
}
|
||||
|
||||
return _send_message_impl(m, xcon);
|
||||
} /* send_message(Message* m, Connection *con) */
|
||||
|
||||
int XioMessenger::_send_message_impl(Message* m, XioConnection* xcon)
|
||||
{
|
||||
int code = 0;
|
||||
|
||||
static uint32_t nreqs;
|
||||
if (unlikely(XioPool::trace_mempool)) {
|
||||
if (unlikely((++nreqs % 65536) == 0)) {
|
||||
xp_stats.dump(__func__, nreqs);
|
||||
}
|
||||
}
|
||||
|
||||
m->set_seq(xcon->state.next_out_seq());
|
||||
m->set_magic(magic); // trace flags and special handling
|
||||
|
||||
@ -938,6 +955,9 @@ ConnectionRef XioMessenger::get_connection(const entity_inst_t& dest)
|
||||
conns_entity_map.insert(*xcon);
|
||||
conns_sp.unlock();
|
||||
|
||||
/* XXXX pre-merge of session startup negotiation ONLY! */
|
||||
xcon->cstate.state_up_ready(XioConnection::CState::OP_FLAG_NONE);
|
||||
|
||||
return xcon->get(); /* nref +1 */
|
||||
}
|
||||
} /* get_connection */
|
||||
@ -947,6 +967,63 @@ ConnectionRef XioMessenger::get_loopback_connection()
|
||||
return (loop_con.get());
|
||||
} /* get_loopback_connection */
|
||||
|
||||
void XioMessenger::mark_down(const entity_addr_t& addr)
|
||||
{
|
||||
entity_inst_t inst(entity_name_t(), addr);
|
||||
Spinlock::Locker lckr(conns_sp);
|
||||
XioConnection::EntitySet::iterator conn_iter =
|
||||
conns_entity_map.find(inst, XioConnection::EntityComp());
|
||||
if (conn_iter != conns_entity_map.end()) {
|
||||
(*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE);
|
||||
}
|
||||
} /* mark_down(const entity_addr_t& */
|
||||
|
||||
void XioMessenger::mark_down(Connection* con)
|
||||
{
|
||||
XioConnection *xcon = static_cast<XioConnection*>(con);
|
||||
xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE);
|
||||
} /* mark_down(Connection*) */
|
||||
|
||||
void XioMessenger::mark_down_all()
|
||||
{
|
||||
Spinlock::Locker lckr(conns_sp);
|
||||
XioConnection::EntitySet::iterator conn_iter;
|
||||
for (conn_iter = conns_entity_map.begin(); conn_iter !=
|
||||
conns_entity_map.begin(); ++conn_iter) {
|
||||
(*conn_iter)._mark_down(XioConnection::CState::OP_FLAG_NONE);
|
||||
}
|
||||
} /* mark_down_all */
|
||||
|
||||
static inline XioMarkDownHook* pool_alloc_markdown_hook(
|
||||
XioConnection *xcon, Message *m)
|
||||
{
|
||||
struct xio_mempool_obj mp_mem;
|
||||
int e = xio_mempool_alloc(xio_msgr_noreg_mpool,
|
||||
sizeof(XioMarkDownHook), &mp_mem);
|
||||
if (!!e)
|
||||
return NULL;
|
||||
XioMarkDownHook *hook = static_cast<XioMarkDownHook*>(mp_mem.addr);
|
||||
new (hook) XioMarkDownHook(xcon, m, mp_mem);
|
||||
return hook;
|
||||
}
|
||||
|
||||
void XioMessenger::mark_down_on_empty(Connection* con)
|
||||
{
|
||||
XioConnection *xcon = static_cast<XioConnection*>(con);
|
||||
MNop* m = new MNop();
|
||||
m->tag = XIO_NOP_TAG_MARKDOWN;
|
||||
m->set_completion_hook(pool_alloc_markdown_hook(xcon, m));
|
||||
// stall new messages
|
||||
xcon->cstate.session_state.set(XioConnection::BARRIER);
|
||||
(void) _send_message_impl(m, xcon);
|
||||
}
|
||||
|
||||
void XioMessenger::mark_disposable(Connection *con)
|
||||
{
|
||||
XioConnection *xcon = static_cast<XioConnection*>(con);
|
||||
xcon->_mark_disposable(XioConnection::CState::OP_FLAG_NONE);
|
||||
}
|
||||
|
||||
void XioMessenger::try_insert(XioConnection *xcon)
|
||||
{
|
||||
Spinlock::Locker lckr(conns_sp);
|
||||
|
@ -44,6 +44,8 @@ private:
|
||||
Mutex sh_mtx;
|
||||
Cond sh_cond;
|
||||
|
||||
friend class XioConnection;
|
||||
|
||||
public:
|
||||
XioMessenger(CephContext *cct, entity_name_t name,
|
||||
string mname, uint64_t nonce,
|
||||
@ -60,6 +62,7 @@ public:
|
||||
|
||||
int _send_message(Message *m, const entity_inst_t &dest);
|
||||
int _send_message(Message *m, Connection *con);
|
||||
int _send_message_impl(Message *m, XioConnection *xcon);
|
||||
|
||||
uint32_t get_magic() { return magic; }
|
||||
void set_magic(int _magic) { magic = _magic; }
|
||||
@ -120,20 +123,11 @@ public:
|
||||
virtual int send_keepalive(Connection *con)
|
||||
{ return EINVAL; }
|
||||
|
||||
virtual void mark_down(const entity_addr_t& a)
|
||||
{ }
|
||||
|
||||
virtual void mark_down(Connection *con)
|
||||
{ }
|
||||
|
||||
virtual void mark_down_on_empty(Connection *con)
|
||||
{ }
|
||||
|
||||
virtual void mark_disposable(Connection *con)
|
||||
{ }
|
||||
|
||||
virtual void mark_down_all()
|
||||
{ }
|
||||
virtual void mark_down(const entity_addr_t& a);
|
||||
virtual void mark_down(Connection *con);
|
||||
virtual void mark_down_all();
|
||||
virtual void mark_down_on_empty(Connection *con);
|
||||
virtual void mark_disposable(Connection *con);
|
||||
|
||||
void ds_dispatch(Message *m)
|
||||
{ dispatch_strategy->ds_dispatch(m); }
|
||||
|
@ -18,7 +18,7 @@
|
||||
#include "XioMsg.h"
|
||||
|
||||
|
||||
int XioCompletionHook::release_msgs()
|
||||
int XioDispatchHook::release_msgs()
|
||||
{
|
||||
XioRsp *xrsp;
|
||||
int r = msg_seq.size();
|
||||
|
@ -22,6 +22,7 @@ extern "C" {
|
||||
#include "libxio.h"
|
||||
}
|
||||
#include "XioConnection.h"
|
||||
#include "XioSubmit.h"
|
||||
#include "msg/msg_types.h"
|
||||
#include "XioPool.h"
|
||||
|
||||
@ -135,29 +136,6 @@ public:
|
||||
|
||||
WRITE_CLASS_ENCODER(XioMsgHdr);
|
||||
|
||||
struct XioSubmit
|
||||
{
|
||||
public:
|
||||
enum submit_type
|
||||
{
|
||||
OUTGOING_MSG,
|
||||
INCOMING_MSG_RELEASE
|
||||
};
|
||||
enum submit_type type;
|
||||
bi::list_member_hook<> submit_list;
|
||||
XioConnection *xcon;
|
||||
|
||||
XioSubmit(enum submit_type _type, XioConnection *_xcon) :
|
||||
type(_type), xcon(_xcon)
|
||||
{}
|
||||
|
||||
typedef bi::list< XioSubmit,
|
||||
bi::member_hook< XioSubmit,
|
||||
bi::list_member_hook<>,
|
||||
&XioSubmit::submit_list >
|
||||
> Queue;
|
||||
};
|
||||
|
||||
extern struct xio_mempool *xio_msgr_noreg_mpool;
|
||||
|
||||
#define XIO_MSGR_IOVLEN 16
|
||||
@ -292,7 +270,7 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
class XioCompletionHook : public Message::CompletionHook
|
||||
class XioDispatchHook : public Message::CompletionHook
|
||||
{
|
||||
private:
|
||||
XioConnection *xcon;
|
||||
@ -305,7 +283,7 @@ private:
|
||||
public:
|
||||
struct xio_mempool_obj mp_this;
|
||||
|
||||
XioCompletionHook(XioConnection *_xcon, Message *_m, XioInSeq& _msg_seq,
|
||||
XioDispatchHook(XioConnection *_xcon, Message *_m, XioInSeq& _msg_seq,
|
||||
struct xio_mempool_obj& _mp) :
|
||||
CompletionHook(_m),
|
||||
xcon(_xcon->get()),
|
||||
@ -329,7 +307,7 @@ public:
|
||||
|
||||
int release_msgs();
|
||||
|
||||
XioCompletionHook* get() {
|
||||
XioDispatchHook* get() {
|
||||
nrefs.inc(); return this;
|
||||
}
|
||||
|
||||
@ -342,8 +320,8 @@ public:
|
||||
if (!cl_flag && release_msgs())
|
||||
return;
|
||||
struct xio_mempool_obj *mp = &this->mp_this;
|
||||
this->~XioCompletionHook();
|
||||
xpool_free(sizeof(XioCompletionHook), mp);
|
||||
this->~XioDispatchHook();
|
||||
xpool_free(sizeof(XioDispatchHook), mp);
|
||||
}
|
||||
}
|
||||
|
||||
@ -358,18 +336,48 @@ public:
|
||||
this->finish(-1);
|
||||
}
|
||||
|
||||
~XioCompletionHook() {
|
||||
~XioDispatchHook() {
|
||||
--xcon->n_reqs; // atomicity by portal thread
|
||||
xpool_dec_hookcnt();
|
||||
xcon->put();
|
||||
}
|
||||
};
|
||||
|
||||
/* A sender-side CompletionHook that relies on the on_msg_delivered
|
||||
* to complete a pending mark down. */
|
||||
class XioMarkDownHook : public Message::CompletionHook
|
||||
{
|
||||
private:
|
||||
XioConnection* xcon;
|
||||
|
||||
public:
|
||||
struct xio_mempool_obj mp_this;
|
||||
|
||||
XioMarkDownHook(
|
||||
XioConnection* _xcon, Message *_m, struct xio_mempool_obj& _mp) :
|
||||
CompletionHook(_m), xcon(_xcon->get()), mp_this(_mp)
|
||||
{ }
|
||||
|
||||
virtual void claim(int r) {}
|
||||
|
||||
virtual void finish(int r) {
|
||||
xcon->put();
|
||||
struct xio_mempool_obj *mp = &this->mp_this;
|
||||
this->~XioMarkDownHook();
|
||||
xio_mempool_free(mp);
|
||||
}
|
||||
|
||||
virtual void complete(int r) {
|
||||
xcon->_mark_down(XioConnection::CState::OP_FLAG_NONE);
|
||||
finish(r);
|
||||
}
|
||||
};
|
||||
|
||||
struct XioRsp : public XioSubmit
|
||||
{
|
||||
XioCompletionHook *xhook;
|
||||
XioDispatchHook *xhook;
|
||||
public:
|
||||
XioRsp(XioConnection *_xcon, XioCompletionHook *_xhook)
|
||||
XioRsp(XioConnection *_xcon, XioDispatchHook *_xhook)
|
||||
: XioSubmit(XioSubmit::INCOMING_MSG_RELEASE, _xcon /* not xcon! */),
|
||||
xhook(_xhook->get()) {
|
||||
// submit queue ref
|
||||
@ -380,7 +388,7 @@ public:
|
||||
return xhook->get_seq().dequeue();
|
||||
}
|
||||
|
||||
XioCompletionHook *get_xhook() { return xhook; }
|
||||
XioDispatchHook* get_xhook() { return xhook; }
|
||||
|
||||
void finalize() {
|
||||
xcon->put();
|
||||
|
@ -77,7 +77,18 @@ private:
|
||||
pthread_spin_unlock(&lane->sp);
|
||||
}
|
||||
|
||||
void deq(XioSubmit::Queue &send_q)
|
||||
void enq(XioConnection *xcon, XioSubmit::Queue& requeue_q)
|
||||
{
|
||||
int size = requeue_q.size();
|
||||
Lane* lane = get_lane(xcon);
|
||||
pthread_spin_lock(&lane->sp);
|
||||
XioSubmit::Queue::const_iterator i1 = lane->q.end();
|
||||
lane->q.splice(i1, requeue_q);
|
||||
lane->size += size;
|
||||
pthread_spin_unlock(&lane->sp);
|
||||
}
|
||||
|
||||
void deq(XioSubmit::Queue& send_q)
|
||||
{
|
||||
int ix;
|
||||
Lane* lane;
|
||||
@ -178,6 +189,38 @@ public:
|
||||
};
|
||||
}
|
||||
|
||||
void requeue(XioConnection* xcon, XioSubmit::Queue& send_q) {
|
||||
submit_q.enq(xcon, send_q);
|
||||
}
|
||||
|
||||
void requeue_all_xcon(XioMsg* xmsg,
|
||||
XioConnection* xcon,
|
||||
XioSubmit::Queue::iterator& q_iter,
|
||||
XioSubmit::Queue& send_q) {
|
||||
// XXX gather all already-dequeued outgoing messages for xcon
|
||||
// and push them in FIFO order to front of the input queue,
|
||||
// having first marked the connection as flow-controlled
|
||||
XioSubmit::Queue requeue_q;
|
||||
XioSubmit *xs;
|
||||
requeue_q.push_back(*xmsg);
|
||||
++q_iter;
|
||||
while (q_iter != send_q.end()) {
|
||||
xs = &(*q_iter);
|
||||
// skip retires and anything for other connections
|
||||
if ((xs->type != XioSubmit::OUTGOING_MSG) ||
|
||||
(xs->xcon != xcon))
|
||||
continue;
|
||||
xmsg = static_cast<XioMsg*>(xs);
|
||||
q_iter = send_q.erase(q_iter);
|
||||
requeue_q.push_back(*xmsg);
|
||||
}
|
||||
pthread_spin_lock(&xcon->sp);
|
||||
XioSubmit::Queue::const_iterator i1 = xcon->outgoing.requeue.begin();
|
||||
xcon->outgoing.requeue.splice(i1, requeue_q);
|
||||
xcon->cstate.state_flow_controlled(XioConnection::CState::OP_FLAG_LOCKED);
|
||||
pthread_spin_unlock(&xcon->sp);
|
||||
}
|
||||
|
||||
void *entry()
|
||||
{
|
||||
int size, code = 0;
|
||||
@ -191,55 +234,70 @@ public:
|
||||
|
||||
do {
|
||||
submit_q.deq(send_q);
|
||||
size = send_q.size();
|
||||
|
||||
/* shutdown() barrier */
|
||||
pthread_spin_lock(&sp);
|
||||
|
||||
restart:
|
||||
size = send_q.size();
|
||||
|
||||
if (_shutdown) {
|
||||
// XXX XioMsg queues for flow-controlled connections may require
|
||||
// cleanup
|
||||
drained = true;
|
||||
}
|
||||
|
||||
if (size > 0) {
|
||||
q_iter = send_q.begin();
|
||||
while (q_iter != send_q.end()) {
|
||||
xs = &(*q_iter);
|
||||
xcon = xs->xcon;
|
||||
xmsg = static_cast<XioMsg*>(xs);
|
||||
q_iter = send_q.begin();
|
||||
while (q_iter != send_q.end()) {
|
||||
xs = &(*q_iter);
|
||||
xcon = xs->xcon;
|
||||
xmsg = static_cast<XioMsg*>(xs);
|
||||
|
||||
/* guard Accelio send queue */
|
||||
xio_qdepth_high = xcon->xio_qdepth_high_mark();
|
||||
if (unlikely((xcon->send_ctr + xmsg->hdr.msg_cnt) > xio_qdepth_high)) {
|
||||
++q_iter;
|
||||
continue;
|
||||
}
|
||||
/* guard Accelio send queue */
|
||||
xio_qdepth_high = xcon->xio_qdepth_high_mark();
|
||||
if (unlikely((xcon->send_ctr + xmsg->hdr.msg_cnt) >
|
||||
xio_qdepth_high)) {
|
||||
requeue_all_xcon(xmsg, xcon, q_iter, send_q);
|
||||
goto restart;
|
||||
}
|
||||
|
||||
q_iter = send_q.erase(q_iter);
|
||||
q_iter = send_q.erase(q_iter);
|
||||
|
||||
switch (xs->type) {
|
||||
case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
|
||||
switch (xs->type) {
|
||||
case XioSubmit::OUTGOING_MSG: /* it was an outgoing 1-way */
|
||||
if (unlikely(!xs->xcon->conn))
|
||||
code = ENOTCONN;
|
||||
code = ENOTCONN;
|
||||
else {
|
||||
msg = &xmsg->req_0.msg;
|
||||
code = xio_send_msg(xcon->conn, msg);
|
||||
/* header trace moved here to capture xio serial# */
|
||||
if (ldlog_p1(msgr->cct, ceph_subsys_xio, 11)) {
|
||||
print_xio_msg_hdr(msgr->cct, "xio_send_msg", xmsg->hdr, msg);
|
||||
print_ceph_msg(msgr->cct, "xio_send_msg", xmsg->m);
|
||||
}
|
||||
msg = &xmsg->req_0.msg;
|
||||
code = xio_send_msg(xcon->conn, msg);
|
||||
/* header trace moved here to capture xio serial# */
|
||||
if (ldlog_p1(msgr->cct, ceph_subsys_xio, 11)) {
|
||||
print_xio_msg_hdr(msgr->cct, "xio_send_msg", xmsg->hdr, msg);
|
||||
print_ceph_msg(msgr->cct, "xio_send_msg", xmsg->m);
|
||||
}
|
||||
}
|
||||
if (unlikely(code)) {
|
||||
xs->xcon->msg_send_fail(xmsg, code);
|
||||
switch (code) {
|
||||
case XIO_E_TX_QUEUE_OVERFLOW:
|
||||
{
|
||||
requeue_all_xcon(xmsg, xcon, q_iter, send_q);
|
||||
goto restart;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
xs->xcon->msg_send_fail(xmsg, code);
|
||||
break;
|
||||
};
|
||||
} else {
|
||||
xs->xcon->send.set(msg->timestamp); // need atomic?
|
||||
xcon->send_ctr += xmsg->hdr.msg_cnt; // only inc if cb promised
|
||||
}
|
||||
break;
|
||||
default:
|
||||
/* INCOMING_MSG_RELEASE */
|
||||
release_xio_rsp(static_cast<XioRsp*>(xs));
|
||||
break;
|
||||
xs->xcon->send.set(msg->timestamp); // need atomic?
|
||||
xcon->send_ctr += xmsg->hdr.msg_cnt; // only inc if cb promised
|
||||
}
|
||||
break;
|
||||
default:
|
||||
/* INCOMING_MSG_RELEASE */
|
||||
release_xio_rsp(static_cast<XioRsp*>(xs));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
55
src/msg/xio/XioSubmit.h
Normal file
55
src/msg/xio/XioSubmit.h
Normal file
@ -0,0 +1,55 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
/*
|
||||
* Ceph - scalable distributed file system
|
||||
*
|
||||
* Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
|
||||
* Portions Copyright (C) 2013 CohortFS, LLC
|
||||
*
|
||||
* This is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License version 2.1, as published by the Free Software
|
||||
* Foundation. See file COPYING.
|
||||
*
|
||||
*/
|
||||
|
||||
#ifndef XIO_SUBMIT_H
|
||||
#define XIO_SUBMIT_H
|
||||
|
||||
#include <boost/intrusive/list.hpp>
|
||||
#include "msg/SimplePolicyMessenger.h"
|
||||
extern "C" {
|
||||
#include "libxio.h"
|
||||
}
|
||||
#include "XioConnection.h"
|
||||
#include "msg/msg_types.h"
|
||||
#include "XioPool.h"
|
||||
|
||||
namespace bi = boost::intrusive;
|
||||
|
||||
class XioConnection;
|
||||
|
||||
struct XioSubmit
|
||||
{
|
||||
public:
|
||||
enum submit_type
|
||||
{
|
||||
OUTGOING_MSG,
|
||||
INCOMING_MSG_RELEASE
|
||||
};
|
||||
enum submit_type type;
|
||||
bi::list_member_hook<> submit_list;
|
||||
XioConnection *xcon;
|
||||
|
||||
XioSubmit(enum submit_type _type, XioConnection *_xcon) :
|
||||
type(_type), xcon(_xcon)
|
||||
{}
|
||||
|
||||
typedef bi::list< XioSubmit,
|
||||
bi::member_hook< XioSubmit,
|
||||
bi::list_member_hook<>,
|
||||
&XioSubmit::submit_list >
|
||||
> Queue;
|
||||
};
|
||||
|
||||
#endif /* XIO_SUBMIT_H */
|
Loading…
Reference in New Issue
Block a user