FileSTore: do not time out threads while they're waiting for op throttle

In order to support this, we pass TPHandle* through the ObjectStore
interface, and then if we have one we suspend the timeouts while
waiting to get our op/byte throttles in the FileStore.
The OSD uses this when doing PG splits.

Signed-off-by: Greg Farnum <greg@inktank.com>
This commit is contained in:
Greg Farnum 2013-12-03 17:02:24 -08:00
parent d8ccd73968
commit 68fdcfa1cc
5 changed files with 40 additions and 21 deletions

View File

@ -1564,7 +1564,7 @@ void FileStore::queue_op(OpSequencer *osr, Op *o)
op_wq.queue(osr); op_wq.queue(osr);
} }
void FileStore::op_queue_reserve_throttle(Op *o) void FileStore::op_queue_reserve_throttle(Op *o, ThreadPool::TPHandle *handle)
{ {
// Do not call while holding the journal lock! // Do not call while holding the journal lock!
uint64_t max_ops = m_filestore_queue_max_ops; uint64_t max_ops = m_filestore_queue_max_ops;
@ -1586,7 +1586,11 @@ void FileStore::op_queue_reserve_throttle(Op *o)
&& (op_queue_bytes + o->bytes) > max_bytes)) { && (op_queue_bytes + o->bytes) > max_bytes)) {
dout(2) << "waiting " << op_queue_len + 1 << " > " << max_ops << " ops || " dout(2) << "waiting " << op_queue_len + 1 << " > " << max_ops << " ops || "
<< op_queue_bytes + o->bytes << " > " << max_bytes << dendl; << op_queue_bytes + o->bytes << " > " << max_bytes << dendl;
if (handle)
handle->suspend_tp_timeout();
op_throttle_cond.Wait(op_throttle_lock); op_throttle_cond.Wait(op_throttle_lock);
if (handle)
handle->reset_tp_timeout();
} }
op_queue_len++; op_queue_len++;
@ -1671,7 +1675,8 @@ struct C_JournaledAhead : public Context {
}; };
int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls, int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
TrackedOpRef osd_op) TrackedOpRef osd_op,
ThreadPool::TPHandle *handle)
{ {
Context *onreadable; Context *onreadable;
Context *ondisk; Context *ondisk;
@ -1699,7 +1704,7 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
if (journal && journal->is_writeable() && !m_filestore_journal_trailing) { if (journal && journal->is_writeable() && !m_filestore_journal_trailing) {
Op *o = build_op(tls, onreadable, onreadable_sync, osd_op); Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
op_queue_reserve_throttle(o); op_queue_reserve_throttle(o, handle);
journal->throttle(); journal->throttle();
uint64_t op_num = submit_manager.op_submit_start(); uint64_t op_num = submit_manager.op_submit_start();
o->op = op_num; o->op = op_num;

View File

@ -305,7 +305,7 @@ private:
Context *onreadable, Context *onreadable_sync, Context *onreadable, Context *onreadable_sync,
TrackedOpRef osd_op); TrackedOpRef osd_op);
void queue_op(OpSequencer *osr, Op *o); void queue_op(OpSequencer *osr, Op *o);
void op_queue_reserve_throttle(Op *o); void op_queue_reserve_throttle(Op *o, ThreadPool::TPHandle *handle = NULL);
void op_queue_release_throttle(Op *o); void op_queue_release_throttle(Op *o);
void _journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk); void _journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk);
friend struct C_JournaledAhead; friend struct C_JournaledAhead;
@ -378,7 +378,8 @@ public:
ThreadPool::TPHandle *handle); ThreadPool::TPHandle *handle);
int queue_transactions(Sequencer *osr, list<Transaction*>& tls, int queue_transactions(Sequencer *osr, list<Transaction*>& tls,
TrackedOpRef op = TrackedOpRef()); TrackedOpRef op = TrackedOpRef(),
ThreadPool::TPHandle *handle = NULL);
/** /**
* set replay guard xattr on given file * set replay guard xattr on given file

View File

@ -19,6 +19,7 @@
#include "include/types.h" #include "include/types.h"
#include "osd/osd_types.h" #include "osd/osd_types.h"
#include "common/TrackedOp.h" #include "common/TrackedOp.h"
#include "common/WorkQueue.h"
#include "ObjectMap.h" #include "ObjectMap.h"
#include <errno.h> #include <errno.h>
@ -793,34 +794,40 @@ public:
} }
unsigned apply_transactions(Sequencer *osr, list<Transaction*>& tls, Context *ondisk=0); unsigned apply_transactions(Sequencer *osr, list<Transaction*>& tls, Context *ondisk=0);
int queue_transaction(Sequencer *osr, Transaction* t) { int queue_transaction(Sequencer *osr, Transaction* t,
ThreadPool::TPHandle *handle = NULL) {
list<Transaction *> tls; list<Transaction *> tls;
tls.push_back(t); tls.push_back(t);
return queue_transactions(osr, tls, new C_DeleteTransaction(t)); return queue_transactions(osr, tls, new C_DeleteTransaction(t),
NULL, NULL, TrackedOpRef(), handle);
} }
int queue_transaction(Sequencer *osr, Transaction *t, Context *onreadable, Context *ondisk=0, int queue_transaction(Sequencer *osr, Transaction *t, Context *onreadable, Context *ondisk=0,
Context *onreadable_sync=0, Context *onreadable_sync=0,
TrackedOpRef op = TrackedOpRef()) { TrackedOpRef op = TrackedOpRef(),
ThreadPool::TPHandle *handle = NULL) {
list<Transaction*> tls; list<Transaction*> tls;
tls.push_back(t); tls.push_back(t);
return queue_transactions(osr, tls, onreadable, ondisk, onreadable_sync, op); return queue_transactions(osr, tls, onreadable, ondisk, onreadable_sync,
op, handle);
} }
int queue_transactions(Sequencer *osr, list<Transaction*>& tls, int queue_transactions(Sequencer *osr, list<Transaction*>& tls,
Context *onreadable, Context *ondisk=0, Context *onreadable, Context *ondisk=0,
Context *onreadable_sync=0, Context *onreadable_sync=0,
TrackedOpRef op = TrackedOpRef()) { TrackedOpRef op = TrackedOpRef(),
ThreadPool::TPHandle *handle = NULL) {
assert(!tls.empty()); assert(!tls.empty());
tls.back()->register_on_applied(onreadable); tls.back()->register_on_applied(onreadable);
tls.back()->register_on_commit(ondisk); tls.back()->register_on_commit(ondisk);
tls.back()->register_on_applied_sync(onreadable_sync); tls.back()->register_on_applied_sync(onreadable_sync);
return queue_transactions(osr, tls, op); return queue_transactions(osr, tls, op, handle);
} }
virtual int queue_transactions( virtual int queue_transactions(
Sequencer *osr, list<Transaction*>& tls, Sequencer *osr, list<Transaction*>& tls,
TrackedOpRef op = TrackedOpRef()) = 0; TrackedOpRef op = TrackedOpRef(),
ThreadPool::TPHandle *handle = NULL) = 0;
int queue_transactions( int queue_transactions(

View File

@ -5994,13 +5994,15 @@ PG::RecoveryCtx OSD::create_context()
return rctx; return rctx;
} }
void OSD::dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg) void OSD::dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg,
ThreadPool::TPHandle *handle)
{ {
if (!ctx.transaction->empty()) { if (!ctx.transaction->empty()) {
ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction)); ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction));
int tr = store->queue_transaction( int tr = store->queue_transaction(
pg->osr.get(), pg->osr.get(),
ctx.transaction, ctx.on_applied, ctx.on_safe); ctx.transaction, ctx.on_applied, ctx.on_safe, NULL,
TrackedOpRef(), handle);
assert(tr == 0); assert(tr == 0);
ctx.transaction = new ObjectStore::Transaction; ctx.transaction = new ObjectStore::Transaction;
ctx.on_applied = new C_Contexts(cct); ctx.on_applied = new C_Contexts(cct);
@ -6025,7 +6027,8 @@ bool OSD::compat_must_dispatch_immediately(PG *pg)
return false; return false;
} }
void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap) void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
ThreadPool::TPHandle *handle)
{ {
if (service.get_osdmap()->is_up(whoami)) { if (service.get_osdmap()->is_up(whoami)) {
do_notifies(*ctx.notify_list, curmap); do_notifies(*ctx.notify_list, curmap);
@ -6045,7 +6048,8 @@ void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap)
ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction)); ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction));
int tr = store->queue_transaction( int tr = store->queue_transaction(
pg->osr.get(), pg->osr.get(),
ctx.transaction, ctx.on_applied, ctx.on_safe); ctx.transaction, ctx.on_applied, ctx.on_safe, NULL, TrackedOpRef(),
handle);
assert(tr == 0); assert(tr == 0);
} }
} }
@ -7250,17 +7254,17 @@ void OSD::process_peering_events(
split_pgs.clear(); split_pgs.clear();
} }
if (compat_must_dispatch_immediately(pg)) { if (compat_must_dispatch_immediately(pg)) {
dispatch_context(rctx, pg, curmap); dispatch_context(rctx, pg, curmap, &handle);
rctx = create_context(); rctx = create_context();
} else { } else {
dispatch_context_transaction(rctx, pg); dispatch_context_transaction(rctx, pg, &handle);
} }
pg->unlock(); pg->unlock();
handle.reset_tp_timeout(); handle.reset_tp_timeout();
} }
if (need_up_thru) if (need_up_thru)
queue_want_up_thru(same_interval_since); queue_want_up_thru(same_interval_since);
dispatch_context(rctx, 0, curmap); dispatch_context(rctx, 0, curmap, &handle);
service.send_pg_temp(); service.send_pg_temp();
} }

View File

@ -1318,8 +1318,10 @@ protected:
// -- generic pg peering -- // -- generic pg peering --
PG::RecoveryCtx create_context(); PG::RecoveryCtx create_context();
bool compat_must_dispatch_immediately(PG *pg); bool compat_must_dispatch_immediately(PG *pg);
void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap); void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap,
void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg); ThreadPool::TPHandle *handle = NULL);
void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg,
ThreadPool::TPHandle *handle = NULL);
void do_notifies(map< int,vector<pair<pg_notify_t, pg_interval_map_t> > >& notify_list, void do_notifies(map< int,vector<pair<pg_notify_t, pg_interval_map_t> > >& notify_list,
OSDMapRef map); OSDMapRef map);
void do_queries(map< int, map<pg_t,pg_query_t> >& query_map, void do_queries(map< int, map<pg_t,pg_query_t> >& query_map,