mon: accumulate a single pending transaction and propose it all at once

Previous we would queue lots of distinct encoded Transactions from various
callers, usually one per PaxosService.  These would be sent through paxos
one at a time.

If there is a completed transaction there is no reason to delay; it is
more efficient to push it through immediately.  Since we will propose
anything pending right when we finish, there is minimal opportunity for
other work to get done.

Instead, accumulate everything in a single MonitorDBStore::Transaction and
propose all pending changes all at once.  Encode at propose time and
expose the Transaction to the callers so they can add their changes.

Signed-off-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2015-01-13 14:51:04 -08:00
parent 5461368968
commit 67a90dd75c
5 changed files with 140 additions and 130 deletions

View File

@ -48,19 +48,21 @@ int ConfigKeyService::store_get(string key, bufferlist &bl)
void ConfigKeyService::store_put(string key, bufferlist &bl, Context *cb)
{
bufferlist proposal_bl;
MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
t->put(STORE_PREFIX, key, bl);
t->encode(proposal_bl);
paxos->propose_new_value(proposal_bl, cb);
if (cb)
paxos->queue_pending_finisher(cb);
paxos->trigger_propose();
}
void ConfigKeyService::store_delete(string key, Context *cb)
{
bufferlist proposal_bl;
MonitorDBStore::Transaction t;
t.erase(STORE_PREFIX, key);
t.encode(proposal_bl);
paxos->propose_new_value(proposal_bl, cb);
MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
t->erase(STORE_PREFIX, key);
if (cb)
paxos->queue_pending_finisher(cb);
paxos->trigger_propose();
}
bool ConfigKeyService::store_exists(string key)

View File

@ -4144,11 +4144,9 @@ void Monitor::tick()
if (is_leader() && paxos->is_active() && fingerprint.is_zero()) {
// this is only necessary on upgraded clusters.
MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
prepare_new_fingerprint(t);
bufferlist tbl;
t->encode(tbl);
paxos->propose_new_value(tbl, new C_NoopContext);
paxos->trigger_propose();
}
new_tick();

View File

