mirror of
https://github.com/ceph/ceph
synced 2025-02-19 17:08:05 +00:00
osd/: introduce xattr caching for ec pools
Signed-off-by: Samuel Just <sam.just@inktank.com>
This commit is contained in:
parent
155b4b069f
commit
ecae37c33b
@ -411,6 +411,10 @@
|
||||
const hobject_t &hoid,
|
||||
const string &attr,
|
||||
bufferlist *out) = 0;
|
||||
|
||||
virtual int objects_get_attrs(
|
||||
const hobject_t &hoid,
|
||||
map<string, bufferlist> *out) = 0;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
@ -314,6 +314,15 @@ int ReplicatedBackend::objects_get_attr(
|
||||
return r;
|
||||
}
|
||||
|
||||
int ReplicatedBackend::objects_get_attrs(
|
||||
const hobject_t &hoid,
|
||||
map<string, bufferlist> *out)
|
||||
{
|
||||
return osd->store->getattrs(
|
||||
coll,
|
||||
hoid,
|
||||
*out);
|
||||
}
|
||||
|
||||
class RPGTransaction : public PGBackend::PGTransaction {
|
||||
coll_t coll;
|
||||
|
@ -172,6 +172,10 @@ public:
|
||||
const string &attr,
|
||||
bufferlist *out);
|
||||
|
||||
int objects_get_attrs(
|
||||
const hobject_t &hoid,
|
||||
map<string, bufferlist> *out);
|
||||
|
||||
private:
|
||||
// push
|
||||
struct PushInfo {
|
||||
|
@ -197,6 +197,8 @@ void ReplicatedPG::on_local_recover(
|
||||
bufferlist bl;
|
||||
::encode(recovery_info.oi, bl);
|
||||
t->setattr(coll, recovery_info.soid, OI_ATTR, bl);
|
||||
if (obc)
|
||||
obc->attr_cache[OI_ATTR] = bl;
|
||||
}
|
||||
}
|
||||
|
||||
@ -2202,7 +2204,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
|
||||
coi.version = ctx->at_version;
|
||||
bl.clear();
|
||||
::encode(coi, bl);
|
||||
t->setattr(coid, OI_ATTR, bl);
|
||||
setattr_maybe_cache(ctx->obc, ctx, t, OI_ATTR, bl);
|
||||
|
||||
ctx->log.push_back(
|
||||
pg_log_entry_t(
|
||||
@ -2261,11 +2263,11 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
|
||||
|
||||
bl.clear();
|
||||
::encode(snapset, bl);
|
||||
t->setattr(snapoid, SS_ATTR, bl);
|
||||
setattr_maybe_cache(ctx->snapset_obc, ctx, t, SS_ATTR, bl);
|
||||
|
||||
bl.clear();
|
||||
::encode(ctx->snapset_obc->obs.oi, bl);
|
||||
t->setattr(snapoid, OI_ATTR, bl);
|
||||
setattr_maybe_cache(ctx->snapset_obc, ctx, t, OI_ATTR, bl);
|
||||
}
|
||||
|
||||
return repop;
|
||||
@ -3077,8 +3079,8 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
|
||||
string aname;
|
||||
bp.copy(op.xattr.name_len, aname);
|
||||
string name = "_" + aname;
|
||||
int r = pgbackend->objects_get_attr(
|
||||
soid,
|
||||
int r = getattr_maybe_cache(
|
||||
ctx->obc,
|
||||
name,
|
||||
&(osd_op.outdata));
|
||||
if (r >= 0) {
|
||||
@ -3094,18 +3096,24 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
|
||||
case CEPH_OSD_OP_GETXATTRS:
|
||||
++ctx->num_read;
|
||||
{
|
||||
map<string,bufferptr> attrset;
|
||||
result = osd->store->getattrs(coll, soid, attrset, true);
|
||||
map<string, bufferptr>::iterator iter;
|
||||
map<string, bufferlist> newattrs;
|
||||
for (iter = attrset.begin(); iter != attrset.end(); ++iter) {
|
||||
bufferlist bl;
|
||||
bl.append(iter->second);
|
||||
newattrs[iter->first] = bl;
|
||||
}
|
||||
map<string,bufferlist> attrset;
|
||||
result = getattrs_maybe_cache(
|
||||
ctx->obc,
|
||||
&attrset);
|
||||
map<string, bufferlist> out;
|
||||
for (map<string, bufferlist>::iterator i = attrset.begin();
|
||||
i != attrset.end();
|
||||
++i) {
|
||||
if (i->first[0] != '_')
|
||||
continue;
|
||||
if (i->first == "_")
|
||||
continue;
|
||||
out[i->first.substr(1, i->first.size())].claim(
|
||||
i->second);
|
||||
}
|
||||
|
||||
bufferlist bl;
|
||||
::encode(newattrs, bl);
|
||||
::encode(out, bl);
|
||||
ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(bl.length(), 10);
|
||||
ctx->delta_stats.num_rd++;
|
||||
osd_op.outdata.claim_append(bl);
|
||||
@ -3123,13 +3131,13 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
|
||||
|
||||
bufferlist xattr;
|
||||
if (op.op == CEPH_OSD_OP_CMPXATTR)
|
||||
result = pgbackend->objects_get_attr(
|
||||
soid,
|
||||
result = getattr_maybe_cache(
|
||||
ctx->obc,
|
||||
name,
|
||||
&xattr);
|
||||
else
|
||||
result = pgbackend->objects_get_attr(
|
||||
src_obc->obs.oi.soid,
|
||||
result = getattr_maybe_cache(
|
||||
src_obc,
|
||||
name,
|
||||
&xattr);
|
||||
if (result < 0 && result != -EEXIST && result != -ENODATA)
|
||||
@ -3651,7 +3659,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
|
||||
string name = "_" + aname;
|
||||
bufferlist bl;
|
||||
bp.copy(op.xattr.value_len, bl);
|
||||
t->setattr(soid, name, bl);
|
||||
setattr_maybe_cache(ctx->obc, ctx, t, name, bl);
|
||||
ctx->delta_stats.num_wr++;
|
||||
}
|
||||
break;
|
||||
@ -3662,7 +3670,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
|
||||
string aname;
|
||||
bp.copy(op.xattr.name_len, aname);
|
||||
string name = "_" + aname;
|
||||
t->rmattr(soid, name);
|
||||
rmattr_maybe_cache(ctx->obc, ctx, t, name);
|
||||
ctx->delta_stats.num_wr++;
|
||||
}
|
||||
break;
|
||||
@ -4257,16 +4265,19 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ReplicatedPG::_make_clone(PGBackend::PGTransaction* t,
|
||||
const hobject_t& head, const hobject_t& coid,
|
||||
object_info_t *poi)
|
||||
void ReplicatedPG::_make_clone(
|
||||
OpContext *ctx,
|
||||
PGBackend::PGTransaction* t,
|
||||
ObjectContextRef obc,
|
||||
const hobject_t& head, const hobject_t& coid,
|
||||
object_info_t *poi)
|
||||
{
|
||||
bufferlist bv;
|
||||
::encode(*poi, bv);
|
||||
|
||||
t->clone(head, coid);
|
||||
t->setattr(coid, OI_ATTR, bv);
|
||||
t->rmattr(coid, SS_ATTR);
|
||||
setattr_maybe_cache(obc, ctx, t, OI_ATTR, bv);
|
||||
rmattr_maybe_cache(obc, ctx, t, SS_ATTR);
|
||||
}
|
||||
|
||||
void ReplicatedPG::make_writeable(OpContext *ctx)
|
||||
@ -4328,6 +4339,8 @@ void ReplicatedPG::make_writeable(OpContext *ctx)
|
||||
ctx->clone_obc->destructor_callback = new C_PG_ObjectContext(this, ctx->clone_obc.get());
|
||||
ctx->clone_obc->obs.oi = static_snap_oi;
|
||||
ctx->clone_obc->obs.exists = true;
|
||||
if (pool.info.ec_pool())
|
||||
ctx->clone_obc->attr_cache = ctx->obc->attr_cache;
|
||||
snap_oi = &ctx->clone_obc->obs.oi;
|
||||
} else {
|
||||
snap_oi = &static_snap_oi;
|
||||
@ -4338,7 +4351,7 @@ void ReplicatedPG::make_writeable(OpContext *ctx)
|
||||
snap_oi->snaps = snaps;
|
||||
if (was_dirty)
|
||||
snap_oi->set_flag(object_info_t::FLAG_DIRTY);
|
||||
_make_clone(t, soid, coid, snap_oi);
|
||||
_make_clone(ctx, t, ctx->clone_obc, soid, coid, snap_oi);
|
||||
|
||||
ctx->delta_stats.num_objects++;
|
||||
if (snap_oi->is_dirty())
|
||||
@ -4625,8 +4638,8 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type)
|
||||
bufferlist bv(sizeof(ctx->new_obs.oi));
|
||||
::encode(ctx->snapset_obc->obs.oi, bv);
|
||||
ctx->op_t->touch(snapoid);
|
||||
ctx->op_t->setattr(snapoid, OI_ATTR, bv);
|
||||
ctx->op_t->setattr(snapoid, SS_ATTR, bss);
|
||||
setattr_maybe_cache(ctx->snapset_obc, ctx, ctx->op_t, OI_ATTR, bv);
|
||||
setattr_maybe_cache(ctx->snapset_obc, ctx, ctx->op_t, SS_ATTR, bss);
|
||||
ctx->at_version.version++;
|
||||
}
|
||||
}
|
||||
@ -4659,12 +4672,12 @@ void ReplicatedPG::finish_ctx(OpContext *ctx, int log_op_type)
|
||||
|
||||
bufferlist bv(sizeof(ctx->new_obs.oi));
|
||||
::encode(ctx->new_obs.oi, bv);
|
||||
ctx->op_t->setattr(soid, OI_ATTR, bv);
|
||||
setattr_maybe_cache(ctx->obc, ctx, ctx->op_t, OI_ATTR, bv);
|
||||
|
||||
if (soid.snap == CEPH_NOSNAP) {
|
||||
dout(10) << " final snapset " << ctx->new_snapset
|
||||
<< " in " << soid << dendl;
|
||||
ctx->op_t->setattr(soid, SS_ATTR, bss);
|
||||
setattr_maybe_cache(ctx->obc, ctx, ctx->op_t, SS_ATTR, bss);
|
||||
} else {
|
||||
dout(10) << " no snapset (this is a clone)" << dendl;
|
||||
}
|
||||
@ -5830,6 +5843,8 @@ void ReplicatedPG::issue_repop(RepGather *repop, utime_t now)
|
||||
unlock_snapset_obc = true;
|
||||
}
|
||||
|
||||
repop->ctx->apply_pending_attrs();
|
||||
|
||||
Context *on_all_commit = new C_OSD_RepopCommit(this, repop);
|
||||
Context *on_all_applied = new C_OSD_RepopApplied(this, repop);
|
||||
Context *onapplied_sync = new C_OSD_OndiskWriteUnlock(
|
||||
@ -6121,7 +6136,7 @@ void ReplicatedPG::handle_watch_timeout(WatchRef watch)
|
||||
obc->obs.oi.version = ctx->at_version;
|
||||
bufferlist bl;
|
||||
::encode(obc->obs.oi, bl);
|
||||
t->setattr(obc->obs.oi.soid, OI_ATTR, bl);
|
||||
setattr_maybe_cache(obc, repop->ctx, t, OI_ATTR, bl);
|
||||
|
||||
// obc ref swallowed by repop!
|
||||
issue_repop(repop, repop->ctx->mtime);
|
||||
@ -6196,6 +6211,24 @@ ObjectContextRef ReplicatedPG::get_object_context(const hobject_t& soid,
|
||||
register_snapset_context(obc->ssc);
|
||||
|
||||
populate_obc_watchers(obc);
|
||||
|
||||
if (pool.info.ec_pool()) {
|
||||
if (attrs) {
|
||||
for (map<string, bufferptr>::iterator i = attrs->begin();
|
||||
i != attrs->end();
|
||||
++i) {
|
||||
bufferlist bl;
|
||||
bl.append(i->second);
|
||||
obc->attr_cache.insert(make_pair(i->first, bl));
|
||||
}
|
||||
} else {
|
||||
int r = pgbackend->objects_get_attrs(
|
||||
soid,
|
||||
&obc->attr_cache);
|
||||
assert(r == 0);
|
||||
}
|
||||
}
|
||||
|
||||
dout(10) << "get_object_context " << obc << " " << soid
|
||||
<< " " << obc->rwstate
|
||||
<< " 0 -> 1 read " << obc->obs.oi << dendl;
|
||||
@ -10087,6 +10120,61 @@ boost::statechart::result ReplicatedPG::WaitingOnReplicas::react(const SnapTrim&
|
||||
return transit< NotTrimming >();
|
||||
}
|
||||
|
||||
void ReplicatedPG::setattr_maybe_cache(
|
||||
ObjectContextRef obc,
|
||||
OpContext *op,
|
||||
PGBackend::PGTransaction *t,
|
||||
const string &key,
|
||||
bufferlist &val)
|
||||
{
|
||||
if (pool.info.ec_pool()) {
|
||||
op->pending_attrs[obc][key] = val;
|
||||
}
|
||||
t->setattr(obc->obs.oi.soid, key, val);
|
||||
}
|
||||
|
||||
void ReplicatedPG::rmattr_maybe_cache(
|
||||
ObjectContextRef obc,
|
||||
OpContext *op,
|
||||
PGBackend::PGTransaction *t,
|
||||
const string &key)
|
||||
{
|
||||
if (pool.info.ec_pool()) {
|
||||
op->pending_attrs[obc][key] = boost::optional<bufferlist>();
|
||||
}
|
||||
t->rmattr(obc->obs.oi.soid, key);
|
||||
}
|
||||
|
||||
int ReplicatedPG::getattr_maybe_cache(
|
||||
ObjectContextRef obc,
|
||||
const string &key,
|
||||
bufferlist *val)
|
||||
{
|
||||
if (pool.info.ec_pool()) {
|
||||
map<string, bufferlist>::iterator i = obc->attr_cache.find(key);
|
||||
if (i != obc->attr_cache.end()) {
|
||||
if (val)
|
||||
*val = i->second;
|
||||
return 0;
|
||||
} else {
|
||||
return -ENODATA;
|
||||
}
|
||||
}
|
||||
return pgbackend->objects_get_attr(obc->obs.oi.soid, key, val);
|
||||
}
|
||||
|
||||
int ReplicatedPG::getattrs_maybe_cache(
|
||||
ObjectContextRef obc,
|
||||
map<string, bufferlist> *out)
|
||||
{
|
||||
if (pool.info.ec_pool()) {
|
||||
if (out)
|
||||
*out = obc->attr_cache;
|
||||
return 0;
|
||||
}
|
||||
return pgbackend->objects_get_attrs(obc->obs.oi.soid, out);
|
||||
}
|
||||
|
||||
void intrusive_ptr_add_ref(ReplicatedPG *pg) { pg->get("intptr"); }
|
||||
void intrusive_ptr_release(ReplicatedPG *pg) { pg->put("intptr"); }
|
||||
|
||||
|
@ -427,6 +427,28 @@ public:
|
||||
|
||||
hobject_t new_temp_oid, discard_temp_oid; ///< temp objects we should start/stop tracking
|
||||
|
||||
// pending xattr updates
|
||||
map<ObjectContextRef,
|
||||
map<string, boost::optional<bufferlist> > > pending_attrs;
|
||||
void apply_pending_attrs() {
|
||||
for (map<ObjectContextRef,
|
||||
map<string, boost::optional<bufferlist> > >::iterator i =
|
||||
pending_attrs.begin();
|
||||
i != pending_attrs.end();
|
||||
++i) {
|
||||
for (map<string, boost::optional<bufferlist> >::iterator j =
|
||||
i->second.begin();
|
||||
j != i->second.end();
|
||||
++j) {
|
||||
if (j->second)
|
||||
i->first->attr_cache[j->first] = j->second.get();
|
||||
else
|
||||
i->first->attr_cache.erase(j->first);
|
||||
}
|
||||
}
|
||||
pending_attrs.clear();
|
||||
}
|
||||
|
||||
enum { W_LOCK, R_LOCK, NONE } lock_to_release;
|
||||
|
||||
OpContext(const OpContext& other);
|
||||
@ -804,9 +826,12 @@ protected:
|
||||
|
||||
// low level ops
|
||||
|
||||
void _make_clone(PGBackend::PGTransaction* t,
|
||||
const hobject_t& head, const hobject_t& coid,
|
||||
object_info_t *poi);
|
||||
void _make_clone(
|
||||
OpContext *ctx,
|
||||
PGBackend::PGTransaction* t,
|
||||
ObjectContextRef obc,
|
||||
const hobject_t& head, const hobject_t& coid,
|
||||
object_info_t *poi);
|
||||
void execute_ctx(OpContext *ctx);
|
||||
void finish_ctx(OpContext *ctx, int log_op_type);
|
||||
void reply_ctx(OpContext *ctx, int err);
|
||||
@ -1147,6 +1172,26 @@ public:
|
||||
void on_flushed();
|
||||
void on_removal(ObjectStore::Transaction *t);
|
||||
void on_shutdown();
|
||||
|
||||
// attr cache handling
|
||||
void setattr_maybe_cache(
|
||||
ObjectContextRef obc,
|
||||
OpContext *op,
|
||||
PGBackend::PGTransaction *t,
|
||||
const string &key,
|
||||
bufferlist &val);
|
||||
void rmattr_maybe_cache(
|
||||
ObjectContextRef obc,
|
||||
OpContext *op,
|
||||
PGBackend::PGTransaction *t,
|
||||
const string &key);
|
||||
int getattr_maybe_cache(
|
||||
ObjectContextRef obc,
|
||||
const string &key,
|
||||
bufferlist *val);
|
||||
int getattrs_maybe_cache(
|
||||
ObjectContextRef obc,
|
||||
map<string, bufferlist> *out);
|
||||
};
|
||||
|
||||
inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop)
|
||||
|
@ -2625,6 +2625,9 @@ public:
|
||||
cond.Signal();
|
||||
lock.Unlock();
|
||||
}
|
||||
|
||||
// attr cache
|
||||
map<string, bufferlist> attr_cache;
|
||||
};
|
||||
|
||||
inline ostream& operator<<(ostream& out, const ObjectState& obs)
|
||||
|
Loading…
Reference in New Issue
Block a user