diff --git a/src/common/TrackedOp.cc b/src/common/TrackedOp.cc new file mode 100644 index 00000000000..d1dbc1e7135 --- /dev/null +++ b/src/common/TrackedOp.cc @@ -0,0 +1,265 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * Copyright 2013 Inktank + */ + +#include "TrackedOp.h" +#include "common/Formatter.h" +#include +#include +#include "common/debug.h" +#include "common/config.h" +#include "msg/Message.h" +#include "include/assert.h" + +#define dout_subsys ceph_subsys_optracker +#undef dout_prefix +#define dout_prefix _prefix(_dout) + +static ostream& _prefix(std::ostream* _dout) +{ + return *_dout << "-- op tracker -- "; +} + +void OpHistory::on_shutdown() +{ + arrived.clear(); + duration.clear(); + shutdown = true; +} + +void OpHistory::insert(utime_t now, TrackedOpRef op) +{ + if (shutdown) + return; + duration.insert(make_pair(op->get_duration(), op)); + arrived.insert(make_pair(op->get_arrived(), op)); + cleanup(now); +} + +void OpHistory::cleanup(utime_t now) +{ + while (arrived.size() && + (now - arrived.begin()->first > + (double)(history_duration))) { + duration.erase(make_pair( + arrived.begin()->second->get_duration(), + arrived.begin()->second)); + arrived.erase(arrived.begin()); + } + + while (duration.size() > history_size) { + arrived.erase(make_pair( + duration.begin()->second->get_arrived(), + duration.begin()->second)); + duration.erase(duration.begin()); + } +} + +void OpHistory::dump_ops(utime_t now, Formatter *f) +{ + cleanup(now); + f->open_object_section("OpHistory"); + f->dump_int("num to keep", history_size); + f->dump_int("duration to keep", history_duration); + { + f->open_array_section("Ops"); + for (set >::const_iterator i = + arrived.begin(); + i != arrived.end(); + ++i) { + f->open_object_section("Op"); + i->second->dump(now, f); + f->close_section(); + } + f->close_section(); + } + f->close_section(); +} + +void OpTracker::dump_historic_ops(Formatter *f) +{ + Mutex::Locker locker(ops_in_flight_lock); + utime_t now = ceph_clock_now(cct); + history.dump_ops(now, f); +} + +void OpTracker::dump_ops_in_flight(Formatter *f) +{ + Mutex::Locker locker(ops_in_flight_lock); + f->open_object_section("ops_in_flight"); // overall dump + f->dump_int("num_ops", ops_in_flight.size()); + f->open_array_section("ops"); // list of TrackedOps + utime_t now = ceph_clock_now(cct); + for (xlist::iterator p = ops_in_flight.begin(); !p.end(); ++p) { + f->open_object_section("op"); + (*p)->dump(now, f); + f->close_section(); // this TrackedOp + } + f->close_section(); // list of TrackedOps + f->close_section(); // overall dump +} + +void OpTracker::register_inflight_op(xlist::item *i) +{ + Mutex::Locker locker(ops_in_flight_lock); + ops_in_flight.push_back(i); + ops_in_flight.back()->seq = seq++; +} + +void OpTracker::unregister_inflight_op(TrackedOp *i) +{ + Mutex::Locker locker(ops_in_flight_lock); + assert(i->xitem.get_list() == &ops_in_flight); + utime_t now = ceph_clock_now(cct); + i->xitem.remove_myself(); + i->request->clear_data(); + history.insert(now, TrackedOpRef(i)); +} + +bool OpTracker::check_ops_in_flight(std::vector &warning_vector) +{ + Mutex::Locker locker(ops_in_flight_lock); + if (!ops_in_flight.size()) + return false; + + utime_t now = ceph_clock_now(cct); + utime_t too_old = now; + too_old -= complaint_time; + + utime_t oldest_secs = now - ops_in_flight.front()->get_arrived(); + + dout(10) << "ops_in_flight.size: " << ops_in_flight.size() + << "; oldest is " << oldest_secs + << " seconds old" << dendl; + + if (oldest_secs < complaint_time) + return false; + + xlist::iterator i = ops_in_flight.begin(); + warning_vector.reserve(log_threshold + 1); + + int slow = 0; // total slow + int warned = 0; // total logged + while (!i.end() && (*i)->get_arrived() < too_old) { + slow++; + + // exponential backoff of warning intervals + if (((*i)->get_arrived() + + (complaint_time * (*i)->warn_interval_multiplier)) < now) { + // will warn + if (warning_vector.empty()) + warning_vector.push_back(""); + warned++; + if (warned > log_threshold) + break; + + utime_t age = now - (*i)->get_arrived(); + stringstream ss; + ss << "slow request " << age << " seconds old, received at " << (*i)->get_arrived() + << ": " << *((*i)->request) << " currently " + << ((*i)->current.size() ? (*i)->current : (*i)->state_string()); + warning_vector.push_back(ss.str()); + + // only those that have been shown will backoff + (*i)->warn_interval_multiplier *= 2; + } + ++i; + } + + // only summarize if we warn about any. if everything has backed + // off, we will stay silent. + if (warned > 0) { + stringstream ss; + ss << slow << " slow requests, " << warned << " included below; oldest blocked for > " + << oldest_secs << " secs"; + warning_vector[0] = ss.str(); + } + + return warning_vector.size(); +} + +void OpTracker::get_age_ms_histogram(pow2_hist_t *h) +{ + Mutex::Locker locker(ops_in_flight_lock); + + h->clear(); + + utime_t now = ceph_clock_now(NULL); + unsigned bin = 30; + uint32_t lb = 1 << (bin-1); // lower bound for this bin + int count = 0; + for (xlist::iterator i = ops_in_flight.begin(); !i.end(); ++i) { + utime_t age = now - (*i)->get_arrived(); + uint32_t ms = (long)(age * 1000.0); + if (ms >= lb) { + count++; + continue; + } + if (count) + h->set(bin, count); + while (lb > ms) { + bin--; + lb >>= 1; + } + count = 1; + } + if (count) + h->set(bin, count); +} + +void OpTracker::mark_event(TrackedOp *op, const string &dest) +{ + utime_t now = ceph_clock_now(cct); + return _mark_event(op, dest, now); +} + +void OpTracker::_mark_event(TrackedOp *op, const string &evt, + utime_t time) +{ + Mutex::Locker locker(ops_in_flight_lock); + dout(5) << //"reqid: " << op->get_reqid() << + ", seq: " << op->seq + << ", time: " << time << ", event: " << evt + << ", request: " << *op->request << dendl; +} + +void OpTracker::RemoveOnDelete::operator()(TrackedOp *op) { + op->mark_event("done"); + tracker->unregister_inflight_op(op); + // Do not delete op, unregister_inflight_op took control +} + +void TrackedOp::mark_event(const string &event) +{ + utime_t now = ceph_clock_now(g_ceph_context); + { + Mutex::Locker l(lock); + events.push_back(make_pair(now, event)); + } + tracker->mark_event(this, event); + _event_marked(); +} + +void TrackedOp::dump(utime_t now, Formatter *f) const +{ + Message *m = request; + stringstream name; + m->print(name); + f->dump_string("description", name.str().c_str()); // this TrackedOp + f->dump_stream("received_at") << get_arrived(); + f->dump_float("age", now - get_arrived()); + f->dump_float("duration", get_duration()); + { + f->open_array_section("type_data"); + _dump(now, f); + f->close_section(); + } +} diff --git a/src/common/TrackedOp.h b/src/common/TrackedOp.h index 753331df7f3..44e03905759 100644 --- a/src/common/TrackedOp.h +++ b/src/common/TrackedOp.h @@ -17,15 +17,163 @@ #include #include #include "common/Mutex.h" +#include "include/histogram.h" #include "include/xlist.h" #include "msg/Message.h" #include -class TrackedOp { -public: - virtual void mark_event(const string &event) = 0; - virtual ~TrackedOp() {} -}; +class TrackedOp; typedef std::tr1::shared_ptr TrackedOpRef; +class OpTracker; +class OpHistory { + set > arrived; + set > duration; + void cleanup(utime_t now); + bool shutdown; + OpTracker *tracker; + uint32_t history_size; + uint32_t history_duration; + +public: + OpHistory(OpTracker *tracker_) : shutdown(false), tracker(tracker_), + history_size(0), history_duration(0) {} + ~OpHistory() { + assert(arrived.empty()); + assert(duration.empty()); + } + void insert(utime_t now, TrackedOpRef op); + void dump_ops(utime_t now, Formatter *f); + void on_shutdown(); + void set_size_and_duration(uint32_t new_size, uint32_t new_duration) { + history_size = new_size; + history_duration = new_duration; + } +}; + +class OpTracker { + class RemoveOnDelete { + OpTracker *tracker; + public: + RemoveOnDelete(OpTracker *tracker) : tracker(tracker) {} + void operator()(TrackedOp *op); + }; + friend class RemoveOnDelete; + friend class OpHistory; + uint64_t seq; + Mutex ops_in_flight_lock; + xlist ops_in_flight; + OpHistory history; + float complaint_time; + int log_threshold; + +public: + CephContext *cct; + OpTracker(CephContext *cct_) : seq(0), ops_in_flight_lock("OpTracker mutex"), + history(this), complaint_time(0), log_threshold(0), cct(cct_) {} + void set_complaint_and_threshold(float time, int threshold) { + complaint_time = time; + log_threshold = threshold; + } + void set_history_size_and_duration(uint32_t new_size, uint32_t new_duration) { + history.set_size_and_duration(new_size, new_duration); + } + void dump_ops_in_flight(Formatter *f); + void dump_historic_ops(Formatter *f); + void register_inflight_op(xlist::item *i); + void unregister_inflight_op(TrackedOp *i); + + void get_age_ms_histogram(pow2_hist_t *h); + + /** + * Look for Ops which are too old, and insert warning + * strings for each Op that is too old. + * + * @param warning_strings A vector reference which is filled + * with a warning string for each old Op. + * @return True if there are any Ops to warn on, false otherwise. + */ + bool check_ops_in_flight(std::vector &warning_strings); + void mark_event(TrackedOp *op, const string &evt); + void _mark_event(TrackedOp *op, const string &evt, utime_t now); + + void on_shutdown() { + Mutex::Locker l(ops_in_flight_lock); + history.on_shutdown(); + } + ~OpTracker() { + assert(ops_in_flight.empty()); + } + + template + typename T::Ref create_request(Message *ref) + { + typename T::Ref retval(new T(ref, this), + RemoveOnDelete(this)); + + _mark_event(retval.get(), "header_read", ref->get_recv_stamp()); + _mark_event(retval.get(), "throttled", ref->get_throttle_stamp()); + _mark_event(retval.get(), "all_read", ref->get_recv_complete_stamp()); + _mark_event(retval.get(), "dispatched", ref->get_dispatch_stamp()); + + retval->init_from_message(); + + return retval; + } +}; + +class TrackedOp { +private: + friend class OpHistory; + friend class OpTracker; + xlist::item xitem; +protected: + Message *request; /// the logical request we are tracking + OpTracker *tracker; /// the tracker we are associated with + + list > events; /// list of events and their times + Mutex lock; /// to protect the events list + string current; /// the current state the event is in + uint64_t seq; /// a unique value set by the OpTracker + + uint32_t warn_interval_multiplier; // limits output of a given op warning + + TrackedOp(Message *req, OpTracker *_tracker) : + xitem(this), + request(req), + tracker(_tracker), + lock("TrackedOp::lock"), + seq(0), + warn_interval_multiplier(1) + { + tracker->register_inflight_op(&xitem); + } + + virtual void init_from_message() {} + /// output any type-specific data you want to get when dump() is called + virtual void _dump(utime_t now, Formatter *f) const {} + /// if you want something else to happen when events are marked, implement + virtual void _event_marked() {} + +public: + virtual ~TrackedOp() { assert(request); request->put(); } + + utime_t get_arrived() const { + return request->get_recv_stamp(); + } + // This function maybe needs some work; assumes last event is completion time + double get_duration() const { + return events.size() ? + (events.rbegin()->first - get_arrived()) : + 0.0; + } + Message *get_req() const { return request; } + + void mark_event(const string &event); + virtual const char *state_string() const { + return events.rbegin()->second.c_str(); + } + void dump(utime_t now, Formatter *f) const; +}; + #endif diff --git a/src/include/Makefile.am b/src/include/Makefile.am index 2d98e777f00..c8823ce523d 100644 --- a/src/include/Makefile.am +++ b/src/include/Makefile.am @@ -43,6 +43,7 @@ noinst_HEADERS += \ include/filepath.h \ include/frag.h \ include/hash.h \ + include/histogram.h \ include/intarith.h \ include/interval_set.h \ include/int_types.h \ diff --git a/src/include/histogram.h b/src/include/histogram.h new file mode 100644 index 00000000000..c817b1ec175 --- /dev/null +++ b/src/include/histogram.h @@ -0,0 +1,76 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * Copyright 2013 Inktank + */ + +#ifndef HISTOGRAM_H_ +#define HISTOGRAM_H_ + +/** + * power of 2 histogram + */ +struct pow2_hist_t { // + /** + * histogram + * + * bin size is 2^index + * value is count of elements that are <= the current bin but > the previous bin. + */ + vector h; + +private: + /// expand to at least another's size + void _expand_to(unsigned s) { + if (s > h.size()) + h.resize(s, 0); + } + /// drop useless trailing 0's + void _contract() { + unsigned p = h.size(); + while (p > 0 && h[p-1] == 0) + --p; + h.resize(p); + } + +public: + void clear() { + h.clear(); + } + void set(int bin, int32_t v) { + _expand_to(bin + 1); + h[bin] = v; + _contract(); + } + + void add(const pow2_hist_t& o) { + _expand_to(o.h.size()); + for (unsigned p = 0; p < o.h.size(); ++p) + h[p] += o.h[p]; + _contract(); + } + void sub(const pow2_hist_t& o) { + _expand_to(o.h.size()); + for (unsigned p = 0; p < o.h.size(); ++p) + h[p] -= o.h[p]; + _contract(); + } + + int32_t upper_bound() const { + return 1 << h.size(); + } + + void dump(Formatter *f) const; + void encode(bufferlist &bl) const; + void decode(bufferlist::iterator &bl); + static void generate_test_instances(std::list& o); +}; +WRITE_CLASS_ENCODER(pow2_hist_t) + +#endif /* HISTOGRAM_H_ */ diff --git a/src/mon/PGMonitor.h b/src/mon/PGMonitor.h index 44015395e94..d29f47c1c43 100644 --- a/src/mon/PGMonitor.h +++ b/src/mon/PGMonitor.h @@ -28,6 +28,7 @@ using namespace std; #include "PaxosService.h" #include "include/types.h" #include "include/utime.h" +#include "include/histogram.h" #include "msg/Messenger.h" #include "common/config.h" #include "mon/MonitorDBStore.h" diff --git a/src/objclass/class_api.cc b/src/objclass/class_api.cc index 1ac224cdfe7..bb26c752f9b 100644 --- a/src/objclass/class_api.cc +++ b/src/objclass/class_api.cc @@ -177,7 +177,7 @@ int cls_read(cls_method_context_t hctx, int ofs, int len, int cls_get_request_origin(cls_method_context_t hctx, entity_inst_t *origin) { ReplicatedPG::OpContext **pctx = static_cast(hctx); - *origin = (*pctx)->op->request->get_orig_source_inst(); + *origin = (*pctx)->op->get_req()->get_orig_source_inst(); return 0; } diff --git a/src/os/Makefile.am b/src/os/Makefile.am index b7fef8dd209..4f12a6a3278 100644 --- a/src/os/Makefile.am +++ b/src/os/Makefile.am @@ -13,7 +13,8 @@ libos_la_SOURCES = \ os/WBThrottle.cc \ os/BtrfsFileStoreBackend.cc \ os/GenericFileStoreBackend.cc \ - os/ZFSFileStoreBackend.cc + os/ZFSFileStoreBackend.cc \ + common/TrackedOp.cc noinst_LTLIBRARIES += libos.la noinst_HEADERS += \ diff --git a/src/osd/Makefile.am b/src/osd/Makefile.am index 9d3bc1d5e47..cae02015fce 100644 --- a/src/osd/Makefile.am +++ b/src/osd/Makefile.am @@ -16,6 +16,7 @@ libosd_la_SOURCES = \ osd/Watch.cc \ osd/ClassHandler.cc \ osd/OpRequest.cc \ + common/TrackedOp.cc \ osd/SnapMapper.cc \ osd/osd_types.cc \ objclass/class_api.cc diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 8ce11bb558c..b2aa2ebbcd2 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -907,6 +907,10 @@ OSD::OSD(CephContext *cct_, int id, Messenger *internal_messenger, Messenger *ex service(this) { monc->set_messenger(client_messenger); + op_tracker.set_complaint_and_threshold(cct->_conf->osd_op_complaint_time, + cct->_conf->osd_op_log_threshold); + op_tracker.set_history_size_and_duration(cct->_conf->osd_op_history_size, + cct->_conf->osd_op_history_duration); } OSD::~OSD() @@ -4539,7 +4543,7 @@ void OSD::do_waiters() void OSD::dispatch_op(OpRequestRef op) { - switch (op->request->get_type()) { + switch (op->get_req()->get_type()) { case MSG_OSD_PG_CREATE: handle_pg_create(op); @@ -4665,7 +4669,7 @@ void OSD::_dispatch(Message *m) default: { - OpRequestRef op = op_tracker.create_request(m); + OpRequestRef op = op_tracker.create_request(m); op->mark_event("waiting_for_osdmap"); // no map? starting up? if (!osdmap) { @@ -5711,9 +5715,9 @@ bool OSD::require_mon_peer(Message *m) bool OSD::require_osd_peer(OpRequestRef op) { - if (!op->request->get_connection()->peer_is_osd()) { - dout(0) << "require_osd_peer received from non-osd " << op->request->get_connection()->get_peer_addr() - << " " << *op->request << dendl; + if (!op->get_req()->get_connection()->peer_is_osd()) { + dout(0) << "require_osd_peer received from non-osd " << op->get_req()->get_connection()->get_peer_addr() + << " " << *op->get_req() << dendl; return false; } return true; @@ -5725,7 +5729,7 @@ bool OSD::require_osd_peer(OpRequestRef op) */ bool OSD::require_same_or_newer_map(OpRequestRef op, epoch_t epoch) { - Message *m = op->request; + Message *m = op->get_req(); dout(15) << "require_same_or_newer_map " << epoch << " (i am " << osdmap->get_epoch() << ") " << m << dendl; assert(osd_lock.is_locked()); @@ -5837,7 +5841,7 @@ void OSD::split_pgs( */ void OSD::handle_pg_create(OpRequestRef op) { - MOSDPGCreate *m = (MOSDPGCreate*)op->request; + MOSDPGCreate *m = (MOSDPGCreate*)op->get_req(); assert(m->get_header().type == MSG_OSD_PG_CREATE); dout(10) << "handle_pg_create " << *m << dendl; @@ -5857,11 +5861,16 @@ void OSD::handle_pg_create(OpRequestRef op) } } - if (!require_mon_peer(op->request)) { - // we have to hack around require_mon_peer's interface limits - op->request = NULL; + /* we have to hack around require_mon_peer's interface limits, so + * grab an extra reference before going in. If the peer isn't + * a Monitor, the reference is put for us (and then cleared + * up automatically by our OpTracker infrastructure). Otherwise, + * we put the extra ref ourself. + */ + if (!require_mon_peer(op->get_req()->get())) { return; } + op->get_req()->put(); if (!require_same_or_newer_map(op, m->epoch)) return; @@ -6166,7 +6175,7 @@ void OSD::do_infos(map > >& info */ void OSD::handle_pg_notify(OpRequestRef op) { - MOSDPGNotify *m = (MOSDPGNotify*)op->request; + MOSDPGNotify *m = (MOSDPGNotify*)op->get_req(); assert(m->get_header().type == MSG_OSD_PG_NOTIFY); dout(7) << "handle_pg_notify from " << m->get_source() << dendl; @@ -6201,7 +6210,7 @@ void OSD::handle_pg_notify(OpRequestRef op) void OSD::handle_pg_log(OpRequestRef op) { - MOSDPGLog *m = (MOSDPGLog*) op->request; + MOSDPGLog *m = (MOSDPGLog*) op->get_req(); assert(m->get_header().type == MSG_OSD_PG_LOG); dout(7) << "handle_pg_log " << *m << " from " << m->get_source() << dendl; @@ -6229,7 +6238,7 @@ void OSD::handle_pg_log(OpRequestRef op) void OSD::handle_pg_info(OpRequestRef op) { - MOSDPGInfo *m = static_cast(op->request); + MOSDPGInfo *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_INFO); dout(7) << "handle_pg_info " << *m << " from " << m->get_source() << dendl; @@ -6262,7 +6271,7 @@ void OSD::handle_pg_info(OpRequestRef op) void OSD::handle_pg_trim(OpRequestRef op) { - MOSDPGTrim *m = (MOSDPGTrim *)op->request; + MOSDPGTrim *m = (MOSDPGTrim *)op->get_req(); assert(m->get_header().type == MSG_OSD_PG_TRIM); dout(7) << "handle_pg_trim " << *m << " from " << m->get_source() << dendl; @@ -6315,7 +6324,7 @@ void OSD::handle_pg_trim(OpRequestRef op) void OSD::handle_pg_scan(OpRequestRef op) { - MOSDPGScan *m = static_cast(op->request); + MOSDPGScan *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_SCAN); dout(10) << "handle_pg_scan " << *m << " from " << m->get_source() << dendl; @@ -6343,7 +6352,7 @@ void OSD::handle_pg_scan(OpRequestRef op) void OSD::handle_pg_backfill(OpRequestRef op) { - MOSDPGBackfill *m = static_cast(op->request); + MOSDPGBackfill *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_BACKFILL); dout(10) << "handle_pg_backfill " << *m << " from " << m->get_source() << dendl; @@ -6371,7 +6380,7 @@ void OSD::handle_pg_backfill(OpRequestRef op) void OSD::handle_pg_backfill_reserve(OpRequestRef op) { - MBackfillReserve *m = static_cast(op->request); + MBackfillReserve *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_BACKFILL_RESERVE); if (!require_osd_peer(op)) @@ -6415,7 +6424,7 @@ void OSD::handle_pg_backfill_reserve(OpRequestRef op) void OSD::handle_pg_recovery_reserve(OpRequestRef op) { - MRecoveryReserve *m = static_cast(op->request); + MRecoveryReserve *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_RECOVERY_RESERVE); if (!require_osd_peer(op)) @@ -6467,7 +6476,7 @@ void OSD::handle_pg_query(OpRequestRef op) { assert(osd_lock.is_locked()); - MOSDPGQuery *m = (MOSDPGQuery*)op->request; + MOSDPGQuery *m = (MOSDPGQuery*)op->get_req(); assert(m->get_header().type == MSG_OSD_PG_QUERY); if (!require_osd_peer(op)) @@ -6554,7 +6563,7 @@ void OSD::handle_pg_query(OpRequestRef op) void OSD::handle_pg_remove(OpRequestRef op) { - MOSDPGRemove *m = (MOSDPGRemove *)op->request; + MOSDPGRemove *m = (MOSDPGRemove *)op->get_req(); assert(m->get_header().type == MSG_OSD_PG_REMOVE); assert(osd_lock.is_locked()); @@ -6827,7 +6836,7 @@ void OSDService::reply_op_error(OpRequestRef op, int err) void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv) { - MOSDOp *m = static_cast(op->request); + MOSDOp *m = static_cast(op->get_req()); assert(m->get_header().type == CEPH_MSG_OSD_OP); int flags; flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK); @@ -6839,7 +6848,7 @@ void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v, void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op) { - MOSDOp *m = static_cast(op->request); + MOSDOp *m = static_cast(op->get_req()); assert(m->get_header().type == CEPH_MSG_OSD_OP); if (m->get_map_epoch() < pg->info.history.same_primary_since) { @@ -6858,7 +6867,7 @@ void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op) void OSD::handle_op(OpRequestRef op) { - MOSDOp *m = static_cast(op->request); + MOSDOp *m = static_cast(op->get_req()); assert(m->get_header().type == CEPH_MSG_OSD_OP); if (op_is_discardable(m)) { dout(10) << " discardable " << *m << dendl; @@ -6993,7 +7002,7 @@ void OSD::handle_op(OpRequestRef op) template void OSD::handle_replica_op(OpRequestRef op) { - T *m = static_cast(op->request); + T *m = static_cast(op->get_req()); assert(m->get_header().type == MSGTYPE); dout(10) << __func__ << *m << " epoch " << m->map_epoch << dendl; @@ -7047,24 +7056,24 @@ bool OSD::op_is_discardable(MOSDOp *op) */ void OSD::enqueue_op(PG *pg, OpRequestRef op) { - utime_t latency = ceph_clock_now(cct) - op->request->get_recv_stamp(); - dout(15) << "enqueue_op " << op << " prio " << op->request->get_priority() - << " cost " << op->request->get_cost() + utime_t latency = ceph_clock_now(cct) - op->get_req()->get_recv_stamp(); + dout(15) << "enqueue_op " << op << " prio " << op->get_req()->get_priority() + << " cost " << op->get_req()->get_cost() << " latency " << latency - << " " << *(op->request) << dendl; + << " " << *(op->get_req()) << dendl; pg->queue_op(op); } void OSD::OpWQ::_enqueue(pair item) { - unsigned priority = item.second->request->get_priority(); - unsigned cost = item.second->request->get_cost(); + unsigned priority = item.second->get_req()->get_priority(); + unsigned cost = item.second->get_req()->get_cost(); if (priority >= CEPH_MSG_PRIO_LOW) pqueue.enqueue_strict( - item.second->request->get_source_inst(), + item.second->get_req()->get_source_inst(), priority, item); else - pqueue.enqueue(item.second->request->get_source_inst(), + pqueue.enqueue(item.second->get_req()->get_source_inst(), priority, cost, item); osd->logger->set(l_osd_opq, pqueue.length()); } @@ -7079,14 +7088,14 @@ void OSD::OpWQ::_enqueue_front(pair item) pg_for_processing[&*(item.first)].pop_back(); } } - unsigned priority = item.second->request->get_priority(); - unsigned cost = item.second->request->get_cost(); + unsigned priority = item.second->get_req()->get_priority(); + unsigned cost = item.second->get_req()->get_cost(); if (priority >= CEPH_MSG_PRIO_LOW) pqueue.enqueue_strict_front( - item.second->request->get_source_inst(), + item.second->get_req()->get_source_inst(), priority, item); else - pqueue.enqueue_front(item.second->request->get_source_inst(), + pqueue.enqueue_front(item.second->get_req()->get_source_inst(), priority, cost, item); osd->logger->set(l_osd_opq, pqueue.length()); } @@ -7138,11 +7147,11 @@ void OSD::dequeue_op( PGRef pg, OpRequestRef op, ThreadPool::TPHandle &handle) { - utime_t latency = ceph_clock_now(cct) - op->request->get_recv_stamp(); - dout(10) << "dequeue_op " << op << " prio " << op->request->get_priority() - << " cost " << op->request->get_cost() + utime_t latency = ceph_clock_now(cct) - op->get_req()->get_recv_stamp(); + dout(10) << "dequeue_op " << op << " prio " << op->get_req()->get_priority() + << " cost " << op->get_req()->get_cost() << " latency " << latency - << " " << *(op->request) + << " " << *(op->get_req()) << " pg " << *pg << dendl; if (pg->deleting) return; @@ -7243,6 +7252,8 @@ const char** OSD::get_tracked_conf_keys() const { static const char* KEYS[] = { "osd_max_backfills", + "osd_op_complaint_time", "osd_op_log_threshold", + "osd_op_history_size", "osd_op_history_duration", NULL }; return KEYS; @@ -7255,13 +7266,23 @@ void OSD::handle_conf_change(const struct md_config_t *conf, service.local_reserver.set_max(cct->_conf->osd_max_backfills); service.remote_reserver.set_max(cct->_conf->osd_max_backfills); } + if (changed.count("osd_op_complaint_time") || + changed.count("osd_op_log_threshold")) { + op_tracker.set_complaint_and_threshold(cct->_conf->osd_op_complaint_time, + cct->_conf->osd_op_log_threshold); + } + if (changed.count("osd_op_history_size") || + changed.count("osd_op_history_duration")) { + op_tracker.set_history_size_and_duration(cct->_conf->osd_op_history_size, + cct->_conf->osd_op_history_duration); + } } // -------------------------------- int OSD::init_op_flags(OpRequestRef op) { - MOSDOp *m = static_cast(op->request); + MOSDOp *m = static_cast(op->get_req()); vector::iterator iter; // client flags have no bearing on whether an op is a read, write, etc. diff --git a/src/osd/OpRequest.cc b/src/osd/OpRequest.cc index 1ffe3073051..2ed7a23086f 100644 --- a/src/osd/OpRequest.cc +++ b/src/osd/OpRequest.cc @@ -11,229 +11,21 @@ #include "messages/MOSDSubOp.h" #include "include/assert.h" -#define dout_subsys ceph_subsys_optracker -#undef dout_prefix -#define dout_prefix _prefix(_dout) -static ostream& _prefix(std::ostream* _dout) -{ - return *_dout << "--OSD::tracker-- "; -} OpRequest::OpRequest(Message *req, OpTracker *tracker) : - request(req), xitem(this), + TrackedOp(req, tracker), rmw_flags(0), - warn_interval_multiplier(1), - lock("OpRequest::lock"), - tracker(tracker), - hit_flag_points(0), latest_flag_point(0), - seq(0) { - received_time = request->get_recv_stamp(); - tracker->register_inflight_op(&xitem); + hit_flag_points(0), latest_flag_point(0) { if (req->get_priority() < tracker->cct->_conf->osd_client_op_priority) { // don't warn as quickly for low priority ops warn_interval_multiplier = tracker->cct->_conf->osd_recovery_op_warn_multiple; } } -void OpHistory::on_shutdown() -{ - arrived.clear(); - duration.clear(); - shutdown = true; -} - -void OpHistory::insert(utime_t now, OpRequestRef op) -{ - if (shutdown) - return; - duration.insert(make_pair(op->get_duration(), op)); - arrived.insert(make_pair(op->get_arrived(), op)); - cleanup(now); -} - -void OpHistory::cleanup(utime_t now) -{ - while (arrived.size() && - (now - arrived.begin()->first > - (double)(tracker->cct->_conf->osd_op_history_duration))) { - duration.erase(make_pair( - arrived.begin()->second->get_duration(), - arrived.begin()->second)); - arrived.erase(arrived.begin()); - } - - while (duration.size() > tracker->cct->_conf->osd_op_history_size) { - arrived.erase(make_pair( - duration.begin()->second->get_arrived(), - duration.begin()->second)); - duration.erase(duration.begin()); - } -} - -void OpHistory::dump_ops(utime_t now, Formatter *f) -{ - cleanup(now); - f->open_object_section("OpHistory"); - f->dump_int("num to keep", tracker->cct->_conf->osd_op_history_size); - f->dump_int("duration to keep", tracker->cct->_conf->osd_op_history_duration); - { - f->open_array_section("Ops"); - for (set >::const_iterator i = - arrived.begin(); - i != arrived.end(); - ++i) { - f->open_object_section("Op"); - i->second->dump(now, f); - f->close_section(); - } - f->close_section(); - } - f->close_section(); -} - -void OpTracker::dump_historic_ops(Formatter *f) -{ - Mutex::Locker locker(ops_in_flight_lock); - utime_t now = ceph_clock_now(cct); - history.dump_ops(now, f); -} - -void OpTracker::dump_ops_in_flight(Formatter *f) -{ - Mutex::Locker locker(ops_in_flight_lock); - f->open_object_section("ops_in_flight"); // overall dump - f->dump_int("num_ops", ops_in_flight.size()); - f->open_array_section("ops"); // list of OpRequests - utime_t now = ceph_clock_now(cct); - for (xlist::iterator p = ops_in_flight.begin(); !p.end(); ++p) { - f->open_object_section("op"); - (*p)->dump(now, f); - f->close_section(); // this OpRequest - } - f->close_section(); // list of OpRequests - f->close_section(); // overall dump -} - -void OpTracker::register_inflight_op(xlist::item *i) -{ - Mutex::Locker locker(ops_in_flight_lock); - ops_in_flight.push_back(i); - ops_in_flight.back()->seq = seq++; -} - -void OpTracker::unregister_inflight_op(OpRequest *i) -{ - Mutex::Locker locker(ops_in_flight_lock); - assert(i->xitem.get_list() == &ops_in_flight); - utime_t now = ceph_clock_now(cct); - i->xitem.remove_myself(); - i->request->clear_data(); - history.insert(now, OpRequestRef(i)); -} - -bool OpTracker::check_ops_in_flight(std::vector &warning_vector) -{ - Mutex::Locker locker(ops_in_flight_lock); - if (!ops_in_flight.size()) - return false; - - utime_t now = ceph_clock_now(cct); - utime_t too_old = now; - too_old -= cct->_conf->osd_op_complaint_time; - - utime_t oldest_secs = now - ops_in_flight.front()->received_time; - - dout(10) << "ops_in_flight.size: " << ops_in_flight.size() - << "; oldest is " << oldest_secs - << " seconds old" << dendl; - - if (oldest_secs < cct->_conf->osd_op_complaint_time) - return false; - - xlist::iterator i = ops_in_flight.begin(); - warning_vector.reserve(cct->_conf->osd_op_log_threshold + 1); - - int slow = 0; // total slow - int warned = 0; // total logged - while (!i.end() && (*i)->received_time < too_old) { - slow++; - - // exponential backoff of warning intervals - if (((*i)->received_time + - (cct->_conf->osd_op_complaint_time * - (*i)->warn_interval_multiplier)) < now) { - // will warn - if (warning_vector.empty()) - warning_vector.push_back(""); - warned++; - if (warned > cct->_conf->osd_op_log_threshold) - break; - - utime_t age = now - (*i)->received_time; - stringstream ss; - ss << "slow request " << age << " seconds old, received at " << (*i)->received_time - << ": " << *((*i)->request) << " currently " - << ((*i)->current.size() ? (*i)->current : (*i)->state_string()); - warning_vector.push_back(ss.str()); - - // only those that have been shown will backoff - (*i)->warn_interval_multiplier *= 2; - } - ++i; - } - - // only summarize if we warn about any. if everything has backed - // off, we will stay silent. - if (warned > 0) { - stringstream ss; - ss << slow << " slow requests, " << warned << " included below; oldest blocked for > " - << oldest_secs << " secs"; - warning_vector[0] = ss.str(); - } - - return warning_vector.size(); -} - -void OpTracker::get_age_ms_histogram(pow2_hist_t *h) -{ - Mutex::Locker locker(ops_in_flight_lock); - - h->clear(); - - utime_t now = ceph_clock_now(NULL); - unsigned bin = 30; - uint32_t lb = 1 << (bin-1); // lower bound for this bin - int count = 0; - for (xlist::iterator i = ops_in_flight.begin(); !i.end(); ++i) { - utime_t age = now - (*i)->received_time; - uint32_t ms = (long)(age * 1000.0); - if (ms >= lb) { - count++; - continue; - } - if (count) - h->set(bin, count); - while (lb > ms) { - bin--; - lb >>= 1; - } - count = 1; - } - if (count) - h->set(bin, count); -} - -void OpRequest::dump(utime_t now, Formatter *f) const +void OpRequest::_dump(utime_t now, Formatter *f) const { Message *m = request; - stringstream name; - m->print(name); - f->dump_string("description", name.str().c_str()); // this OpRequest - f->dump_unsigned("rmw_flags", rmw_flags); - f->dump_stream("received_at") << received_time; - f->dump_float("age", now - received_time); - f->dump_float("duration", get_duration()); f->dump_string("flag_point", state_string()); if (m->get_orig_source().is_client()) { f->open_object_section("client_info"); @@ -257,50 +49,11 @@ void OpRequest::dump(utime_t now, Formatter *f) const } } -void OpTracker::mark_event(OpRequest *op, const string &dest) +void OpRequest::init_from_message() { - utime_t now = ceph_clock_now(cct); - return _mark_event(op, dest, now); -} - -void OpTracker::_mark_event(OpRequest *op, const string &evt, - utime_t time) -{ - Mutex::Locker locker(ops_in_flight_lock); - dout(5) << "reqid: " << op->get_reqid() << ", seq: " << op->seq - << ", time: " << time << ", event: " << evt - << ", request: " << *op->request << dendl; -} - -void OpTracker::RemoveOnDelete::operator()(OpRequest *op) { - op->mark_event("done"); - tracker->unregister_inflight_op(op); - // Do not delete op, unregister_inflight_op took control -} - -OpRequestRef OpTracker::create_request(Message *ref) -{ - OpRequestRef retval(new OpRequest(ref, this), - RemoveOnDelete(this)); - - if (ref->get_type() == CEPH_MSG_OSD_OP) { - retval->reqid = static_cast(ref)->get_reqid(); - } else if (ref->get_type() == MSG_OSD_SUBOP) { - retval->reqid = static_cast(ref)->reqid; + if (request->get_type() == CEPH_MSG_OSD_OP) { + reqid = static_cast(request)->get_reqid(); + } else if (request->get_type() == MSG_OSD_SUBOP) { + reqid = static_cast(request)->reqid; } - _mark_event(retval.get(), "header_read", ref->get_recv_stamp()); - _mark_event(retval.get(), "throttled", ref->get_throttle_stamp()); - _mark_event(retval.get(), "all_read", ref->get_recv_complete_stamp()); - _mark_event(retval.get(), "dispatched", ref->get_dispatch_stamp()); - return retval; -} - -void OpRequest::mark_event(const string &event) -{ - utime_t now = ceph_clock_now(tracker->cct); - { - Mutex::Locker l(lock); - events.push_back(make_pair(now, event)); - } - tracker->mark_event(this, event); } diff --git a/src/osd/OpRequest.h b/src/osd/OpRequest.h index 9634be87846..87571f58787 100644 --- a/src/osd/OpRequest.h +++ b/src/osd/OpRequest.h @@ -25,87 +25,12 @@ #include "common/TrackedOp.h" #include "osd/osd_types.h" -struct OpRequest; -class OpTracker; -typedef std::tr1::shared_ptr OpRequestRef; -class OpHistory { - set > arrived; - set > duration; - void cleanup(utime_t now); - bool shutdown; - OpTracker *tracker; - -public: - OpHistory(OpTracker *tracker_) : shutdown(false), tracker(tracker_) {} - ~OpHistory() { - assert(arrived.empty()); - assert(duration.empty()); - } - void insert(utime_t now, OpRequestRef op); - void dump_ops(utime_t now, Formatter *f); - void on_shutdown(); -}; - -class OpTracker { - class RemoveOnDelete { - OpTracker *tracker; - public: - RemoveOnDelete(OpTracker *tracker) : tracker(tracker) {} - void operator()(OpRequest *op); - }; - friend class RemoveOnDelete; - friend class OpRequest; - friend class OpHistory; - uint64_t seq; - Mutex ops_in_flight_lock; - xlist ops_in_flight; - OpHistory history; - -protected: - CephContext *cct; - -public: - OpTracker(CephContext *cct_) : seq(0), ops_in_flight_lock("OpTracker mutex"), history(this), cct(cct_) {} - void dump_ops_in_flight(Formatter *f); - void dump_historic_ops(Formatter *f); - void register_inflight_op(xlist::item *i); - void unregister_inflight_op(OpRequest *i); - - void get_age_ms_histogram(pow2_hist_t *h); - - /** - * Look for Ops which are too old, and insert warning - * strings for each Op that is too old. - * - * @param warning_strings A vector reference which is filled - * with a warning string for each old Op. - * @return True if there are any Ops to warn on, false otherwise. - */ - bool check_ops_in_flight(std::vector &warning_strings); - void mark_event(OpRequest *op, const string &evt); - void _mark_event(OpRequest *op, const string &evt, utime_t now); - OpRequestRef create_request(Message *req); - void on_shutdown() { - Mutex::Locker l(ops_in_flight_lock); - history.on_shutdown(); - } - ~OpTracker() { - assert(ops_in_flight.empty()); - } -}; - /** * The OpRequest takes in a Message* and takes over a single reference * to it, which it puts() when destroyed. - * OpRequest is itself ref-counted. The expectation is that you get a Message - * you want to track, create an OpRequest with it, and then pass around that OpRequest - * the way you used to pass around the Message. */ struct OpRequest : public TrackedOp { friend class OpTracker; - friend class OpHistory; - Message *request; - xlist::item xitem; // rmw flags int rmw_flags; @@ -134,28 +59,12 @@ struct OpRequest : public TrackedOp { void set_class_write() { rmw_flags |= CEPH_OSD_RMW_FLAG_CLASS_WRITE; } void set_pg_op() { rmw_flags |= CEPH_OSD_RMW_FLAG_PGOP; } - utime_t received_time; - uint32_t warn_interval_multiplier; - utime_t get_arrived() const { - return received_time; - } - double get_duration() const { - return events.size() ? - (events.rbegin()->first - received_time) : - 0.0; - } - - void dump(utime_t now, Formatter *f) const; + void _dump(utime_t now, Formatter *f) const; private: - list > events; - string current; - Mutex lock; - OpTracker *tracker; osd_reqid_t reqid; uint8_t hit_flag_points; uint8_t latest_flag_point; - uint64_t seq; static const uint8_t flag_queued_for_pg=1 << 0; static const uint8_t flag_reached_pg = 1 << 1; static const uint8_t flag_delayed = 1 << 2; @@ -164,12 +73,8 @@ private: static const uint8_t flag_commit_sent = 1 << 5; OpRequest(Message *req, OpTracker *tracker); -public: - ~OpRequest() { - assert(request); - request->put(); - } +public: bool been_queued_for_pg() { return hit_flag_points & flag_queued_for_pg; } bool been_reached_pg() { return hit_flag_points & flag_reached_pg; } bool been_delayed() { return hit_flag_points & flag_delayed; } @@ -233,10 +138,15 @@ public: latest_flag_point = flag_commit_sent; } - void mark_event(const string &event); osd_reqid_t get_reqid() const { return reqid; } + + void init_from_message(); + + typedef std::tr1::shared_ptr Ref; }; +typedef OpRequest::Ref OpRequestRef; + #endif /* OPREQUEST_H_ */ diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 1d9ed5f6a31..8f7d3ccb684 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1332,10 +1332,10 @@ void PG::do_pending_flush() bool PG::op_has_sufficient_caps(OpRequestRef op) { // only check MOSDOp - if (op->request->get_type() != CEPH_MSG_OSD_OP) + if (op->get_req()->get_type() != CEPH_MSG_OSD_OP) return true; - MOSDOp *req = static_cast(op->request); + MOSDOp *req = static_cast(op->get_req()); OSD::Session *session = (OSD::Session *)req->get_connection()->get_priv(); if (!session) { @@ -1417,7 +1417,7 @@ void PG::replay_queued_ops() c = p->first; } dout(10) << "activate replay " << p->first << " " - << *p->second->request << dendl; + << *p->second->get_req() << dendl; replay.push_back(p->second); } replay_queue.clear(); @@ -2618,7 +2618,7 @@ void PG::unreg_next_scrub() void PG::sub_op_scrub_map(OpRequestRef op) { - MOSDSubOp *m = static_cast(op->request); + MOSDSubOp *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_SUBOP); dout(7) << "sub_op_scrub_map" << dendl; @@ -2804,7 +2804,7 @@ void PG::_request_scrub_map(int replica, eversion_t version, void PG::sub_op_scrub_reserve(OpRequestRef op) { - MOSDSubOp *m = static_cast(op->request); + MOSDSubOp *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_SUBOP); dout(7) << "sub_op_scrub_reserve" << dendl; @@ -2824,7 +2824,7 @@ void PG::sub_op_scrub_reserve(OpRequestRef op) void PG::sub_op_scrub_reserve_reply(OpRequestRef op) { - MOSDSubOpReply *reply = static_cast(op->request); + MOSDSubOpReply *reply = static_cast(op->get_req()); assert(reply->get_header().type == MSG_OSD_SUBOPREPLY); dout(7) << "sub_op_scrub_reserve_reply" << dendl; @@ -2857,7 +2857,7 @@ void PG::sub_op_scrub_reserve_reply(OpRequestRef op) void PG::sub_op_scrub_unreserve(OpRequestRef op) { - assert(op->request->get_header().type == MSG_OSD_SUBOP); + assert(op->get_req()->get_header().type == MSG_OSD_SUBOP); dout(7) << "sub_op_scrub_unreserve" << dendl; op->mark_started(); @@ -2869,7 +2869,7 @@ void PG::sub_op_scrub_stop(OpRequestRef op) { op->mark_started(); - MOSDSubOp *m = static_cast(op->request); + MOSDSubOp *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_SUBOP); dout(7) << "sub_op_scrub_stop" << dendl; @@ -4732,7 +4732,7 @@ ostream& operator<<(ostream& out, const PG& pg) bool PG::can_discard_op(OpRequestRef op) { - MOSDOp *m = static_cast(op->request); + MOSDOp *m = static_cast(op->get_req()); if (OSD::op_is_discardable(m)) { dout(20) << " discard " << *m << dendl; return true; @@ -4760,7 +4760,7 @@ bool PG::can_discard_op(OpRequestRef op) template bool PG::can_discard_replica_op(OpRequestRef op) { - T *m = static_cast(op->request); + T *m = static_cast(op->get_req()); assert(m->get_header().type == MSGTYPE); // same pg? @@ -4776,7 +4776,7 @@ bool PG::can_discard_replica_op(OpRequestRef op) bool PG::can_discard_scan(OpRequestRef op) { - MOSDPGScan *m = static_cast(op->request); + MOSDPGScan *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_SCAN); if (old_peering_msg(m->map_epoch, m->query_epoch)) { @@ -4788,7 +4788,7 @@ bool PG::can_discard_scan(OpRequestRef op) bool PG::can_discard_backfill(OpRequestRef op) { - MOSDPGBackfill *m = static_cast(op->request); + MOSDPGBackfill *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_BACKFILL); if (old_peering_msg(m->map_epoch, m->query_epoch)) { @@ -4802,7 +4802,7 @@ bool PG::can_discard_backfill(OpRequestRef op) bool PG::can_discard_request(OpRequestRef op) { - switch (op->request->get_type()) { + switch (op->get_req()->get_type()) { case CEPH_MSG_OSD_OP: return can_discard_op(op); case MSG_OSD_SUBOP: @@ -4827,55 +4827,55 @@ bool PG::can_discard_request(OpRequestRef op) bool PG::split_request(OpRequestRef op, unsigned match, unsigned bits) { unsigned mask = ~((~0)<request->get_type()) { + switch (op->get_req()->get_type()) { case CEPH_MSG_OSD_OP: - return (static_cast(op->request)->get_pg().m_seed & mask) == match; + return (static_cast(op->get_req())->get_pg().m_seed & mask) == match; } return false; } bool PG::op_must_wait_for_map(OSDMapRef curmap, OpRequestRef op) { - switch (op->request->get_type()) { + switch (op->get_req()->get_type()) { case CEPH_MSG_OSD_OP: return !have_same_or_newer_map( curmap, - static_cast(op->request)->get_map_epoch()); + static_cast(op->get_req())->get_map_epoch()); case MSG_OSD_SUBOP: return !have_same_or_newer_map( curmap, - static_cast(op->request)->map_epoch); + static_cast(op->get_req())->map_epoch); case MSG_OSD_SUBOPREPLY: return !have_same_or_newer_map( curmap, - static_cast(op->request)->map_epoch); + static_cast(op->get_req())->map_epoch); case MSG_OSD_PG_SCAN: return !have_same_or_newer_map( curmap, - static_cast(op->request)->map_epoch); + static_cast(op->get_req())->map_epoch); case MSG_OSD_PG_BACKFILL: return !have_same_or_newer_map( curmap, - static_cast(op->request)->map_epoch); + static_cast(op->get_req())->map_epoch); case MSG_OSD_PG_PUSH: return !have_same_or_newer_map( curmap, - static_cast(op->request)->map_epoch); + static_cast(op->get_req())->map_epoch); case MSG_OSD_PG_PULL: return !have_same_or_newer_map( curmap, - static_cast(op->request)->map_epoch); + static_cast(op->get_req())->map_epoch); case MSG_OSD_PG_PUSH_REPLY: return !have_same_or_newer_map( curmap, - static_cast(op->request)->map_epoch); + static_cast(op->get_req())->map_epoch); } assert(0); return false; diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc index ddc39d70372..9529e15ae77 100644 --- a/src/osd/ReplicatedBackend.cc +++ b/src/osd/ReplicatedBackend.cc @@ -96,7 +96,7 @@ bool ReplicatedBackend::handle_message( ) { dout(10) << __func__ << ": " << op << dendl; - switch (op->request->get_type()) { + switch (op->get_req()->get_type()) { case MSG_OSD_PG_PUSH: // TODOXXX: needs to be active possibly do_push(op); @@ -111,7 +111,7 @@ bool ReplicatedBackend::handle_message( return true; case MSG_OSD_SUBOP: { - MOSDSubOp *m = static_cast(op->request); + MOSDSubOp *m = static_cast(op->get_req()); if (m->ops.size() >= 1) { OSDOp *first = &m->ops[0]; switch (first->op.op) { @@ -130,7 +130,7 @@ bool ReplicatedBackend::handle_message( } case MSG_OSD_SUBOPREPLY: { - MOSDSubOpReply *r = static_cast(op->request); + MOSDSubOpReply *r = static_cast(op->get_req()); if (r->ops.size() >= 1) { OSDOp &first = r->ops[0]; switch (first.op.op) { diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index d02a9c9cc48..93540f4fc81 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -86,9 +86,9 @@ static void log_subop_stats( { utime_t now = ceph_clock_now(g_ceph_context); utime_t latency = now; - latency -= op->request->get_recv_stamp(); + latency -= op->get_req()->get_recv_stamp(); - uint64_t inb = op->request->get_data().length(); + uint64_t inb = op->get_req()->get_data().length(); osd->logger->inc(l_osd_sop); @@ -583,7 +583,7 @@ bool ReplicatedPG::pg_op_must_wait(MOSDOp *op) void ReplicatedPG::do_pg_op(OpRequestRef op) { - MOSDOp *m = static_cast(op->request); + MOSDOp *m = static_cast(op->get_req()); assert(m->get_header().type == CEPH_MSG_OSD_OP); dout(10) << "do_pg_op " << *m << dendl; @@ -828,7 +828,7 @@ void ReplicatedPG::do_request( if (pgbackend->handle_message(op)) return; - switch (op->request->get_type()) { + switch (op->get_req()->get_type()) { case CEPH_MSG_OSD_OP: if (is_replay() || !is_active()) { dout(20) << " replay, waiting for active on " << op << dendl; @@ -866,7 +866,7 @@ void ReplicatedPG::do_request( */ void ReplicatedPG::do_op(OpRequestRef op) { - MOSDOp *m = static_cast(op->request); + MOSDOp *m = static_cast(op->get_req()); assert(m->get_header().type == CEPH_MSG_OSD_OP); if (op->includes_pg_op()) { if (pg_op_must_wait(m)) { @@ -1172,7 +1172,7 @@ bool ReplicatedPG::maybe_handle_cache(OpRequestRef op, ObjectContextRef obc, void ReplicatedPG::do_cache_redirect(OpRequestRef op, ObjectContextRef obc) { - MOSDOp *m = static_cast(op->request); + MOSDOp *m = static_cast(op->get_req()); int flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK); MOSDOpReply *reply = new MOSDOpReply(m, -ENOENT, get_osdmap()->get_epoch(), flags); @@ -1188,7 +1188,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) { dout(10) << __func__ << " " << ctx << dendl; OpRequestRef op = ctx->op; - MOSDOp *m = static_cast(op->request); + MOSDOp *m = static_cast(op->get_req()); ObjectContextRef obc = ctx->obc; const hobject_t& soid = obc->obs.oi.soid; map& src_obc = ctx->src_obc; @@ -1412,16 +1412,16 @@ void ReplicatedPG::reply_ctx(OpContext *ctx, int r, eversion_t v, version_t uv) void ReplicatedPG::log_op_stats(OpContext *ctx) { OpRequestRef op = ctx->op; - MOSDOp *m = static_cast(op->request); + MOSDOp *m = static_cast(op->get_req()); utime_t now = ceph_clock_now(cct); utime_t latency = now; - latency -= ctx->op->request->get_recv_stamp(); + latency -= ctx->op->get_req()->get_recv_stamp(); utime_t rlatency; if (ctx->readable_stamp != utime_t()) { rlatency = ctx->readable_stamp; - rlatency -= ctx->op->request->get_recv_stamp(); + rlatency -= ctx->op->get_req()->get_recv_stamp(); } uint64_t inb = ctx->bytes_written; @@ -1460,10 +1460,10 @@ void ReplicatedPG::log_op_stats(OpContext *ctx) void ReplicatedPG::do_sub_op(OpRequestRef op) { - MOSDSubOp *m = static_cast(op->request); + MOSDSubOp *m = static_cast(op->get_req()); assert(have_same_or_newer_map(m->map_epoch)); assert(m->get_header().type == MSG_OSD_SUBOP); - dout(15) << "do_sub_op " << *op->request << dendl; + dout(15) << "do_sub_op " << *op->get_req() << dendl; OSDOp *first = NULL; if (m->ops.size() >= 1) { @@ -1501,7 +1501,7 @@ void ReplicatedPG::do_sub_op(OpRequestRef op) void ReplicatedPG::do_sub_op_reply(OpRequestRef op) { - MOSDSubOpReply *r = static_cast(op->request); + MOSDSubOpReply *r = static_cast(op->get_req()); assert(r->get_header().type == MSG_OSD_SUBOPREPLY); if (r->ops.size() >= 1) { OSDOp& first = r->ops[0]; @@ -1519,7 +1519,7 @@ void ReplicatedPG::do_scan( OpRequestRef op, ThreadPool::TPHandle &handle) { - MOSDPGScan *m = static_cast(op->request); + MOSDPGScan *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_SCAN); dout(10) << "do_scan " << *m << dendl; @@ -1594,7 +1594,7 @@ void ReplicatedPG::do_scan( void ReplicatedBackend::_do_push(OpRequestRef op) { - MOSDPGPush *m = static_cast(op->request); + MOSDPGPush *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_PUSH); int from = m->get_source().num(); @@ -1646,7 +1646,7 @@ struct C_ReplicatedBackend_OnPullComplete : GenContext { void ReplicatedBackend::_do_pull_response(OpRequestRef op) { - MOSDPGPush *m = static_cast(op->request); + MOSDPGPush *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_PUSH); int from = m->get_source().num(); @@ -1691,7 +1691,7 @@ void ReplicatedBackend::_do_pull_response(OpRequestRef op) void ReplicatedBackend::do_pull(OpRequestRef op) { - MOSDPGPull *m = static_cast(op->request); + MOSDPGPull *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_PULL); int from = m->get_source().num(); @@ -1707,7 +1707,7 @@ void ReplicatedBackend::do_pull(OpRequestRef op) void ReplicatedBackend::do_push_reply(OpRequestRef op) { - MOSDPGPushReply *m = static_cast(op->request); + MOSDPGPushReply *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_PUSH_REPLY); int from = m->get_source().num(); @@ -1728,7 +1728,7 @@ void ReplicatedBackend::do_push_reply(OpRequestRef op) void ReplicatedPG::do_backfill(OpRequestRef op) { - MOSDPGBackfill *m = static_cast(op->request); + MOSDPGBackfill *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_PG_BACKFILL); dout(10) << "do_backfill " << *m << dendl; @@ -2392,7 +2392,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) ObjectContextRef src_obc; if (ceph_osd_op_type_multi(op.op)) { - MOSDOp *m = static_cast(ctx->op->request); + MOSDOp *m = static_cast(ctx->op->get_req()); object_locator_t src_oloc; get_src_oloc(soid.oid, m->get_object_locator(), src_oloc); hobject_t src_oid(osd_op.soid, src_oloc.key, soid.hash, @@ -3190,10 +3190,10 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops) << " oi.version=" << oi.version.version << " ctx->at_version=" << ctx->at_version << dendl; dout(10) << "watch: oi.user_version=" << oi.user_version<< dendl; dout(10) << "watch: peer_addr=" - << ctx->op->request->get_connection()->get_peer_addr() << dendl; + << ctx->op->get_req()->get_connection()->get_peer_addr() << dendl; watch_info_t w(cookie, cct->_conf->osd_client_watch_timeout, - ctx->op->request->get_connection()->get_peer_addr()); + ctx->op->get_req()->get_connection()->get_peer_addr()); if (do_watch) { if (oi.watchers.count(make_pair(cookie, entity))) { dout(10) << " found existing watch " << w << " by " << entity << dendl; @@ -4038,7 +4038,7 @@ void ReplicatedPG::add_interval_usage(interval_set& s, object_stat_sum void ReplicatedPG::do_osd_op_effects(OpContext *ctx) { - ConnectionRef conn(ctx->op->request->get_connection()); + ConnectionRef conn(ctx->op->get_req()->get_connection()); boost::intrusive_ptr session( (OSD::Session *)conn->get_priv()); session->put(); // get_priv() takes a ref, and so does the intrusive_ptr @@ -4698,7 +4698,7 @@ void ReplicatedPG::eval_repop(RepGather *repop) { MOSDOp *m = NULL; if (repop->ctx->op) - m = static_cast(repop->ctx->op->request); + m = static_cast(repop->ctx->op->get_req()); if (m) dout(10) << "eval_repop " << *repop @@ -4774,7 +4774,7 @@ void ReplicatedPG::eval_repop(RepGather *repop) for (list::iterator i = waiting_for_ack[repop->v].begin(); i != waiting_for_ack[repop->v].end(); ++i) { - MOSDOp *m = (MOSDOp*)(*i)->request; + MOSDOp *m = (MOSDOp*)(*i)->get_req(); MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0); reply->set_reply_versions(repop->ctx->at_version, repop->ctx->user_at_version); @@ -4870,7 +4870,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now) get_osdmap()->get_epoch(), repop->rep_tid, repop->ctx->at_version); if (ctx->op && - ((static_cast(ctx->op->request))->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC)) { + ((static_cast(ctx->op->get_req()))->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC)) { // replicate original op for parallel execution on replica assert(0 == "broken implementation, do not use"); } @@ -4911,7 +4911,7 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContextRe tid_t rep_tid) { if (ctx->op) - dout(10) << "new_repop rep_tid " << rep_tid << " on " << *ctx->op->request << dendl; + dout(10) << "new_repop rep_tid " << rep_tid << " on " << *ctx->op->get_req() << dendl; else dout(10) << "new_repop rep_tid " << rep_tid << " (no op)" << dendl; @@ -4942,7 +4942,7 @@ void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type, MOSDOp *m = NULL; if (repop->ctx->op) - m = static_cast(repop->ctx->op->request); + m = static_cast(repop->ctx->op->get_req()); if (m) dout(7) << "repop_ack rep_tid " << repop->rep_tid << " op " << *m @@ -5488,7 +5488,7 @@ void ReplicatedPG::put_snapset_context(SnapSetContext *ssc) void ReplicatedPG::sub_op_modify(OpRequestRef op) { - MOSDSubOp *m = static_cast(op->request); + MOSDSubOp *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_SUBOP); const hobject_t& soid = m->poid; @@ -5607,8 +5607,8 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm) rm->applied = true; if (!pg_has_reset_since(rm->epoch_started)) { - dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request << dendl; - MOSDSubOp *m = static_cast(rm->op->request); + dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->get_req() << dendl; + MOSDSubOp *m = static_cast(rm->op->get_req()); assert(m->get_header().type == MSG_OSD_SUBOP); if (!rm->committed) { @@ -5630,7 +5630,7 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm) } } } else { - dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request + dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->get_req() << " from epoch " << rm->epoch_started << " < last_peering_reset " << last_peering_reset << dendl; } @@ -5652,19 +5652,19 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm) if (!pg_has_reset_since(rm->epoch_started)) { // send commit. - dout(10) << "sub_op_modify_commit on op " << *rm->op->request + dout(10) << "sub_op_modify_commit on op " << *rm->op->get_req() << ", sending commit to osd." << rm->ackerosd << dendl; if (get_osdmap()->is_up(rm->ackerosd)) { last_complete_ondisk = rm->last_complete; - MOSDSubOpReply *commit = new MOSDSubOpReply(static_cast(rm->op->request), 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK); + MOSDSubOpReply *commit = new MOSDSubOpReply(static_cast(rm->op->get_req()), 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK); commit->set_last_complete_ondisk(rm->last_complete); commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority! osd->send_message_osd_cluster(rm->ackerosd, commit, get_osdmap()->get_epoch()); } } else { - dout(10) << "sub_op_modify_commit " << rm << " op " << *rm->op->request + dout(10) << "sub_op_modify_commit " << rm << " op " << *rm->op->get_req() << " from epoch " << rm->epoch_started << " < last_peering_reset " << last_peering_reset << dendl; } @@ -5681,7 +5681,7 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm) void ReplicatedPG::sub_op_modify_reply(OpRequestRef op) { - MOSDSubOpReply *r = static_cast(op->request); + MOSDSubOpReply *r = static_cast(op->get_req()); assert(r->get_header().type == MSG_OSD_SUBOPREPLY); op->mark_started(); @@ -6631,7 +6631,7 @@ void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op) void ReplicatedBackend::sub_op_push_reply(OpRequestRef op) { - MOSDSubOpReply *reply = static_cast(op->request); + MOSDSubOpReply *reply = static_cast(op->get_req()); const hobject_t& soid = reply->get_poid(); assert(reply->get_header().type == MSG_OSD_SUBOPREPLY); dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl; @@ -6644,7 +6644,7 @@ void ReplicatedBackend::sub_op_push_reply(OpRequestRef op) PushOp pop; bool more = handle_push_reply(peer, rop, &pop); if (more) - send_push_op_legacy(op->request->get_priority(), peer, pop); + send_push_op_legacy(op->get_req()->get_priority(), peer, pop); } bool ReplicatedBackend::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply) @@ -6725,7 +6725,7 @@ void ReplicatedPG::finish_degraded_object(const hobject_t& oid) */ void ReplicatedBackend::sub_op_pull(OpRequestRef op) { - MOSDSubOp *m = static_cast(op->request); + MOSDSubOp *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_SUBOP); op->mark_started(); @@ -6918,7 +6918,7 @@ void ReplicatedBackend::trim_pushed_data( void ReplicatedBackend::sub_op_push(OpRequestRef op) { op->mark_started(); - MOSDSubOp *m = static_cast(op->request); + MOSDSubOp *m = static_cast(op->get_req()); PushOp pop; pop.soid = m->recovery_info.soid; @@ -6950,14 +6950,14 @@ void ReplicatedBackend::sub_op_push(OpRequestRef op) C_ReplicatedBackend_OnPullComplete *c = new C_ReplicatedBackend_OnPullComplete( this, - op->request->get_priority()); + op->get_req()->get_priority()); c->to_continue.swap(to_continue); t->register_on_complete( new C_QueueInWQ( &osd->push_wq, get_parent()->bless_gencontext(c))); } - run_recovery_op(h, op->request->get_priority()); + run_recovery_op(h, op->get_req()->get_priority()); } else { PushReplyOp resp; MOSDSubOpReply *reply = new MOSDSubOpReply( @@ -7002,7 +7002,7 @@ void ReplicatedBackend::_failed_push(int from, const hobject_t &soid) void ReplicatedPG::sub_op_remove(OpRequestRef op) { - MOSDSubOp *m = static_cast(op->request); + MOSDSubOp *m = static_cast(op->get_req()); assert(m->get_header().type == MSG_OSD_SUBOP); dout(7) << "sub_op_remove " << m->poid << dendl; @@ -7225,7 +7225,7 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue) if (requeue) { if (repop->ctx->op) { - dout(10) << " requeuing " << *repop->ctx->op->request << dendl; + dout(10) << " requeuing " << *repop->ctx->op->get_req() << dendl; rq.push_back(repop->ctx->op); repop->ctx->op = OpRequestRef(); } diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index c277c0d3f86..27c9d1bb605 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -993,7 +993,7 @@ inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop) //<< " wfnvram=" << repop.waitfor_nvram << " wfdisk=" << repop.waitfor_disk; if (repop.ctx->op) - out << " op=" << *(repop.ctx->op->request); + out << " op=" << *(repop.ctx->op->get_req()); out << ")"; return out; } diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 59b71cc6f67..a54fc65f375 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -23,6 +23,7 @@ #include "include/types.h" #include "include/utime.h" #include "include/CompatSet.h" +#include "include/histogram.h" #include "include/interval_set.h" #include "common/snap_types.h" #include "common/Formatter.h" @@ -555,67 +556,6 @@ inline ostream& operator<<(ostream& out, const eversion_t e) { return out << e.epoch << "'" << e.version; } - -/** - * power of 2 histogram - */ -struct pow2_hist_t { - /** - * histogram - * - * bin size is 2^index - * value is count of elements that are <= the current bin but > the previous bin. - */ - vector h; - -private: - /// expand to at least another's size - void _expand_to(unsigned s) { - if (s > h.size()) - h.resize(s, 0); - } - /// drop useless trailing 0's - void _contract() { - unsigned p = h.size(); - while (p > 0 && h[p-1] == 0) - --p; - h.resize(p); - } - -public: - void clear() { - h.clear(); - } - void set(int bin, int32_t v) { - _expand_to(bin + 1); - h[bin] = v; - _contract(); - } - - void add(const pow2_hist_t& o) { - _expand_to(o.h.size()); - for (unsigned p = 0; p < o.h.size(); ++p) - h[p] += o.h[p]; - _contract(); - } - void sub(const pow2_hist_t& o) { - _expand_to(o.h.size()); - for (unsigned p = 0; p < o.h.size(); ++p) - h[p] -= o.h[p]; - _contract(); - } - - int32_t upper_bound() const { - return 1 << h.size(); - } - - void dump(Formatter *f) const; - void encode(bufferlist &bl) const; - void decode(bufferlist::iterator &bl); - static void generate_test_instances(std::list& o); -}; -WRITE_CLASS_ENCODER(pow2_hist_t) - /** * filestore_perf_stat_t * diff --git a/src/test/encoding/types.h b/src/test/encoding/types.h index 77c8729e986..18ed795c3ef 100644 --- a/src/test/encoding/types.h +++ b/src/test/encoding/types.h @@ -36,13 +36,15 @@ TYPEWITHSTRAYDATA(OSDMap::Incremental) #include "crush/CrushWrapper.h" TYPE(CrushWrapper) +#include "include/histogram.h" +TYPE(pow2_hist_t) + #include "osd/osd_types.h" TYPE(osd_reqid_t) TYPE(object_locator_t) TYPE(request_redirect_t) TYPE(pg_t) TYPE(coll_t) -TYPE(pow2_hist_t) TYPE(filestore_perf_stat_t) TYPE(osd_stat_t) TYPE(OSDSuperblock)