@ -573,7 +573,7 @@ void Paxos::handle_last(MMonPaxos *last)
finish_contexts(g_ceph_context, waiting_for_readable);
finish_contexts(g_ceph_context, waiting_for_writeable);
queue_next();
maybe_propose_pending();
}
}
}
@ -923,7 +923,7 @@ void Paxos::commit_finish()
finish_contexts(g_ceph_context, waiting_for_readable);
finish_contexts(g_ceph_context, waiting_for_writeable);
queue_next();
maybe_propose_pending();
}
}
@ -1039,20 +1039,9 @@ void Paxos::commit_proposal()
assert(mon->is_leader());
assert(is_refresh());
if (proposals.empty()) {
return; // must have been updating previous
}
C_Proposal *proposal = static_cast<C_Proposal*>(proposals.front());
if (proposal->proposed) {
dout(10) << __func__ << " proposal " << proposal << " took "
<< (ceph_clock_now(NULL) - proposal->proposal_time)
<< " to finish" << dendl;
proposals.pop_front();
proposal->complete(0);
} else {
// must have been updating previous.
}
list<Context*> ls;
ls.swap(committing_finishers);
finish_contexts(g_ceph_context, ls);
}
void Paxos::finish_round()
@ -1064,17 +1053,16 @@ void Paxos::finish_round()
state = STATE_ACTIVE;
}
void Paxos::queue_next()
void Paxos::maybe_propose_pending()
{
dout(10) << __func__ << " state " << state
<< " proposals left " << proposals.size() << dendl;
dout(10) << __func__ << " state " << state << dendl;
if (should_trim()) {
trim();
}
if (is_active() && !proposals.empty()) {
propose_queued();
if (is_active() && pending_proposal) {
propose_pending();
}
}
@ -1212,7 +1200,7 @@ void Paxos::trim()
dout(10) << "trim to " << end << " (was " << first_committed << ")" << dendl;
MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
MonitorDBStore::TransactionRef t = get_pending_transaction();
for (version_t v = first_committed; v < end; ++v) {
dout(10) << "trim " << v << dendl;
@ -1224,17 +1212,8 @@ void Paxos::trim()
t->compact_range(get_name(), stringify(first_committed - 1), stringify(end));
}
dout(30) << __func__ << " transaction dump:\n";
JSONFormatter f(true);
t->dump(&f);
f.flush(*_dout);
*_dout << dendl;
bufferlist bl;
t->encode(bl);
trimming = true;
queue_proposal(bl, new C_Trimmed(this));
queue_pending_finisher(new C_Trimmed(this));
}
/*
@ -1298,13 +1277,19 @@ void Paxos::cancel_events()
}
}
void Paxos::shutdown() {
void Paxos::shutdown()
{
dout(10) << __func__ << " cancel all contexts" << dendl;
// discard pending transaction
pending_proposal.reset();
finish_contexts(g_ceph_context, waiting_for_writeable, -ECANCELED);
finish_contexts(g_ceph_context, waiting_for_commit, -ECANCELED);
finish_contexts(g_ceph_context, waiting_for_readable, -ECANCELED);
finish_contexts(g_ceph_context, waiting_for_active, -ECANCELED);
finish_contexts(g_ceph_context, proposals, -ECANCELED);
finish_contexts(g_ceph_context, pending_finishers, -ECANCELED);
finish_contexts(g_ceph_context, committing_finishers, -ECANCELED);
if (logger)
g_ceph_context->get_perfcounters_collection()->remove(logger);
delete logger;
@ -1315,7 +1300,11 @@ void Paxos::leader_init()
cancel_events();
new_value.clear();
finish_contexts(g_ceph_context, proposals, -EAGAIN);
// discard pending transaction
pending_proposal.reset();
finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
logger->inc(l_paxos_start_leader);
@ -1342,10 +1331,14 @@ void Paxos::peon_init()
// start a timer, in case the leader never manages to issue a lease
reset_lease_timeout();
// discard pending transaction
pending_proposal.reset();
// no chance to write now!
finish_contexts(g_ceph_context, waiting_for_writeable, -EAGAIN);
finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN);
finish_contexts(g_ceph_context, proposals, -EAGAIN);
finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
logger->inc(l_paxos_start_peon);
}
@ -1365,7 +1358,11 @@ void Paxos::restart()
}
state = STATE_RECOVERING;
finish_contexts(g_ceph_context, proposals, -EAGAIN);
// discard pending transaction
pending_proposal.reset();
finish_contexts(g_ceph_context, committing_finishers, -EAGAIN);
finish_contexts(g_ceph_context, pending_finishers, -EAGAIN);
finish_contexts(g_ceph_context, waiting_for_commit, -EAGAIN);
finish_contexts(g_ceph_context, waiting_for_active, -EAGAIN);
@ -1484,60 +1481,57 @@ bool Paxos::is_writeable()
is_lease_valid();
}
void Paxos::list_proposals(ostream& out)
{
out << __func__ << " " << proposals.size() << " in queue:\n";
list<Context*>::iterator p_it = proposals.begin();
for (int i = 0; p_it != proposals.end(); ++p_it, ++i) {
C_Proposal *p = (C_Proposal*) *p_it;
out << "-- entry #" << i << "\n";
out << *p << "\n";
}
}
void Paxos::propose_queued()
void Paxos::propose_pending()
{
assert(is_active());
assert(!proposals.empty());
C_Proposal *proposal = static_cast<C_Proposal*>(proposals.front());
assert(!proposal->proposed);
assert(pending_proposal);
cancel_events();
dout(10) << __func__ << " " << (last_committed + 1)
<< " " << proposal->bl.length() << " bytes" << dendl;
proposal->proposed = true;
dout(30) << __func__ << " ";
list_proposals(*_dout);
bufferlist bl;
pending_proposal->encode(bl);
pending_proposal.reset();
dout(10) << __func__ << " " << (last_committed + 1)
<< " " << bl.length() << " bytes" << dendl;
dout(30) << __func__ << " transaction dump:\n";
JSONFormatter f(true);
pending_proposal->dump(&f);
f.flush(*_dout);
*_dout << dendl;
committing_finishers.swap(pending_finishers);
state = STATE_UPDATING;
begin(proposal->bl);
begin(bl);
}
void Paxos::queue_proposal(bufferlist& bl, Context *onfinished)
void Paxos::queue_pending_finisher(Context *onfinished)
{
dout(5) << __func__ << " bl " << bl.length() << " bytes;"
<< " ctx = " << onfinished << dendl;
proposals.push_back(new C_Proposal(onfinished, bl));
dout(5) << __func__ << " " << onfinished << dendl;
assert(onfinished);
pending_finishers.push_back(onfinished);
}
bool Paxos::propose_new_value(bufferlist& bl, Context *onfinished)
MonitorDBStore::TransactionRef Paxos::get_pending_transaction()
{
assert(mon->is_leader());
queue_proposal(bl, onfinished);
if (!is_active()) {
dout(5) << __func__ << " not active; proposal queued" << dendl;
return true;
if (!pending_proposal) {
pending_proposal.reset(new MonitorDBStore::Transaction);
assert(pending_finishers.empty());
}
return pending_proposal;
}
propose_queued();
return true;
bool Paxos::trigger_propose()
{
if (is_active()) {
dout(10) << __func__ << " active, proposing now" << dendl;
propose_pending();
return true;
} else {
dout(10) << __func__ << " not active, will propose later" << dendl;
return false;
}
}
bool Paxos::is_consistent()

View File

@ -575,13 +575,31 @@ private:
* fully committed.
*/
list<Context*> waiting_for_commit;
/**
* Pending proposal transaction
*
* This is the transaction that is under construction and pending
* proposal. We will add operations to it until we decide it is
* time to start a paxos round.
*/
list<Context*> proposals;
MonitorDBStore::TransactionRef pending_proposal;
/**
* @}
* Finishers for pending transaction
*
* These are waiting for updates in the pending proposal/transaction
* to be committed.
*/
list<Context*> pending_finishers;
/**
* Finishers for committing transaction
*
* When the pending_proposal is submitted, pending_finishers move to
* this list. When it commits, these finishers are notified.
*/
list<Context*> committing_finishers;
/**
* @defgroup Paxos_h_sync_warns Synchronization warnings
@ -1046,17 +1064,9 @@ private:
void warn_on_future_time(utime_t t, entity_name_t from);
/**
* Queue a new proposal by pushing it at the back of the queue; do not
* propose it.
*
* @param bl The bufferlist to be proposed
* @param onfinished The callback to be called once the proposal finishes
* Begin proposing the pending_proposal.
*/
void queue_proposal(bufferlist& bl, Context *onfinished);
/**
* Begin proposing the Proposal at the front of the proposals queue.
*/
void propose_queued();
void propose_pending();
/**
* refresh state from store
@ -1070,7 +1080,14 @@ private:
void commit_proposal();
void finish_round();
void queue_next();
/**
* propose pending, if any
*
* This is called at the end of the round to check if there is another
* pending proposal ready to go.
*/
void maybe_propose_pending();
public:
/**
@ -1335,22 +1352,30 @@ public:
}
/**
* List all queued proposals
* Get a transaction to submit operations to propose against
*
* @param out[out] Output Stream onto which we will output the list
* of queued proposals.
* Apply operations to this transaction. It will eventually be proposed
* to paxos.
*/
void list_proposals(ostream& out);
MonitorDBStore::TransactionRef get_pending_transaction();
/**
* Propose a new value to the Leader.
* Queue a completion for the pending proposal
*
* This function enables the submission of a new value to the Leader, which
* will trigger a new proposal.
*
* @param bl A bufferlist holding the value to be proposed
* @param onfinish A callback to be fired up once we finish the proposal
* This completion will get triggered when the pending proposal
* transaction commits.
*/
bool propose_new_value(bufferlist& bl, Context *onfinished=0);
void queue_pending_finisher(Context *onfinished);
/**
* (try to) trigger a proposal
*
* Tell paxos that it should submit the pending proposal. Note that if it
* is not active (e.g., because it is already in the midst of committing
* something) that will be deferred (e.g., until the current round finishes).
*/
bool trigger_propose();
/**
* Add oncommit to the back of the list of callbacks waiting for us to
* finish committing.

View File

@ -181,15 +181,15 @@ void PaxosService::propose_pending()
}
/**
* @note The value we propose is encoded in a bufferlist, passed to
* Paxos::propose_new_value and it is obtained by calling a
* function that must be implemented by the class implementing us.
* I.e., the function encode_pending will be the one responsible
* to encode whatever is pending on the implementation class into a
* bufferlist, so we can then propose that as a value through Paxos.
* @note What we contirbute to the pending Paxos transaction is
* obtained by calling a function that must be implemented by
* the class implementing us. I.e., the function
* encode_pending will be the one responsible to encode
* whatever is pending on the implementation class into a
* bufferlist, so we can then propose that as a value through
* Paxos.
*/
MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
bufferlist bl;
MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
if (should_stash_full())
encode_full(t);
@ -201,17 +201,10 @@ void PaxosService::propose_pending()
t->put(get_service_name(), "format_version", format_version);
}
dout(30) << __func__ << " transaction dump:\n";
JSONFormatter f(true);
t->dump(&f);
f.flush(*_dout);
*_dout << dendl;
t->encode(bl);
// apply to paxos
proposing = true;
paxos->propose_new_value(bl, new C_Committed(this));
paxos->queue_pending_finisher(new C_Committed(this));
paxos->trigger_propose();
}
bool PaxosService::should_stash_full()
@ -351,16 +344,14 @@ void PaxosService::maybe_trim()
}
dout(10) << __func__ << " trimming to " << trim_to << ", " << to_remove << " states" << dendl;
MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
trim(t, get_first_committed(), trim_to);
put_first_committed(t, trim_to);
// let the service add any extra stuff
encode_trim_extra(t, trim_to);
bufferlist bl;
t->encode(bl);
paxos->propose_new_value(bl, NULL);
paxos->trigger_propose();
}
void PaxosService::trim(MonitorDBStore::TransactionRef t,