mirror of
https://github.com/ceph/ceph
synced 2025-01-03 09:32:43 +00:00
osd/: plumb rollback_trim_to through interfaces and messages
We simply trim the roll back info up to min_last_complete_ondisk since we cannot correctly rollback past that point anyway! ReplicatedPG -> PGBackend::submit_transaction -> Backend messages -> PGBackend::Listener::log_operation -> append_log Signed-off-by: Samuel Just <sam.just@inktank.com>
This commit is contained in:
parent
ac11ca40b4
commit
953c33edb6
@ -25,7 +25,7 @@
|
||||
|
||||
class MOSDSubOp : public Message {
|
||||
|
||||
static const int HEAD_VERSION = 10;
|
||||
static const int HEAD_VERSION = 11;
|
||||
static const int COMPAT_VERSION = 1;
|
||||
|
||||
public:
|
||||
@ -63,6 +63,8 @@ public:
|
||||
|
||||
// piggybacked osd/og state
|
||||
eversion_t pg_trim_to; // primary->replica: trim to here
|
||||
eversion_t pg_trim_rollback_to; // primary->replica: trim rollback
|
||||
// info to here
|
||||
osd_peer_stat_t peer_stat;
|
||||
|
||||
map<string,bufferlist> attrset;
|
||||
@ -175,6 +177,11 @@ public:
|
||||
if (header.version >= 10) {
|
||||
::decode(updated_hit_set_history, p);
|
||||
}
|
||||
if (header.version >= 11) {
|
||||
::decode(pg_trim_rollback_to, p);
|
||||
} else {
|
||||
pg_trim_rollback_to = pg_trim_to;
|
||||
}
|
||||
}
|
||||
|
||||
virtual void encode_payload(uint64_t features) {
|
||||
@ -224,6 +231,7 @@ public:
|
||||
::encode(from, payload);
|
||||
::encode(pgid.shard, payload);
|
||||
::encode(updated_hit_set_history, payload);
|
||||
::encode(pg_trim_rollback_to, payload);
|
||||
}
|
||||
|
||||
MOSDSubOp()
|
||||
|
@ -829,6 +829,7 @@ void ECBackend::handle_sub_write(
|
||||
op.log_entries,
|
||||
op.updated_hit_set_history,
|
||||
op.trim_to,
|
||||
op.trim_rollback_to,
|
||||
!(op.t.empty()),
|
||||
localt);
|
||||
localt->append(op.t);
|
||||
@ -1210,6 +1211,7 @@ void ECBackend::submit_transaction(
|
||||
const eversion_t &at_version,
|
||||
PGTransaction *_t,
|
||||
const eversion_t &trim_to,
|
||||
const eversion_t &trim_rollback_to,
|
||||
vector<pg_log_entry_t> &log_entries,
|
||||
boost::optional<pg_hit_set_history_t> &hset_history,
|
||||
Context *on_local_applied_sync,
|
||||
@ -1225,6 +1227,7 @@ void ECBackend::submit_transaction(
|
||||
op->hoid = hoid;
|
||||
op->version = at_version;
|
||||
op->trim_to = trim_to;
|
||||
op->trim_rollback_to = trim_rollback_to;
|
||||
op->log_entries.swap(log_entries);
|
||||
std::swap(op->updated_hit_set_history, hset_history);
|
||||
op->on_local_applied_sync = on_local_applied_sync;
|
||||
@ -1531,6 +1534,7 @@ void ECBackend::start_write(Op *op) {
|
||||
should_send ? iter->second : ObjectStore::Transaction(),
|
||||
op->version,
|
||||
op->trim_to,
|
||||
op->trim_rollback_to,
|
||||
op->log_entries,
|
||||
op->updated_hit_set_history,
|
||||
op->temp_added,
|
||||
|
@ -97,6 +97,7 @@ public:
|
||||
const eversion_t &at_version,
|
||||
PGTransaction *t,
|
||||
const eversion_t &trim_to,
|
||||
const eversion_t &trim_rollback_to,
|
||||
vector<pg_log_entry_t> &log_entries,
|
||||
boost::optional<pg_hit_set_history_t> &hset_history,
|
||||
Context *on_local_applied_sync,
|
||||
@ -326,6 +327,7 @@ public:
|
||||
hobject_t hoid;
|
||||
eversion_t version;
|
||||
eversion_t trim_to;
|
||||
eversion_t trim_rollback_to;
|
||||
vector<pg_log_entry_t> log_entries;
|
||||
boost::optional<pg_hit_set_history_t> updated_hit_set_history;
|
||||
Context *on_local_applied_sync;
|
||||
|
@ -16,7 +16,7 @@
|
||||
|
||||
void ECSubWrite::encode(bufferlist &bl) const
|
||||
{
|
||||
ENCODE_START(2, 1, bl);
|
||||
ENCODE_START(3, 1, bl);
|
||||
::encode(from, bl);
|
||||
::encode(tid, bl);
|
||||
::encode(reqid, bl);
|
||||
@ -29,12 +29,13 @@ void ECSubWrite::encode(bufferlist &bl) const
|
||||
::encode(temp_added, bl);
|
||||
::encode(temp_removed, bl);
|
||||
::encode(updated_hit_set_history, bl);
|
||||
::encode(trim_rollback_to, bl);
|
||||
ENCODE_FINISH(bl);
|
||||
}
|
||||
|
||||
void ECSubWrite::decode(bufferlist::iterator &bl)
|
||||
{
|
||||
DECODE_START(2, bl);
|
||||
DECODE_START(3, bl);
|
||||
::decode(from, bl);
|
||||
::decode(tid, bl);
|
||||
::decode(reqid, bl);
|
||||
@ -49,6 +50,11 @@ void ECSubWrite::decode(bufferlist::iterator &bl)
|
||||
if (struct_v >= 2) {
|
||||
::decode(updated_hit_set_history, bl);
|
||||
}
|
||||
if (struct_v >= 3) {
|
||||
::decode(trim_rollback_to, bl);
|
||||
} else {
|
||||
trim_rollback_to = trim_to;
|
||||
}
|
||||
DECODE_FINISH(bl);
|
||||
}
|
||||
|
||||
@ -58,7 +64,8 @@ std::ostream &operator<<(
|
||||
lhs << "ECSubWrite(tid=" << rhs.tid
|
||||
<< ", reqid=" << rhs.reqid
|
||||
<< ", at_version=" << rhs.at_version
|
||||
<< ", trim_to=" << rhs.trim_to;
|
||||
<< ", trim_to=" << rhs.trim_to
|
||||
<< ", trim_rollback_to=" << rhs.trim_rollback_to;
|
||||
if (rhs.updated_hit_set_history)
|
||||
lhs << ", has_updated_hit_set_history";
|
||||
return lhs << ")";
|
||||
@ -70,6 +77,7 @@ void ECSubWrite::dump(Formatter *f) const
|
||||
f->dump_stream("reqid") << reqid;
|
||||
f->dump_stream("at_version") << at_version;
|
||||
f->dump_stream("trim_to") << trim_to;
|
||||
f->dump_stream("trim_rollback_to") << trim_rollback_to;
|
||||
f->dump_stream("has_updated_hit_set_history")
|
||||
<< static_cast<bool>(updated_hit_set_history);
|
||||
}
|
||||
@ -85,6 +93,12 @@ void ECSubWrite::generate_test_instances(list<ECSubWrite*> &o)
|
||||
o.back()->reqid = osd_reqid_t(entity_name_t::CLIENT(123), 1, 45678);
|
||||
o.back()->at_version = eversion_t(10, 300);
|
||||
o.back()->trim_to = eversion_t(5, 42);
|
||||
o.push_back(new ECSubWrite());
|
||||
o.back()->tid = 9;
|
||||
o.back()->reqid = osd_reqid_t(entity_name_t::CLIENT(123), 1, 45678);
|
||||
o.back()->at_version = eversion_t(10, 300);
|
||||
o.back()->trim_to = eversion_t(5, 42);
|
||||
o.back()->trim_rollback_to = eversion_t(8, 250);
|
||||
}
|
||||
|
||||
void ECSubWriteReply::encode(bufferlist &bl) const
|
||||
|
@ -28,6 +28,7 @@ struct ECSubWrite {
|
||||
ObjectStore::Transaction t;
|
||||
eversion_t at_version;
|
||||
eversion_t trim_to;
|
||||
eversion_t trim_rollback_to;
|
||||
vector<pg_log_entry_t> log_entries;
|
||||
set<hobject_t> temp_added;
|
||||
set<hobject_t> temp_removed;
|
||||
@ -42,6 +43,7 @@ struct ECSubWrite {
|
||||
const ObjectStore::Transaction &t,
|
||||
eversion_t at_version,
|
||||
eversion_t trim_to,
|
||||
eversion_t trim_rollback_to,
|
||||
vector<pg_log_entry_t> log_entries,
|
||||
boost::optional<pg_hit_set_history_t> updated_hit_set_history,
|
||||
const set<hobject_t> &temp_added,
|
||||
@ -49,7 +51,8 @@ struct ECSubWrite {
|
||||
: from(from), tid(tid), reqid(reqid),
|
||||
soid(soid), stats(stats), t(t),
|
||||
at_version(at_version),
|
||||
trim_to(trim_to), log_entries(log_entries),
|
||||
trim_to(trim_to), trim_rollback_to(trim_rollback_to),
|
||||
log_entries(log_entries),
|
||||
temp_added(temp_added),
|
||||
temp_removed(temp_removed),
|
||||
updated_hit_set_history(updated_hit_set_history) {}
|
||||
|
@ -2640,7 +2640,10 @@ void PG::add_log_entry(pg_log_entry_t& e, bufferlist& log_bl)
|
||||
|
||||
|
||||
void PG::append_log(
|
||||
vector<pg_log_entry_t>& logv, eversion_t trim_to, ObjectStore::Transaction &t,
|
||||
vector<pg_log_entry_t>& logv,
|
||||
eversion_t trim_to,
|
||||
eversion_t trim_rollback_to,
|
||||
ObjectStore::Transaction &t,
|
||||
bool transaction_applied)
|
||||
{
|
||||
if (transaction_applied)
|
||||
@ -2654,13 +2657,23 @@ void PG::append_log(
|
||||
p->offset = 0;
|
||||
add_log_entry(*p, keys[p->get_key_name()]);
|
||||
}
|
||||
if (!transaction_applied)
|
||||
pg_log.clear_can_rollback_to();
|
||||
|
||||
PGLogEntryHandler handler;
|
||||
if (!transaction_applied) {
|
||||
pg_log.clear_can_rollback_to(&handler);
|
||||
} else if (trim_rollback_to > pg_log.get_rollback_trimmed_to()) {
|
||||
pg_log.trim_rollback_info(
|
||||
trim_rollback_to,
|
||||
&handler);
|
||||
}
|
||||
|
||||
dout(10) << "append_log adding " << keys.size() << " keys" << dendl;
|
||||
t.omap_setkeys(coll_t::META_COLL, log_oid, keys);
|
||||
PGLogEntryHandler handler;
|
||||
|
||||
pg_log.trim(&handler, trim_to, info);
|
||||
|
||||
dout(10) << __func__ << ": trimming to " << trim_rollback_to
|
||||
<< " entries " << handler.to_trim << dendl;
|
||||
handler.apply(this, &t);
|
||||
|
||||
// update the local pg, pg log
|
||||
|
@ -1996,7 +1996,10 @@ public:
|
||||
|
||||
void add_log_entry(pg_log_entry_t& e, bufferlist& log_bl);
|
||||
void append_log(
|
||||
vector<pg_log_entry_t>& logv, eversion_t trim_to, ObjectStore::Transaction &t,
|
||||
vector<pg_log_entry_t>& logv,
|
||||
eversion_t trim_to,
|
||||
eversion_t trim_rollback_to,
|
||||
ObjectStore::Transaction &t,
|
||||
bool transaction_applied = true);
|
||||
bool check_log_for_corruption(ObjectStore *store);
|
||||
void trim_peers();
|
||||
|
@ -177,6 +177,7 @@
|
||||
vector<pg_log_entry_t> &logv,
|
||||
boost::optional<pg_hit_set_history_t> &hset_history,
|
||||
const eversion_t &trim_to,
|
||||
const eversion_t &trim_rollback_to,
|
||||
bool transaction_applied,
|
||||
ObjectStore::Transaction *t) = 0;
|
||||
|
||||
@ -492,6 +493,7 @@
|
||||
const eversion_t &at_version, ///< [in] version
|
||||
PGTransaction *t, ///< [in] trans to execute
|
||||
const eversion_t &trim_to, ///< [in] trim log to here
|
||||
const eversion_t &trim_rollback_to, ///< [in] trim rollback info to here
|
||||
vector<pg_log_entry_t> &log_entries, ///< [in] log entries for t
|
||||
/// [in] hitset history (if updated with this transaction)
|
||||
boost::optional<pg_hit_set_history_t> &hset_history,
|
||||
|
@ -362,8 +362,21 @@ public:
|
||||
eversion_t trim_to,
|
||||
pg_info_t &info);
|
||||
|
||||
void clear_can_rollback_to() {
|
||||
void trim_rollback_info(
|
||||
eversion_t trim_rollback_to,
|
||||
LogEntryHandler *h) {
|
||||
if (trim_rollback_to > log.can_rollback_to)
|
||||
log.can_rollback_to = trim_rollback_to;
|
||||
log.advance_rollback_info_trimmed_to(
|
||||
trim_rollback_to,
|
||||
h);
|
||||
}
|
||||
|
||||
void clear_can_rollback_to(LogEntryHandler *h) {
|
||||
log.can_rollback_to = log.head;
|
||||
log.advance_rollback_info_trimmed_to(
|
||||
log.head,
|
||||
h);
|
||||
}
|
||||
|
||||
//////////////////// get or set log & missing ////////////////////
|
||||
|
@ -493,6 +493,7 @@ void ReplicatedBackend::submit_transaction(
|
||||
const eversion_t &at_version,
|
||||
PGTransaction *_t,
|
||||
const eversion_t &trim_to,
|
||||
const eversion_t &trim_rollback_to,
|
||||
vector<pg_log_entry_t> &log_entries,
|
||||
boost::optional<pg_hit_set_history_t> &hset_history,
|
||||
Context *on_local_applied_sync,
|
||||
@ -533,6 +534,7 @@ void ReplicatedBackend::submit_transaction(
|
||||
tid,
|
||||
reqid,
|
||||
trim_to,
|
||||
trim_rollback_to,
|
||||
t->get_temp_added().size() ? *(t->get_temp_added().begin()) : hobject_t(),
|
||||
t->get_temp_cleared().size() ?
|
||||
*(t->get_temp_cleared().begin()) :hobject_t(),
|
||||
@ -548,7 +550,13 @@ void ReplicatedBackend::submit_transaction(
|
||||
}
|
||||
clear_temp_objs(t->get_temp_cleared());
|
||||
|
||||
parent->log_operation(log_entries, hset_history, trim_to, true, &local_t);
|
||||
parent->log_operation(
|
||||
log_entries,
|
||||
hset_history,
|
||||
trim_to,
|
||||
trim_rollback_to,
|
||||
true,
|
||||
&local_t);
|
||||
local_t.append(*op_t);
|
||||
local_t.swap(*op_t);
|
||||
|
||||
|
@ -342,6 +342,7 @@ public:
|
||||
const eversion_t &at_version,
|
||||
PGTransaction *t,
|
||||
const eversion_t &trim_to,
|
||||
const eversion_t &trim_rollback_to,
|
||||
vector<pg_log_entry_t> &log_entries,
|
||||
boost::optional<pg_hit_set_history_t> &hset_history,
|
||||
Context *on_local_applied_sync,
|
||||
@ -359,6 +360,7 @@ private:
|
||||
ceph_tid_t tid,
|
||||
osd_reqid_t reqid,
|
||||
eversion_t pg_trim_to,
|
||||
eversion_t pg_trim_rollback_to,
|
||||
hobject_t new_temp_oid,
|
||||
hobject_t discard_temp_oid,
|
||||
vector<pg_log_entry_t> &log_entries,
|
||||
|
@ -6718,6 +6718,7 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now)
|
||||
repop->ctx->at_version,
|
||||
repop->ctx->op_t,
|
||||
pg_trim_to,
|
||||
min_last_complete_ondisk,
|
||||
repop->ctx->log,
|
||||
repop->ctx->updated_hset_history,
|
||||
onapplied_sync,
|
||||
@ -6735,6 +6736,7 @@ void ReplicatedBackend::issue_op(
|
||||
ceph_tid_t tid,
|
||||
osd_reqid_t reqid,
|
||||
eversion_t pg_trim_to,
|
||||
eversion_t pg_trim_rollback_to,
|
||||
hobject_t new_temp_oid,
|
||||
hobject_t discard_temp_oid,
|
||||
vector<pg_log_entry_t> &log_entries,
|
||||
@ -6790,6 +6792,7 @@ void ReplicatedBackend::issue_op(
|
||||
wr->pg_stats = get_info().stats;
|
||||
|
||||
wr->pg_trim_to = pg_trim_to;
|
||||
wr->pg_trim_rollback_to = pg_trim_rollback_to;
|
||||
|
||||
wr->new_temp_oid = new_temp_oid;
|
||||
wr->discard_temp_oid = discard_temp_oid;
|
||||
@ -7590,6 +7593,7 @@ void ReplicatedBackend::sub_op_modify(OpRequestRef op)
|
||||
log,
|
||||
m->updated_hit_set_history,
|
||||
m->pg_trim_to,
|
||||
m->pg_trim_rollback_to,
|
||||
update_snaps,
|
||||
&(rm->localt));
|
||||
|
||||
|
@ -347,13 +347,14 @@ public:
|
||||
vector<pg_log_entry_t> &logv,
|
||||
boost::optional<pg_hit_set_history_t> &hset_history,
|
||||
const eversion_t &trim_to,
|
||||
const eversion_t &trim_rollback_to,
|
||||
bool transaction_applied,
|
||||
ObjectStore::Transaction *t) {
|
||||
if (hset_history) {
|
||||
info.hit_set = *hset_history;
|
||||
dirty_info = true;
|
||||
}
|
||||
append_log(logv, trim_to, *t, transaction_applied);
|
||||
append_log(logv, trim_to, trim_rollback_to, *t, transaction_applied);
|
||||
}
|
||||
|
||||
void op_applied(
|
||||
|
Loading…
Reference in New Issue
Block a user