xio: Enable Accelio flow control with msgs and bytes throttlers

* Enable Accelio flow control in general
* Read out policy for messages and bytes throttlers from connection's peer_type
* Set Accelio connection flow control with policy throttlers or default values
* Set q_high_mark for xio_connection (80% of queue_depth)
  xio: Correct q_high_mark setting

Signed-off-by: Vu Pham <vu@mellanox.com>
Signed-off-by: Matt Benjamin <matt@cohortfs.com>
This commit is contained in:
Vu Pham 2014-12-11 06:28:26 -08:00 committed by Matt Benjamin
parent 3c7e857b83
commit 1c2efde84d
4 changed files with 53 additions and 10 deletions

View File

@ -97,6 +97,49 @@ XioConnection::XioConnection(XioMessenger *m, XioConnection::type _type,
peer_type = peer.name.type();
set_peer_addr(peer.addr);
Messenger::Policy policy;
int64_t max_msgs = 0, max_bytes = 0, bytes_opt = 0;
int xopt;
policy = m->get_policy(peer_type);
if (policy.throttler_messages) {
max_msgs = policy.throttler_messages->get_max();
ldout(m->cct,0) << "XioMessenger throttle_msgs: " << max_msgs << dendl;
}
xopt = m->cct->_conf->xio_queue_depth;
if (max_msgs > xopt)
xopt = max_msgs;
/* set high mark for send, reserved 20% for credits */
q_high_mark = xopt * 4 / 5;
q_low_mark = q_high_mark/2;
/* set send & receive msgs queue depth */
xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_SND_QUEUE_DEPTH_MSGS,
&xopt, sizeof(xopt));
xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_RCV_QUEUE_DEPTH_MSGS,
&xopt, sizeof(xopt));
if (policy.throttler_bytes) {
max_bytes = policy.throttler_bytes->get_max();
ldout(m->cct,0) << "XioMessenger throttle_bytes: " << max_bytes << dendl;
}
bytes_opt = (2 << 28); /* default: 512 MB */
if (max_bytes > bytes_opt)
bytes_opt = max_bytes;
/* set send & receive total bytes throttle */
xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_SND_QUEUE_DEPTH_BYTES,
&bytes_opt, sizeof(bytes_opt));
xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_RCV_QUEUE_DEPTH_BYTES,
&bytes_opt, sizeof(bytes_opt));
ldout(m->cct,0) << "Peer type: " << peer.name.type_str() <<
" throttle_msgs: " << xopt << " throttle_bytes: " << bytes_opt << dendl;
/* XXXX fake features, aieee! */
set_features(XIO_ALL_FEATURES);
}

View File

@ -55,6 +55,7 @@ private:
uint32_t special_handling;
uint64_t scount;
uint32_t send_ctr;
int q_high_mark;
struct lifecycle {
// different from Pipe states?
@ -139,8 +140,8 @@ private:
return 0;
}
int xio_queue_depth() {
return msgr->cct->_conf->xio_queue_depth;
int xio_qdepth_high_mark() {
return q_high_mark;
}
public:

View File

@ -289,11 +289,10 @@ XioMessenger::XioMessenger(CephContext *cct, entity_name_t name,
xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_MAX_OUT_IOVLEN,
&xopt, sizeof(xopt));
xopt = cct->_conf->xio_queue_depth; // defaults to 512
xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_SND_QUEUE_DEPTH_MSGS,
&xopt, sizeof(xopt));
xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_RCV_QUEUE_DEPTH_MSGS,
&xopt, sizeof(xopt));
/* enable flow-control */
xopt = 1;
xio_set_opt(NULL, XIO_OPTLEVEL_ACCELIO, XIO_OPTNAME_ENABLE_FLOW_CONTROL,
&xopt, sizeof(xopt));
/* and set threshold for buffer callouts */
xopt = 16384;

View File

@ -181,7 +181,7 @@ public:
void *entry()
{
int size, code = 0;
uint32_t xio_qdepth;
uint32_t xio_qdepth_high;
XioSubmit::Queue send_q;
XioSubmit::Queue::iterator q_iter;
struct xio_msg *msg = NULL;
@ -208,8 +208,8 @@ public:
xmsg = static_cast<XioMsg*>(xs);
/* guard Accelio send queue */
xio_qdepth = xcon->xio_queue_depth();
if (unlikely((xcon->send_ctr + xmsg->hdr.msg_cnt) > xio_qdepth)) {
xio_qdepth_high = xcon->xio_qdepth_high_mark();
if (unlikely((xcon->send_ctr + xmsg->hdr.msg_cnt) > xio_qdepth_high)) {
++q_iter;
continue;
}