diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index 89a55b393db..db033c6e39b 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -1564,7 +1564,7 @@ void FileStore::queue_op(OpSequencer *osr, Op *o) op_wq.queue(osr); } -void FileStore::op_queue_reserve_throttle(Op *o) +void FileStore::op_queue_reserve_throttle(Op *o, ThreadPool::TPHandle *handle) { // Do not call while holding the journal lock! uint64_t max_ops = m_filestore_queue_max_ops; @@ -1586,7 +1586,11 @@ void FileStore::op_queue_reserve_throttle(Op *o) && (op_queue_bytes + o->bytes) > max_bytes)) { dout(2) << "waiting " << op_queue_len + 1 << " > " << max_ops << " ops || " << op_queue_bytes + o->bytes << " > " << max_bytes << dendl; + if (handle) + handle->suspend_tp_timeout(); op_throttle_cond.Wait(op_throttle_lock); + if (handle) + handle->reset_tp_timeout(); } op_queue_len++; @@ -1671,7 +1675,8 @@ struct C_JournaledAhead : public Context { }; int FileStore::queue_transactions(Sequencer *posr, list &tls, - TrackedOpRef osd_op) + TrackedOpRef osd_op, + ThreadPool::TPHandle *handle) { Context *onreadable; Context *ondisk; @@ -1699,7 +1704,7 @@ int FileStore::queue_transactions(Sequencer *posr, list &tls, if (journal && journal->is_writeable() && !m_filestore_journal_trailing) { Op *o = build_op(tls, onreadable, onreadable_sync, osd_op); - op_queue_reserve_throttle(o); + op_queue_reserve_throttle(o, handle); journal->throttle(); uint64_t op_num = submit_manager.op_submit_start(); o->op = op_num; diff --git a/src/os/FileStore.h b/src/os/FileStore.h index c489fdd5796..23b58809656 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -305,7 +305,7 @@ private: Context *onreadable, Context *onreadable_sync, TrackedOpRef osd_op); void queue_op(OpSequencer *osr, Op *o); - void op_queue_reserve_throttle(Op *o); + void op_queue_reserve_throttle(Op *o, ThreadPool::TPHandle *handle = NULL); void op_queue_release_throttle(Op *o); void _journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk); friend struct C_JournaledAhead; @@ -378,7 +378,8 @@ public: ThreadPool::TPHandle *handle); int queue_transactions(Sequencer *osr, list& tls, - TrackedOpRef op = TrackedOpRef()); + TrackedOpRef op = TrackedOpRef(), + ThreadPool::TPHandle *handle = NULL); /** * set replay guard xattr on given file diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h index 6494290b541..b0be3c83aca 100644 --- a/src/os/ObjectStore.h +++ b/src/os/ObjectStore.h @@ -19,6 +19,7 @@ #include "include/types.h" #include "osd/osd_types.h" #include "common/TrackedOp.h" +#include "common/WorkQueue.h" #include "ObjectMap.h" #include @@ -793,34 +794,40 @@ public: } unsigned apply_transactions(Sequencer *osr, list& tls, Context *ondisk=0); - int queue_transaction(Sequencer *osr, Transaction* t) { + int queue_transaction(Sequencer *osr, Transaction* t, + ThreadPool::TPHandle *handle = NULL) { list tls; tls.push_back(t); - return queue_transactions(osr, tls, new C_DeleteTransaction(t)); + return queue_transactions(osr, tls, new C_DeleteTransaction(t), + NULL, NULL, TrackedOpRef(), handle); } int queue_transaction(Sequencer *osr, Transaction *t, Context *onreadable, Context *ondisk=0, Context *onreadable_sync=0, - TrackedOpRef op = TrackedOpRef()) { + TrackedOpRef op = TrackedOpRef(), + ThreadPool::TPHandle *handle = NULL) { list tls; tls.push_back(t); - return queue_transactions(osr, tls, onreadable, ondisk, onreadable_sync, op); + return queue_transactions(osr, tls, onreadable, ondisk, onreadable_sync, + op, handle); } int queue_transactions(Sequencer *osr, list& tls, Context *onreadable, Context *ondisk=0, Context *onreadable_sync=0, - TrackedOpRef op = TrackedOpRef()) { + TrackedOpRef op = TrackedOpRef(), + ThreadPool::TPHandle *handle = NULL) { assert(!tls.empty()); tls.back()->register_on_applied(onreadable); tls.back()->register_on_commit(ondisk); tls.back()->register_on_applied_sync(onreadable_sync); - return queue_transactions(osr, tls, op); + return queue_transactions(osr, tls, op, handle); } virtual int queue_transactions( Sequencer *osr, list& tls, - TrackedOpRef op = TrackedOpRef()) = 0; + TrackedOpRef op = TrackedOpRef(), + ThreadPool::TPHandle *handle = NULL) = 0; int queue_transactions( diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 11f1c84a13b..e4aff735c41 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -5994,13 +5994,15 @@ PG::RecoveryCtx OSD::create_context() return rctx; } -void OSD::dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg) +void OSD::dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg, + ThreadPool::TPHandle *handle) { if (!ctx.transaction->empty()) { ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction)); int tr = store->queue_transaction( pg->osr.get(), - ctx.transaction, ctx.on_applied, ctx.on_safe); + ctx.transaction, ctx.on_applied, ctx.on_safe, NULL, + TrackedOpRef(), handle); assert(tr == 0); ctx.transaction = new ObjectStore::Transaction; ctx.on_applied = new C_Contexts(cct); @@ -6025,7 +6027,8 @@ bool OSD::compat_must_dispatch_immediately(PG *pg) return false; } -void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap) +void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap, + ThreadPool::TPHandle *handle) { if (service.get_osdmap()->is_up(whoami)) { do_notifies(*ctx.notify_list, curmap); @@ -6045,7 +6048,8 @@ void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap) ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction)); int tr = store->queue_transaction( pg->osr.get(), - ctx.transaction, ctx.on_applied, ctx.on_safe); + ctx.transaction, ctx.on_applied, ctx.on_safe, NULL, TrackedOpRef(), + handle); assert(tr == 0); } } @@ -7250,17 +7254,17 @@ void OSD::process_peering_events( split_pgs.clear(); } if (compat_must_dispatch_immediately(pg)) { - dispatch_context(rctx, pg, curmap); + dispatch_context(rctx, pg, curmap, &handle); rctx = create_context(); } else { - dispatch_context_transaction(rctx, pg); + dispatch_context_transaction(rctx, pg, &handle); } pg->unlock(); handle.reset_tp_timeout(); } if (need_up_thru) queue_want_up_thru(same_interval_since); - dispatch_context(rctx, 0, curmap); + dispatch_context(rctx, 0, curmap, &handle); service.send_pg_temp(); } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 11ad2b89399..29a1f875c01 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1318,8 +1318,10 @@ protected: // -- generic pg peering -- PG::RecoveryCtx create_context(); bool compat_must_dispatch_immediately(PG *pg); - void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap); - void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg); + void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap, + ThreadPool::TPHandle *handle = NULL); + void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg, + ThreadPool::TPHandle *handle = NULL); void do_notifies(map< int,vector > >& notify_list, OSDMapRef map); void do_queries(map< int, map >& query_map,