osd: implement basic caching policies in ReplicatedPG

Right now these are very basic and aren't as sophisticated as we
want them to end up, but we have a skeleton for where to put the
decision-making logic.

Signed-off-by: Greg Farnum <greg@inktank.com>
This commit is contained in:
Greg Farnum 2013-09-09 11:19:37 -07:00
parent fbbe3ad122
commit 76c343ba17
2 changed files with 72 additions and 17 deletions

View File

@ -711,23 +711,28 @@ void ReplicatedPG::do_op(OpRequestRef op)
m->get_object_locator().get_pool(),
m->get_object_locator().nspace),
&obc, can_create, &snapid);
if (r) {
if (r == -EAGAIN) {
// If we're not the primary of this OSD, and we have
// CEPH_OSD_FLAG_LOCALIZE_READS set, we just return -EAGAIN. Otherwise,
// we have to wait for the object.
if (is_primary() ||
(!(m->get_flags() & CEPH_OSD_FLAG_BALANCE_READS) &&
!(m->get_flags() & CEPH_OSD_FLAG_LOCALIZE_READS))) {
// missing the specific snap we need; requeue and wait.
assert(!can_create); // only happens on a read
hobject_t soid(m->get_oid(), m->get_object_locator().key,
snapid, m->get_pg().ps(),
info.pgid.pool(), m->get_object_locator().nspace);
wait_for_missing_object(soid, op);
return;
}
if (r == -EAGAIN) {
// If we're not the primary of this OSD, and we have
// CEPH_OSD_FLAG_LOCALIZE_READS set, we just return -EAGAIN. Otherwise,
// we have to wait for the object.
if (is_primary() ||
(!(m->get_flags() & CEPH_OSD_FLAG_BALANCE_READS) &&
!(m->get_flags() & CEPH_OSD_FLAG_LOCALIZE_READS))) {
// missing the specific snap we need; requeue and wait.
assert(!can_create); // only happens on a read
hobject_t soid(m->get_oid(), m->get_object_locator().key,
snapid, m->get_pg().ps(),
info.pgid.pool(), m->get_object_locator().nspace);
wait_for_missing_object(soid, op);
return;
}
}
if (maybe_handle_cache(op, obc, r))
return;
if (r) {
osd->reply_op_error(op, r);
return;
}
@ -886,6 +891,53 @@ void ReplicatedPG::do_op(OpRequestRef op)
execute_ctx(ctx);
}
bool ReplicatedPG::maybe_handle_cache(OpRequestRef op, ObjectContextRef obc,
int r)
{
switch(pool.info.cache_mode) {
case pg_pool_t::CACHEMODE_NONE:
return false;
break;
case pg_pool_t::CACHEMODE_WRITEBACK:
if (obc.get()) {
return false;
} else {
do_cache_redirect(op, obc);
return true;
}
break;
case pg_pool_t::CACHEMODE_INVALIDATE_FORWARD:
do_cache_redirect(op, obc);
return true;
break;
case pg_pool_t::CACHEMODE_READONLY:
if (obc.get() && !r) {
return false;
} else {
do_cache_redirect(op, obc);
return true;
}
break;
default:
assert(0);
}
return false;
}
void ReplicatedPG::do_cache_redirect(OpRequestRef op, ObjectContextRef obc)
{
MOSDOp *m = static_cast<MOSDOp*>(op->request);
int flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
MOSDOpReply *reply = new MOSDOpReply(m, -ENOENT,
get_osdmap()->get_epoch(), flags);
request_redirect_t redir(m->get_object_locator(), pool.info.tier_of);
reply->set_redirect(redir);
dout(10) << "sending redirect to pool " << pool.info.tier_of << " for op "
<< op << dendl;
m->get_connection()->get_messenger()->send_message(reply, m->get_connection());
return;
}
void ReplicatedPG::execute_ctx(OpContext *ctx)
{
dout(10) << __func__ << " " << ctx << dendl;

View File

@ -619,7 +619,10 @@ protected:
void write_update_size_and_usage(object_stat_sum_t& stats, object_info_t& oi,
SnapSet& ss, interval_set<uint64_t>& modified,
uint64_t offset, uint64_t length, bool count_bytes);
void add_interval_usage(interval_set<uint64_t>& s, object_stat_sum_t& st);
void add_interval_usage(interval_set<uint64_t>& s, object_stat_sum_t& st);
inline bool maybe_handle_cache(OpRequestRef op, ObjectContextRef obc, int r);
void do_cache_redirect(OpRequestRef op, ObjectContextRef obc);
int prepare_transaction(OpContext *ctx);