1
0
mirror of https://github.com/ceph/ceph synced 2024-12-17 00:46:05 +00:00

Merge branch 'wip_push_refactor'

Reviewed-by: Sage Weil <sage@newdream.net>
This commit is contained in:
Samuel Just 2012-02-14 13:02:44 -08:00
commit 34145d5dd2
6 changed files with 746 additions and 538 deletions

View File

@ -25,7 +25,7 @@
class MOSDSubOp : public Message {
static const int HEAD_VERSION = 3;
static const int HEAD_VERSION = 4;
static const int COMPAT_VERSION = 1;
public:
@ -71,6 +71,15 @@ public:
bool first, complete;
interval_set<uint64_t> data_included;
ObjectRecoveryInfo recovery_info;
// reflects result of current push
ObjectRecoveryProgress recovery_progress;
// reflects progress before current push
ObjectRecoveryProgress current_progress;
virtual void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(map_epoch, p);
@ -101,6 +110,7 @@ public:
::decode(pg_trim_to, p);
::decode(peer_stat, p);
::decode(attrset, p);
::decode(data_subset, p);
::decode(clone_subsets, p);
@ -110,6 +120,12 @@ public:
}
if (header.version >= 3)
::decode(oloc, p);
if (header.version >= 4) {
::decode(data_included, p);
::decode(recovery_info, p);
::decode(recovery_progress, p);
::decode(current_progress, p);
}
}
virtual void encode_payload(uint64_t features) {
@ -148,9 +164,12 @@ public:
::encode(first, payload);
::encode(complete, payload);
::encode(oloc, payload);
::encode(data_included, payload);
::encode(recovery_info, payload);
::encode(recovery_progress, payload);
::encode(current_progress, payload);
}
MOSDSubOp()
: Message(MSG_OSD_SUBOP, HEAD_VERSION, COMPAT_VERSION) { }
MOSDSubOp(osd_reqid_t r, pg_t p, const hobject_t& po, bool noop_, int aw,

File diff suppressed because it is too large Load Diff

View File

@ -543,17 +543,47 @@ protected:
}
void put_snapset_context(SnapSetContext *ssc);
// pull
struct pull_info_t {
eversion_t version;
int from;
bool need_size;
interval_set<uint64_t> data_subset, data_subset_pulling;
// push
struct PushInfo {
ObjectRecoveryProgress recovery_progress;
ObjectRecoveryInfo recovery_info;
};
map<hobject_t, pull_info_t> pulling;
map<hobject_t, map<int, PushInfo> > pushing;
// pull
struct PullInfo {
ObjectRecoveryProgress recovery_progress;
ObjectRecoveryInfo recovery_info;
bool is_complete() const {
return recovery_progress.is_complete(recovery_info);
}
};
map<hobject_t, PullInfo> pulling;
ObjectRecoveryInfo recalc_subsets(ObjectRecoveryInfo recovery_info);
static void trim_pushed_data(const interval_set<uint64_t> &copy_subset,
const interval_set<uint64_t> &intervals_received,
bufferlist data_received,
interval_set<uint64_t> *intervals_usable,
bufferlist *data_usable);
void handle_pull_response(OpRequest *op);
void handle_push(OpRequest *op);
int send_push(int peer,
ObjectRecoveryInfo recovery_info,
ObjectRecoveryProgress progress,
ObjectRecoveryProgress *out_progress = 0);
int send_pull(int peer,
ObjectRecoveryInfo recovery_info,
ObjectRecoveryProgress progress);
void submit_push_data(const ObjectRecoveryInfo &recovery_info,
bool first,
const interval_set<uint64_t> &intervals_included,
bufferlist data_included,
map<string, bufferptr> &attrs,
ObjectStore::Transaction *t);
void submit_push_complete(ObjectRecoveryInfo &recovery_info,
ObjectStore::Transaction *t);
/*
* Backfill
@ -578,15 +608,6 @@ protected:
// Reverse mapping from osd peer to objects beging pulled from that peer
map<int, set<hobject_t> > pull_from_peer;
// push
struct push_info_t {
uint64_t size;
eversion_t version;
interval_set<uint64_t> data_subset, data_subset_pushing;
map<hobject_t, interval_set<uint64_t> > clone_subsets;
};
map<hobject_t, map<int, push_info_t> > pushing;
int recover_object_replicas(const hobject_t& soid, eversion_t v);
void calc_head_subsets(ObjectContext *obc, SnapSet& snapset, const hobject_t& head,
pg_missing_t& missing,
@ -598,15 +619,13 @@ protected:
interval_set<uint64_t>& data_subset,
map<hobject_t, interval_set<uint64_t> >& clone_subsets);
void push_to_replica(ObjectContext *obc, const hobject_t& oid, int dest);
void push_start(const hobject_t& oid, int dest);
void push_start(const hobject_t& soid, int peer,
uint64_t size, eversion_t version,
void push_start(ObjectContext *obc,
const hobject_t& oid, int dest);
void push_start(ObjectContext *obc,
const hobject_t& soid, int peer,
eversion_t version,
interval_set<uint64_t> &data_subset,
map<hobject_t, interval_set<uint64_t> >& clone_subsets);
int send_push_op(const hobject_t& oid, eversion_t version, int dest,
uint64_t size, bool first, bool complete,
interval_set<uint64_t>& data_subset,
map<hobject_t, interval_set<uint64_t> >& clone_subsets);
void send_push_op_blank(const hobject_t& soid, int peer);
void finish_degraded_object(const hobject_t& oid);
@ -614,8 +633,6 @@ protected:
// Cancels/resets pulls from peer
void check_recovery_op_pulls(const OSDMapRef map);
int pull(const hobject_t& oid, eversion_t v);
void send_pull_op(const hobject_t& soid, eversion_t v, bool first, const interval_set<uint64_t>& data_subset, int fromosd);
// low level ops

View File

@ -2000,6 +2000,138 @@ ostream& operator<<(ostream& out, const object_info_t& oi)
return out;
}
// -- ObjectRecovery --
void ObjectRecoveryProgress::encode(bufferlist &bl) const
{
ENCODE_START(1, 1, bl);
::encode(first, bl);
::encode(data_complete, bl);
::encode(data_recovered_to, bl);
::encode(omap_recovered_to, bl);
::encode(omap_complete, bl);
ENCODE_FINISH(bl);
}
void ObjectRecoveryProgress::decode(bufferlist::iterator &bl)
{
DECODE_START(1, bl);
::decode(first, bl);
::decode(data_complete, bl);
::decode(data_recovered_to, bl);
::decode(omap_recovered_to, bl);
::decode(omap_complete, bl);
DECODE_FINISH(bl);
}
ostream &operator<<(ostream &out, const ObjectRecoveryProgress &prog)
{
return prog.print(out);
}
void ObjectRecoveryProgress::generate_test_instances(
list<ObjectRecoveryProgress*>& o)
{
o.push_back(new ObjectRecoveryProgress);
o.back()->first = false;
o.back()->data_complete = true;
o.back()->omap_complete = true;
o.back()->data_recovered_to = 100;
o.push_back(new ObjectRecoveryProgress);
o.back()->first = true;
o.back()->data_complete = false;
o.back()->omap_complete = false;
o.back()->data_recovered_to = 0;
}
ostream &ObjectRecoveryProgress::print(ostream &out) const
{
return out << "ObjectRecoveryProgress("
<< ( first ? "" : "!" ) << "first, "
<< "data_recovered_to:" << data_recovered_to
<< ", data_complete:" << ( data_complete ? "true" : "false" )
<< ", omap_recovered_to:" << omap_recovered_to
<< ", omap_complete:" << ( omap_complete ? "true" : "false" )
<< ")";
}
void ObjectRecoveryProgress::dump(Formatter *f) const
{
f->dump_int("first?", first);
f->dump_int("data_complete?", data_complete);
f->dump_unsigned("data_recovered_to", data_recovered_to);
f->dump_int("omap_complete?", omap_complete);
f->dump_string("omap_recovered_to", omap_recovered_to);
}
void ObjectRecoveryInfo::encode(bufferlist &bl) const
{
ENCODE_START(1, 1, bl);
::encode(soid, bl);
::encode(version, bl);
::encode(size, bl);
::encode(oi, bl);
::encode(ss, bl);
::encode(copy_subset, bl);
::encode(clone_subset, bl);
ENCODE_FINISH(bl);
}
void ObjectRecoveryInfo::decode(bufferlist::iterator &bl)
{
DECODE_START(1, bl);
::decode(soid, bl);
::decode(version, bl);
::decode(size, bl);
::decode(oi, bl);
::decode(ss, bl);
::decode(copy_subset, bl);
::decode(clone_subset, bl);
DECODE_FINISH(bl);
}
void ObjectRecoveryInfo::generate_test_instances(
list<ObjectRecoveryInfo*>& o)
{
o.push_back(new ObjectRecoveryInfo);
o.back()->soid = hobject_t(sobject_t("key", CEPH_NOSNAP));
o.back()->version = eversion_t(0,0);
o.back()->size = 100;
}
void ObjectRecoveryInfo::dump(Formatter *f) const
{
f->dump_stream("object") << soid;
f->dump_stream("at_version") << version;
f->dump_stream("size") << size;
{
f->open_object_section("object_info");
oi.dump(f);
f->close_section();
}
{
f->open_object_section("snapset");
ss.dump(f);
f->close_section();
}
f->dump_stream("copy_subset") << copy_subset;
f->dump_stream("clone_subset") << clone_subset;
}
ostream& operator<<(ostream& out, const ObjectRecoveryInfo &inf)
{
return inf.print(out);
}
ostream &ObjectRecoveryInfo::print(ostream &out) const
{
return out << "ObjectRecoveryInfo("
<< soid << "@" << version
<< ", copy_subset: " << copy_subset
<< ", clone_subset: " << clone_subset
<< ")";
}
// -- ScrubMap --

View File

@ -1614,6 +1614,53 @@ ostream& operator<<(ostream& out, const object_info_t& oi);
// Object recovery
struct ObjectRecoveryInfo {
hobject_t soid;
eversion_t version;
uint64_t size;
object_info_t oi;
SnapSet ss;
interval_set<uint64_t> copy_subset;
map<hobject_t, interval_set<uint64_t> > clone_subset;
ObjectRecoveryInfo() : size(0) { }
static void generate_test_instances(list<ObjectRecoveryInfo*>& o);
void encode(bufferlist &bl) const;
void decode(bufferlist::iterator &bl);
ostream &print(ostream &out) const;
void dump(Formatter *f) const;
};
WRITE_CLASS_ENCODER(ObjectRecoveryInfo)
ostream& operator<<(ostream& out, const ObjectRecoveryInfo &inf);
struct ObjectRecoveryProgress {
bool first;
uint64_t data_recovered_to;
bool data_complete;
string omap_recovered_to;
bool omap_complete;
ObjectRecoveryProgress()
: first(true),
data_recovered_to(0),
data_complete(false), omap_complete(false) { }
bool is_complete(const ObjectRecoveryInfo& info) const {
return data_recovered_to >= (info.copy_subset.empty() ? 0 : info.copy_subset.range_end());
}
static void generate_test_instances(list<ObjectRecoveryProgress*>& o);
void encode(bufferlist &bl) const;
void decode(bufferlist::iterator &bl);
ostream &print(ostream &out) const;
void dump(Formatter *f) const;
};
WRITE_CLASS_ENCODER(ObjectRecoveryProgress)
ostream& operator<<(ostream& out, const ObjectRecoveryProgress &prog);
/*
* summarize pg contents for purposes of a scrub
*/

View File

@ -50,6 +50,8 @@ TYPE(pg_create_t)
TYPE(watch_info_t)
TYPE(object_info_t)
TYPE(SnapSet)
TYPE(ObjectRecoveryInfo)
TYPE(ObjectRecoveryProgress)
TYPE(ScrubMap::object)
TYPE(ScrubMap)
TYPE(osd_peer_stat_t)