1
0
mirror of https://github.com/ceph/ceph synced 2025-04-01 23:02:17 +00:00

Merge remote branch 'upstream/wip_latency'

This commit is contained in:
Samuel Just 2012-03-29 13:15:39 -07:00
commit 41a09bea47
19 changed files with 445 additions and 356 deletions

View File

@ -1071,7 +1071,8 @@ libosd_la_SOURCES = \
osd/OSD.cc \
osd/OSDCaps.cc \
osd/Watch.cc \
osd/ClassHandler.cc
osd/ClassHandler.cc \
osd/OpRequest.cc
libosd_la_CXXFLAGS= ${CRYPTO_CXXFLAGS} ${AM_CXXFLAGS}
libosd_la_LIBADD = libglobal.la
noinst_LTLIBRARIES += libosd.la

31
src/common/TrackedOp.h Normal file
View File

@ -0,0 +1,31 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2012 New Dream Network/Sage Weil <sage@newdream.net>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*/
#ifndef TRACKEDREQUEST_H_
#define TRACKEDREQUEST_H_
#include <sstream>
#include <stdint.h>
#include <include/utime.h>
#include "common/Mutex.h"
#include "include/xlist.h"
#include "msg/Message.h"
#include <tr1/memory>
class TrackedOp {
public:
virtual void mark_event(const string &event) = 0;
virtual ~TrackedOp() {}
};
typedef std::tr1::shared_ptr<TrackedOp> TrackedOpRef;
#endif

View File

@ -57,6 +57,7 @@ OPTION(debug_journaler, OPT_INT, 0)
OPTION(debug_objectcacher, OPT_INT, 0)
OPTION(debug_client, OPT_INT, 0)
OPTION(debug_osd, OPT_INT, 0)
OPTION(debug_optracker, OPT_INT, 10)
OPTION(debug_objclass, OPT_INT, 0)
OPTION(debug_filestore, OPT_INT, 1)
OPTION(debug_journal, OPT_INT, 1)
@ -355,6 +356,7 @@ OPTION(journal_queue_max_ops, OPT_INT, 500)
OPTION(journal_queue_max_bytes, OPT_INT, 100 << 20)
OPTION(journal_align_min_size, OPT_INT, 64 << 10) // align data payloads >= this.
OPTION(journal_replay_from, OPT_INT, 0)
OPTION(journal_zero_on_create, OPT_BOOL, false)
OPTION(bdev_lock, OPT_BOOL, true)
OPTION(bdev_iothreads, OPT_INT, 1) // number of ios to queue with kernel
OPTION(bdev_idle_kick_after_ms, OPT_INT, 100) // ms

View File

