mirror of
https://github.com/ceph/ceph
synced 2025-03-11 02:39:05 +00:00
msgr: add second per-message throttler to message policy
We already have a throttler that lets of limit the amount of memory consumed by messages from a given source. Currently this is based only on the size of the message payload. Add a second throttler that limits the number of messages so that we can effectively throttle small requests as well. Signed-off-by: Sage Weil <sage@inktank.com>
This commit is contained in:
parent
79b71441f8
commit
f7070e9568
@ -407,15 +407,15 @@ int main(int argc, const char **argv)
|
||||
// throttle client traffic
|
||||
Throttle *client_throttler = new Throttle(g_ceph_context, "mon_client_bytes",
|
||||
g_conf->mon_client_bytes);
|
||||
messenger->set_policy_throttler(entity_name_t::TYPE_CLIENT, client_throttler);
|
||||
messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT, client_throttler, NULL);
|
||||
|
||||
// throttle daemon traffic
|
||||
// NOTE: actual usage on the leader may multiply by the number of
|
||||
// monitors if they forward large update messages from daemons.
|
||||
Throttle *daemon_throttler = new Throttle(g_ceph_context, "mon_daemon_bytes",
|
||||
g_conf->mon_daemon_bytes);
|
||||
messenger->set_policy_throttler(entity_name_t::TYPE_OSD, daemon_throttler);
|
||||
messenger->set_policy_throttler(entity_name_t::TYPE_MDS, daemon_throttler);
|
||||
messenger->set_policy_throttlers(entity_name_t::TYPE_OSD, daemon_throttler, NULL);
|
||||
messenger->set_policy_throttlers(entity_name_t::TYPE_MDS, daemon_throttler, NULL);
|
||||
|
||||
cout << "starting " << g_conf->name << " rank " << rank
|
||||
<< " at " << ipaddr
|
||||
|
@ -349,9 +349,9 @@ int main(int argc, const char **argv)
|
||||
CEPH_FEATURE_MSG_AUTH;
|
||||
|
||||
client_messenger->set_default_policy(Messenger::Policy::stateless_server(supported, 0));
|
||||
client_messenger->set_policy_throttler(
|
||||
entity_name_t::TYPE_CLIENT,
|
||||
client_throttler.get()); // default, actually
|
||||
client_messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
|
||||
client_throttler.get(),
|
||||
NULL);
|
||||
client_messenger->set_policy(entity_name_t::TYPE_MON,
|
||||
Messenger::Policy::lossy_client(supported,
|
||||
CEPH_FEATURE_UID |
|
||||
|
@ -299,7 +299,10 @@ protected:
|
||||
|
||||
// release our size in bytes back to this throttler when our payload
|
||||
// is adjusted or when we are destroyed.
|
||||
Throttle *throttler;
|
||||
Throttle *byte_throttler;
|
||||
|
||||
// release a count back to this throttler when we are destroyed
|
||||
Throttle *msg_throttler;
|
||||
|
||||
// keep track of how big this message was when we reserved space in
|
||||
// the msgr dispatch_throttler, so that we can properly release it
|
||||
@ -313,14 +316,16 @@ protected:
|
||||
public:
|
||||
Message()
|
||||
: connection(NULL),
|
||||
throttler(NULL),
|
||||
byte_throttler(NULL),
|
||||
msg_throttler(NULL),
|
||||
dispatch_throttle_size(0) {
|
||||
memset(&header, 0, sizeof(header));
|
||||
memset(&footer, 0, sizeof(footer));
|
||||
};
|
||||
Message(int t, int version=1, int compat_version=0)
|
||||
: connection(NULL),
|
||||
throttler(NULL),
|
||||
byte_throttler(NULL),
|
||||
msg_throttler(NULL),
|
||||
dispatch_throttle_size(0) {
|
||||
memset(&header, 0, sizeof(header));
|
||||
header.type = t;
|
||||
@ -340,8 +345,10 @@ protected:
|
||||
assert(nref.read() == 0);
|
||||
if (connection)
|
||||
connection->put();
|
||||
if (throttler)
|
||||
throttler->put(payload.length() + middle.length() + data.length());
|
||||
if (byte_throttler)
|
||||
byte_throttler->put(payload.length() + middle.length() + data.length());
|
||||
if (msg_throttler)
|
||||
msg_throttler->put();
|
||||
}
|
||||
public:
|
||||
Connection *get_connection() { return connection; }
|
||||
@ -350,8 +357,10 @@ public:
|
||||
connection->put();
|
||||
connection = c;
|
||||
}
|
||||
void set_throttler(Throttle *t) { throttler = t; }
|
||||
Throttle *get_throttler() { return throttler; }
|
||||
void set_byte_throttler(Throttle *t) { byte_throttler = t; }
|
||||
Throttle *get_byte_throttler() { return byte_throttler; }
|
||||
void set_message_throttler(Throttle *t) { msg_throttler = t; }
|
||||
Throttle *get_message_throttler() { return msg_throttler; }
|
||||
|
||||
void set_dispatch_throttle_size(uint64_t s) { dispatch_throttle_size = s; }
|
||||
uint64_t get_dispatch_throttle_size() { return dispatch_throttle_size; }
|
||||
@ -369,39 +378,48 @@ public:
|
||||
*/
|
||||
|
||||
void clear_payload() {
|
||||
if (throttler) throttler->put(payload.length() + middle.length());
|
||||
if (byte_throttler)
|
||||
byte_throttler->put(payload.length() + middle.length());
|
||||
payload.clear();
|
||||
middle.clear();
|
||||
}
|
||||
void clear_data() {
|
||||
if (throttler) throttler->put(data.length());
|
||||
if (byte_throttler)
|
||||
byte_throttler->put(data.length());
|
||||
data.clear();
|
||||
}
|
||||
|
||||
bool empty_payload() { return payload.length() == 0; }
|
||||
bufferlist& get_payload() { return payload; }
|
||||
void set_payload(bufferlist& bl) {
|
||||
if (throttler) throttler->put(payload.length());
|
||||
if (byte_throttler)
|
||||
byte_throttler->put(payload.length());
|
||||
payload.claim(bl);
|
||||
if (throttler) throttler->take(payload.length());
|
||||
if (byte_throttler)
|
||||
byte_throttler->take(payload.length());
|
||||
}
|
||||
|
||||
void set_middle(bufferlist& bl) {
|
||||
if (throttler) throttler->put(payload.length());
|
||||
if (byte_throttler)
|
||||
byte_throttler->put(payload.length());
|
||||
middle.claim(bl);
|
||||
if (throttler) throttler->take(payload.length());
|
||||
if (byte_throttler)
|
||||
byte_throttler->take(payload.length());
|
||||
}
|
||||
bufferlist& get_middle() { return middle; }
|
||||
|
||||
void set_data(const bufferlist &d) {
|
||||
if (throttler) throttler->put(data.length());
|
||||
if (byte_throttler)
|
||||
byte_throttler->put(data.length());
|
||||
data = d;
|
||||
if (throttler) throttler->take(data.length());
|
||||
if (byte_throttler)
|
||||
byte_throttler->take(data.length());
|
||||
}
|
||||
|
||||
bufferlist& get_data() { return data; }
|
||||
void claim_data(bufferlist& bl) {
|
||||
if (throttler) throttler->put(data.length());
|
||||
if (byte_throttler)
|
||||
byte_throttler->put(data.length());
|
||||
bl.claim(data);
|
||||
}
|
||||
off_t get_data_len() { return data.length(); }
|
||||
|
@ -74,7 +74,8 @@ public:
|
||||
* the associated Connection(s). When reading in a new Message, the Messenger
|
||||
* will call throttler->throttle() for the size of the new Message.
|
||||
*/
|
||||
Throttle *throttler;
|
||||
Throttle *throttler_bytes;
|
||||
Throttle *throttler_messages;
|
||||
|
||||
/// Specify features supported locally by the endpoint.
|
||||
uint64_t features_supported;
|
||||
@ -82,12 +83,16 @@ public:
|
||||
uint64_t features_required;
|
||||
|
||||
Policy()
|
||||
: lossy(false), server(false), standby(false), resetcheck(true), throttler(NULL),
|
||||
: lossy(false), server(false), standby(false), resetcheck(true),
|
||||
throttler_bytes(NULL),
|
||||
throttler_messages(NULL),
|
||||
features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT),
|
||||
features_required(0) {}
|
||||
private:
|
||||
Policy(bool l, bool s, bool st, bool r, uint64_t sup, uint64_t req)
|
||||
: lossy(l), server(s), standby(st), resetcheck(r), throttler(NULL),
|
||||
: lossy(l), server(s), standby(st), resetcheck(r),
|
||||
throttler_bytes(NULL),
|
||||
throttler_messages(NULL),
|
||||
features_supported(sup | CEPH_FEATURES_SUPPORTED_DEFAULT),
|
||||
features_required(req) {}
|
||||
|
||||
@ -266,7 +271,7 @@ public:
|
||||
* ownership of this pointer, but you must not destroy it before
|
||||
* you destroy the Messenger.
|
||||
*/
|
||||
virtual void set_policy_throttler(int type, Throttle *t) = 0;
|
||||
virtual void set_policy_throttlers(int type, Throttle *bytes, Throttle *msgs=NULL) = 0;
|
||||
/**
|
||||
* Set the default send priority
|
||||
*
|
||||
|
@ -1661,14 +1661,20 @@ int Pipe::read_message(Message **pm)
|
||||
Message *message;
|
||||
utime_t recv_stamp = ceph_clock_now(msgr->cct);
|
||||
|
||||
if (policy.throttler_messages) {
|
||||
ldout(msgr->cct,10) << "reader wants " << 1 << " message from policy throttler "
|
||||
<< policy.throttler_messages->get_current() << "/"
|
||||
<< policy.throttler_messages->get_max() << dendl;
|
||||
policy.throttler_messages->get();
|
||||
}
|
||||
|
||||
uint64_t message_size = header.front_len + header.middle_len + header.data_len;
|
||||
if (message_size) {
|
||||
bool waited_on_throttle = false;
|
||||
if (policy.throttler) {
|
||||
ldout(msgr->cct,10) << "reader wants " << message_size << " from policy throttler "
|
||||
<< policy.throttler->get_current() << "/"
|
||||
<< policy.throttler->get_max() << dendl;
|
||||
waited_on_throttle = policy.throttler->get(message_size);
|
||||
if (policy.throttler_bytes) {
|
||||
ldout(msgr->cct,10) << "reader wants " << message_size << " bytes from policy throttler "
|
||||
<< policy.throttler_bytes->get_current() << "/"
|
||||
<< policy.throttler_bytes->get_max() << dendl;
|
||||
policy.throttler_bytes->get(message_size);
|
||||
}
|
||||
|
||||
// throttle total bytes waiting for dispatch. do this _after_ the
|
||||
@ -1678,7 +1684,7 @@ int Pipe::read_message(Message **pm)
|
||||
ldout(msgr->cct,10) << "reader wants " << message_size << " from dispatch throttler "
|
||||
<< msgr->dispatch_throttler.get_current() << "/"
|
||||
<< msgr->dispatch_throttler.get_max() << dendl;
|
||||
waited_on_throttle |= msgr->dispatch_throttler.get(message_size);
|
||||
msgr->dispatch_throttler.get(message_size);
|
||||
}
|
||||
|
||||
utime_t throttle_stamp = ceph_clock_now(msgr->cct);
|
||||
@ -1807,7 +1813,8 @@ int Pipe::read_message(Message **pm)
|
||||
}
|
||||
}
|
||||
|
||||
message->set_throttler(policy.throttler);
|
||||
message->set_byte_throttler(policy.throttler_bytes);
|
||||
message->set_message_throttler(policy.throttler_messages);
|
||||
|
||||
// store reservation size in message, so we don't get confused
|
||||
// by messages entering the dispatch queue through other paths.
|
||||
@ -1822,12 +1829,18 @@ int Pipe::read_message(Message **pm)
|
||||
|
||||
out_dethrottle:
|
||||
// release bytes reserved from the throttlers on failure
|
||||
if (policy.throttler_messages) {
|
||||
ldout(msgr->cct,10) << "reader releasing " << 1 << " message to policy throttler "
|
||||
<< policy.throttler_messages->get_current() << "/"
|
||||
<< policy.throttler_messages->get_max() << dendl;
|
||||
policy.throttler_messages->put();
|
||||
}
|
||||
if (message_size) {
|
||||
if (policy.throttler) {
|
||||
ldout(msgr->cct,10) << "reader releasing " << message_size << " to policy throttler "
|
||||
<< policy.throttler->get_current() << "/"
|
||||
<< policy.throttler->get_max() << dendl;
|
||||
policy.throttler->put(message_size);
|
||||
if (policy.throttler_bytes) {
|
||||
ldout(msgr->cct,10) << "reader releasing " << message_size << " bytes to policy throttler "
|
||||
<< policy.throttler_bytes->get_current() << "/"
|
||||
<< policy.throttler_bytes->get_max() << dendl;
|
||||
policy.throttler_bytes->put(message_size);
|
||||
}
|
||||
|
||||
msgr->dispatch_throttle_release(message_size);
|
||||
|
@ -162,12 +162,15 @@ public:
|
||||
* ownership of this pointer, but you must not destroy it before
|
||||
* you destroy SimpleMessenger.
|
||||
*/
|
||||
void set_policy_throttler(int type, Throttle *t) {
|
||||
void set_policy_throttlers(int type, Throttle *byte_throttle, Throttle *msg_throttle) {
|
||||
Mutex::Locker l(policy_lock);
|
||||
if (policy_map.count(type))
|
||||
policy_map[type].throttler = t;
|
||||
else
|
||||
default_policy.throttler = t;
|
||||
if (policy_map.count(type)) {
|
||||
policy_map[type].throttler_bytes = byte_throttle;
|
||||
policy_map[type].throttler_messages = msg_throttle;
|
||||
} else {
|
||||
default_policy.throttler_bytes = byte_throttle;
|
||||
default_policy.throttler_messages = msg_throttle;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Bind the SimpleMessenger to a specific address. If bind_addr
|
||||
|
@ -366,8 +366,8 @@ class OSDStub : public TestStub
|
||||
|
||||
messenger->set_default_policy(
|
||||
Messenger::Policy::stateless_server(supported, 0));
|
||||
messenger->set_policy_throttler(entity_name_t::TYPE_CLIENT,
|
||||
&throttler);
|
||||
messenger->set_policy_throttlers(entity_name_t::TYPE_CLIENT,
|
||||
&throttler, NULL);
|
||||
messenger->set_policy(entity_name_t::TYPE_MON,
|
||||
Messenger::Policy::lossy_client(supported, CEPH_FEATURE_UID |
|
||||
CEPH_FEATURE_PGID64 |
|
||||
|
Loading…
Reference in New Issue
Block a user