diff --git a/src/common/config_opts.h b/src/common/config_opts.h index b8085f86372..8f5640347cd 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -148,6 +148,7 @@ OPTION(mon_osd_down_out_subtree_limit, OPT_STR, "rack") // smallest crush unit OPTION(mon_osd_min_up_ratio, OPT_DOUBLE, .3) // min osds required to be up to mark things down OPTION(mon_osd_min_in_ratio, OPT_DOUBLE, .3) // min osds required to be in to mark things out OPTION(mon_osd_max_op_age, OPT_DOUBLE, 32) // max op age before we get concerned (make it a power of 2) +OPTION(mon_osd_max_split_count, OPT_INT, 32) // largest number of PGs per "involved" OSD to let split create OPTION(mon_stat_smooth_intervals, OPT_INT, 2) // smooth stats over last N PGMap maps OPTION(mon_lease, OPT_FLOAT, 5) // lease interval OPTION(mon_lease_renew_interval, OPT_FLOAT, 3) // on leader, to renew the lease diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index 9b1631828a8..6bff6d36524 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -2803,7 +2803,21 @@ int OSDMonitor::prepare_command_pool_set(map &cmdmap, } if (n <= (int)p.get_pg_num()) { ss << "specified pg_num " << n << " <= current " << p.get_pg_num(); + if (n < (int)p.get_pg_num()) + return -EEXIST; + else + return 0; } else { + int expected_osds = MIN(p.get_pg_num(), osdmap.get_num_osds()); + int64_t new_pgs = n - p.get_pg_num(); + int64_t pgs_per_osd = new_pgs / expected_osds; + if (pgs_per_osd > g_conf->mon_osd_max_split_count) { + ss << "specified pg_num " << n << " is too large (creating " + << new_pgs << " new PGs on ~" << expected_osds + << " OSDs exceeds per-OSD max of" << g_conf->mon_osd_max_split_count + << ')'; + return -E2BIG; + } for(set::iterator i = mon->pgmon()->pg_map.creating_pgs.begin(); i != mon->pgmon()->pg_map.creating_pgs.end(); ++i) { diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index 89a55b393db..db033c6e39b 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -1564,7 +1564,7 @@ void FileStore::queue_op(OpSequencer *osr, Op *o) op_wq.queue(osr); } -void FileStore::op_queue_reserve_throttle(Op *o) +void FileStore::op_queue_reserve_throttle(Op *o, ThreadPool::TPHandle *handle) { // Do not call while holding the journal lock! uint64_t max_ops = m_filestore_queue_max_ops; @@ -1586,7 +1586,11 @@ void FileStore::op_queue_reserve_throttle(Op *o) && (op_queue_bytes + o->bytes) > max_bytes)) { dout(2) << "waiting " << op_queue_len + 1 << " > " << max_ops << " ops || " << op_queue_bytes + o->bytes << " > " << max_bytes << dendl; + if (handle) + handle->suspend_tp_timeout(); op_throttle_cond.Wait(op_throttle_lock); + if (handle) + handle->reset_tp_timeout(); } op_queue_len++; @@ -1671,7 +1675,8 @@ struct C_JournaledAhead : public Context { }; int FileStore::queue_transactions(Sequencer *posr, list &tls, - TrackedOpRef osd_op) + TrackedOpRef osd_op, + ThreadPool::TPHandle *handle) { Context *onreadable; Context *ondisk; @@ -1699,7 +1704,7 @@ int FileStore::queue_transactions(Sequencer *posr, list &tls, if (journal && journal->is_writeable() && !m_filestore_journal_trailing) { Op *o = build_op(tls, onreadable, onreadable_sync, osd_op); - op_queue_reserve_throttle(o); + op_queue_reserve_throttle(o, handle); journal->throttle(); uint64_t op_num = submit_manager.op_submit_start(); o->op = op_num; diff --git a/src/os/FileStore.h b/src/os/FileStore.h index c489fdd5796..23b58809656 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -305,7 +305,7 @@ private: Context *onreadable, Context *onreadable_sync, TrackedOpRef osd_op); void queue_op(OpSequencer *osr, Op *o); - void op_queue_reserve_throttle(Op *o); + void op_queue_reserve_throttle(Op *o, ThreadPool::TPHandle *handle = NULL); void op_queue_release_throttle(Op *o); void _journaled_ahead(OpSequencer *osr, Op *o, Context *ondisk); friend struct C_JournaledAhead; @@ -378,7 +378,8 @@ public: ThreadPool::TPHandle *handle); int queue_transactions(Sequencer *osr, list& tls, - TrackedOpRef op = TrackedOpRef()); + TrackedOpRef op = TrackedOpRef(), + ThreadPool::TPHandle *handle = NULL); /** * set replay guard xattr on given file diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h index 6494290b541..b0be3c83aca 100644 --- a/src/os/ObjectStore.h +++ b/src/os/ObjectStore.h @@ -19,6 +19,7 @@ #include "include/types.h" #include "osd/osd_types.h" #include "common/TrackedOp.h" +#include "common/WorkQueue.h" #include "ObjectMap.h" #include @@ -793,34 +794,40 @@ public: } unsigned apply_transactions(Sequencer *osr, list& tls, Context *ondisk=0); - int queue_transaction(Sequencer *osr, Transaction* t) { + int queue_transaction(Sequencer *osr, Transaction* t, + ThreadPool::TPHandle *handle = NULL) { list tls; tls.push_back(t); - return queue_transactions(osr, tls, new C_DeleteTransaction(t)); + return queue_transactions(osr, tls, new C_DeleteTransaction(t), + NULL, NULL, TrackedOpRef(), handle); } int queue_transaction(Sequencer *osr, Transaction *t, Context *onreadable, Context *ondisk=0, Context *onreadable_sync=0, - TrackedOpRef op = TrackedOpRef()) { + TrackedOpRef op = TrackedOpRef(), + ThreadPool::TPHandle *handle = NULL) { list tls; tls.push_back(t); - return queue_transactions(osr, tls, onreadable, ondisk, onreadable_sync, op); + return queue_transactions(osr, tls, onreadable, ondisk, onreadable_sync, + op, handle); } int queue_transactions(Sequencer *osr, list& tls, Context *onreadable, Context *ondisk=0, Context *onreadable_sync=0, - TrackedOpRef op = TrackedOpRef()) { + TrackedOpRef op = TrackedOpRef(), + ThreadPool::TPHandle *handle = NULL) { assert(!tls.empty()); tls.back()->register_on_applied(onreadable); tls.back()->register_on_commit(ondisk); tls.back()->register_on_applied_sync(onreadable_sync); - return queue_transactions(osr, tls, op); + return queue_transactions(osr, tls, op, handle); } virtual int queue_transactions( Sequencer *osr, list& tls, - TrackedOpRef op = TrackedOpRef()) = 0; + TrackedOpRef op = TrackedOpRef(), + ThreadPool::TPHandle *handle = NULL) = 0; int queue_transactions( diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 11f1c84a13b..e4aff735c41 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -5994,13 +5994,15 @@ PG::RecoveryCtx OSD::create_context() return rctx; } -void OSD::dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg) +void OSD::dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg, + ThreadPool::TPHandle *handle) { if (!ctx.transaction->empty()) { ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction)); int tr = store->queue_transaction( pg->osr.get(), - ctx.transaction, ctx.on_applied, ctx.on_safe); + ctx.transaction, ctx.on_applied, ctx.on_safe, NULL, + TrackedOpRef(), handle); assert(tr == 0); ctx.transaction = new ObjectStore::Transaction; ctx.on_applied = new C_Contexts(cct); @@ -6025,7 +6027,8 @@ bool OSD::compat_must_dispatch_immediately(PG *pg) return false; } -void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap) +void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap, + ThreadPool::TPHandle *handle) { if (service.get_osdmap()->is_up(whoami)) { do_notifies(*ctx.notify_list, curmap); @@ -6045,7 +6048,8 @@ void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap) ctx.on_applied->add(new ObjectStore::C_DeleteTransaction(ctx.transaction)); int tr = store->queue_transaction( pg->osr.get(), - ctx.transaction, ctx.on_applied, ctx.on_safe); + ctx.transaction, ctx.on_applied, ctx.on_safe, NULL, TrackedOpRef(), + handle); assert(tr == 0); } } @@ -7250,17 +7254,17 @@ void OSD::process_peering_events( split_pgs.clear(); } if (compat_must_dispatch_immediately(pg)) { - dispatch_context(rctx, pg, curmap); + dispatch_context(rctx, pg, curmap, &handle); rctx = create_context(); } else { - dispatch_context_transaction(rctx, pg); + dispatch_context_transaction(rctx, pg, &handle); } pg->unlock(); handle.reset_tp_timeout(); } if (need_up_thru) queue_want_up_thru(same_interval_since); - dispatch_context(rctx, 0, curmap); + dispatch_context(rctx, 0, curmap, &handle); service.send_pg_temp(); } diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 11ad2b89399..29a1f875c01 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1318,8 +1318,10 @@ protected: // -- generic pg peering -- PG::RecoveryCtx create_context(); bool compat_must_dispatch_immediately(PG *pg); - void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap); - void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg); + void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap, + ThreadPool::TPHandle *handle = NULL); + void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg, + ThreadPool::TPHandle *handle = NULL); void do_notifies(map< int,vector > >& notify_list, OSDMapRef map); void do_queries(map< int, map >& query_map, diff --git a/src/osd/OSDMap.cc b/src/osd/OSDMap.cc index 8007d613b8c..ab6f2f17b80 100644 --- a/src/osd/OSDMap.cc +++ b/src/osd/OSDMap.cc @@ -673,18 +673,18 @@ void OSDMap::get_all_osds(set& ls) const ls.insert(i); } -int OSDMap::get_num_up_osds() const +unsigned OSDMap::get_num_up_osds() const { - int n = 0; + unsigned n = 0; for (int i=0; i& ls) const; - int get_num_up_osds() const; - int get_num_in_osds() const; + unsigned get_num_up_osds() const; + unsigned get_num_in_osds() const; int get_flags() const { return flags; } int test_flag(int f) const { return flags & f; }