mirror of
https://github.com/ceph/ceph
synced 2025-03-11 02:39:05 +00:00
Merge pull request #610 from ceph/wip-optracker
Reviewed-by: Samuel Just <sam.just@inktank.com>
This commit is contained in:
commit
13b80bb446
265
src/common/TrackedOp.cc
Normal file
265
src/common/TrackedOp.cc
Normal file
@ -0,0 +1,265 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
/*
|
||||
* Ceph - scalable distributed file system
|
||||
*
|
||||
* This is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License version 2.1, as published by the Free Software
|
||||
* Foundation. See file COPYING.
|
||||
* Copyright 2013 Inktank
|
||||
*/
|
||||
|
||||
#include "TrackedOp.h"
|
||||
#include "common/Formatter.h"
|
||||
#include <iostream>
|
||||
#include <vector>
|
||||
#include "common/debug.h"
|
||||
#include "common/config.h"
|
||||
#include "msg/Message.h"
|
||||
#include "include/assert.h"
|
||||
|
||||
#define dout_subsys ceph_subsys_optracker
|
||||
#undef dout_prefix
|
||||
#define dout_prefix _prefix(_dout)
|
||||
|
||||
static ostream& _prefix(std::ostream* _dout)
|
||||
{
|
||||
return *_dout << "-- op tracker -- ";
|
||||
}
|
||||
|
||||
void OpHistory::on_shutdown()
|
||||
{
|
||||
arrived.clear();
|
||||
duration.clear();
|
||||
shutdown = true;
|
||||
}
|
||||
|
||||
void OpHistory::insert(utime_t now, TrackedOpRef op)
|
||||
{
|
||||
if (shutdown)
|
||||
return;
|
||||
duration.insert(make_pair(op->get_duration(), op));
|
||||
arrived.insert(make_pair(op->get_arrived(), op));
|
||||
cleanup(now);
|
||||
}
|
||||
|
||||
void OpHistory::cleanup(utime_t now)
|
||||
{
|
||||
while (arrived.size() &&
|
||||
(now - arrived.begin()->first >
|
||||
(double)(history_duration))) {
|
||||
duration.erase(make_pair(
|
||||
arrived.begin()->second->get_duration(),
|
||||
arrived.begin()->second));
|
||||
arrived.erase(arrived.begin());
|
||||
}
|
||||
|
||||
while (duration.size() > history_size) {
|
||||
arrived.erase(make_pair(
|
||||
duration.begin()->second->get_arrived(),
|
||||
duration.begin()->second));
|
||||
duration.erase(duration.begin());
|
||||
}
|
||||
}
|
||||
|
||||
void OpHistory::dump_ops(utime_t now, Formatter *f)
|
||||
{
|
||||
cleanup(now);
|
||||
f->open_object_section("OpHistory");
|
||||
f->dump_int("num to keep", history_size);
|
||||
f->dump_int("duration to keep", history_duration);
|
||||
{
|
||||
f->open_array_section("Ops");
|
||||
for (set<pair<utime_t, TrackedOpRef> >::const_iterator i =
|
||||
arrived.begin();
|
||||
i != arrived.end();
|
||||
++i) {
|
||||
f->open_object_section("Op");
|
||||
i->second->dump(now, f);
|
||||
f->close_section();
|
||||
}
|
||||
f->close_section();
|
||||
}
|
||||
f->close_section();
|
||||
}
|
||||
|
||||
void OpTracker::dump_historic_ops(Formatter *f)
|
||||
{
|
||||
Mutex::Locker locker(ops_in_flight_lock);
|
||||
utime_t now = ceph_clock_now(cct);
|
||||
history.dump_ops(now, f);
|
||||
}
|
||||
|
||||
void OpTracker::dump_ops_in_flight(Formatter *f)
|
||||
{
|
||||
Mutex::Locker locker(ops_in_flight_lock);
|
||||
f->open_object_section("ops_in_flight"); // overall dump
|
||||
f->dump_int("num_ops", ops_in_flight.size());
|
||||
f->open_array_section("ops"); // list of TrackedOps
|
||||
utime_t now = ceph_clock_now(cct);
|
||||
for (xlist<TrackedOp*>::iterator p = ops_in_flight.begin(); !p.end(); ++p) {
|
||||
f->open_object_section("op");
|
||||
(*p)->dump(now, f);
|
||||
f->close_section(); // this TrackedOp
|
||||
}
|
||||
f->close_section(); // list of TrackedOps
|
||||
f->close_section(); // overall dump
|
||||
}
|
||||
|
||||
void OpTracker::register_inflight_op(xlist<TrackedOp*>::item *i)
|
||||
{
|
||||
Mutex::Locker locker(ops_in_flight_lock);
|
||||
ops_in_flight.push_back(i);
|
||||
ops_in_flight.back()->seq = seq++;
|
||||
}
|
||||
|
||||
void OpTracker::unregister_inflight_op(TrackedOp *i)
|
||||
{
|
||||
Mutex::Locker locker(ops_in_flight_lock);
|
||||
assert(i->xitem.get_list() == &ops_in_flight);
|
||||
utime_t now = ceph_clock_now(cct);
|
||||
i->xitem.remove_myself();
|
||||
i->request->clear_data();
|
||||
history.insert(now, TrackedOpRef(i));
|
||||
}
|
||||
|
||||
bool OpTracker::check_ops_in_flight(std::vector<string> &warning_vector)
|
||||
{
|
||||
Mutex::Locker locker(ops_in_flight_lock);
|
||||
if (!ops_in_flight.size())
|
||||
return false;
|
||||
|
||||
utime_t now = ceph_clock_now(cct);
|
||||
utime_t too_old = now;
|
||||
too_old -= complaint_time;
|
||||
|
||||
utime_t oldest_secs = now - ops_in_flight.front()->get_arrived();
|
||||
|
||||
dout(10) << "ops_in_flight.size: " << ops_in_flight.size()
|
||||
<< "; oldest is " << oldest_secs
|
||||
<< " seconds old" << dendl;
|
||||
|
||||
if (oldest_secs < complaint_time)
|
||||
return false;
|
||||
|
||||
xlist<TrackedOp*>::iterator i = ops_in_flight.begin();
|
||||
warning_vector.reserve(log_threshold + 1);
|
||||
|
||||
int slow = 0; // total slow
|
||||
int warned = 0; // total logged
|
||||
while (!i.end() && (*i)->get_arrived() < too_old) {
|
||||
slow++;
|
||||
|
||||
// exponential backoff of warning intervals
|
||||
if (((*i)->get_arrived() +
|
||||
(complaint_time * (*i)->warn_interval_multiplier)) < now) {
|
||||
// will warn
|
||||
if (warning_vector.empty())
|
||||
warning_vector.push_back("");
|
||||
warned++;
|
||||
if (warned > log_threshold)
|
||||
break;
|
||||
|
||||
utime_t age = now - (*i)->get_arrived();
|
||||
stringstream ss;
|
||||
ss << "slow request " << age << " seconds old, received at " << (*i)->get_arrived()
|
||||
<< ": " << *((*i)->request) << " currently "
|
||||
<< ((*i)->current.size() ? (*i)->current : (*i)->state_string());
|
||||
warning_vector.push_back(ss.str());
|
||||
|
||||
// only those that have been shown will backoff
|
||||
(*i)->warn_interval_multiplier *= 2;
|
||||
}
|
||||
++i;
|
||||
}
|
||||
|
||||
// only summarize if we warn about any. if everything has backed
|
||||
// off, we will stay silent.
|
||||
if (warned > 0) {
|
||||
stringstream ss;
|
||||
ss << slow << " slow requests, " << warned << " included below; oldest blocked for > "
|
||||
<< oldest_secs << " secs";
|
||||
warning_vector[0] = ss.str();
|
||||
}
|
||||
|
||||
return warning_vector.size();
|
||||
}
|
||||
|
||||
void OpTracker::get_age_ms_histogram(pow2_hist_t *h)
|
||||
{
|
||||
Mutex::Locker locker(ops_in_flight_lock);
|
||||
|
||||
h->clear();
|
||||
|
||||
utime_t now = ceph_clock_now(NULL);
|
||||
unsigned bin = 30;
|
||||
uint32_t lb = 1 << (bin-1); // lower bound for this bin
|
||||
int count = 0;
|
||||
for (xlist<TrackedOp*>::iterator i = ops_in_flight.begin(); !i.end(); ++i) {
|
||||
utime_t age = now - (*i)->get_arrived();
|
||||
uint32_t ms = (long)(age * 1000.0);
|
||||
if (ms >= lb) {
|
||||
count++;
|
||||
continue;
|
||||
}
|
||||
if (count)
|
||||
h->set(bin, count);
|
||||
while (lb > ms) {
|
||||
bin--;
|
||||
lb >>= 1;
|
||||
}
|
||||
count = 1;
|
||||
}
|
||||
if (count)
|
||||
h->set(bin, count);
|
||||
}
|
||||
|
||||
void OpTracker::mark_event(TrackedOp *op, const string &dest)
|
||||
{
|
||||
utime_t now = ceph_clock_now(cct);
|
||||
return _mark_event(op, dest, now);
|
||||
}
|
||||
|
||||
void OpTracker::_mark_event(TrackedOp *op, const string &evt,
|
||||
utime_t time)
|
||||
{
|
||||
Mutex::Locker locker(ops_in_flight_lock);
|
||||
dout(5) << //"reqid: " << op->get_reqid() <<
|
||||
", seq: " << op->seq
|
||||
<< ", time: " << time << ", event: " << evt
|
||||
<< ", request: " << *op->request << dendl;
|
||||
}
|
||||
|
||||
void OpTracker::RemoveOnDelete::operator()(TrackedOp *op) {
|
||||
op->mark_event("done");
|
||||
tracker->unregister_inflight_op(op);
|
||||
// Do not delete op, unregister_inflight_op took control
|
||||
}
|
||||
|
||||
void TrackedOp::mark_event(const string &event)
|
||||
{
|
||||
utime_t now = ceph_clock_now(g_ceph_context);
|
||||
{
|
||||
Mutex::Locker l(lock);
|
||||
events.push_back(make_pair(now, event));
|
||||
}
|
||||
tracker->mark_event(this, event);
|
||||
_event_marked();
|
||||
}
|
||||
|
||||
void TrackedOp::dump(utime_t now, Formatter *f) const
|
||||
{
|
||||
Message *m = request;
|
||||
stringstream name;
|
||||
m->print(name);
|
||||
f->dump_string("description", name.str().c_str()); // this TrackedOp
|
||||
f->dump_stream("received_at") << get_arrived();
|
||||
f->dump_float("age", now - get_arrived());
|
||||
f->dump_float("duration", get_duration());
|
||||
{
|
||||
f->open_array_section("type_data");
|
||||
_dump(now, f);
|
||||
f->close_section();
|
||||
}
|
||||
}
|
@ -17,15 +17,163 @@
|
||||
#include <stdint.h>
|
||||
#include <include/utime.h>
|
||||
#include "common/Mutex.h"
|
||||
#include "include/histogram.h"
|
||||
#include "include/xlist.h"
|
||||
#include "msg/Message.h"
|
||||
#include <tr1/memory>
|
||||
|
||||
class TrackedOp {
|
||||
public:
|
||||
virtual void mark_event(const string &event) = 0;
|
||||
virtual ~TrackedOp() {}
|
||||
};
|
||||
class TrackedOp;
|
||||
typedef std::tr1::shared_ptr<TrackedOp> TrackedOpRef;
|
||||
|
||||
class OpTracker;
|
||||
class OpHistory {
|
||||
set<pair<utime_t, TrackedOpRef> > arrived;
|
||||
set<pair<double, TrackedOpRef> > duration;
|
||||
void cleanup(utime_t now);
|
||||
bool shutdown;
|
||||
OpTracker *tracker;
|
||||
uint32_t history_size;
|
||||
uint32_t history_duration;
|
||||
|
||||
public:
|
||||
OpHistory(OpTracker *tracker_) : shutdown(false), tracker(tracker_),
|
||||
history_size(0), history_duration(0) {}
|
||||
~OpHistory() {
|
||||
assert(arrived.empty());
|
||||
assert(duration.empty());
|
||||
}
|
||||
void insert(utime_t now, TrackedOpRef op);
|
||||
void dump_ops(utime_t now, Formatter *f);
|
||||
void on_shutdown();
|
||||
void set_size_and_duration(uint32_t new_size, uint32_t new_duration) {
|
||||
history_size = new_size;
|
||||
history_duration = new_duration;
|
||||
}
|
||||
};
|
||||
|
||||
class OpTracker {
|
||||
class RemoveOnDelete {
|
||||
OpTracker *tracker;
|
||||
public:
|
||||
RemoveOnDelete(OpTracker *tracker) : tracker(tracker) {}
|
||||
void operator()(TrackedOp *op);
|
||||
};
|
||||
friend class RemoveOnDelete;
|
||||
friend class OpHistory;
|
||||
uint64_t seq;
|
||||
Mutex ops_in_flight_lock;
|
||||
xlist<TrackedOp *> ops_in_flight;
|
||||
OpHistory history;
|
||||
float complaint_time;
|
||||
int log_threshold;
|
||||
|
||||
public:
|
||||
CephContext *cct;
|
||||
OpTracker(CephContext *cct_) : seq(0), ops_in_flight_lock("OpTracker mutex"),
|
||||
history(this), complaint_time(0), log_threshold(0), cct(cct_) {}
|
||||
void set_complaint_and_threshold(float time, int threshold) {
|
||||
complaint_time = time;
|
||||
log_threshold = threshold;
|
||||
}
|
||||
void set_history_size_and_duration(uint32_t new_size, uint32_t new_duration) {
|
||||
history.set_size_and_duration(new_size, new_duration);
|
||||
}
|
||||
void dump_ops_in_flight(Formatter *f);
|
||||
void dump_historic_ops(Formatter *f);
|
||||
void register_inflight_op(xlist<TrackedOp*>::item *i);
|
||||
void unregister_inflight_op(TrackedOp *i);
|
||||
|
||||
void get_age_ms_histogram(pow2_hist_t *h);
|
||||
|
||||
/**
|
||||
* Look for Ops which are too old, and insert warning
|
||||
* strings for each Op that is too old.
|
||||
*
|
||||
* @param warning_strings A vector<string> reference which is filled
|
||||
* with a warning string for each old Op.
|
||||
* @return True if there are any Ops to warn on, false otherwise.
|
||||
*/
|
||||
bool check_ops_in_flight(std::vector<string> &warning_strings);
|
||||
void mark_event(TrackedOp *op, const string &evt);
|
||||
void _mark_event(TrackedOp *op, const string &evt, utime_t now);
|
||||
|
||||
void on_shutdown() {
|
||||
Mutex::Locker l(ops_in_flight_lock);
|
||||
history.on_shutdown();
|
||||
}
|
||||
~OpTracker() {
|
||||
assert(ops_in_flight.empty());
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
typename T::Ref create_request(Message *ref)
|
||||
{
|
||||
typename T::Ref retval(new T(ref, this),
|
||||
RemoveOnDelete(this));
|
||||
|
||||
_mark_event(retval.get(), "header_read", ref->get_recv_stamp());
|
||||
_mark_event(retval.get(), "throttled", ref->get_throttle_stamp());
|
||||
_mark_event(retval.get(), "all_read", ref->get_recv_complete_stamp());
|
||||
_mark_event(retval.get(), "dispatched", ref->get_dispatch_stamp());
|
||||
|
||||
retval->init_from_message();
|
||||
|
||||
return retval;
|
||||
}
|
||||
};
|
||||
|
||||
class TrackedOp {
|
||||
private:
|
||||
friend class OpHistory;
|
||||
friend class OpTracker;
|
||||
xlist<TrackedOp*>::item xitem;
|
||||
protected:
|
||||
Message *request; /// the logical request we are tracking
|
||||
OpTracker *tracker; /// the tracker we are associated with
|
||||
|
||||
list<pair<utime_t, string> > events; /// list of events and their times
|
||||
Mutex lock; /// to protect the events list
|
||||
string current; /// the current state the event is in
|
||||
uint64_t seq; /// a unique value set by the OpTracker
|
||||
|
||||
uint32_t warn_interval_multiplier; // limits output of a given op warning
|
||||
|
||||
TrackedOp(Message *req, OpTracker *_tracker) :
|
||||
xitem(this),
|
||||
request(req),
|
||||
tracker(_tracker),
|
||||
lock("TrackedOp::lock"),
|
||||
seq(0),
|
||||
warn_interval_multiplier(1)
|
||||
{
|
||||
tracker->register_inflight_op(&xitem);
|
||||
}
|
||||
|
||||
virtual void init_from_message() {}
|
||||
/// output any type-specific data you want to get when dump() is called
|
||||
virtual void _dump(utime_t now, Formatter *f) const {}
|
||||
/// if you want something else to happen when events are marked, implement
|
||||
virtual void _event_marked() {}
|
||||
|
||||
public:
|
||||
virtual ~TrackedOp() { assert(request); request->put(); }
|
||||
|
||||
utime_t get_arrived() const {
|
||||
return request->get_recv_stamp();
|
||||
}
|
||||
// This function maybe needs some work; assumes last event is completion time
|
||||
double get_duration() const {
|
||||
return events.size() ?
|
||||
(events.rbegin()->first - get_arrived()) :
|
||||
0.0;
|
||||
}
|
||||
Message *get_req() const { return request; }
|
||||
|
||||
void mark_event(const string &event);
|
||||
virtual const char *state_string() const {
|
||||
return events.rbegin()->second.c_str();
|
||||
}
|
||||
void dump(utime_t now, Formatter *f) const;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
@ -43,6 +43,7 @@ noinst_HEADERS += \
|
||||
include/filepath.h \
|
||||
include/frag.h \
|
||||
include/hash.h \
|
||||
include/histogram.h \
|
||||
include/intarith.h \
|
||||
include/interval_set.h \
|
||||
include/int_types.h \
|
||||
|
76
src/include/histogram.h
Normal file
76
src/include/histogram.h
Normal file
@ -0,0 +1,76 @@
|
||||
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
|
||||
// vim: ts=8 sw=2 smarttab
|
||||
/*
|
||||
* Ceph - scalable distributed file system
|
||||
*
|
||||
* This is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU Lesser General Public
|
||||
* License version 2.1, as published by the Free Software
|
||||
* Foundation. See file COPYING.
|
||||
* Copyright 2013 Inktank
|
||||
*/
|
||||
|
||||
#ifndef HISTOGRAM_H_
|
||||
#define HISTOGRAM_H_
|
||||
|
||||
/**
|
||||
* power of 2 histogram
|
||||
*/
|
||||
struct pow2_hist_t { //
|
||||
/**
|
||||
* histogram
|
||||
*
|
||||
* bin size is 2^index
|
||||
* value is count of elements that are <= the current bin but > the previous bin.
|
||||
*/
|
||||
vector<int32_t> h;
|
||||
|
||||
private:
|
||||
/// expand to at least another's size
|
||||
void _expand_to(unsigned s) {
|
||||
if (s > h.size())
|
||||
h.resize(s, 0);
|
||||
}
|
||||
/// drop useless trailing 0's
|
||||
void _contract() {
|
||||
unsigned p = h.size();
|
||||
while (p > 0 && h[p-1] == 0)
|
||||
--p;
|
||||
h.resize(p);
|
||||
}
|
||||
|
||||
public:
|
||||
void clear() {
|
||||
h.clear();
|
||||
}
|
||||
void set(int bin, int32_t v) {
|
||||
_expand_to(bin + 1);
|
||||
h[bin] = v;
|
||||
_contract();
|
||||
}
|
||||
|
||||
void add(const pow2_hist_t& o) {
|
||||
_expand_to(o.h.size());
|
||||
for (unsigned p = 0; p < o.h.size(); ++p)
|
||||
h[p] += o.h[p];
|
||||
_contract();
|
||||
}
|
||||
void sub(const pow2_hist_t& o) {
|
||||
_expand_to(o.h.size());
|
||||
for (unsigned p = 0; p < o.h.size(); ++p)
|
||||
h[p] -= o.h[p];
|
||||
_contract();
|
||||
}
|
||||
|
||||
int32_t upper_bound() const {
|
||||
return 1 << h.size();
|
||||
}
|
||||
|
||||
void dump(Formatter *f) const;
|
||||
void encode(bufferlist &bl) const;
|
||||
void decode(bufferlist::iterator &bl);
|
||||
static void generate_test_instances(std::list<pow2_hist_t*>& o);
|
||||
};
|
||||
WRITE_CLASS_ENCODER(pow2_hist_t)
|
||||
|
||||
#endif /* HISTOGRAM_H_ */
|
@ -28,6 +28,7 @@ using namespace std;
|
||||
#include "PaxosService.h"
|
||||
#include "include/types.h"
|
||||
#include "include/utime.h"
|
||||
#include "include/histogram.h"
|
||||
#include "msg/Messenger.h"
|
||||
#include "common/config.h"
|
||||
#include "mon/MonitorDBStore.h"
|
||||
|
@ -177,7 +177,7 @@ int cls_read(cls_method_context_t hctx, int ofs, int len,
|
||||
int cls_get_request_origin(cls_method_context_t hctx, entity_inst_t *origin)
|
||||
{
|
||||
ReplicatedPG::OpContext **pctx = static_cast<ReplicatedPG::OpContext **>(hctx);
|
||||
*origin = (*pctx)->op->request->get_orig_source_inst();
|
||||
*origin = (*pctx)->op->get_req()->get_orig_source_inst();
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -13,7 +13,8 @@ libos_la_SOURCES = \
|
||||
os/WBThrottle.cc \
|
||||
os/BtrfsFileStoreBackend.cc \
|
||||
os/GenericFileStoreBackend.cc \
|
||||
os/ZFSFileStoreBackend.cc
|
||||
os/ZFSFileStoreBackend.cc \
|
||||
common/TrackedOp.cc
|
||||
noinst_LTLIBRARIES += libos.la
|
||||
|
||||
noinst_HEADERS += \
|
||||
|
@ -16,6 +16,7 @@ libosd_la_SOURCES = \
|
||||
osd/Watch.cc \
|
||||
osd/ClassHandler.cc \
|
||||
osd/OpRequest.cc \
|
||||
common/TrackedOp.cc \
|
||||
osd/SnapMapper.cc \
|
||||
osd/osd_types.cc \
|
||||
objclass/class_api.cc
|
||||
|
103
src/osd/OSD.cc
103
src/osd/OSD.cc
@ -907,6 +907,10 @@ OSD::OSD(CephContext *cct_, int id, Messenger *internal_messenger, Messenger *ex
|
||||
service(this)
|
||||
{
|
||||
monc->set_messenger(client_messenger);
|
||||
op_tracker.set_complaint_and_threshold(cct->_conf->osd_op_complaint_time,
|
||||
cct->_conf->osd_op_log_threshold);
|
||||
op_tracker.set_history_size_and_duration(cct->_conf->osd_op_history_size,
|
||||
cct->_conf->osd_op_history_duration);
|
||||
}
|
||||
|
||||
OSD::~OSD()
|
||||
@ -4539,7 +4543,7 @@ void OSD::do_waiters()
|
||||
|
||||
void OSD::dispatch_op(OpRequestRef op)
|
||||
{
|
||||
switch (op->request->get_type()) {
|
||||
switch (op->get_req()->get_type()) {
|
||||
|
||||
case MSG_OSD_PG_CREATE:
|
||||
handle_pg_create(op);
|
||||
@ -4665,7 +4669,7 @@ void OSD::_dispatch(Message *m)
|
||||
|
||||
default:
|
||||
{
|
||||
OpRequestRef op = op_tracker.create_request(m);
|
||||
OpRequestRef op = op_tracker.create_request<OpRequest>(m);
|
||||
op->mark_event("waiting_for_osdmap");
|
||||
// no map? starting up?
|
||||
if (!osdmap) {
|
||||
@ -5711,9 +5715,9 @@ bool OSD::require_mon_peer(Message *m)
|
||||
|
||||
bool OSD::require_osd_peer(OpRequestRef op)
|
||||
{
|
||||
if (!op->request->get_connection()->peer_is_osd()) {
|
||||
dout(0) << "require_osd_peer received from non-osd " << op->request->get_connection()->get_peer_addr()
|
||||
<< " " << *op->request << dendl;
|
||||
if (!op->get_req()->get_connection()->peer_is_osd()) {
|
||||
dout(0) << "require_osd_peer received from non-osd " << op->get_req()->get_connection()->get_peer_addr()
|
||||
<< " " << *op->get_req() << dendl;
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
@ -5725,7 +5729,7 @@ bool OSD::require_osd_peer(OpRequestRef op)
|
||||
*/
|
||||
bool OSD::require_same_or_newer_map(OpRequestRef op, epoch_t epoch)
|
||||
{
|
||||
Message *m = op->request;
|
||||
Message *m = op->get_req();
|
||||
dout(15) << "require_same_or_newer_map " << epoch << " (i am " << osdmap->get_epoch() << ") " << m << dendl;
|
||||
|
||||
assert(osd_lock.is_locked());
|
||||
@ -5837,7 +5841,7 @@ void OSD::split_pgs(
|
||||
*/
|
||||
void OSD::handle_pg_create(OpRequestRef op)
|
||||
{
|
||||
MOSDPGCreate *m = (MOSDPGCreate*)op->request;
|
||||
MOSDPGCreate *m = (MOSDPGCreate*)op->get_req();
|
||||
assert(m->get_header().type == MSG_OSD_PG_CREATE);
|
||||
|
||||
dout(10) << "handle_pg_create " << *m << dendl;
|
||||
@ -5857,11 +5861,16 @@ void OSD::handle_pg_create(OpRequestRef op)
|
||||
}
|
||||
}
|
||||
|
||||
if (!require_mon_peer(op->request)) {
|
||||
// we have to hack around require_mon_peer's interface limits
|
||||
op->request = NULL;
|
||||
/* we have to hack around require_mon_peer's interface limits, so
|
||||
* grab an extra reference before going in. If the peer isn't
|
||||
* a Monitor, the reference is put for us (and then cleared
|
||||
* up automatically by our OpTracker infrastructure). Otherwise,
|
||||
* we put the extra ref ourself.
|
||||
*/
|
||||
if (!require_mon_peer(op->get_req()->get())) {
|
||||
return;
|
||||
}
|
||||
op->get_req()->put();
|
||||
|
||||
if (!require_same_or_newer_map(op, m->epoch)) return;
|
||||
|
||||
@ -6166,7 +6175,7 @@ void OSD::do_infos(map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >& info
|
||||
*/
|
||||
void OSD::handle_pg_notify(OpRequestRef op)
|
||||
{
|
||||
MOSDPGNotify *m = (MOSDPGNotify*)op->request;
|
||||
MOSDPGNotify *m = (MOSDPGNotify*)op->get_req();
|
||||
assert(m->get_header().type == MSG_OSD_PG_NOTIFY);
|
||||
|
||||
dout(7) << "handle_pg_notify from " << m->get_source() << dendl;
|
||||
@ -6201,7 +6210,7 @@ void OSD::handle_pg_notify(OpRequestRef op)
|
||||
|
||||
void OSD::handle_pg_log(OpRequestRef op)
|
||||
{
|
||||
MOSDPGLog *m = (MOSDPGLog*) op->request;
|
||||
MOSDPGLog *m = (MOSDPGLog*) op->get_req();
|
||||
assert(m->get_header().type == MSG_OSD_PG_LOG);
|
||||
dout(7) << "handle_pg_log " << *m << " from " << m->get_source() << dendl;
|
||||
|
||||
@ -6229,7 +6238,7 @@ void OSD::handle_pg_log(OpRequestRef op)
|
||||
|
||||
void OSD::handle_pg_info(OpRequestRef op)
|
||||
{
|
||||
MOSDPGInfo *m = static_cast<MOSDPGInfo *>(op->request);
|
||||
MOSDPGInfo *m = static_cast<MOSDPGInfo *>(op->get_req());
|
||||
assert(m->get_header().type == MSG_OSD_PG_INFO);
|
||||
dout(7) << "handle_pg_info " << *m << " from " << m->get_source() << dendl;
|
||||
|
||||
@ -6262,7 +6271,7 @@ void OSD::handle_pg_info(OpRequestRef op)
|
||||
|
||||
void OSD::handle_pg_trim(OpRequestRef op)
|
||||
{
|
||||
MOSDPGTrim *m = (MOSDPGTrim *)op->request;
|
||||
MOSDPGTrim *m = (MOSDPGTrim *)op->get_req();
|
||||
assert(m->get_header().type == MSG_OSD_PG_TRIM);
|
||||
|
||||
dout(7) << "handle_pg_trim " << *m << " from " << m->get_source() << dendl;
|
||||
@ -6315,7 +6324,7 @@ void OSD::handle_pg_trim(OpRequestRef op)
|
||||
|
||||
void OSD::handle_pg_scan(OpRequestRef op)
|
||||
{
|
||||
MOSDPGScan *m = static_cast<MOSDPGScan*>(op->request);
|
||||
MOSDPGScan *m = static_cast<MOSDPGScan*>(op->get_req());
|
||||
assert(m->get_header().type == MSG_OSD_PG_SCAN);
|
||||
dout(10) << "handle_pg_scan " << *m << " from " << m->get_source() << dendl;
|
||||
|
||||
@ -6343,7 +6352,7 @@ void OSD::handle_pg_scan(OpRequestRef op)
|
||||
|
||||
void OSD::handle_pg_backfill(OpRequestRef op)
|
||||
{
|
||||
MOSDPGBackfill *m = static_cast<MOSDPGBackfill*>(op->request);
|
||||
MOSDPGBackfill *m = static_cast<MOSDPGBackfill*>(op->get_req());
|
||||
assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
|
||||
dout(10) << "handle_pg_backfill " << *m << " from " << m->get_source() << dendl;
|
||||
|
||||
@ -6371,7 +6380,7 @@ void OSD::handle_pg_backfill(OpRequestRef op)
|
||||
|
||||
void OSD::handle_pg_backfill_reserve(OpRequestRef op)
|
||||
{
|
||||
MBackfillReserve *m = static_cast<MBackfillReserve*>(op->request);
|
||||
MBackfillReserve *m = static_cast<MBackfillReserve*>(op->get_req());
|
||||
assert(m->get_header().type == MSG_OSD_BACKFILL_RESERVE);
|
||||
|
||||
if (!require_osd_peer(op))
|
||||
@ -6415,7 +6424,7 @@ void OSD::handle_pg_backfill_reserve(OpRequestRef op)
|
||||
|
||||
void OSD::handle_pg_recovery_reserve(OpRequestRef op)
|
||||
{
|
||||
MRecoveryReserve *m = static_cast<MRecoveryReserve*>(op->request);
|
||||
MRecoveryReserve *m = static_cast<MRecoveryReserve*>(op->get_req());
|
||||
assert(m->get_header().type == MSG_OSD_RECOVERY_RESERVE);
|
||||
|
||||
if (!require_osd_peer(op))
|
||||
@ -6467,7 +6476,7 @@ void OSD::handle_pg_query(OpRequestRef op)
|
||||
{
|
||||
assert(osd_lock.is_locked());
|
||||
|
||||
MOSDPGQuery *m = (MOSDPGQuery*)op->request;
|
||||
MOSDPGQuery *m = (MOSDPGQuery*)op->get_req();
|
||||
assert(m->get_header().type == MSG_OSD_PG_QUERY);
|
||||
|
||||
if (!require_osd_peer(op))
|
||||
@ -6554,7 +6563,7 @@ void OSD::handle_pg_query(OpRequestRef op)
|
||||
|
||||
void OSD::handle_pg_remove(OpRequestRef op)
|
||||
{
|
||||
MOSDPGRemove *m = (MOSDPGRemove *)op->request;
|
||||
MOSDPGRemove *m = (MOSDPGRemove *)op->get_req();
|
||||
assert(m->get_header().type == MSG_OSD_PG_REMOVE);
|
||||
assert(osd_lock.is_locked());
|
||||
|
||||
@ -6827,7 +6836,7 @@ void OSDService::reply_op_error(OpRequestRef op, int err)
|
||||
void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v,
|
||||
version_t uv)
|
||||
{
|
||||
MOSDOp *m = static_cast<MOSDOp*>(op->request);
|
||||
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
|
||||
assert(m->get_header().type == CEPH_MSG_OSD_OP);
|
||||
int flags;
|
||||
flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
|
||||
@ -6839,7 +6848,7 @@ void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v,
|
||||
|
||||
void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
|
||||
{
|
||||
MOSDOp *m = static_cast<MOSDOp*>(op->request);
|
||||
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
|
||||
assert(m->get_header().type == CEPH_MSG_OSD_OP);
|
||||
|
||||
if (m->get_map_epoch() < pg->info.history.same_primary_since) {
|
||||
@ -6858,7 +6867,7 @@ void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
|
||||
|
||||
void OSD::handle_op(OpRequestRef op)
|
||||
{
|
||||
MOSDOp *m = static_cast<MOSDOp*>(op->request);
|
||||
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
|
||||
assert(m->get_header().type == CEPH_MSG_OSD_OP);
|
||||
if (op_is_discardable(m)) {
|
||||
dout(10) << " discardable " << *m << dendl;
|
||||
@ -6993,7 +7002,7 @@ void OSD::handle_op(OpRequestRef op)
|
||||
template<typename T, int MSGTYPE>
|
||||
void OSD::handle_replica_op(OpRequestRef op)
|
||||
{
|
||||
T *m = static_cast<T *>(op->request);
|
||||
T *m = static_cast<T *>(op->get_req());
|
||||
assert(m->get_header().type == MSGTYPE);
|
||||
|
||||
dout(10) << __func__ << *m << " epoch " << m->map_epoch << dendl;
|
||||
@ -7047,24 +7056,24 @@ bool OSD::op_is_discardable(MOSDOp *op)
|
||||
*/
|
||||
void OSD::enqueue_op(PG *pg, OpRequestRef op)
|
||||
{
|
||||
utime_t latency = ceph_clock_now(cct) - op->request->get_recv_stamp();
|
||||
dout(15) << "enqueue_op " << op << " prio " << op->request->get_priority()
|
||||
<< " cost " << op->request->get_cost()
|
||||
utime_t latency = ceph_clock_now(cct) - op->get_req()->get_recv_stamp();
|
||||
dout(15) << "enqueue_op " << op << " prio " << op->get_req()->get_priority()
|
||||
<< " cost " << op->get_req()->get_cost()
|
||||
<< " latency " << latency
|
||||
<< " " << *(op->request) << dendl;
|
||||
<< " " << *(op->get_req()) << dendl;
|
||||
pg->queue_op(op);
|
||||
}
|
||||
|
||||
void OSD::OpWQ::_enqueue(pair<PGRef, OpRequestRef> item)
|
||||
{
|
||||
unsigned priority = item.second->request->get_priority();
|
||||
unsigned cost = item.second->request->get_cost();
|
||||
unsigned priority = item.second->get_req()->get_priority();
|
||||
unsigned cost = item.second->get_req()->get_cost();
|
||||
if (priority >= CEPH_MSG_PRIO_LOW)
|
||||
pqueue.enqueue_strict(
|
||||
item.second->request->get_source_inst(),
|
||||
item.second->get_req()->get_source_inst(),
|
||||
priority, item);
|
||||
else
|
||||
pqueue.enqueue(item.second->request->get_source_inst(),
|
||||
pqueue.enqueue(item.second->get_req()->get_source_inst(),
|
||||
priority, cost, item);
|
||||
osd->logger->set(l_osd_opq, pqueue.length());
|
||||
}
|
||||
@ -7079,14 +7088,14 @@ void OSD::OpWQ::_enqueue_front(pair<PGRef, OpRequestRef> item)
|
||||
pg_for_processing[&*(item.first)].pop_back();
|
||||
}
|
||||
}
|
||||
unsigned priority = item.second->request->get_priority();
|
||||
unsigned cost = item.second->request->get_cost();
|
||||
unsigned priority = item.second->get_req()->get_priority();
|
||||
unsigned cost = item.second->get_req()->get_cost();
|
||||
if (priority >= CEPH_MSG_PRIO_LOW)
|
||||
pqueue.enqueue_strict_front(
|
||||
item.second->request->get_source_inst(),
|
||||
item.second->get_req()->get_source_inst(),
|
||||
priority, item);
|
||||
else
|
||||
pqueue.enqueue_front(item.second->request->get_source_inst(),
|
||||
pqueue.enqueue_front(item.second->get_req()->get_source_inst(),
|
||||
priority, cost, item);
|
||||
osd->logger->set(l_osd_opq, pqueue.length());
|
||||
}
|
||||
@ -7138,11 +7147,11 @@ void OSD::dequeue_op(
|
||||
PGRef pg, OpRequestRef op,
|
||||
ThreadPool::TPHandle &handle)
|
||||
{
|
||||
utime_t latency = ceph_clock_now(cct) - op->request->get_recv_stamp();
|
||||
dout(10) << "dequeue_op " << op << " prio " << op->request->get_priority()
|
||||
<< " cost " << op->request->get_cost()
|
||||
utime_t latency = ceph_clock_now(cct) - op->get_req()->get_recv_stamp();
|
||||
dout(10) << "dequeue_op " << op << " prio " << op->get_req()->get_priority()
|
||||
<< " cost " << op->get_req()->get_cost()
|
||||
<< " latency " << latency
|
||||
<< " " << *(op->request)
|
||||
<< " " << *(op->get_req())
|
||||
<< " pg " << *pg << dendl;
|
||||
if (pg->deleting)
|
||||
return;
|
||||
@ -7243,6 +7252,8 @@ const char** OSD::get_tracked_conf_keys() const
|
||||
{
|
||||
static const char* KEYS[] = {
|
||||
"osd_max_backfills",
|
||||
"osd_op_complaint_time", "osd_op_log_threshold",
|
||||
"osd_op_history_size", "osd_op_history_duration",
|
||||
NULL
|
||||
};
|
||||
return KEYS;
|
||||
@ -7255,13 +7266,23 @@ void OSD::handle_conf_change(const struct md_config_t *conf,
|
||||
service.local_reserver.set_max(cct->_conf->osd_max_backfills);
|
||||
service.remote_reserver.set_max(cct->_conf->osd_max_backfills);
|
||||
}
|
||||
if (changed.count("osd_op_complaint_time") ||
|
||||
changed.count("osd_op_log_threshold")) {
|
||||
op_tracker.set_complaint_and_threshold(cct->_conf->osd_op_complaint_time,
|
||||
cct->_conf->osd_op_log_threshold);
|
||||
}
|
||||
if (changed.count("osd_op_history_size") ||
|
||||
changed.count("osd_op_history_duration")) {
|
||||
op_tracker.set_history_size_and_duration(cct->_conf->osd_op_history_size,
|
||||
cct->_conf->osd_op_history_duration);
|
||||
}
|
||||
}
|
||||
|
||||
// --------------------------------
|
||||
|
||||
int OSD::init_op_flags(OpRequestRef op)
|
||||
{
|
||||
MOSDOp *m = static_cast<MOSDOp*>(op->request);
|
||||
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
|
||||
vector<OSDOp>::iterator iter;
|
||||
|
||||
// client flags have no bearing on whether an op is a read, write, etc.
|
||||
|
@ -11,229 +11,21 @@
|
||||
#include "messages/MOSDSubOp.h"
|
||||
#include "include/assert.h"
|
||||
|
||||
#define dout_subsys ceph_subsys_optracker
|
||||
#undef dout_prefix
|
||||
#define dout_prefix _prefix(_dout)
|
||||
|
||||
static ostream& _prefix(std::ostream* _dout)
|
||||
{
|
||||
return *_dout << "--OSD::tracker-- ";
|
||||
}
|
||||
|
||||
OpRequest::OpRequest(Message *req, OpTracker *tracker) :
|
||||
request(req), xitem(this),
|
||||
TrackedOp(req, tracker),
|
||||
rmw_flags(0),
|
||||
warn_interval_multiplier(1),
|
||||
lock("OpRequest::lock"),
|
||||
tracker(tracker),
|
||||
hit_flag_points(0), latest_flag_point(0),
|
||||
seq(0) {
|
||||
received_time = request->get_recv_stamp();
|
||||
tracker->register_inflight_op(&xitem);
|
||||
hit_flag_points(0), latest_flag_point(0) {
|
||||
if (req->get_priority() < tracker->cct->_conf->osd_client_op_priority) {
|
||||
// don't warn as quickly for low priority ops
|
||||
warn_interval_multiplier = tracker->cct->_conf->osd_recovery_op_warn_multiple;
|
||||
}
|
||||
}
|
||||
|
||||
void OpHistory::on_shutdown()
|
||||
{
|
||||
arrived.clear();
|
||||
duration.clear();
|
||||
shutdown = true;
|
||||
}
|
||||
|
||||
void OpHistory::insert(utime_t now, OpRequestRef op)
|
||||
{
|
||||
if (shutdown)
|
||||
return;
|
||||
duration.insert(make_pair(op->get_duration(), op));
|
||||
arrived.insert(make_pair(op->get_arrived(), op));
|
||||
cleanup(now);
|
||||
}
|
||||
|
||||
void OpHistory::cleanup(utime_t now)
|
||||
{
|
||||
while (arrived.size() &&
|
||||
(now - arrived.begin()->first >
|
||||
(double)(tracker->cct->_conf->osd_op_history_duration))) {
|
||||
duration.erase(make_pair(
|
||||
arrived.begin()->second->get_duration(),
|
||||
arrived.begin()->second));
|
||||
arrived.erase(arrived.begin());
|
||||
}
|
||||
|
||||
while (duration.size() > tracker->cct->_conf->osd_op_history_size) {
|
||||
arrived.erase(make_pair(
|
||||
duration.begin()->second->get_arrived(),
|
||||
duration.begin()->second));
|
||||
duration.erase(duration.begin());
|
||||
}
|
||||
}
|
||||
|
||||
void OpHistory::dump_ops(utime_t now, Formatter *f)
|
||||
{
|
||||
cleanup(now);
|
||||
f->open_object_section("OpHistory");
|
||||
f->dump_int("num to keep", tracker->cct->_conf->osd_op_history_size);
|
||||
f->dump_int("duration to keep", tracker->cct->_conf->osd_op_history_duration);
|
||||
{
|
||||
f->open_array_section("Ops");
|
||||
for (set<pair<utime_t, OpRequestRef> >::const_iterator i =
|
||||
arrived.begin();
|
||||
i != arrived.end();
|
||||
++i) {
|
||||
f->open_object_section("Op");
|
||||
i->second->dump(now, f);
|
||||
f->close_section();
|
||||
}
|
||||
f->close_section();
|
||||
}
|
||||
f->close_section();
|
||||
}
|
||||
|
||||
void OpTracker::dump_historic_ops(Formatter *f)
|
||||
{
|
||||
Mutex::Locker locker(ops_in_flight_lock);
|
||||
utime_t now = ceph_clock_now(cct);
|
||||
history.dump_ops(now, f);
|
||||
}
|
||||
|
||||
void OpTracker::dump_ops_in_flight(Formatter *f)
|
||||
{
|
||||
Mutex::Locker locker(ops_in_flight_lock);
|
||||
f->open_object_section("ops_in_flight"); // overall dump
|
||||
f->dump_int("num_ops", ops_in_flight.size());
|
||||
f->open_array_section("ops"); // list of OpRequests
|
||||
utime_t now = ceph_clock_now(cct);
|
||||
for (xlist<OpRequest*>::iterator p = ops_in_flight.begin(); !p.end(); ++p) {
|
||||
f->open_object_section("op");
|
||||
(*p)->dump(now, f);
|
||||
f->close_section(); // this OpRequest
|
||||
}
|
||||
f->close_section(); // list of OpRequests
|
||||
f->close_section(); // overall dump
|
||||
}
|
||||
|
||||
void OpTracker::register_inflight_op(xlist<OpRequest*>::item *i)
|
||||
{
|
||||
Mutex::Locker locker(ops_in_flight_lock);
|
||||
ops_in_flight.push_back(i);
|
||||
ops_in_flight.back()->seq = seq++;
|
||||
}
|
||||
|
||||
void OpTracker::unregister_inflight_op(OpRequest *i)
|
||||
{
|
||||
Mutex::Locker locker(ops_in_flight_lock);
|
||||
assert(i->xitem.get_list() == &ops_in_flight);
|
||||
utime_t now = ceph_clock_now(cct);
|
||||
i->xitem.remove_myself();
|
||||
i->request->clear_data();
|
||||
history.insert(now, OpRequestRef(i));
|
||||
}
|
||||
|
||||
bool OpTracker::check_ops_in_flight(std::vector<string> &warning_vector)
|
||||
{
|
||||
Mutex::Locker locker(ops_in_flight_lock);
|
||||
if (!ops_in_flight.size())
|
||||
return false;
|
||||
|
||||
utime_t now = ceph_clock_now(cct);
|
||||
utime_t too_old = now;
|
||||
too_old -= cct->_conf->osd_op_complaint_time;
|
||||
|
||||
utime_t oldest_secs = now - ops_in_flight.front()->received_time;
|
||||
|
||||
dout(10) << "ops_in_flight.size: " << ops_in_flight.size()
|
||||
<< "; oldest is " << oldest_secs
|
||||
<< " seconds old" << dendl;
|
||||
|
||||
if (oldest_secs < cct->_conf->osd_op_complaint_time)
|
||||
return false;
|
||||
|
||||
xlist<OpRequest*>::iterator i = ops_in_flight.begin();
|
||||
warning_vector.reserve(cct->_conf->osd_op_log_threshold + 1);
|
||||
|
||||
int slow = 0; // total slow
|
||||
int warned = 0; // total logged
|
||||
while (!i.end() && (*i)->received_time < too_old) {
|
||||
slow++;
|
||||
|
||||
// exponential backoff of warning intervals
|
||||
if (((*i)->received_time +
|
||||
(cct->_conf->osd_op_complaint_time *
|
||||
(*i)->warn_interval_multiplier)) < now) {
|
||||
// will warn
|
||||
if (warning_vector.empty())
|
||||
warning_vector.push_back("");
|
||||
warned++;
|
||||
if (warned > cct->_conf->osd_op_log_threshold)
|
||||
break;
|
||||
|
||||
utime_t age = now - (*i)->received_time;
|
||||
stringstream ss;
|
||||
ss << "slow request " << age << " seconds old, received at " << (*i)->received_time
|
||||
<< ": " << *((*i)->request) << " currently "
|
||||
<< ((*i)->current.size() ? (*i)->current : (*i)->state_string());
|
||||
warning_vector.push_back(ss.str());
|
||||
|
||||
// only those that have been shown will backoff
|
||||
(*i)->warn_interval_multiplier *= 2;
|
||||
}
|
||||
++i;
|
||||
}
|
||||
|
||||
// only summarize if we warn about any. if everything has backed
|
||||
// off, we will stay silent.
|
||||
if (warned > 0) {
|
||||
stringstream ss;
|
||||
ss << slow << " slow requests, " << warned << " included below; oldest blocked for > "
|
||||
<< oldest_secs << " secs";
|
||||
warning_vector[0] = ss.str();
|
||||
}
|
||||
|
||||
return warning_vector.size();
|
||||
}
|
||||
|
||||
void OpTracker::get_age_ms_histogram(pow2_hist_t *h)
|
||||
{
|
||||
Mutex::Locker locker(ops_in_flight_lock);
|
||||
|
||||
h->clear();
|
||||
|
||||
utime_t now = ceph_clock_now(NULL);
|
||||
unsigned bin = 30;
|
||||
uint32_t lb = 1 << (bin-1); // lower bound for this bin
|
||||
int count = 0;
|
||||
for (xlist<OpRequest*>::iterator i = ops_in_flight.begin(); !i.end(); ++i) {
|
||||
utime_t age = now - (*i)->received_time;
|
||||
uint32_t ms = (long)(age * 1000.0);
|
||||
if (ms >= lb) {
|
||||
count++;
|
||||
continue;
|
||||
}
|
||||
if (count)
|
||||
h->set(bin, count);
|
||||
while (lb > ms) {
|
||||
bin--;
|
||||
lb >>= 1;
|
||||
}
|
||||
count = 1;
|
||||
}
|
||||
if (count)
|
||||
h->set(bin, count);
|
||||
}
|
||||
|
||||
void OpRequest::dump(utime_t now, Formatter *f) const
|
||||
void OpRequest::_dump(utime_t now, Formatter *f) const
|
||||
{
|
||||
Message *m = request;
|
||||
stringstream name;
|
||||
m->print(name);
|
||||
f->dump_string("description", name.str().c_str()); // this OpRequest
|
||||
f->dump_unsigned("rmw_flags", rmw_flags);
|
||||
f->dump_stream("received_at") << received_time;
|
||||
f->dump_float("age", now - received_time);
|
||||
f->dump_float("duration", get_duration());
|
||||
f->dump_string("flag_point", state_string());
|
||||
if (m->get_orig_source().is_client()) {
|
||||
f->open_object_section("client_info");
|
||||
@ -257,50 +49,11 @@ void OpRequest::dump(utime_t now, Formatter *f) const
|
||||
}
|
||||
}
|
||||
|
||||
void OpTracker::mark_event(OpRequest *op, const string &dest)
|
||||
void OpRequest::init_from_message()
|
||||
{
|
||||
utime_t now = ceph_clock_now(cct);
|
||||
return _mark_event(op, dest, now);
|
||||
}
|
||||
|
||||
void OpTracker::_mark_event(OpRequest *op, const string &evt,
|
||||
utime_t time)
|
||||
{
|
||||
Mutex::Locker locker(ops_in_flight_lock);
|
||||
dout(5) << "reqid: " << op->get_reqid() << ", seq: " << op->seq
|
||||
<< ", time: " << time << ", event: " << evt
|
||||
<< ", request: " << *op->request << dendl;
|
||||
}
|
||||
|
||||
void OpTracker::RemoveOnDelete::operator()(OpRequest *op) {
|
||||
op->mark_event("done");
|
||||
tracker->unregister_inflight_op(op);
|
||||
// Do not delete op, unregister_inflight_op took control
|
||||
}
|
||||
|
||||
OpRequestRef OpTracker::create_request(Message *ref)
|
||||
{
|
||||
OpRequestRef retval(new OpRequest(ref, this),
|
||||
RemoveOnDelete(this));
|
||||
|
||||
if (ref->get_type() == CEPH_MSG_OSD_OP) {
|
||||
retval->reqid = static_cast<MOSDOp*>(ref)->get_reqid();
|
||||
} else if (ref->get_type() == MSG_OSD_SUBOP) {
|
||||
retval->reqid = static_cast<MOSDSubOp*>(ref)->reqid;
|
||||
if (request->get_type() == CEPH_MSG_OSD_OP) {
|
||||
reqid = static_cast<MOSDOp*>(request)->get_reqid();
|
||||
} else if (request->get_type() == MSG_OSD_SUBOP) {
|
||||
reqid = static_cast<MOSDSubOp*>(request)->reqid;
|
||||
}
|
||||
_mark_event(retval.get(), "header_read", ref->get_recv_stamp());
|
||||
_mark_event(retval.get(), "throttled", ref->get_throttle_stamp());
|
||||
_mark_event(retval.get(), "all_read", ref->get_recv_complete_stamp());
|
||||
_mark_event(retval.get(), "dispatched", ref->get_dispatch_stamp());
|
||||
return retval;
|
||||
}
|
||||
|
||||
void OpRequest::mark_event(const string &event)
|
||||
{
|
||||
utime_t now = ceph_clock_now(tracker->cct);
|
||||
{
|
||||
Mutex::Locker l(lock);
|
||||
events.push_back(make_pair(now, event));
|
||||
}
|
||||
tracker->mark_event(this, event);
|
||||
}
|
||||
|
@ -25,87 +25,12 @@
|
||||
#include "common/TrackedOp.h"
|
||||
#include "osd/osd_types.h"
|
||||
|
||||
struct OpRequest;
|
||||
class OpTracker;
|
||||
typedef std::tr1::shared_ptr<OpRequest> OpRequestRef;
|
||||
class OpHistory {
|
||||
set<pair<utime_t, OpRequestRef> > arrived;
|
||||
set<pair<double, OpRequestRef> > duration;
|
||||
void cleanup(utime_t now);
|
||||
bool shutdown;
|
||||
OpTracker *tracker;
|
||||
|
||||
public:
|
||||
OpHistory(OpTracker *tracker_) : shutdown(false), tracker(tracker_) {}
|
||||
~OpHistory() {
|
||||
assert(arrived.empty());
|
||||
assert(duration.empty());
|
||||
}
|
||||
void insert(utime_t now, OpRequestRef op);
|
||||
void dump_ops(utime_t now, Formatter *f);
|
||||
void on_shutdown();
|
||||
};
|
||||
|
||||
class OpTracker {
|
||||
class RemoveOnDelete {
|
||||
OpTracker *tracker;
|
||||
public:
|
||||
RemoveOnDelete(OpTracker *tracker) : tracker(tracker) {}
|
||||
void operator()(OpRequest *op);
|
||||
};
|
||||
friend class RemoveOnDelete;
|
||||
friend class OpRequest;
|
||||
friend class OpHistory;
|
||||
uint64_t seq;
|
||||
Mutex ops_in_flight_lock;
|
||||
xlist<OpRequest *> ops_in_flight;
|
||||
OpHistory history;
|
||||
|
||||
protected:
|
||||
CephContext *cct;
|
||||
|
||||
public:
|
||||
OpTracker(CephContext *cct_) : seq(0), ops_in_flight_lock("OpTracker mutex"), history(this), cct(cct_) {}
|
||||
void dump_ops_in_flight(Formatter *f);
|
||||
void dump_historic_ops(Formatter *f);
|
||||
void register_inflight_op(xlist<OpRequest*>::item *i);
|
||||
void unregister_inflight_op(OpRequest *i);
|
||||
|
||||
void get_age_ms_histogram(pow2_hist_t *h);
|
||||
|
||||
/**
|
||||
* Look for Ops which are too old, and insert warning
|
||||
* strings for each Op that is too old.
|
||||
*
|
||||
* @param warning_strings A vector<string> reference which is filled
|
||||
* with a warning string for each old Op.
|
||||
* @return True if there are any Ops to warn on, false otherwise.
|
||||
*/
|
||||
bool check_ops_in_flight(std::vector<string> &warning_strings);
|
||||
void mark_event(OpRequest *op, const string &evt);
|
||||
void _mark_event(OpRequest *op, const string &evt, utime_t now);
|
||||
OpRequestRef create_request(Message *req);
|
||||
void on_shutdown() {
|
||||
Mutex::Locker l(ops_in_flight_lock);
|
||||
history.on_shutdown();
|
||||
}
|
||||
~OpTracker() {
|
||||
assert(ops_in_flight.empty());
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* The OpRequest takes in a Message* and takes over a single reference
|
||||
* to it, which it puts() when destroyed.
|
||||
* OpRequest is itself ref-counted. The expectation is that you get a Message
|
||||
* you want to track, create an OpRequest with it, and then pass around that OpRequest
|
||||
* the way you used to pass around the Message.
|
||||
*/
|
||||
struct OpRequest : public TrackedOp {
|
||||
friend class OpTracker;
|
||||
friend class OpHistory;
|
||||
Message *request;
|
||||
xlist<OpRequest*>::item xitem;
|
||||
|
||||
// rmw flags
|
||||
int rmw_flags;
|
||||
@ -134,28 +59,12 @@ struct OpRequest : public TrackedOp {
|
||||
void set_class_write() { rmw_flags |= CEPH_OSD_RMW_FLAG_CLASS_WRITE; }
|
||||
void set_pg_op() { rmw_flags |= CEPH_OSD_RMW_FLAG_PGOP; }
|
||||
|
||||
utime_t received_time;
|
||||
uint32_t warn_interval_multiplier;
|
||||
utime_t get_arrived() const {
|
||||
return received_time;
|
||||
}
|
||||
double get_duration() const {
|
||||
return events.size() ?
|
||||
(events.rbegin()->first - received_time) :
|
||||
0.0;
|
||||
}
|
||||
|
||||
void dump(utime_t now, Formatter *f) const;
|
||||
void _dump(utime_t now, Formatter *f) const;
|
||||
|
||||
private:
|
||||
list<pair<utime_t, string> > events;
|
||||
string current;
|
||||
Mutex lock;
|
||||
OpTracker *tracker;
|
||||
osd_reqid_t reqid;
|
||||
uint8_t hit_flag_points;
|
||||
uint8_t latest_flag_point;
|
||||
uint64_t seq;
|
||||
static const uint8_t flag_queued_for_pg=1 << 0;
|
||||
static const uint8_t flag_reached_pg = 1 << 1;
|
||||
static const uint8_t flag_delayed = 1 << 2;
|
||||
@ -164,12 +73,8 @@ private:
|
||||
static const uint8_t flag_commit_sent = 1 << 5;
|
||||
|
||||
OpRequest(Message *req, OpTracker *tracker);
|
||||
public:
|
||||
~OpRequest() {
|
||||
assert(request);
|
||||
request->put();
|
||||
}
|
||||
|
||||
public:
|
||||
bool been_queued_for_pg() { return hit_flag_points & flag_queued_for_pg; }
|
||||
bool been_reached_pg() { return hit_flag_points & flag_reached_pg; }
|
||||
bool been_delayed() { return hit_flag_points & flag_delayed; }
|
||||
@ -233,10 +138,15 @@ public:
|
||||
latest_flag_point = flag_commit_sent;
|
||||
}
|
||||
|
||||
void mark_event(const string &event);
|
||||
osd_reqid_t get_reqid() const {
|
||||
return reqid;
|
||||
}
|
||||
|
||||
void init_from_message();
|
||||
|
||||
typedef std::tr1::shared_ptr<OpRequest> Ref;
|
||||
};
|
||||
|
||||
typedef OpRequest::Ref OpRequestRef;
|
||||
|
||||
#endif /* OPREQUEST_H_ */
|
||||
|
@ -1332,10 +1332,10 @@ void PG::do_pending_flush()
|
||||
bool PG::op_has_sufficient_caps(OpRequestRef op)
|
||||
{
|
||||
// only check MOSDOp
|
||||
if (op->request->get_type() != CEPH_MSG_OSD_OP)
|
||||
if (op->get_req()->get_type() != CEPH_MSG_OSD_OP)
|
||||
return true;
|
||||
|
||||
MOSDOp *req = static_cast<MOSDOp*>(op->request);
|
||||
MOSDOp *req = static_cast<MOSDOp*>(op->get_req());
|
||||
|
||||
OSD::Session *session = (OSD::Session *)req->get_connection()->get_priv();
|
||||
if (!session) {
|
||||
@ -1417,7 +1417,7 @@ void PG::replay_queued_ops()
|
||||
c = p->first;
|
||||
}
|
||||
dout(10) << "activate replay " << p->first << " "
|
||||
<< *p->second->request << dendl;
|
||||
<< *p->second->get_req() << dendl;
|
||||
replay.push_back(p->second);
|
||||
}
|
||||
replay_queue.clear();
|
||||
@ -2618,7 +2618,7 @@ void PG::unreg_next_scrub()
|
||||
|
||||
void PG::sub_op_scrub_map(OpRequestRef op)
|
||||
{
|
||||
MOSDSubOp *m = static_cast<MOSDSubOp *>(op->request);
|
||||
MOSDSubOp *m = static_cast<MOSDSubOp *>(op->get_req());
|
||||
assert(m->get_header().type == MSG_OSD_SUBOP);
|
||||
dout(7) << "sub_op_scrub_map" << dendl;
|
||||
|
||||
@ -2804,7 +2804,7 @@ void PG::_request_scrub_map(int replica, eversion_t version,
|
||||
|
||||
void PG::sub_op_scrub_reserve(OpRequestRef op)
|
||||
{
|
||||
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
|
||||
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
|
||||
assert(m->get_header().type == MSG_OSD_SUBOP);
|
||||
dout(7) << "sub_op_scrub_reserve" << dendl;
|
||||
|
||||
@ -2824,7 +2824,7 @@ void PG::sub_op_scrub_reserve(OpRequestRef op)
|
||||
|
||||
void PG::sub_op_scrub_reserve_reply(OpRequestRef op)
|
||||
{
|
||||
MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->request);
|
||||
MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->get_req());
|
||||
assert(reply->get_header().type == MSG_OSD_SUBOPREPLY);
|
||||
dout(7) << "sub_op_scrub_reserve_reply" << dendl;
|
||||
|
||||
@ -2857,7 +2857,7 @@ void PG::sub_op_scrub_reserve_reply(OpRequestRef op)
|
||||
|
||||
void PG::sub_op_scrub_unreserve(OpRequestRef op)
|
||||
{
|
||||
assert(op->request->get_header().type == MSG_OSD_SUBOP);
|
||||
assert(op->get_req()->get_header().type == MSG_OSD_SUBOP);
|
||||
dout(7) << "sub_op_scrub_unreserve" << dendl;
|
||||
|
||||
op->mark_started();
|
||||
@ -2869,7 +2869,7 @@ void PG::sub_op_scrub_stop(OpRequestRef op)
|
||||
{
|
||||
op->mark_started();
|
||||
|
||||
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
|
||||
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
|
||||
assert(m->get_header().type == MSG_OSD_SUBOP);
|
||||
dout(7) << "sub_op_scrub_stop" << dendl;
|
||||
|
||||
@ -4732,7 +4732,7 @@ ostream& operator<<(ostream& out, const PG& pg)
|
||||
|
||||
bool PG::can_discard_op(OpRequestRef op)
|
||||
{
|
||||
MOSDOp *m = static_cast<MOSDOp*>(op->request);
|
||||
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
|
||||
if (OSD::op_is_discardable(m)) {
|
||||
dout(20) << " discard " << *m << dendl;
|
||||
return true;
|
||||
@ -4760,7 +4760,7 @@ bool PG::can_discard_op(OpRequestRef op)
|
||||
template<typename T, int MSGTYPE>
|
||||
bool PG::can_discard_replica_op(OpRequestRef op)
|
||||
{
|
||||
T *m = static_cast<T *>(op->request);
|
||||
T *m = static_cast<T *>(op->get_req());
|
||||
assert(m->get_header().type == MSGTYPE);
|
||||
|
||||
// same pg?
|
||||
@ -4776,7 +4776,7 @@ bool PG::can_discard_replica_op(OpRequestRef op)
|
||||
|
||||
bool PG::can_discard_scan(OpRequestRef op)
|
||||
{
|
||||
MOSDPGScan *m = static_cast<MOSDPGScan *>(op->request);
|
||||
MOSDPGScan *m = static_cast<MOSDPGScan *>(op->get_req());
|
||||
assert(m->get_header().type == MSG_OSD_PG_SCAN);
|
||||
|
||||
if (old_peering_msg(m->map_epoch, m->query_epoch)) {
|
||||
@ -4788,7 +4788,7 @@ bool PG::can_discard_scan(OpRequestRef op)
|
||||
|
||||
bool PG::can_discard_backfill(OpRequestRef op)
|
||||
{
|
||||
MOSDPGBackfill *m = static_cast<MOSDPGBackfill *>(op->request);
|
||||
MOSDPGBackfill *m = static_cast<MOSDPGBackfill *>(op->get_req());
|
||||
assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
|
||||
|
||||
if (old_peering_msg(m->map_epoch, m->query_epoch)) {
|
||||
@ -4802,7 +4802,7 @@ bool PG::can_discard_backfill(OpRequestRef op)
|
||||
|
||||
bool PG::can_discard_request(OpRequestRef op)
|
||||
{
|
||||
switch (op->request->get_type()) {
|
||||
switch (op->get_req()->get_type()) {
|
||||
case CEPH_MSG_OSD_OP:
|
||||
return can_discard_op(op);
|
||||
case MSG_OSD_SUBOP:
|
||||
@ -4827,55 +4827,55 @@ bool PG::can_discard_request(OpRequestRef op)
|
||||
bool PG::split_request(OpRequestRef op, unsigned match, unsigned bits)
|
||||
{
|
||||
unsigned mask = ~((~0)<<bits);
|
||||
switch (op->request->get_type()) {
|
||||
switch (op->get_req()->get_type()) {
|
||||
case CEPH_MSG_OSD_OP:
|
||||
return (static_cast<MOSDOp*>(op->request)->get_pg().m_seed & mask) == match;
|
||||
return (static_cast<MOSDOp*>(op->get_req())->get_pg().m_seed & mask) == match;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool PG::op_must_wait_for_map(OSDMapRef curmap, OpRequestRef op)
|
||||
{
|
||||
switch (op->request->get_type()) {
|
||||
switch (op->get_req()->get_type()) {
|
||||
case CEPH_MSG_OSD_OP:
|
||||
return !have_same_or_newer_map(
|
||||
curmap,
|
||||
static_cast<MOSDOp*>(op->request)->get_map_epoch());
|
||||
static_cast<MOSDOp*>(op->get_req())->get_map_epoch());
|
||||
|
||||
case MSG_OSD_SUBOP:
|
||||
return !have_same_or_newer_map(
|
||||
curmap,
|
||||
static_cast<MOSDSubOp*>(op->request)->map_epoch);
|
||||
static_cast<MOSDSubOp*>(op->get_req())->map_epoch);
|
||||
|
||||
case MSG_OSD_SUBOPREPLY:
|
||||
return !have_same_or_newer_map(
|
||||
curmap,
|
||||
static_cast<MOSDSubOpReply*>(op->request)->map_epoch);
|
||||
static_cast<MOSDSubOpReply*>(op->get_req())->map_epoch);
|
||||
|
||||
case MSG_OSD_PG_SCAN:
|
||||
return !have_same_or_newer_map(
|
||||
curmap,
|
||||
static_cast<MOSDPGScan*>(op->request)->map_epoch);
|
||||
static_cast<MOSDPGScan*>(op->get_req())->map_epoch);
|
||||
|
||||
case MSG_OSD_PG_BACKFILL:
|
||||
return !have_same_or_newer_map(
|
||||
curmap,
|
||||
static_cast<MOSDPGBackfill*>(op->request)->map_epoch);
|
||||
static_cast<MOSDPGBackfill*>(op->get_req())->map_epoch);
|
||||
|
||||
case MSG_OSD_PG_PUSH:
|
||||
return !have_same_or_newer_map(
|
||||
curmap,
|
||||
static_cast<MOSDPGPush*>(op->request)->map_epoch);
|
||||
static_cast<MOSDPGPush*>(op->get_req())->map_epoch);
|
||||
|
||||
case MSG_OSD_PG_PULL:
|
||||
return !have_same_or_newer_map(
|
||||
curmap,
|
||||
static_cast<MOSDPGPull*>(op->request)->map_epoch);
|
||||
static_cast<MOSDPGPull*>(op->get_req())->map_epoch);
|
||||
|
||||
case MSG_OSD_PG_PUSH_REPLY:
|
||||
return !have_same_or_newer_map(
|
||||
curmap,
|
||||
static_cast<MOSDPGPushReply*>(op->request)->map_epoch);
|
||||
static_cast<MOSDPGPushReply*>(op->get_req())->map_epoch);
|
||||
}
|
||||
assert(0);
|
||||
return false;
|
||||
|
@ -96,7 +96,7 @@ bool ReplicatedBackend::handle_message(
|
||||
)
|
||||
{
|
||||
dout(10) << __func__ << ": " << op << dendl;
|
||||
switch (op->request->get_type()) {
|
||||
switch (op->get_req()->get_type()) {
|
||||
case MSG_OSD_PG_PUSH:
|
||||
// TODOXXX: needs to be active possibly
|
||||
do_push(op);
|
||||
@ -111,7 +111,7 @@ bool ReplicatedBackend::handle_message(
|
||||
return true;
|
||||
|
||||
case MSG_OSD_SUBOP: {
|
||||
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
|
||||
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
|
||||
if (m->ops.size() >= 1) {
|
||||
OSDOp *first = &m->ops[0];
|
||||
switch (first->op.op) {
|
||||
@ -130,7 +130,7 @@ bool ReplicatedBackend::handle_message(
|
||||
}
|
||||
|
||||
case MSG_OSD_SUBOPREPLY: {
|
||||
MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->request);
|
||||
MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->get_req());
|
||||
if (r->ops.size() >= 1) {
|
||||
OSDOp &first = r->ops[0];
|
||||
switch (first.op.op) {
|
||||
|
@ -86,9 +86,9 @@ static void log_subop_stats(
|
||||
{
|
||||
utime_t now = ceph_clock_now(g_ceph_context);
|
||||
utime_t latency = now;
|
||||
latency -= op->request->get_recv_stamp();
|
||||
latency -= op->get_req()->get_recv_stamp();
|
||||
|
||||
uint64_t inb = op->request->get_data().length();
|
||||
uint64_t inb = op->get_req()->get_data().length();
|
||||
|
||||
osd->logger->inc(l_osd_sop);
|
||||
|
||||
@ -583,7 +583,7 @@ bool ReplicatedPG::pg_op_must_wait(MOSDOp *op)
|
||||
|
||||
void ReplicatedPG::do_pg_op(OpRequestRef op)
|
||||
{
|
||||
MOSDOp *m = static_cast<MOSDOp *>(op->request);
|
||||
MOSDOp *m = static_cast<MOSDOp *>(op->get_req());
|
||||
assert(m->get_header().type == CEPH_MSG_OSD_OP);
|
||||
dout(10) << "do_pg_op " << *m << dendl;
|
||||
|
||||
@ -828,7 +828,7 @@ void ReplicatedPG::do_request(
|
||||
if (pgbackend->handle_message(op))
|
||||
return;
|
||||
|
||||
switch (op->request->get_type()) {
|
||||
switch (op->get_req()->get_type()) {
|
||||
case CEPH_MSG_OSD_OP:
|
||||
if (is_replay() || !is_active()) {
|
||||
dout(20) << " replay, waiting for active on " << op << dendl;
|
||||
@ -866,7 +866,7 @@ void ReplicatedPG::do_request(
|
||||
*/
|
||||
void ReplicatedPG::do_op(OpRequestRef op)
|
||||
{
|
||||
MOSDOp *m = static_cast<MOSDOp*>(op->request);
|
||||
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
|
||||
assert(m->get_header().type == CEPH_MSG_OSD_OP);
|
||||
if (op->includes_pg_op()) {
|
||||
if (pg_op_must_wait(m)) {
|
||||
@ -1172,7 +1172,7 @@ bool ReplicatedPG::maybe_handle_cache(OpRequestRef op, ObjectContextRef obc,
|
||||
|
||||
void ReplicatedPG::do_cache_redirect(OpRequestRef op, ObjectContextRef obc)
|
||||
{
|
||||
MOSDOp *m = static_cast<MOSDOp*>(op->request);
|
||||
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
|
||||
int flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
|
||||
MOSDOpReply *reply = new MOSDOpReply(m, -ENOENT,
|
||||
get_osdmap()->get_epoch(), flags);
|
||||
@ -1188,7 +1188,7 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
|
||||
{
|
||||
dout(10) << __func__ << " " << ctx << dendl;
|
||||
OpRequestRef op = ctx->op;
|
||||
MOSDOp *m = static_cast<MOSDOp*>(op->request);
|
||||
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
|
||||
ObjectContextRef obc = ctx->obc;
|
||||
const hobject_t& soid = obc->obs.oi.soid;
|
||||
map<hobject_t,ObjectContextRef>& src_obc = ctx->src_obc;
|
||||
@ -1412,16 +1412,16 @@ void ReplicatedPG::reply_ctx(OpContext *ctx, int r, eversion_t v, version_t uv)
|
||||
void ReplicatedPG::log_op_stats(OpContext *ctx)
|
||||
{
|
||||
OpRequestRef op = ctx->op;
|
||||
MOSDOp *m = static_cast<MOSDOp*>(op->request);
|
||||
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
|
||||
|
||||
utime_t now = ceph_clock_now(cct);
|
||||
utime_t latency = now;
|
||||
latency -= ctx->op->request->get_recv_stamp();
|
||||
latency -= ctx->op->get_req()->get_recv_stamp();
|
||||
|
||||
utime_t rlatency;
|
||||
if (ctx->readable_stamp != utime_t()) {
|
||||
rlatency = ctx->readable_stamp;
|
||||
rlatency -= ctx->op->request->get_recv_stamp();
|
||||
rlatency -= ctx->op->get_req()->get_recv_stamp();
|
||||
}
|
||||
|
||||
uint64_t inb = ctx->bytes_written;
|
||||
@ -1460,10 +1460,10 @@ void ReplicatedPG::log_op_stats(OpContext *ctx)
|
||||
|
||||
void ReplicatedPG::do_sub_op(OpRequestRef op)
|
||||
{
|
||||
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
|
||||
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
|
||||
assert(have_same_or_newer_map(m->map_epoch));
|
||||
assert(m->get_header().type == MSG_OSD_SUBOP);
|
||||
dout(15) << "do_sub_op " << *op->request << dendl;
|
||||
dout(15) << "do_sub_op " << *op->get_req() << dendl;
|
||||
|
||||
OSDOp *first = NULL;
|
||||
if (m->ops.size() >= 1) {
|
||||
@ -1501,7 +1501,7 @@ void ReplicatedPG::do_sub_op(OpRequestRef op)
|
||||
|
||||
void ReplicatedPG::do_sub_op_reply(OpRequestRef op)
|
||||
{
|
||||
MOSDSubOpReply *r = static_cast<MOSDSubOpReply *>(op->request);
|
||||
MOSDSubOpReply *r = static_cast<MOSDSubOpReply *>(op->get_req());
|
||||
assert(r->get_header().type == MSG_OSD_SUBOPREPLY);
|
||||
if (r->ops.size() >= 1) {
|
||||
OSDOp& first = r->ops[0];
|
||||
@ -1519,7 +1519,7 @@ void ReplicatedPG::do_scan(
|
||||
OpRequestRef op,
|
||||
ThreadPool::TPHandle &handle)
|
||||
{
|
||||
MOSDPGScan *m = static_cast<MOSDPGScan*>(op->request);
|
||||
MOSDPGScan *m = static_cast<MOSDPGScan*>(op->get_req());
|
||||
assert(m->get_header().type == MSG_OSD_PG_SCAN);
|
||||
dout(10) << "do_scan " << *m << dendl;
|
||||
|
||||
@ -1594,7 +1594,7 @@ void ReplicatedPG::do_scan(
|
||||
|
||||
void ReplicatedBackend::_do_push(OpRequestRef op)
|
||||
{
|
||||
MOSDPGPush *m = static_cast<MOSDPGPush *>(op->request);
|
||||
MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req());
|
||||
assert(m->get_header().type == MSG_OSD_PG_PUSH);
|
||||
int from = m->get_source().num();
|
||||
|
||||
@ -1646,7 +1646,7 @@ struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
|
||||
|
||||
void ReplicatedBackend::_do_pull_response(OpRequestRef op)
|
||||
{
|
||||
MOSDPGPush *m = static_cast<MOSDPGPush *>(op->request);
|
||||
MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req());
|
||||
assert(m->get_header().type == MSG_OSD_PG_PUSH);
|
||||
int from = m->get_source().num();
|
||||
|
||||
@ -1691,7 +1691,7 @@ void ReplicatedBackend::_do_pull_response(OpRequestRef op)
|
||||
|
||||
void ReplicatedBackend::do_pull(OpRequestRef op)
|
||||
{
|
||||
MOSDPGPull *m = static_cast<MOSDPGPull *>(op->request);
|
||||
MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_req());
|
||||
assert(m->get_header().type == MSG_OSD_PG_PULL);
|
||||
int from = m->get_source().num();
|
||||
|
||||
@ -1707,7 +1707,7 @@ void ReplicatedBackend::do_pull(OpRequestRef op)
|
||||
|
||||
void ReplicatedBackend::do_push_reply(OpRequestRef op)
|
||||
{
|
||||
MOSDPGPushReply *m = static_cast<MOSDPGPushReply *>(op->request);
|
||||
MOSDPGPushReply *m = static_cast<MOSDPGPushReply *>(op->get_req());
|
||||
assert(m->get_header().type == MSG_OSD_PG_PUSH_REPLY);
|
||||
int from = m->get_source().num();
|
||||
|
||||
@ -1728,7 +1728,7 @@ void ReplicatedBackend::do_push_reply(OpRequestRef op)
|
||||
|
||||
void ReplicatedPG::do_backfill(OpRequestRef op)
|
||||
{
|
||||
MOSDPGBackfill *m = static_cast<MOSDPGBackfill*>(op->request);
|
||||
MOSDPGBackfill *m = static_cast<MOSDPGBackfill*>(op->get_req());
|
||||
assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
|
||||
dout(10) << "do_backfill " << *m << dendl;
|
||||
|
||||
@ -2392,7 +2392,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
|
||||
|
||||
ObjectContextRef src_obc;
|
||||
if (ceph_osd_op_type_multi(op.op)) {
|
||||
MOSDOp *m = static_cast<MOSDOp *>(ctx->op->request);
|
||||
MOSDOp *m = static_cast<MOSDOp *>(ctx->op->get_req());
|
||||
object_locator_t src_oloc;
|
||||
get_src_oloc(soid.oid, m->get_object_locator(), src_oloc);
|
||||
hobject_t src_oid(osd_op.soid, src_oloc.key, soid.hash,
|
||||
@ -3190,10 +3190,10 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
|
||||
<< " oi.version=" << oi.version.version << " ctx->at_version=" << ctx->at_version << dendl;
|
||||
dout(10) << "watch: oi.user_version=" << oi.user_version<< dendl;
|
||||
dout(10) << "watch: peer_addr="
|
||||
<< ctx->op->request->get_connection()->get_peer_addr() << dendl;
|
||||
<< ctx->op->get_req()->get_connection()->get_peer_addr() << dendl;
|
||||
|
||||
watch_info_t w(cookie, cct->_conf->osd_client_watch_timeout,
|
||||
ctx->op->request->get_connection()->get_peer_addr());
|
||||
ctx->op->get_req()->get_connection()->get_peer_addr());
|
||||
if (do_watch) {
|
||||
if (oi.watchers.count(make_pair(cookie, entity))) {
|
||||
dout(10) << " found existing watch " << w << " by " << entity << dendl;
|
||||
@ -4038,7 +4038,7 @@ void ReplicatedPG::add_interval_usage(interval_set<uint64_t>& s, object_stat_sum
|
||||
|
||||
void ReplicatedPG::do_osd_op_effects(OpContext *ctx)
|
||||
{
|
||||
ConnectionRef conn(ctx->op->request->get_connection());
|
||||
ConnectionRef conn(ctx->op->get_req()->get_connection());
|
||||
boost::intrusive_ptr<OSD::Session> session(
|
||||
(OSD::Session *)conn->get_priv());
|
||||
session->put(); // get_priv() takes a ref, and so does the intrusive_ptr
|
||||
@ -4698,7 +4698,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
|
||||
{
|
||||
MOSDOp *m = NULL;
|
||||
if (repop->ctx->op)
|
||||
m = static_cast<MOSDOp *>(repop->ctx->op->request);
|
||||
m = static_cast<MOSDOp *>(repop->ctx->op->get_req());
|
||||
|
||||
if (m)
|
||||
dout(10) << "eval_repop " << *repop
|
||||
@ -4774,7 +4774,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
|
||||
for (list<OpRequestRef>::iterator i = waiting_for_ack[repop->v].begin();
|
||||
i != waiting_for_ack[repop->v].end();
|
||||
++i) {
|
||||
MOSDOp *m = (MOSDOp*)(*i)->request;
|
||||
MOSDOp *m = (MOSDOp*)(*i)->get_req();
|
||||
MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
|
||||
reply->set_reply_versions(repop->ctx->at_version,
|
||||
repop->ctx->user_at_version);
|
||||
@ -4870,7 +4870,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now)
|
||||
get_osdmap()->get_epoch(),
|
||||
repop->rep_tid, repop->ctx->at_version);
|
||||
if (ctx->op &&
|
||||
((static_cast<MOSDOp *>(ctx->op->request))->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC)) {
|
||||
((static_cast<MOSDOp *>(ctx->op->get_req()))->get_flags() & CEPH_OSD_FLAG_PARALLELEXEC)) {
|
||||
// replicate original op for parallel execution on replica
|
||||
assert(0 == "broken implementation, do not use");
|
||||
}
|
||||
@ -4911,7 +4911,7 @@ ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContextRe
|
||||
tid_t rep_tid)
|
||||
{
|
||||
if (ctx->op)
|
||||
dout(10) << "new_repop rep_tid " << rep_tid << " on " << *ctx->op->request << dendl;
|
||||
dout(10) << "new_repop rep_tid " << rep_tid << " on " << *ctx->op->get_req() << dendl;
|
||||
else
|
||||
dout(10) << "new_repop rep_tid " << rep_tid << " (no op)" << dendl;
|
||||
|
||||
@ -4942,7 +4942,7 @@ void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type,
|
||||
MOSDOp *m = NULL;
|
||||
|
||||
if (repop->ctx->op)
|
||||
m = static_cast<MOSDOp *>(repop->ctx->op->request);
|
||||
m = static_cast<MOSDOp *>(repop->ctx->op->get_req());
|
||||
|
||||
if (m)
|
||||
dout(7) << "repop_ack rep_tid " << repop->rep_tid << " op " << *m
|
||||
@ -5488,7 +5488,7 @@ void ReplicatedPG::put_snapset_context(SnapSetContext *ssc)
|
||||
|
||||
void ReplicatedPG::sub_op_modify(OpRequestRef op)
|
||||
{
|
||||
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
|
||||
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
|
||||
assert(m->get_header().type == MSG_OSD_SUBOP);
|
||||
|
||||
const hobject_t& soid = m->poid;
|
||||
@ -5607,8 +5607,8 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
|
||||
rm->applied = true;
|
||||
|
||||
if (!pg_has_reset_since(rm->epoch_started)) {
|
||||
dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request << dendl;
|
||||
MOSDSubOp *m = static_cast<MOSDSubOp*>(rm->op->request);
|
||||
dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->get_req() << dendl;
|
||||
MOSDSubOp *m = static_cast<MOSDSubOp*>(rm->op->get_req());
|
||||
assert(m->get_header().type == MSG_OSD_SUBOP);
|
||||
|
||||
if (!rm->committed) {
|
||||
@ -5630,7 +5630,7 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request
|
||||
dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->get_req()
|
||||
<< " from epoch " << rm->epoch_started << " < last_peering_reset "
|
||||
<< last_peering_reset << dendl;
|
||||
}
|
||||
@ -5652,19 +5652,19 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm)
|
||||
|
||||
if (!pg_has_reset_since(rm->epoch_started)) {
|
||||
// send commit.
|
||||
dout(10) << "sub_op_modify_commit on op " << *rm->op->request
|
||||
dout(10) << "sub_op_modify_commit on op " << *rm->op->get_req()
|
||||
<< ", sending commit to osd." << rm->ackerosd
|
||||
<< dendl;
|
||||
|
||||
if (get_osdmap()->is_up(rm->ackerosd)) {
|
||||
last_complete_ondisk = rm->last_complete;
|
||||
MOSDSubOpReply *commit = new MOSDSubOpReply(static_cast<MOSDSubOp*>(rm->op->request), 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
|
||||
MOSDSubOpReply *commit = new MOSDSubOpReply(static_cast<MOSDSubOp*>(rm->op->get_req()), 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
|
||||
commit->set_last_complete_ondisk(rm->last_complete);
|
||||
commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
|
||||
osd->send_message_osd_cluster(rm->ackerosd, commit, get_osdmap()->get_epoch());
|
||||
}
|
||||
} else {
|
||||
dout(10) << "sub_op_modify_commit " << rm << " op " << *rm->op->request
|
||||
dout(10) << "sub_op_modify_commit " << rm << " op " << *rm->op->get_req()
|
||||
<< " from epoch " << rm->epoch_started << " < last_peering_reset "
|
||||
<< last_peering_reset << dendl;
|
||||
}
|
||||
@ -5681,7 +5681,7 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm)
|
||||
|
||||
void ReplicatedPG::sub_op_modify_reply(OpRequestRef op)
|
||||
{
|
||||
MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->request);
|
||||
MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->get_req());
|
||||
assert(r->get_header().type == MSG_OSD_SUBOPREPLY);
|
||||
|
||||
op->mark_started();
|
||||
@ -6631,7 +6631,7 @@ void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op)
|
||||
|
||||
void ReplicatedBackend::sub_op_push_reply(OpRequestRef op)
|
||||
{
|
||||
MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->request);
|
||||
MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->get_req());
|
||||
const hobject_t& soid = reply->get_poid();
|
||||
assert(reply->get_header().type == MSG_OSD_SUBOPREPLY);
|
||||
dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl;
|
||||
@ -6644,7 +6644,7 @@ void ReplicatedBackend::sub_op_push_reply(OpRequestRef op)
|
||||
PushOp pop;
|
||||
bool more = handle_push_reply(peer, rop, &pop);
|
||||
if (more)
|
||||
send_push_op_legacy(op->request->get_priority(), peer, pop);
|
||||
send_push_op_legacy(op->get_req()->get_priority(), peer, pop);
|
||||
}
|
||||
|
||||
bool ReplicatedBackend::handle_push_reply(int peer, PushReplyOp &op, PushOp *reply)
|
||||
@ -6725,7 +6725,7 @@ void ReplicatedPG::finish_degraded_object(const hobject_t& oid)
|
||||
*/
|
||||
void ReplicatedBackend::sub_op_pull(OpRequestRef op)
|
||||
{
|
||||
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
|
||||
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
|
||||
assert(m->get_header().type == MSG_OSD_SUBOP);
|
||||
|
||||
op->mark_started();
|
||||
@ -6918,7 +6918,7 @@ void ReplicatedBackend::trim_pushed_data(
|
||||
void ReplicatedBackend::sub_op_push(OpRequestRef op)
|
||||
{
|
||||
op->mark_started();
|
||||
MOSDSubOp *m = static_cast<MOSDSubOp *>(op->request);
|
||||
MOSDSubOp *m = static_cast<MOSDSubOp *>(op->get_req());
|
||||
|
||||
PushOp pop;
|
||||
pop.soid = m->recovery_info.soid;
|
||||
@ -6950,14 +6950,14 @@ void ReplicatedBackend::sub_op_push(OpRequestRef op)
|
||||
C_ReplicatedBackend_OnPullComplete *c =
|
||||
new C_ReplicatedBackend_OnPullComplete(
|
||||
this,
|
||||
op->request->get_priority());
|
||||
op->get_req()->get_priority());
|
||||
c->to_continue.swap(to_continue);
|
||||
t->register_on_complete(
|
||||
new C_QueueInWQ(
|
||||
&osd->push_wq,
|
||||
get_parent()->bless_gencontext(c)));
|
||||
}
|
||||
run_recovery_op(h, op->request->get_priority());
|
||||
run_recovery_op(h, op->get_req()->get_priority());
|
||||
} else {
|
||||
PushReplyOp resp;
|
||||
MOSDSubOpReply *reply = new MOSDSubOpReply(
|
||||
@ -7002,7 +7002,7 @@ void ReplicatedBackend::_failed_push(int from, const hobject_t &soid)
|
||||
|
||||
void ReplicatedPG::sub_op_remove(OpRequestRef op)
|
||||
{
|
||||
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->request);
|
||||
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
|
||||
assert(m->get_header().type == MSG_OSD_SUBOP);
|
||||
dout(7) << "sub_op_remove " << m->poid << dendl;
|
||||
|
||||
@ -7225,7 +7225,7 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue)
|
||||
|
||||
if (requeue) {
|
||||
if (repop->ctx->op) {
|
||||
dout(10) << " requeuing " << *repop->ctx->op->request << dendl;
|
||||
dout(10) << " requeuing " << *repop->ctx->op->get_req() << dendl;
|
||||
rq.push_back(repop->ctx->op);
|
||||
repop->ctx->op = OpRequestRef();
|
||||
}
|
||||
|
@ -993,7 +993,7 @@ inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop)
|
||||
//<< " wfnvram=" << repop.waitfor_nvram
|
||||
<< " wfdisk=" << repop.waitfor_disk;
|
||||
if (repop.ctx->op)
|
||||
out << " op=" << *(repop.ctx->op->request);
|
||||
out << " op=" << *(repop.ctx->op->get_req());
|
||||
out << ")";
|
||||
return out;
|
||||
}
|
||||
|
@ -23,6 +23,7 @@
|
||||
#include "include/types.h"
|
||||
#include "include/utime.h"
|
||||
#include "include/CompatSet.h"
|
||||
#include "include/histogram.h"
|
||||
#include "include/interval_set.h"
|
||||
#include "common/snap_types.h"
|
||||
#include "common/Formatter.h"
|
||||
@ -555,67 +556,6 @@ inline ostream& operator<<(ostream& out, const eversion_t e) {
|
||||
return out << e.epoch << "'" << e.version;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* power of 2 histogram
|
||||
*/
|
||||
struct pow2_hist_t {
|
||||
/**
|
||||
* histogram
|
||||
*
|
||||
* bin size is 2^index
|
||||
* value is count of elements that are <= the current bin but > the previous bin.
|
||||
*/
|
||||
vector<int32_t> h;
|
||||
|
||||
private:
|
||||
/// expand to at least another's size
|
||||
void _expand_to(unsigned s) {
|
||||
if (s > h.size())
|
||||
h.resize(s, 0);
|
||||
}
|
||||
/// drop useless trailing 0's
|
||||
void _contract() {
|
||||
unsigned p = h.size();
|
||||
while (p > 0 && h[p-1] == 0)
|
||||
--p;
|
||||
h.resize(p);
|
||||
}
|
||||
|
||||
public:
|
||||
void clear() {
|
||||
h.clear();
|
||||
}
|
||||
void set(int bin, int32_t v) {
|
||||
_expand_to(bin + 1);
|
||||
h[bin] = v;
|
||||
_contract();
|
||||
}
|
||||
|
||||
void add(const pow2_hist_t& o) {
|
||||
_expand_to(o.h.size());
|
||||
for (unsigned p = 0; p < o.h.size(); ++p)
|
||||
h[p] += o.h[p];
|
||||
_contract();
|
||||
}
|
||||
void sub(const pow2_hist_t& o) {
|
||||
_expand_to(o.h.size());
|
||||
for (unsigned p = 0; p < o.h.size(); ++p)
|
||||
h[p] -= o.h[p];
|
||||
_contract();
|
||||
}
|
||||
|
||||
int32_t upper_bound() const {
|
||||
return 1 << h.size();
|
||||
}
|
||||
|
||||
void dump(Formatter *f) const;
|
||||
void encode(bufferlist &bl) const;
|
||||
void decode(bufferlist::iterator &bl);
|
||||
static void generate_test_instances(std::list<pow2_hist_t*>& o);
|
||||
};
|
||||
WRITE_CLASS_ENCODER(pow2_hist_t)
|
||||
|
||||
/**
|
||||
* filestore_perf_stat_t
|
||||
*
|
||||
|
@ -36,13 +36,15 @@ TYPEWITHSTRAYDATA(OSDMap::Incremental)
|
||||
#include "crush/CrushWrapper.h"
|
||||
TYPE(CrushWrapper)
|
||||
|
||||
#include "include/histogram.h"
|
||||
TYPE(pow2_hist_t)
|
||||
|
||||
#include "osd/osd_types.h"
|
||||
TYPE(osd_reqid_t)
|
||||
TYPE(object_locator_t)
|
||||
TYPE(request_redirect_t)
|
||||
TYPE(pg_t)
|
||||
TYPE(coll_t)
|
||||
TYPE(pow2_hist_t)
|
||||
TYPE(filestore_perf_stat_t)
|
||||
TYPE(osd_stat_t)
|
||||
TYPE(OSDSuperblock)
|
||||
|
Loading…
Reference in New Issue
Block a user