@ -58,7 +58,7 @@ int FileJournal::_open(bool forwrite, bool create)
if (forwrite) {
flags = O_RDWR;
if (directio)
flags |= O_DIRECT | O_SYNC;
flags |= O_DIRECT | O_DSYNC;
} else {
flags = O_RDONLY;
}
@ -334,6 +334,20 @@ int FileJournal::_open_file(int64_t oldsize, blksize_t blksize,
}
block_size = MAX(blksize, (blksize_t)CEPH_PAGE_SIZE);
if (create && g_conf->journal_zero_on_create) {
derr << "FileJournal::_open_file : zeroing journal" << dendl;
uint64_t write_size = 1 << 20;
char *buf = new char[write_size];
memset(static_cast<void*>(buf), 0, write_size);
uint64_t i = 0;
for (; (i + write_size) <= (unsigned)max_size; i += write_size)
::pwrite(fd, static_cast<void*>(buf), write_size, i);
if (i < (unsigned)max_size)
::pwrite(fd, static_cast<void*>(buf), max_size - i, i);
delete [] buf;
}
dout(10) << "_open journal is not a block device, NOT checking disk "
<< "write cache on '" << fn << "'" << dendl;
@ -798,6 +812,10 @@ void FileJournal::queue_completions_thru(uint64_t seq)
}
if (completions.front().finish)
finisher->queue(completions.front().finish);
assert(pending_ops.count(completions.front().seq));
if (pending_ops[completions.front().seq])
pending_ops[completions.front().seq]->mark_event("journaled_completion_queued");
pending_ops.erase(completions.front().seq);
completions.pop_front();
}
}
@ -864,6 +882,9 @@ int FileJournal::prepare_single_write(bufferlist& bl, off64_t& queue_pos, uint64
if (queue_pos > header.max_size)
queue_pos = queue_pos + get_top() - header.max_size;
assert(pending_ops.count(seq));
if (pending_ops[seq])
pending_ops[seq]->mark_event("write_thread_in_journal_buffer");
return 0;
}
@ -1336,7 +1357,8 @@ void FileJournal::check_aio_completion()
}
#endif
void FileJournal::submit_entry(uint64_t seq, bufferlist& e, int alignment, Context *oncommit)
void FileJournal::submit_entry(uint64_t seq, bufferlist& e, int alignment,
Context *oncommit, TrackedOpRef osd_op)
{
Mutex::Locker locker(write_lock); // ** lock **
@ -1347,8 +1369,11 @@ void FileJournal::submit_entry(uint64_t seq, bufferlist& e, int alignment, Conte
assert(e.length() > 0);
completions.push_back(completion_item(seq, oncommit, ceph_clock_now(g_ceph_context)));
pending_ops[seq] = osd_op;
if (full_state == FULL_NOTFULL) {
if (osd_op)
osd_op->mark_event("commit_queued_for_journal_write");
// queue and kick writer thread
dout(30) << "XXX throttle take " << e.length() << dendl;
throttle_ops.take(1);
@ -1364,6 +1389,8 @@ void FileJournal::submit_entry(uint64_t seq, bufferlist& e, int alignment, Conte
writeq.push_back(write_item(seq, e, alignment));
write_cond.Signal();
} else {
if (osd_op)
osd_op->mark_event("commit_blocked_by_journal_full");
// not journaling this. restart writing no sooner than seq + 1.
dout(10) << " journal is/was full" << dendl;
}

View File

@ -185,6 +185,7 @@ private:
: seq(o), finish(c), start(s) {}
};
deque<completion_item> completions; // queued, writing, waiting for commit.
map<uint64_t, TrackedOpRef> pending_ops;
uint64_t writing_seq, journaled_seq;
bool plug_journal_completions;
@ -306,7 +307,8 @@ private:
void make_writeable();
// writes
void submit_entry(uint64_t seq, bufferlist& bl, int alignment, Context *oncommit); // submit an item
void submit_entry(uint64_t seq, bufferlist& bl, int alignment, Context *oncommit,
TrackedOpRef osd_op = TrackedOpRef()); // submit an item
void commit_start();
void committed_thru(uint64_t seq);
bool should_commit_now() {

View File

@ -2017,7 +2017,9 @@ int FileStore::get_max_object_name_length()
/// -----------------------------
FileStore::Op *FileStore::build_op(list<Transaction*>& tls,
Context *onreadable, Context *onreadable_sync)
Context *onreadable,
Context *onreadable_sync,
TrackedOpRef osd_op)
{
uint64_t bytes = 0, ops = 0;
for (list<Transaction*>::iterator p = tls.begin();
@ -2034,6 +2036,7 @@ FileStore::Op *FileStore::build_op(list<Transaction*>& tls,
o->onreadable_sync = onreadable_sync;
o->ops = ops;
o->bytes = bytes;
o->osd_op = osd_op;
return o;
}
@ -2170,7 +2173,8 @@ int FileStore::queue_transaction(Sequencer *osr, Transaction *t)
int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
Context *onreadable, Context *ondisk,
Context *onreadable_sync)
Context *onreadable_sync,
TrackedOpRef osd_op)
{
if (g_conf->filestore_blackhole) {
dout(0) << "queue_transactions filestore_blackhole = TRUE, dropping transaction" << dendl;
@ -2192,14 +2196,14 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
}
if (journal && journal->is_writeable() && !m_filestore_journal_trailing) {
Op *o = build_op(tls, onreadable, onreadable_sync);
Op *o = build_op(tls, onreadable, onreadable_sync, osd_op);
op_queue_reserve_throttle(o);
journal->throttle();
o->op = op_submit_start();
if (m_filestore_journal_parallel) {
dout(5) << "queue_transactions (parallel) " << o->op << " " << o->tls << dendl;
_op_journal_transactions(o->tls, o->op, ondisk);
_op_journal_transactions(o->tls, o->op, ondisk, osd_op);
// queue inside journal lock, to preserve ordering
queue_op(osr, o);
@ -2208,7 +2212,9 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
osr->queue_journal(o->op);
_op_journal_transactions(o->tls, o->op, new C_JournaledAhead(this, osr, o, ondisk));
_op_journal_transactions(o->tls, o->op,
new C_JournaledAhead(this, osr, o, ondisk),
osd_op);
} else {
assert(0);
}
@ -2223,7 +2229,7 @@ int FileStore::queue_transactions(Sequencer *posr, list<Transaction*> &tls,
int r = do_transactions(tls, op);
if (r >= 0) {
_op_journal_transactions(tls, op, ondisk);
_op_journal_transactions(tls, op, ondisk, osd_op);
} else {
delete ondisk;
}

View File

@ -116,6 +116,7 @@ class FileStore : public JournalingObjectStore,
list<Transaction*> tls;
Context *onreadable, *onreadable_sync;
uint64_t ops, bytes;
TrackedOpRef osd_op;
};
class OpSequencer : public Sequencer_impl {
Mutex qlock; // to protect q, for benefit of flush (peek/dequeue also protected by lock)
@ -222,7 +223,8 @@ class FileStore : public JournalingObjectStore,
void _do_op(OpSequencer *o);
void _finish_op(OpSequencer *o);
Op *build_op(list<Transaction*>& tls,
Context *onreadable, Context *onreadable_sync);
Context *onreadable, Context *onreadable_sync,
TrackedOpRef osd_op);
void queue_op(OpSequencer *osr, Op *o);
void op_queue_reserve_throttle(Op *o);
void _op_queue_reserve_throttle(Op *o, const char *caller = 0);
@ -297,8 +299,10 @@ public:
unsigned _do_transaction(Transaction& t, uint64_t op_seq, int trans_num);
int queue_transaction(Sequencer *osr, Transaction* t);
int queue_transactions(Sequencer *osr, list<Transaction*>& tls, Context *onreadable, Context *ondisk=0,
Context *onreadable_sync=0);
int queue_transactions(Sequencer *osr, list<Transaction*>& tls,
Context *onreadable, Context *ondisk=0,
Context *onreadable_sync=0,
TrackedOpRef op = TrackedOpRef());
/**
* set replay guard xattr on given file

View File

@ -21,6 +21,7 @@
#include "include/buffer.h"
#include "include/Context.h"
#include "common/Finisher.h"
#include "common/TrackedOp.h"
class PerfCounters;
@ -55,7 +56,9 @@ public:
// writes
virtual bool is_writeable() = 0;
virtual void make_writeable() = 0;
virtual void submit_entry(uint64_t seq, bufferlist& e, int alignment, Context *oncommit) = 0;
virtual void submit_entry(uint64_t seq, bufferlist& e, int alignment,
Context *oncommit,
TrackedOpRef osd_op = TrackedOpRef()) = 0;
virtual void commit_start() = 0;
virtual void committed_thru(uint64_t seq) = 0;
virtual bool read_entry(bufferlist& bl, uint64_t &seq) = 0;

View File

@ -280,15 +280,8 @@ void JournalingObjectStore::commit_finish()
}
}
void JournalingObjectStore::op_journal_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op,
Context *onjournal)
{
Mutex::Locker l(journal_lock);
_op_journal_transactions(tls, op, onjournal);
}
void JournalingObjectStore::_op_journal_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op,
Context *onjournal)
Context *onjournal, TrackedOpRef osd_op)
{
assert(journal_lock.is_locked());
dout(10) << "op_journal_transactions " << op << " " << tls << dendl;
@ -305,7 +298,7 @@ void JournalingObjectStore::_op_journal_transactions(list<ObjectStore::Transacti
}
::encode(*t, tbl);
}
journal->submit_entry(op, tbl, data_align, onjournal);
journal->submit_entry(op, tbl, data_align, onjournal, osd_op);
} else if (onjournal)
commit_waiters[op].push_back(onjournal);
}

View File

@ -56,8 +56,8 @@ protected:
uint64_t _op_apply_start(uint64_t op);
void op_apply_finish(uint64_t op);
void op_journal_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op, Context *onjournal);
void _op_journal_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op, Context *onjournal);
void _op_journal_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op,
Context *onjournal, TrackedOpRef osd_op);
virtual int do_transactions(list<ObjectStore::Transaction*>& tls, uint64_t op_seq) = 0;

View File

@ -20,6 +20,7 @@
#include "include/buffer.h"
#include "include/types.h"
#include "osd/osd_types.h"
#include "common/TrackedOp.h"
#include "ObjectMap.h"
#include <errno.h>
@ -579,15 +580,15 @@ public:
virtual int queue_transaction(Sequencer *osr, Transaction* t) = 0;
virtual int queue_transaction(Sequencer *osr, Transaction *t, Context *onreadable, Context *ondisk=0,
Context *onreadable_sync=0) {
Context *onreadable_sync=0,
TrackedOpRef op = TrackedOpRef()) {
list<Transaction*> tls;
tls.push_back(t);
return queue_transactions(osr, tls, onreadable, ondisk, onreadable_sync);
return queue_transactions(osr, tls, onreadable, ondisk, onreadable_sync, op);
}
virtual int queue_transactions(Sequencer *osr, list<Transaction*>& tls, Context *onreadable, Context *ondisk=0,
Context *onreadable_sync=0) = 0;
Context *onreadable_sync=0,
TrackedOpRef op = TrackedOpRef()) = 0;
public:
ObjectStore() : logger(NULL) {}

View File

@ -553,7 +553,6 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger,
heartbeat_dispatcher(this),
stat_lock("OSD::stat_lock"),
finished_lock("OSD::finished_lock"),
ops_in_flight_lock("OSD::ops_in_flight_lock"),
admin_ops_hook(NULL),
op_queue_len(0),
op_wq(this, g_conf->osd_op_thread_timeout, &op_tp),
@ -1872,77 +1871,15 @@ void OSD::tick()
void OSD::check_ops_in_flight()
{
ops_in_flight_lock.Lock();
if (ops_in_flight.size()) {
utime_t now = ceph_clock_now(g_ceph_context);
utime_t too_old = now;
too_old -= g_conf->osd_op_complaint_time;
dout(1) << "ops_in_flight.size: " << ops_in_flight.size()
<< "; oldest is " << now - ops_in_flight.front()->received_time
<< " seconds old" << dendl;
xlist<OpRequest*>::iterator i = ops_in_flight.begin();
while (!i.end() && (*i)->received_time < too_old) {
// exponential backoff of warning intervals
if ( ( (*i)->received_time +
(g_conf->osd_op_complaint_time *
(*i)->warn_interval_multiplier) )< now) {
stringstream ss;
ss << "old request " << *((*i)->request) << " received at "
<< (*i)->received_time << " currently " << (*i)->state_string();
clog.warn(ss);
(*i)->warn_interval_multiplier *= 2;
}
++i;
}
}
ops_in_flight_lock.Unlock();
stringstream ss;
if (op_tracker.check_ops_in_flight(ss))
clog.warn(ss);
return;
}
void OSD::dump_ops_in_flight(ostream& ss)
{
JSONFormatter jf(true);
Mutex::Locker locker(ops_in_flight_lock);
jf.open_object_section("ops_in_flight"); // overall dump
jf.dump_int("num_ops", ops_in_flight.size());
jf.open_array_section("ops"); // list of OpRequests
utime_t now = ceph_clock_now(g_ceph_context);
for (xlist<OpRequest*>::iterator p = ops_in_flight.begin(); !p.end(); ++p) {
stringstream name;
Message *m = (*p)->request;
m->print(name);
jf.open_object_section("op");
jf.dump_string("description", name.str().c_str()); // this OpRequest
jf.dump_stream("received_at") << (*p)->received_time;
jf.dump_float("age", now - (*p)->received_time);
jf.dump_string("flag_point", (*p)->state_string());
if (m->get_orig_source().is_client()) {
jf.open_object_section("client_info");
stringstream client_name;
client_name << m->get_orig_source();
jf.dump_string("client", client_name.str());
jf.dump_int("tid", m->get_tid());
jf.close_section(); // client_info
}
jf.close_section(); // this OpRequest
}
jf.close_section(); // list of OpRequests
jf.close_section(); // overall dump
jf.flush(ss);
}
void OSD::register_inflight_op(xlist<OpRequest*>::item *i)
{
ops_in_flight_lock.Lock();
ops_in_flight.push_back(i);
ops_in_flight_lock.Unlock();
}
void OSD::unregister_inflight_op(xlist<OpRequest*>::item *i)
{
ops_in_flight_lock.Lock();
assert(i->get_list() == &ops_in_flight);
i->remove_myself();
ops_in_flight_lock.Unlock();
op_tracker.dump_ops_in_flight(ss);
}
// =========================================
@ -2828,13 +2765,13 @@ void OSD::do_waiters()
if (finished.empty()) {
finished_lock.Unlock();
} else {
list<OpRequest*> waiting;
list<OpRequestRef> waiting;
waiting.splice(waiting.begin(), finished);
finished_lock.Unlock();
dout(2) << "do_waiters -- start" << dendl;
for (list<OpRequest*>::iterator it = waiting.begin();
for (list<OpRequestRef>::iterator it = waiting.begin();
it != waiting.end();
it++)
dispatch_op(*it);
@ -2842,7 +2779,7 @@ void OSD::do_waiters()
}
}
void OSD::dispatch_op(OpRequest *op)
void OSD::dispatch_op(OpRequestRef op)
{
switch (op->request->get_type()) {
@ -2960,8 +2897,8 @@ void OSD::_dispatch(Message *m)
default:
{
OpRequest *op = new OpRequest(m, this);
register_inflight_op(&op->xitem);
OpRequestRef op = op_tracker.create_request(m);
op->mark_event("waiting_for_osdmap");
// no map? starting up?
if (!osdmap) {
dout(7) << "no OSDMap, not booted" << dendl;
@ -3152,7 +3089,7 @@ void OSD::dec_scrubs_active()
// =====================================================
// MAP
void OSD::wait_for_new_map(OpRequest *op)
void OSD::wait_for_new_map(OpRequestRef op)
{
// ask?
if (waiting_for_osdmap.empty()) {
@ -3274,7 +3211,7 @@ void OSD::handle_osd_map(MOSDMap *m)
op_wq.lock();
list<OpRequest*> rq;
list<OpRequestRef> rq;
while (true) {
PG *pg = op_wq._dequeue();
if (!pg)
@ -3289,7 +3226,7 @@ void OSD::handle_osd_map(MOSDMap *m)
// thread did something very strange :/
assert(!pg->op_queue.empty());
OpRequest *op = pg->op_queue.front();
OpRequestRef op = pg->op_queue.front();
pg->op_queue.pop_front();
pg->unlock();
pg->put();
@ -3702,7 +3639,7 @@ void OSD::advance_map(ObjectStore::Transaction& t, C_Contexts *tfin)
}
// scan pgs with waiters
map<pg_t, list<OpRequest*> >::iterator p = waiting_for_pg.begin();
map<pg_t, list<OpRequestRef> >::iterator p = waiting_for_pg.begin();
while (p != waiting_for_pg.end()) {
pg_t pgid = p->first;
@ -3715,7 +3652,6 @@ void OSD::advance_map(ObjectStore::Transaction& t, C_Contexts *tfin)
} else {
dout(10) << " discarding waiting ops for " << pgid << dendl;
while (!p->second.empty()) {
p->second.front()->put();
p->second.pop_front();
}
waiting_for_pg.erase(p++);
@ -3980,12 +3916,11 @@ bool OSD::require_mon_peer(Message *m)
return true;
}
bool OSD::require_osd_peer(OpRequest *op)
bool OSD::require_osd_peer(OpRequestRef op)
{
if (!op->request->get_connection()->peer_is_osd()) {
dout(0) << "require_osd_peer received from non-osd " << op->request->get_connection()->get_peer_addr()
<< " " << *op->request << dendl;
op->put();
return false;
}
return true;
@ -3995,7 +3930,7 @@ bool OSD::require_osd_peer(OpRequest *op)
* require that we have same (or newer) map, and that
* the source is the pg primary.
*/
bool OSD::require_same_or_newer_map(OpRequest *op, epoch_t epoch)
bool OSD::require_same_or_newer_map(OpRequestRef op, epoch_t epoch)
{
Message *m = op->request;
dout(15) << "require_same_or_newer_map " << epoch << " (i am " << osdmap->get_epoch() << ") " << m << dendl;
@ -4009,7 +3944,6 @@ bool OSD::require_same_or_newer_map(OpRequest *op, epoch_t epoch)
if (epoch < up_epoch) {
dout(7) << "from pre-up epoch " << epoch << " < " << up_epoch << dendl;
op->put();
return false;
}
@ -4026,7 +3960,6 @@ bool OSD::require_same_or_newer_map(OpRequest *op, epoch_t epoch)
cluster_messenger->mark_down_on_empty(con);
cluster_messenger->mark_disposable(con);
op->put();
return false;
}
}
@ -4034,7 +3967,6 @@ bool OSD::require_same_or_newer_map(OpRequest *op, epoch_t epoch)
// ok, we have at least as new a map as they do. are we (re)booting?
if (!is_active()) {
dout(7) << "still in boot state, dropping message " << *m << dendl;
op->put();
return false;
}
@ -4225,7 +4157,7 @@ void OSD::split_pg(PG *parent, map<pg_t,PG*>& children, ObjectStore::Transaction
/*
* holding osd_lock
*/
void OSD::handle_pg_create(OpRequest *op)
void OSD::handle_pg_create(OpRequestRef op)
{
MOSDPGCreate *m = (MOSDPGCreate*)op->request;
assert(m->get_header().type == MSG_OSD_PG_CREATE);
@ -4235,7 +4167,6 @@ void OSD::handle_pg_create(OpRequest *op)
if (!require_mon_peer(op->request)) {
// we have to hack around require_mon_peer's interface limits
op->request = NULL;
op->put();
return;
}
@ -4342,8 +4273,6 @@ void OSD::handle_pg_create(OpRequest *op)
do_infos(info_map);
maybe_update_heartbeat_peers();
op->put();
}
@ -4415,7 +4344,7 @@ void OSD::do_infos(map<int,MOSDPGInfo*>& info_map)
* includes pg_info_t.
* NOTE: called with opqueue active.
*/
void OSD::handle_pg_notify(OpRequest *op)
void OSD::handle_pg_notify(OpRequestRef op)
{
MOSDPGNotify *m = (MOSDPGNotify*)op->request;
assert(m->get_header().type == MSG_OSD_PG_NOTIFY);
@ -4466,11 +4395,9 @@ void OSD::handle_pg_notify(OpRequest *op)
do_infos(info_map);
maybe_update_heartbeat_peers();
op->put();
}
void OSD::handle_pg_log(OpRequest *op)
void OSD::handle_pg_log(OpRequestRef op)
{
MOSDPGLog *m = (MOSDPGLog*) op->request;
assert(m->get_header().type == MSG_OSD_PG_LOG);
@ -4488,7 +4415,6 @@ void OSD::handle_pg_log(OpRequest *op)
PG *pg = get_or_create_pg(m->info, m->get_epoch(),
from, created, false, &t, &fin);
if (!pg) {
op->put();
return;
}
@ -4514,11 +4440,9 @@ void OSD::handle_pg_log(OpRequest *op)
assert(!tr);
maybe_update_heartbeat_peers();
op->put();
}
void OSD::handle_pg_info(OpRequest *op)
void OSD::handle_pg_info(OpRequestRef op)
{
MOSDPGInfo *m = (MOSDPGInfo *)op->request;
assert(m->get_header().type == MSG_OSD_PG_INFO);
@ -4567,11 +4491,9 @@ void OSD::handle_pg_info(OpRequest *op)
do_infos(info_map);
maybe_update_heartbeat_peers();
op->put();
}
void OSD::handle_pg_trim(OpRequest *op)
void OSD::handle_pg_trim(OpRequestRef op)
{
MOSDPGTrim *m = (MOSDPGTrim *)op->request;
assert(m->get_header().type == MSG_OSD_PG_TRIM);
@ -4593,7 +4515,7 @@ void OSD::handle_pg_trim(OpRequest *op)
if (m->epoch < pg->info.history.same_interval_since) {
dout(10) << *pg << " got old trim to " << m->trim_to << ", ignoring" << dendl;
pg->unlock();
goto out;
return;
}
assert(pg);
@ -4615,12 +4537,9 @@ void OSD::handle_pg_trim(OpRequest *op)
}
pg->unlock();
}
out:
op->put();
}
void OSD::handle_pg_scan(OpRequest *op)
void OSD::handle_pg_scan(OpRequestRef op)
{
MOSDPGScan *m = (MOSDPGScan*)op->request;
assert(m->get_header().type == MSG_OSD_PG_SCAN);
@ -4634,7 +4553,6 @@ void OSD::handle_pg_scan(OpRequest *op)
PG *pg;
if (!_have_pg(m->pgid)) {
op->put();
return;
}
@ -4647,7 +4565,7 @@ void OSD::handle_pg_scan(OpRequest *op)
pg->put();
}
bool OSD::scan_is_queueable(PG *pg, OpRequest *op)
bool OSD::scan_is_queueable(PG *pg, OpRequestRef op)
{
MOSDPGScan *m = (MOSDPGScan *)op->request;
assert(m->get_header().type == MSG_OSD_PG_SCAN);
@ -4655,14 +4573,13 @@ bool OSD::scan_is_queueable(PG *pg, OpRequest *op)
if (m->query_epoch < pg->info.history.same_interval_since) {
dout(10) << *pg << " got old scan, ignoring" << dendl;
op->put();
return false;
}
return true;
}
void OSD::handle_pg_backfill(OpRequest *op)
void OSD::handle_pg_backfill(OpRequestRef op)
{
MOSDPGBackfill *m = (MOSDPGBackfill*)op->request;
assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
@ -4676,7 +4593,6 @@ void OSD::handle_pg_backfill(OpRequest *op)
PG *pg;
if (!_have_pg(m->pgid)) {
op->put();
return;
}
@ -4689,7 +4605,7 @@ void OSD::handle_pg_backfill(OpRequest *op)
pg->put();
}
bool OSD::backfill_is_queueable(PG *pg, OpRequest *op)
bool OSD::backfill_is_queueable(PG *pg, OpRequestRef op)
{
MOSDPGBackfill *m = (MOSDPGBackfill *)op->request;
assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
@ -4697,7 +4613,6 @@ bool OSD::backfill_is_queueable(PG *pg, OpRequest *op)
if (m->query_epoch < pg->info.history.same_interval_since) {
dout(10) << *pg << " got old backfill, ignoring" << dendl;
op->put();
return false;
}
@ -4706,7 +4621,7 @@ bool OSD::backfill_is_queueable(PG *pg, OpRequest *op)
void OSD::handle_pg_missing(OpRequest *op)
void OSD::handle_pg_missing(OpRequestRef op)
{
MOSDPGMissing *m = (MOSDPGMissing *)op->request;
assert(m->get_header().type == MSG_OSD_PG_MISSING);
@ -4731,8 +4646,6 @@ void OSD::handle_pg_missing(OpRequest *op)
do_queries(query_map);
maybe_update_heartbeat_peers();
op->put();
#endif
}
@ -4740,7 +4653,7 @@ void OSD::handle_pg_missing(OpRequest *op)
* from primary to replica | stray
* NOTE: called with opqueue active.
*/
void OSD::handle_pg_query(OpRequest *op)
void OSD::handle_pg_query(OpRequestRef op)
{
assert(osd_lock.is_locked());
@ -4837,12 +4750,10 @@ void OSD::handle_pg_query(OpRequest *op)
}
do_notifies(notify_list, m->get_epoch());
op->put();
}
void OSD::handle_pg_remove(OpRequest *op)
void OSD::handle_pg_remove(OpRequestRef op)
{
MOSDPGRemove *m = (MOSDPGRemove *)op->request;
assert(m->get_header().type == MSG_OSD_PG_REMOVE);
@ -4883,7 +4794,6 @@ void OSD::handle_pg_remove(OpRequest *op)
}
pg->unlock();
}
op->put();
}
@ -5232,12 +5142,12 @@ void OSD::defer_recovery(PG *pg)
// =========================================================
// OPS
void OSD::reply_op_error(OpRequest *op, int err)
void OSD::reply_op_error(OpRequestRef op, int err)
{
reply_op_error(op, err, eversion_t());
}
void OSD::reply_op_error(OpRequest *op, int err, eversion_t v)
void OSD::reply_op_error(OpRequestRef op, int err, eversion_t v)
{
MOSDOp *m = (MOSDOp*)op->request;
assert(m->get_header().type == CEPH_MSG_OSD_OP);
@ -5250,17 +5160,15 @@ void OSD::reply_op_error(OpRequest *op, int err, eversion_t v)
if (m->get_source().is_osd())
msgr = cluster_messenger;
msgr->send_message(reply, m->get_connection());
op->put();
}
void OSD::handle_misdirected_op(PG *pg, OpRequest *op)
void OSD::handle_misdirected_op(PG *pg, OpRequestRef op)
{
MOSDOp *m = (MOSDOp*)op->request;
assert(m->get_header().type == CEPH_MSG_OSD_OP);
if (pg) {
if (m->get_map_epoch() < pg->info.history.same_primary_since) {
dout(7) << *pg << " changed after " << m->get_map_epoch() << ", dropping" << dendl;
op->put();
return;
} else {
dout(7) << *pg << " misdirected op in " << m->get_map_epoch() << dendl;
@ -5281,12 +5189,11 @@ void OSD::handle_misdirected_op(PG *pg, OpRequest *op)
reply_op_error(op, -ENXIO);
}
void OSD::handle_op(OpRequest *op)
void OSD::handle_op(OpRequestRef op)
{
MOSDOp *m = (MOSDOp*)op->request;
assert(m->get_header().type == CEPH_MSG_OSD_OP);
if (op_is_discardable(m)) {
op->put();
return;
}
@ -5367,7 +5274,6 @@ void OSD::handle_op(OpRequest *op)
// okay, we aren't valid now; check send epoch
if (m->get_map_epoch() >= superblock.oldest_map) {
dout(7) << "don't have sender's osdmap; assuming it was valid and that client will resend" << dendl;
op->put();
return;
}
OSDMapRef send_map = get_map(m->get_map_epoch());
@ -5380,7 +5286,6 @@ void OSD::handle_op(OpRequest *op)
if (send_map->get_pg_role(m->get_pg(), whoami) >= 0) {
dout(7) << "dropping request; client will resend when they get new map" << dendl;
op->put();
} else {
dout(7) << "we are invalid target" << dendl;
handle_misdirected_op(NULL, op);
@ -5425,7 +5330,7 @@ bool OSD::op_has_sufficient_caps(PG *pg, MOSDOp *op)
return true;
}
void OSD::handle_sub_op(OpRequest *op)
void OSD::handle_sub_op(OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp*)op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
@ -5433,7 +5338,6 @@ void OSD::handle_sub_op(OpRequest *op)
dout(10) << "handle_sub_op " << *m << " epoch " << m->map_epoch << dendl;
if (m->map_epoch < up_epoch) {
dout(3) << "replica op from before up" << dendl;
op->put();
return;
}
@ -5456,7 +5360,6 @@ void OSD::handle_sub_op(OpRequest *op)
PG *pg = _have_pg(pgid) ? _lookup_lock_pg(pgid) : NULL;
if (!pg) {
op->put();
return;
}
pg->get();
@ -5465,13 +5368,12 @@ void OSD::handle_sub_op(OpRequest *op)
pg->put();
}
void OSD::handle_sub_op_reply(OpRequest *op)
void OSD::handle_sub_op_reply(OpRequestRef op)
{
MOSDSubOpReply *m = (MOSDSubOpReply*)op->request;
assert(m->get_header().type == MSG_OSD_SUBOPREPLY);
if (m->get_map_epoch() < up_epoch) {
dout(3) << "replica op reply from before up" << dendl;
op->put();
return;
}
@ -5493,7 +5395,6 @@ void OSD::handle_sub_op_reply(OpRequest *op)
PG *pg = _have_pg(pgid) ? _lookup_lock_pg(pgid) : NULL;
if (!pg) {
op->put();
return;
}
pg->get();
@ -5524,7 +5425,7 @@ bool OSD::op_is_discardable(MOSDOp *op)
*
* @return true if the op is queueable; false otherwise.
*/
bool OSD::op_is_queueable(PG *pg, OpRequest *op)
bool OSD::op_is_queueable(PG *pg, OpRequestRef op)
{
assert(pg->is_locked());
MOSDOp *m = (MOSDOp*)op->request;
@ -5536,7 +5437,6 @@ bool OSD::op_is_queueable(PG *pg, OpRequest *op)
}
if (op_is_discardable(m)) {
op->put();
return false;
}
@ -5580,7 +5480,7 @@ bool OSD::op_is_queueable(PG *pg, OpRequest *op)
/*
* discard operation, or return true. no side-effects.
*/
bool OSD::subop_is_queueable(PG *pg, OpRequest *op)
bool OSD::subop_is_queueable(PG *pg, OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp *)op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
@ -5592,7 +5492,6 @@ bool OSD::subop_is_queueable(PG *pg, OpRequest *op)
dout(10) << "handle_sub_op pg changed " << pg->info.history
<< " after " << m->map_epoch
<< ", dropping" << dendl;
op->put();
return false;
}
@ -5602,7 +5501,7 @@ bool OSD::subop_is_queueable(PG *pg, OpRequest *op)
/*
* enqueue called with osd_lock held
*/
void OSD::enqueue_op(PG *pg, OpRequest *op)
void OSD::enqueue_op(PG *pg, OpRequestRef op)
{
dout(15) << *pg << " enqueue_op " << op->request << " "
<< *(op->request) << dendl;
@ -5671,7 +5570,7 @@ PG *OSD::OpWQ::_dequeue()
* thread is currently chewing on so as not to violate ordering from
* the clients' perspective.
*/
void OSD::requeue_ops(PG *pg, list<OpRequest*>& ls)
void OSD::requeue_ops(PG *pg, list<OpRequestRef>& ls)
{
dout(15) << *pg << " requeue_ops " << ls << dendl;
assert(pg->is_locked());
@ -5680,17 +5579,17 @@ void OSD::requeue_ops(PG *pg, list<OpRequest*>& ls)
assert(&ls != &pg->op_queue);
// set current queue contents aside..
list<OpRequest*> orig_queue;
list<OpRequestRef> orig_queue;
orig_queue.swap(pg->op_queue);
// grab whole list at once, in case methods we call below start adding things
// back on the list reference we were passed!
list<OpRequest*> q;
list<OpRequestRef> q;
q.swap(ls);
// requeue old items, now at front.
while (!q.empty()) {
OpRequest *op = q.front();
OpRequestRef op = q.front();
q.pop_front();
enqueue_op(pg, op);
}
@ -5704,7 +5603,7 @@ void OSD::requeue_ops(PG *pg, list<OpRequest*>& ls)
*/
void OSD::dequeue_op(PG *pg)
{
OpRequest *op = 0;
OpRequestRef op;
osd_lock.Lock();
{

View File

@ -35,6 +35,7 @@
#include "auth/KeyRing.h"
#include "messages/MOSDRepScrub.h"
#include "OpRequest.h"
#include <map>
#include <memory>
@ -45,6 +46,7 @@ using namespace std;
#include <ext/hash_set>
using namespace __gnu_cxx;
#include "OpRequest.h"
#define CEPH_OSD_PROTOCOL 10 /* cluster internal */
@ -120,7 +122,6 @@ class ReplicatedPG;
class AuthAuthorizeHandlerRegistry;
class OpRequest;
class OpsFlightSocketHook;
extern const coll_t meta_coll;
@ -163,7 +164,7 @@ protected:
void create_logger();
void tick();
void _dispatch(Message *m);
void dispatch_op(OpRequest *op);
void dispatch_op(OpRequestRef op);
public:
ClassHandler *class_handler;
@ -309,20 +310,20 @@ private:
void update_osd_stat();
// -- waiters --
list<OpRequest*> finished;
list<OpRequestRef> finished;
Mutex finished_lock;
void take_waiters(list<class OpRequest*>& ls) {
void take_waiters(list<OpRequestRef>& ls) {
finished_lock.Lock();
finished.splice(finished.end(), ls);
finished_lock.Unlock();
}
void take_waiter(OpRequest *op) {
void take_waiter(OpRequestRef op) {
finished_lock.Lock();
finished.push_back(op);
finished_lock.Unlock();
}
void push_waiters(list<OpRequest*>& ls) {
void push_waiters(list<OpRequestRef>& ls) {
assert(osd_lock.is_locked()); // currently, at least. be careful if we change this (see #743)
finished_lock.Lock();
finished.splice(finished.begin(), ls);
@ -331,16 +332,9 @@ private:
void do_waiters();
// -- op tracking --
xlist<OpRequest*> ops_in_flight;
/** This is an inner lock that is taken by the following three
* functions without regard for what locks the callers hold. It
* protects the xlist, but not the OpRequests. */
Mutex ops_in_flight_lock;
void register_inflight_op(xlist<OpRequest*>::item *i);
OpTracker op_tracker;
void check_ops_in_flight();
void unregister_inflight_op(xlist<OpRequest*>::item *i);
void dump_ops_in_flight(ostream& ss);
friend struct OpRequest;
friend class OpsFlightSocketHook;
OpsFlightSocketHook *admin_ops_hook;
@ -369,8 +363,8 @@ private:
}
} op_wq;
void enqueue_op(PG *pg, OpRequest *op);
void requeue_ops(PG *pg, list<OpRequest*>& ls);
void enqueue_op(PG *pg, OpRequestRef op);
void requeue_ops(PG *pg, list<OpRequestRef>& ls);
void dequeue_op(PG *pg);
static void static_dequeueop(OSD *o, PG *pg) {
o->dequeue_op(pg);
@ -387,7 +381,7 @@ private:
OSDMapRef osdmap;
utime_t had_map_since;
RWLock map_lock;
list<OpRequest*> waiting_for_osdmap;
list<OpRequestRef> waiting_for_osdmap;
Mutex peer_map_epoch_lock;
map<int, epoch_t> peer_map_epoch;
@ -400,7 +394,7 @@ private:
Session *session = 0);
void _share_map_outgoing(const entity_inst_t& inst);
void wait_for_new_map(OpRequest *op);
void wait_for_new_map(OpRequestRef op);
void handle_osd_map(class MOSDMap *m);
void note_down_osd(int osd);
void note_up_osd(int osd);
@ -439,7 +433,7 @@ protected:
// -- placement groups --
map<int, PGPool*> pool_map;
hash_map<pg_t, PG*> pg_map;
map<pg_t, list<OpRequest*> > waiting_for_pg;
map<pg_t, list<OpRequestRef> > waiting_for_pg;
PGRecoveryStats pg_recovery_stats;
PGPool *_get_pool(int id);
@ -471,7 +465,7 @@ protected:
}
}
void wake_all_pg_waiters() {
for (map<pg_t, list<OpRequest*> >::iterator p = waiting_for_pg.begin();
for (map<pg_t, list<OpRequestRef> >::iterator p = waiting_for_pg.begin();
p != waiting_for_pg.end();
p++)
take_waiters(p->second);
@ -490,7 +484,7 @@ protected:
hash_map<pg_t, create_pg_info> creating_pgs;
bool can_create_pg(pg_t pgid);
void handle_pg_create(OpRequest *op);
void handle_pg_create(OpRequestRef op);
void do_split(PG *parent, set<pg_t>& children, ObjectStore::Transaction &t, C_Contexts *tfin);
void split_pg(PG *parent, map<pg_t,PG*>& children, ObjectStore::Transaction &t);
@ -604,24 +598,24 @@ protected:
void repeer(PG *pg, map< int, map<pg_t,pg_query_t> >& query_map);
bool require_mon_peer(Message *m);
bool require_osd_peer(OpRequest *op);
bool require_osd_peer(OpRequestRef op);
bool require_same_or_newer_map(OpRequest *op, epoch_t e);
bool require_same_or_newer_map(OpRequestRef op, epoch_t e);
void handle_pg_query(OpRequest *op);
void handle_pg_missing(OpRequest *op);
void handle_pg_notify(OpRequest *op);
void handle_pg_log(OpRequest *op);
void handle_pg_info(OpRequest *op);
void handle_pg_trim(OpRequest *op);
void handle_pg_query(OpRequestRef op);
void handle_pg_missing(OpRequestRef op);
void handle_pg_notify(OpRequestRef op);
void handle_pg_log(OpRequestRef op);
void handle_pg_info(OpRequestRef op);
void handle_pg_trim(OpRequestRef op);
void handle_pg_scan(OpRequest *op);
bool scan_is_queueable(PG *pg, OpRequest *op);
void handle_pg_scan(OpRequestRef op);
bool scan_is_queueable(PG *pg, OpRequestRef op);
void handle_pg_backfill(OpRequest *op);
bool backfill_is_queueable(PG *pg, OpRequest *op);
void handle_pg_backfill(OpRequestRef op);
bool backfill_is_queueable(PG *pg, OpRequestRef op);
void handle_pg_remove(OpRequest *op);
void handle_pg_remove(OpRequestRef op);
void queue_pg_for_deletion(PG *pg);
void _remove_pg(PG *pg);
@ -1061,16 +1055,16 @@ public:
void handle_signal(int signum);
void reply_op_error(OpRequest *op, int r);
void reply_op_error(OpRequest *op, int r, eversion_t v);
void handle_misdirected_op(PG *pg, OpRequest *op);
void reply_op_error(OpRequestRef op, int r);
void reply_op_error(OpRequestRef op, int r, eversion_t v);
void handle_misdirected_op(PG *pg, OpRequestRef op);
void handle_rep_scrub(MOSDRepScrub *m);
void handle_scrub(class MOSDScrub *m);
void handle_osd_ping(class MOSDPing *m);
void handle_op(OpRequest *op);
void handle_sub_op(OpRequest *op);
void handle_sub_op_reply(OpRequest *op);
void handle_op(OpRequestRef op);
void handle_sub_op(OpRequestRef op);
void handle_sub_op_reply(OpRequestRef op);
private:
/// check if we can throw out op from a disconnected client
@ -1078,9 +1072,9 @@ private:
/// check if op has sufficient caps
bool op_has_sufficient_caps(PG *pg, class MOSDOp *m);
/// check if op should be (re)queued for processing
bool op_is_queueable(PG *pg, OpRequest *op);
bool op_is_queueable(PG *pg, OpRequestRef op);
/// check if subop should be (re)queued for processing
bool subop_is_queueable(PG *pg, OpRequest *op);
bool subop_is_queueable(PG *pg, OpRequestRef op);
public:
void force_remount();

114
src/osd/OpRequest.cc Normal file
View File

@ -0,0 +1,114 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
#include "OpRequest.h"
#include "common/Formatter.h"
#include <iostream>
#include "common/debug.h"
#include "common/config.h"
#define DOUT_SUBSYS optracker
#undef dout_prefix
#define dout_prefix _prefix(_dout)
static ostream& _prefix(std::ostream* _dout)
{
return *_dout << "--OSD::tracker-- ";
}
void OpTracker::dump_ops_in_flight(ostream &ss)
{
JSONFormatter jf(true);
Mutex::Locker locker(ops_in_flight_lock);
jf.open_object_section("ops_in_flight"); // overall dump
jf.dump_int("num_ops", ops_in_flight.size());
jf.open_array_section("ops"); // list of OpRequests
utime_t now = ceph_clock_now(g_ceph_context);
for (xlist<OpRequest*>::iterator p = ops_in_flight.begin(); !p.end(); ++p) {
stringstream name;
Message *m = (*p)->request;
m->print(name);
jf.open_object_section("op");
jf.dump_string("description", name.str().c_str()); // this OpRequest
jf.dump_stream("received_at") << (*p)->received_time;
jf.dump_float("age", now - (*p)->received_time);
jf.dump_string("flag_point", (*p)->state_string());
if (m->get_orig_source().is_client()) {
jf.open_object_section("client_info");
stringstream client_name;
client_name << m->get_orig_source();
jf.dump_string("client", client_name.str());
jf.dump_int("tid", m->get_tid());
jf.close_section(); // client_info
}
jf.close_section(); // this OpRequest
}
jf.close_section(); // list of OpRequests
jf.close_section(); // overall dump
jf.flush(ss);
}
void OpTracker::register_inflight_op(xlist<OpRequest*>::item *i)
{
Mutex::Locker locker(ops_in_flight_lock);
ops_in_flight.push_back(i);
ops_in_flight.back()->seq = seq++;
}
void OpTracker::unregister_inflight_op(xlist<OpRequest*>::item *i)
{
Mutex::Locker locker(ops_in_flight_lock);
assert(i->get_list() == &ops_in_flight);
i->remove_myself();
}
bool OpTracker::check_ops_in_flight(ostream &out)
{
Mutex::Locker locker(ops_in_flight_lock);
if (!ops_in_flight.size())
return false;
utime_t now = ceph_clock_now(g_ceph_context);
utime_t too_old = now;
too_old -= g_conf->osd_op_complaint_time;
dout(10) << "ops_in_flight.size: " << ops_in_flight.size()
<< "; oldest is " << now - ops_in_flight.front()->received_time
<< " seconds old" << dendl;
xlist<OpRequest*>::iterator i = ops_in_flight.begin();
while (!i.end() && (*i)->received_time < too_old) {
// exponential backoff of warning intervals
if ( ( (*i)->received_time +
(g_conf->osd_op_complaint_time *
(*i)->warn_interval_multiplier) )< now) {
out << "old request " << *((*i)->request) << " received at "
<< (*i)->received_time << " currently " << (*i)->state_string();
(*i)->warn_interval_multiplier *= 2;
}
++i;
}
return !i.end();
}
void OpTracker::mark_event(OpRequest *op, const string &dest)
{
Mutex::Locker locker(ops_in_flight_lock);
utime_t now = ceph_clock_now(g_ceph_context);
dout(1) << "seq: " << op->seq << ", time: " << now << ", event: " << dest
<< " " << *op->request << dendl;
}
void OpTracker::RemoveOnDelete::operator()(OpRequest *op) {
tracker->unregister_inflight_op(&(op->xitem));
delete op;
}
OpRequestRef OpTracker::create_request(Message *ref)
{
return OpRequestRef(new OpRequest(ref, this),
RemoveOnDelete(this));
}
void OpRequest::mark_event(const string &event)
{
tracker->mark_event(this, event);
}

View File

@ -13,6 +13,38 @@
#ifndef OPREQUEST_H_
#define OPREQUEST_H_
#include <sstream>
#include <stdint.h>
#include <include/utime.h>
#include "common/Mutex.h"
#include "include/xlist.h"
#include "msg/Message.h"
#include <tr1/memory>
#include "common/TrackedOp.h"
class OpRequest;
typedef std::tr1::shared_ptr<OpRequest> OpRequestRef;
class OpTracker {
class RemoveOnDelete {
OpTracker *tracker;
public:
RemoveOnDelete(OpTracker *tracker) : tracker(tracker) {}
void operator()(OpRequest *op);
};
friend class RemoveOnDelete;
uint64_t seq;
Mutex ops_in_flight_lock;
xlist<OpRequest *> ops_in_flight;
public:
OpTracker() : seq(0), ops_in_flight_lock("OpTracker mutex") {}
void dump_ops_in_flight(std::ostream& ss);
void register_inflight_op(xlist<OpRequest*>::item *i);
void unregister_inflight_op(xlist<OpRequest*>::item *i);
bool check_ops_in_flight(std::ostream &out);
void mark_event(OpRequest *op, const string &evt);
OpRequestRef create_request(Message *req);
};
/**
* The OpRequest takes in a Message* and takes over a single reference
@ -21,33 +53,35 @@
* you want to track, create an OpRequest with it, and then pass around that OpRequest
* the way you used to pass around the Message.
*/
struct OpRequest : public RefCountedObject {
struct OpRequest : public TrackedOp {
friend class OpTracker;
Message *request;
xlist<OpRequest*>::item xitem;
utime_t received_time;
uint8_t warn_interval_multiplier;
private:
OSD *osd;
OpTracker *tracker;
uint8_t hit_flag_points;
uint8_t latest_flag_point;
uint64_t seq;
static const uint8_t flag_queued_for_pg=1 << 0;
static const uint8_t flag_reached_pg = 1 << 1;
static const uint8_t flag_delayed = 1 << 2;
static const uint8_t flag_started = 1 << 3;
static const uint8_t flag_sub_op_sent = 1 << 4;
public:
OpRequest() : request(NULL), xitem(this) {}
OpRequest(Message *req, OSD *o) : request(req), xitem(this),
warn_interval_multiplier(1),
osd(o) {
OpRequest(Message *req, OpTracker *tracker) :
request(req), xitem(this),
warn_interval_multiplier(1),
tracker(tracker),
seq(0) {
received_time = request->get_recv_stamp();
tracker->register_inflight_op(&xitem);
}
public:
~OpRequest() {
osd->unregister_inflight_op(&xitem);
if (request) {
request->put();
}
assert(request);
request->put();
}
bool been_queued_for_pg() { return hit_flag_points & flag_queued_for_pg; }
@ -74,10 +108,12 @@ public:
}
void mark_queued_for_pg() {
mark_event("queued_for_pg");
hit_flag_points |= flag_queued_for_pg;
latest_flag_point = flag_queued_for_pg;
}
void mark_reached_pg() {
mark_event("reached_pg");
hit_flag_points |= flag_reached_pg;
latest_flag_point = flag_reached_pg;
}
@ -86,13 +122,17 @@ public:
latest_flag_point = flag_delayed;
}
void mark_started() {
mark_event("started");
hit_flag_points |= flag_started;
latest_flag_point = flag_started;
}
void mark_sub_op_sent() {
mark_event("sub_op_sent");
hit_flag_points |= flag_sub_op_sent;
latest_flag_point = flag_sub_op_sent;
}
void mark_event(const string &event);
};
#endif /* OPREQUEST_H_ */

View File

@ -549,7 +549,7 @@ bool PG::search_for_missing(const pg_info_t &oinfo, const pg_missing_t *omissing
map<hobject_t, set<int> >::iterator ml = missing_loc.find(soid);
if (ml == missing_loc.end()) {
map<hobject_t, list<class OpRequest*> >::iterator wmo =
map<hobject_t, list<OpRequestRef> >::iterator wmo =
waiting_for_missing_object.find(soid);
if (wmo != waiting_for_missing_object.end()) {
osd->requeue_ops(this, wmo->second);
@ -1395,16 +1395,14 @@ void PG::do_pending_flush()
}
}
void PG::do_request(OpRequest *op)
void PG::do_request(OpRequestRef op)
{
// do any pending flush
do_pending_flush();
switch (op->request->get_type()) {
case CEPH_MSG_OSD_OP:
if (osd->op_is_discardable((MOSDOp*)op->request))
op->put();
else
if (!osd->op_is_discardable((MOSDOp*)op->request))
do_op(op); // do it now
break;
@ -1434,11 +1432,11 @@ void PG::replay_queued_ops()
{
assert(is_replay() && is_active());
eversion_t c = info.last_update;
list<OpRequest*> replay;
list<OpRequestRef> replay;
dout(10) << "replay_queued_ops" << dendl;
state_clear(PG_STATE_REPLAY);
for (map<eversion_t,OpRequest*>::iterator p = replay_queue.begin();
for (map<eversion_t,OpRequestRef>::iterator p = replay_queue.begin();
p != replay_queue.end();
p++) {
if (p->first.version != c.version+1) {
@ -2434,9 +2432,9 @@ void PG::adjust_local_snaps()
}
}
void PG::requeue_object_waiters(map<hobject_t, list<OpRequest*> >& m)
void PG::requeue_object_waiters(map<hobject_t, list<OpRequestRef> >& m)
{
for (map<hobject_t, list<OpRequest*> >::iterator it = m.begin();
for (map<hobject_t, list<OpRequestRef> >::iterator it = m.begin();
it != m.end();
it++)
osd->requeue_ops(this, it->second);
@ -2514,7 +2512,7 @@ bool PG::sched_scrub()
}
void PG::sub_op_scrub_map(OpRequest *op)
void PG::sub_op_scrub_map(OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp *)op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
@ -2523,7 +2521,6 @@ void PG::sub_op_scrub_map(OpRequest *op)
if (m->map_epoch < info.history.same_interval_since) {
dout(10) << "sub_op_scrub discarding old sub_op from "
<< m->map_epoch << " < " << info.history.same_interval_since << dendl;
op->put();
return;
}
@ -2547,8 +2544,6 @@ void PG::sub_op_scrub_map(OpRequest *op)
assert(last_update_applied == info.last_update);
osd->scrub_finalize_wq.queue(this);
}
op->put();
}
/*
@ -2588,7 +2583,7 @@ void PG::_request_scrub_map(int replica, eversion_t version)
get_osdmap()->get_cluster_inst(replica));
}
void PG::sub_op_scrub_reserve(OpRequest *op)
void PG::sub_op_scrub_reserve(OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp*)op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
@ -2596,7 +2591,6 @@ void PG::sub_op_scrub_reserve(OpRequest *op)
if (scrub_reserved) {
dout(10) << "Ignoring reserve request: Already reserved" << dendl;
op->put();
return;
}
@ -2607,11 +2601,9 @@ void PG::sub_op_scrub_reserve(OpRequest *op)
MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
::encode(scrub_reserved, reply->get_data());
osd->cluster_messenger->send_message(reply, m->get_connection());
op->put();
}
void PG::sub_op_scrub_reserve_reply(OpRequest *op)
void PG::sub_op_scrub_reserve_reply(OpRequestRef op)
{
MOSDSubOpReply *reply = (MOSDSubOpReply*)op->request;
assert(reply->get_header().type == MSG_OSD_SUBOPREPLY);
@ -2619,7 +2611,6 @@ void PG::sub_op_scrub_reserve_reply(OpRequest *op)
if (!scrub_reserved) {
dout(10) << "ignoring obsolete scrub reserve reply" << dendl;
op->put();
return;
}
@ -2643,11 +2634,9 @@ void PG::sub_op_scrub_reserve_reply(OpRequest *op)
}
sched_scrub();
}
op->put();
}
void PG::sub_op_scrub_unreserve(OpRequest *op)
void PG::sub_op_scrub_unreserve(OpRequestRef op)
{
assert(op->request->get_header().type == MSG_OSD_SUBOP);
dout(7) << "sub_op_scrub_unreserve" << dendl;
@ -2655,11 +2644,9 @@ void PG::sub_op_scrub_unreserve(OpRequest *op)
op->mark_started();
clear_scrub_reserved();
op->put();
}
void PG::sub_op_scrub_stop(OpRequest *op)
void PG::sub_op_scrub_stop(OpRequestRef op)
{
op->mark_started();
@ -2672,8 +2659,6 @@ void PG::sub_op_scrub_stop(OpRequest *op)
MOSDSubOpReply *reply = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
osd->cluster_messenger->send_message(reply, m->get_connection());
op->put();
}
void PG::clear_scrub_reserved()
@ -3579,8 +3564,8 @@ void PG::start_peering_interval(const OSDMapRef lastmap,
clear_stats();
// take replay queue waiters
list<OpRequest*> ls;
for (map<eversion_t,OpRequest*>::iterator it = replay_queue.begin();
list<OpRequestRef> ls;
for (map<eversion_t,OpRequestRef>::iterator it = replay_queue.begin();
it != replay_queue.end();
it++)
ls.push_back(it->second);

View File

@ -31,6 +31,7 @@
#include "include/xlist.h"
#include "include/atomic.h"
#include "OpRequest.h"
#include "OSDMap.h"
#include "os/ObjectStore.h"
#include "msg/Messenger.h"
@ -52,7 +53,6 @@ using namespace __gnu_cxx;
class OSD;
class OpRequest;
class MOSDOp;
class MOSDSubOp;
class MOSDSubOpReply;
@ -394,7 +394,7 @@ public:
}
list<OpRequest*> op_queue; // op queue
list<OpRequestRef> op_queue; // op queue
bool dirty_info, dirty_log;
@ -648,14 +648,14 @@ protected:
// pg waiters
list<OpRequest*> waiting_for_active;
list<OpRequest*> waiting_for_all_missing;
map<hobject_t, list<OpRequest*> > waiting_for_missing_object,
list<OpRequestRef> waiting_for_active;
list<OpRequestRef> waiting_for_all_missing;
map<hobject_t, list<OpRequestRef> > waiting_for_missing_object,
waiting_for_degraded_object;
map<eversion_t,list<OpRequest*> > waiting_for_ondisk;
map<eversion_t,OpRequest*> replay_queue;
map<eversion_t,list<OpRequestRef> > waiting_for_ondisk;
map<eversion_t,OpRequestRef> replay_queue;
void requeue_object_waiters(map<hobject_t, list<OpRequest*> >& m);
void requeue_object_waiters(map<hobject_t, list<OpRequestRef> >& m);
// stats
Mutex pg_stats_lock;
@ -813,11 +813,11 @@ public:
bool sched_scrub();
void replica_scrub(class MOSDRepScrub *op);
void sub_op_scrub_map(OpRequest *op);
void sub_op_scrub_reserve(OpRequest *op);
void sub_op_scrub_reserve_reply(OpRequest *op);
void sub_op_scrub_unreserve(OpRequest *op);
void sub_op_scrub_stop(OpRequest *op);
void sub_op_scrub_map(OpRequestRef op);
void sub_op_scrub_reserve(OpRequestRef op);
void sub_op_scrub_reserve_reply(OpRequestRef op);
void sub_op_scrub_unreserve(OpRequestRef op);
void sub_op_scrub_stop(OpRequestRef op);
// -- recovery state --
@ -1423,13 +1423,13 @@ public:
// abstract bits
void do_request(OpRequest *op);
void do_request(OpRequestRef op);
virtual void do_op(OpRequest *op) = 0;
virtual void do_sub_op(OpRequest *op) = 0;
virtual void do_sub_op_reply(OpRequest *op) = 0;
virtual void do_scan(OpRequest *op) = 0;
virtual void do_backfill(OpRequest *op) = 0;
virtual void do_op(OpRequestRef op) = 0;
virtual void do_sub_op(OpRequestRef op) = 0;
virtual void do_sub_op_reply(OpRequestRef op) = 0;
virtual void do_scan(OpRequestRef op) = 0;
virtual void do_backfill(OpRequestRef op) = 0;
virtual bool snap_trimmer() = 0;
virtual int do_command(vector<string>& cmd, ostream& ss,

View File

@ -101,7 +101,7 @@ bool ReplicatedPG::is_missing_object(const hobject_t& soid)
return missing.missing.count(soid);
}
void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequest *op)
void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequestRef op)
{
assert(is_missing_object(soid));
@ -125,7 +125,7 @@ void ReplicatedPG::wait_for_missing_object(const hobject_t& soid, OpRequest *op)
op->mark_delayed();
}
void ReplicatedPG::wait_for_all_missing(OpRequest *op)
void ReplicatedPG::wait_for_all_missing(OpRequestRef op)
{
waiting_for_all_missing.push_back(op);
}
@ -157,7 +157,7 @@ bool ReplicatedPG::is_degraded_object(const hobject_t& soid)
return false;
}
void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequest *op)
void ReplicatedPG::wait_for_degraded_object(const hobject_t& soid, OpRequestRef op)
{
assert(is_degraded_object(soid));
@ -398,7 +398,7 @@ bool ReplicatedPG::pg_op_must_wait(MOSDOp *op)
return false;
}
void ReplicatedPG::do_pg_op(OpRequest *op)
void ReplicatedPG::do_pg_op(OpRequestRef op)
{
MOSDOp *m = (MOSDOp *)op->request;
assert(m->get_header().type == CEPH_MSG_OSD_OP);
@ -541,7 +541,6 @@ void ReplicatedPG::do_pg_op(OpRequest *op)
reply->set_data(outdata);
reply->set_result(result);
osd->client_messenger->send_message(reply, m->get_connection());
op->put();
delete filter;
}
@ -591,7 +590,7 @@ void ReplicatedPG::get_src_oloc(const object_t& oid, const object_locator_t& olo
* pg lock will be held (if multithreaded)
* osd_lock NOT held.
*/
void ReplicatedPG::do_op(OpRequest *op)
void ReplicatedPG::do_op(OpRequestRef op)
{
MOSDOp *m = (MOSDOp*)op->request;
assert(m->get_header().type == CEPH_MSG_OSD_OP);
@ -786,7 +785,6 @@ void ReplicatedPG::do_op(OpRequest *op)
} else {
dout(10) << "no src oid specified for multi op " << osd_op << dendl;
osd->reply_op_error(op, -EINVAL);
op->put();
}
put_object_contexts(src_obc);
put_object_context(obc);
@ -933,7 +931,6 @@ void ReplicatedPG::do_op(OpRequest *op)
ctx->reply = NULL;
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
osd->client_messenger->send_message(reply, m->get_connection());
op->put();
delete ctx;
put_object_context(obc);
put_object_contexts(src_obc);
@ -1012,7 +1009,7 @@ void ReplicatedPG::log_op_stats(OpContext *ctx)
<< " lat " << latency << dendl;
}
void ReplicatedPG::log_subop_stats(OpRequest *op, int tag_inb, int tag_lat)
void ReplicatedPG::log_subop_stats(OpRequestRef op, int tag_inb, int tag_lat)
{
utime_t now = ceph_clock_now(g_ceph_context);
utime_t latency = now;
@ -1034,7 +1031,7 @@ void ReplicatedPG::log_subop_stats(OpRequest *op, int tag_inb, int tag_lat)
void ReplicatedPG::do_sub_op(OpRequest *op)
void ReplicatedPG::do_sub_op(OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp*)op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
@ -1070,7 +1067,7 @@ void ReplicatedPG::do_sub_op(OpRequest *op)
sub_op_modify(op);
}
void ReplicatedPG::do_sub_op_reply(OpRequest *op)
void ReplicatedPG::do_sub_op_reply(OpRequestRef op)
{
MOSDSubOpReply *r = (MOSDSubOpReply *)op->request;
assert(r->get_header().type == MSG_OSD_SUBOPREPLY);
@ -1091,7 +1088,7 @@ void ReplicatedPG::do_sub_op_reply(OpRequest *op)
sub_op_modify_reply(op);
}
void ReplicatedPG::do_scan(OpRequest *op)
void ReplicatedPG::do_scan(OpRequestRef op)
{
MOSDPGScan *m = (MOSDPGScan*)op->request;
assert(m->get_header().type == MSG_OSD_PG_SCAN);
@ -1133,11 +1130,9 @@ void ReplicatedPG::do_scan(OpRequest *op)
}
break;
}
op->put();
}
void ReplicatedPG::do_backfill(OpRequest *op)
void ReplicatedPG::do_backfill(OpRequestRef op)
{
MOSDPGBackfill *m = (MOSDPGBackfill*)op->request;
assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
@ -1181,8 +1176,6 @@ void ReplicatedPG::do_backfill(OpRequest *op)
}
break;
}
op->put();
}
/* Returns head of snap_trimq as snap_to_trim and the relevant objects as
@ -1252,7 +1245,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid,
vector<OSDOp> ops;
tid_t rep_tid = osd->get_tid();
osd_reqid_t reqid(osd->cluster_messenger->get_myname(), 0, rep_tid);
OpContext *ctx = new OpContext(NULL, reqid, ops, &obc->obs, ssc, this);
OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops, &obc->obs, ssc, this);
ctx->mtime = ceph_clock_now(g_ceph_context);
ctx->at_version.epoch = get_osdmap()->get_epoch();
@ -3273,7 +3266,7 @@ void ReplicatedPG::apply_repop(RepGather *repop)
Context *onapplied = new C_OSD_OpApplied(this, repop);
Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(repop->obc,
repop->ctx->clone_obc);
int r = osd->store->queue_transactions(&osr, repop->tls, onapplied, oncommit, onapplied_sync);
int r = osd->store->queue_transactions(&osr, repop->tls, onapplied, oncommit, onapplied_sync, repop->ctx->op);
if (r) {
derr << "apply_repop queue_transactions returned " << r << " on " << *repop << dendl;
assert(0);
@ -3284,6 +3277,8 @@ void ReplicatedPG::op_applied(RepGather *repop)
{
lock();
dout(10) << "op_applied " << *repop << dendl;
if (repop->ctx->op)
repop->ctx->op->mark_event("op_applied");
// discard my reference to the buffer
if (repop->ctx->op)
@ -3337,6 +3332,8 @@ void ReplicatedPG::op_applied(RepGather *repop)
void ReplicatedPG::op_commit(RepGather *repop)
{
lock();
if (repop->ctx->op)
repop->ctx->op->mark_event("op_commit");
if (repop->aborted) {
dout(10) << "op_commit " << *repop << " -- aborted" << dendl;
@ -3599,6 +3596,8 @@ void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type,
<< dendl;
if (ack_type & CEPH_OSD_FLAG_ONDISK) {
if (repop->ctx->op)
repop->ctx->op->mark_event("sub_op_commit_rec");
// disk
if (repop->waitfor_disk.count(fromosd)) {
repop->waitfor_disk.erase(fromosd);
@ -3612,6 +3611,8 @@ void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type,
repop->waitfor_ack.erase(fromosd);*/
} else {
// ack
if (repop->ctx->op)
repop->ctx->op->mark_event("sub_op_applied_rec");
repop->waitfor_ack.erase(fromosd);
}
@ -3695,7 +3696,8 @@ void ReplicatedPG::handle_watch_timeout(void *_obc,
vector<OSDOp> ops;
tid_t rep_tid = osd->get_tid();
osd_reqid_t reqid(osd->cluster_messenger->get_myname(), 0, rep_tid);
OpContext *ctx = new OpContext(NULL, reqid, ops, &obc->obs, obc->ssc, this);
OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops,
&obc->obs, obc->ssc, this);
ctx->mtime = ceph_clock_now(g_ceph_context);
ctx->at_version.epoch = get_osdmap()->get_epoch();
@ -4046,7 +4048,7 @@ void ReplicatedPG::put_snapset_context(SnapSetContext *ssc)
// sub op modify
void ReplicatedPG::sub_op_modify(OpRequest *op)
void ReplicatedPG::sub_op_modify(OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp*)op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
@ -4148,7 +4150,7 @@ void ReplicatedPG::sub_op_modify(OpRequest *op)
Context *oncommit = new C_OSD_RepModifyCommit(rm);
Context *onapply = new C_OSD_RepModifyApply(rm);
int r = osd->store->queue_transactions(&osr, rm->tls, onapply, oncommit);
int r = osd->store->queue_transactions(&osr, rm->tls, onapply, oncommit, 0, op);
if (r) {
dout(0) << "error applying transaction: r = " << r << dendl;
assert(0);
@ -4159,6 +4161,7 @@ void ReplicatedPG::sub_op_modify(OpRequest *op)
void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
{
lock();
rm->op->mark_event("sub_op_applied");
dout(10) << "sub_op_modify_applied on " << rm << " op " << *rm->op->request << dendl;
MOSDSubOp *m = (MOSDSubOp*)rm->op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
@ -4188,7 +4191,6 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
unlock();
if (done) {
delete rm->ctx;
rm->op->put();
delete rm;
put();
}
@ -4197,6 +4199,7 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm)
void ReplicatedPG::sub_op_modify_commit(RepModify *rm)
{
lock();
rm->op->mark_event("sub_op_commit");
// send commit.
dout(10) << "sub_op_modify_commit on op " << *rm->op->request
@ -4219,13 +4222,12 @@ void ReplicatedPG::sub_op_modify_commit(RepModify *rm)
unlock();
if (done) {
delete rm->ctx;
rm->op->put();
delete rm;
put();
}
}
void ReplicatedPG::sub_op_modify_reply(OpRequest *op)
void ReplicatedPG::sub_op_modify_reply(OpRequestRef op)
{
MOSDSubOpReply *r = (MOSDSubOpReply*)op->request;
assert(r->get_header().type == MSG_OSD_SUBOPREPLY);
@ -4243,8 +4245,6 @@ void ReplicatedPG::sub_op_modify_reply(OpRequest *op)
fromosd,
r->get_last_complete_ondisk());
}
op->put();
}
@ -4737,7 +4737,7 @@ ObjectRecoveryInfo ReplicatedPG::recalc_subsets(ObjectRecoveryInfo recovery_info
return new_info;
}
void ReplicatedPG::handle_pull_response(OpRequest *op)
void ReplicatedPG::handle_pull_response(OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp *)op->request;
bufferlist data;
@ -4869,7 +4869,7 @@ void ReplicatedPG::handle_pull_response(OpRequest *op)
}
}
void ReplicatedPG::handle_push(OpRequest *op)
void ReplicatedPG::handle_push(OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp *)op->request;
dout(10) << "handle_push "
@ -5030,7 +5030,7 @@ void ReplicatedPG::send_push_op_blank(const hobject_t& soid, int peer)
osd->cluster_messenger->send_message(subop, get_osdmap()->get_cluster_inst(peer));
}
void ReplicatedPG::sub_op_push_reply(OpRequest *op)
void ReplicatedPG::sub_op_push_reply(OpRequestRef op)
{
MOSDSubOpReply *reply = (MOSDSubOpReply*)op->request;
assert(reply->get_header().type == MSG_OSD_SUBOPREPLY);
@ -5086,7 +5086,6 @@ void ReplicatedPG::sub_op_push_reply(OpRequest *op)
}
}
}
op->put();
}
void ReplicatedPG::finish_degraded_object(const hobject_t& oid)
@ -5111,7 +5110,7 @@ void ReplicatedPG::finish_degraded_object(const hobject_t& oid)
* process request to pull an entire object.
* NOTE: called from opqueue.
*/
void ReplicatedPG::sub_op_pull(OpRequest *op)
void ReplicatedPG::sub_op_pull(OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp*)op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
@ -5150,11 +5149,10 @@ void ReplicatedPG::sub_op_pull(OpRequest *op)
}
log_subop_stats(op, 0, l_osd_sop_pull_lat);
op->put();
}
void ReplicatedPG::_committed_pushed_object(OpRequest *op, epoch_t same_since, eversion_t last_complete)
void ReplicatedPG::_committed_pushed_object(OpRequestRef op, epoch_t same_since, eversion_t last_complete)
{
lock();
if (same_since == info.history.same_interval_since) {
@ -5275,7 +5273,7 @@ void ReplicatedPG::trim_pushed_data(
/** op_push
* NOTE: called from opqueue.
*/
void ReplicatedPG::sub_op_push(OpRequest *op)
void ReplicatedPG::sub_op_push(OpRequestRef op)
{
op->mark_started();
if (is_primary()) {
@ -5283,11 +5281,10 @@ void ReplicatedPG::sub_op_push(OpRequest *op)
} else {
handle_push(op);
}
op->put();
return;
}
void ReplicatedPG::_failed_push(OpRequest *op)
void ReplicatedPG::_failed_push(OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp*)op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
@ -5309,11 +5306,9 @@ void ReplicatedPG::_failed_push(OpRequest *op)
finish_recovery_op(soid); // close out this attempt,
pull_from_peer[from].erase(soid);
pulling.erase(soid);
op->put();
}
void ReplicatedPG::sub_op_remove(OpRequest *op)
void ReplicatedPG::sub_op_remove(OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp*)op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
@ -5325,8 +5320,6 @@ void ReplicatedPG::sub_op_remove(OpRequest *op)
remove_object_with_snap_hardlinks(*t, m->poid);
int r = osd->store->queue_transaction(&osr, t);
assert(r == 0);
op->put();
}
@ -5363,7 +5356,7 @@ ReplicatedPG::ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transac
{
// Wake anyone waiting for this object. Now that it's been marked as lost,
// we will just return an error code.
map<hobject_t, list<OpRequest*> >::iterator wmo =
map<hobject_t, list<OpRequestRef> >::iterator wmo =
waiting_for_missing_object.find(oid);
if (wmo != waiting_for_missing_object.end()) {
osd->requeue_ops(this, wmo->second);
@ -5524,7 +5517,7 @@ void ReplicatedPG::_finish_mark_all_unfound_lost(list<ObjectContext*>& obcs)
void ReplicatedPG::apply_and_flush_repops(bool requeue)
{
list<OpRequest*> rq;
list<OpRequestRef> rq;
// apply all repops
while (!repop_queue.empty()) {
@ -5538,7 +5531,7 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue)
if (requeue && repop->ctx->op) {
dout(10) << " requeuing " << *repop->ctx->op->request << dendl;
rq.push_back(repop->ctx->op);
repop->ctx->op = 0;
repop->ctx->op = OpRequestRef();
}
remove_repop(repop);
@ -5590,7 +5583,7 @@ void ReplicatedPG::on_change()
// take object waiters
requeue_object_waiters(waiting_for_missing_object);
for (map<hobject_t,list<OpRequest*> >::iterator p = waiting_for_degraded_object.begin();
for (map<hobject_t,list<OpRequestRef> >::iterator p = waiting_for_degraded_object.begin();
p != waiting_for_degraded_object.end();
waiting_for_degraded_object.erase(p++)) {
osd->requeue_ops(this, p->second);
@ -5613,7 +5606,7 @@ void ReplicatedPG::on_role_change()
dout(10) << "on_role_change" << dendl;
// take commit waiters
for (map<eversion_t, list<OpRequest*> >::iterator p = waiting_for_ondisk.begin();
for (map<eversion_t, list<OpRequestRef> >::iterator p = waiting_for_ondisk.begin();
p != waiting_for_ondisk.end();
p++)
osd->requeue_ops(this, p->second);
@ -5804,7 +5797,7 @@ int ReplicatedPG::recover_primary(int max)
osd->store->queue_transaction(&osr, t,
new C_OSD_AppliedRecoveredObject(this, t, obc),
new C_OSD_CommittedPushedObject(this, NULL,
new C_OSD_CommittedPushedObject(this, OpRequestRef(),
info.history.same_interval_since,
info.last_complete),
new C_OSD_OndiskWriteUnlock(obc));

View File

@ -133,7 +133,7 @@ public:
}
state_t state;
int num_wr;
list<OpRequest*> waiting;
list<OpRequestRef> waiting;
list<Cond*> waiting_cond;
bool wake;
@ -333,7 +333,7 @@ public:
* Capture all object state associated with an in-progress read or write.
*/
struct OpContext {
OpRequest *op;
OpRequestRef op;
osd_reqid_t reqid;
vector<OSDOp>& ops;
@ -380,7 +380,7 @@ public:
OpContext(const OpContext& other);
const OpContext& operator=(const OpContext& other);
OpContext(OpRequest *_op, osd_reqid_t _reqid, vector<OSDOp>& _ops,
OpContext(OpRequestRef _op, osd_reqid_t _reqid, vector<OSDOp>& _ops,
ObjectState *_obs, SnapSetContext *_ssc,
ReplicatedPG *_pg) :
op(_op), reqid(_reqid), ops(_ops), obs(_obs),
@ -454,8 +454,6 @@ public:
if (--nref == 0) {
assert(!obc);
assert(src_obc.empty());
if (ctx->op)
ctx->op->put();
delete ctx;
delete this;
//generic_dout(0) << "deleting " << this << dendl;
@ -573,8 +571,8 @@ protected:
bufferlist data_received,
interval_set<uint64_t> *intervals_usable,
bufferlist *data_usable);
void handle_pull_response(OpRequest *op);
void handle_push(OpRequest *op);
void handle_pull_response(OpRequestRef op);
void handle_push(OpRequestRef op);
int send_push(int peer,
ObjectRecoveryInfo recovery_info,
ObjectRecoveryProgress progress,
@ -690,7 +688,7 @@ protected:
struct RepModify {
ReplicatedPG *pg;
OpRequest *op;
OpRequestRef op;
OpContext *ctx;
bool applied, committed;
int ackerosd;
@ -701,7 +699,7 @@ protected:
ObjectStore::Transaction opt, localt;
list<ObjectStore::Transaction*> tls;
RepModify() : pg(NULL), op(NULL), ctx(NULL), applied(false), committed(false), ackerosd(-1),
RepModify() : pg(NULL), ctx(NULL), applied(false), committed(false), ackerosd(-1),
bytes_written(0) {}
};
@ -748,37 +746,33 @@ protected:
};
struct C_OSD_CommittedPushedObject : public Context {
ReplicatedPG *pg;
OpRequest *op;
OpRequestRef op;
epoch_t same_since;
eversion_t last_complete;
C_OSD_CommittedPushedObject(ReplicatedPG *p, OpRequest *o, epoch_t ss, eversion_t lc) : pg(p), op(o), same_since(ss), last_complete(lc) {
if (op)
op->get();
C_OSD_CommittedPushedObject(ReplicatedPG *p, OpRequestRef o, epoch_t ss, eversion_t lc) : pg(p), op(o), same_since(ss), last_complete(lc) {
pg->get();
}
void finish(int r) {
pg->_committed_pushed_object(op, same_since, last_complete);
if (op)
op->put();
}
};
void sub_op_remove(OpRequest *op);
void sub_op_remove(OpRequestRef op);
void sub_op_modify(OpRequest *op);
void sub_op_modify(OpRequestRef op);
void sub_op_modify_applied(RepModify *rm);
void sub_op_modify_commit(RepModify *rm);
void sub_op_modify_reply(OpRequest *op);
void sub_op_modify_reply(OpRequestRef op);
void _applied_recovered_object(ObjectStore::Transaction *t, ObjectContext *obc);
void _committed_pushed_object(OpRequest *op, epoch_t same_since, eversion_t lc);
void _committed_pushed_object(OpRequestRef op, epoch_t same_since, eversion_t lc);
void recover_got(hobject_t oid, eversion_t v);
void sub_op_push(OpRequest *op);
void _failed_push(OpRequest *op);
void sub_op_push_reply(OpRequest *op);
void sub_op_pull(OpRequest *op);
void sub_op_push(OpRequestRef op);
void _failed_push(OpRequestRef op);
void sub_op_push_reply(OpRequestRef op);
void sub_op_pull(OpRequestRef op);
void log_subop_stats(OpRequest *op, int tag_inb, int tag_lat);
void log_subop_stats(OpRequestRef op, int tag_inb, int tag_lat);
// -- scrub --
@ -799,13 +793,13 @@ public:
int do_command(vector<string>& cmd, ostream& ss, bufferlist& idata, bufferlist& odata);
void do_op(OpRequest *op);
void do_op(OpRequestRef op);
bool pg_op_must_wait(MOSDOp *op);
void do_pg_op(OpRequest *op);
void do_sub_op(OpRequest *op);
void do_sub_op_reply(OpRequest *op);
void do_scan(OpRequest *op);
void do_backfill(OpRequest *op);
void do_pg_op(OpRequestRef op);
void do_sub_op(OpRequestRef op);
void do_sub_op_reply(OpRequestRef op);
void do_scan(OpRequestRef op);
void do_backfill(OpRequestRef op);
bool get_obs_to_trim(snapid_t &snap_to_trim,
coll_t &col_to_trim,
vector<hobject_t> &obs_to_trim);
@ -888,11 +882,11 @@ public:
bool same_for_rep_modify_since(epoch_t e);
bool is_missing_object(const hobject_t& oid);
void wait_for_missing_object(const hobject_t& oid, OpRequest *op);
void wait_for_all_missing(OpRequest *op);
void wait_for_missing_object(const hobject_t& oid, OpRequestRef op);
void wait_for_all_missing(OpRequestRef op);
bool is_degraded_object(const hobject_t& oid);
void wait_for_degraded_object(const hobject_t& oid, OpRequest *op);
void wait_for_degraded_object(const hobject_t& oid, OpRequestRef op);
void mark_all_unfound_lost(int what);
eversion_t pick_newest_available(const hobject_t& oid);