osd: MASKTRUNC and SETTRUNC osd ops

This commit is contained in:
Sage Weil 2009-01-29 10:58:50 -08:00
parent 72fc7a26ce
commit 116e0fb31a
5 changed files with 155 additions and 38 deletions

View File

@ -1155,47 +1155,53 @@ struct ceph_mds_snap_realm {
#define CEPH_OSD_OP_TYPE_ATTR 0x0300
enum {
/** data **/
/* read */
CEPH_OSD_OP_READ = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 1,
CEPH_OSD_OP_STAT = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 2,
CEPH_OSD_OP_GETXATTR = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 1,
CEPH_OSD_OP_GETXATTRS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 2,
/* fancy read */
CEPH_OSD_OP_GREP = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 3,
CEPH_OSD_OP_MASKTRUNC = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 4,
/* subop */
CEPH_OSD_OP_PULL = CEPH_OSD_OP_MODE_SUB | 1,
CEPH_OSD_OP_PUSH = CEPH_OSD_OP_MODE_SUB | 2,
CEPH_OSD_OP_BALANCEREADS = CEPH_OSD_OP_MODE_SUB | 3,
CEPH_OSD_OP_UNBALANCEREADS = CEPH_OSD_OP_MODE_SUB | 4,
CEPH_OSD_OP_SCRUB = CEPH_OSD_OP_MODE_SUB | 5,
/* object data */
/* write */
CEPH_OSD_OP_WRITE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 1,
CEPH_OSD_OP_WRITEFULL = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 2,
CEPH_OSD_OP_TRUNCATE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 3,
CEPH_OSD_OP_ZERO = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 4,
CEPH_OSD_OP_DELETE = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 5,
/* object attrs */
/* fancy write */
CEPH_OSD_OP_APPEND = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 6,
CEPH_OSD_OP_STARTSYNC = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 7,
CEPH_OSD_OP_SETTRUNC = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 8,
CEPH_OSD_OP_TRIMTRUNC = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 9,
/** attrs **/
/* read */
CEPH_OSD_OP_GETXATTR = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 1,
CEPH_OSD_OP_GETXATTRS = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_ATTR | 2,
/* write */
CEPH_OSD_OP_SETXATTR = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 1,
CEPH_OSD_OP_SETXATTRS = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 2,
CEPH_OSD_OP_RESETXATTRS= CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 3,
CEPH_OSD_OP_RMXATTR = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_ATTR | 4,
/* lock */
/** subop **/
CEPH_OSD_OP_PULL = CEPH_OSD_OP_MODE_SUB | 1,
CEPH_OSD_OP_PUSH = CEPH_OSD_OP_MODE_SUB | 2,
CEPH_OSD_OP_BALANCEREADS = CEPH_OSD_OP_MODE_SUB | 3,
CEPH_OSD_OP_UNBALANCEREADS = CEPH_OSD_OP_MODE_SUB | 4,
CEPH_OSD_OP_SCRUB = CEPH_OSD_OP_MODE_SUB | 5,
/** lock **/
CEPH_OSD_OP_WRLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 1,
CEPH_OSD_OP_WRUNLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 2,
CEPH_OSD_OP_RDLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 3,
CEPH_OSD_OP_RDUNLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 4,
CEPH_OSD_OP_UPLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 5,
CEPH_OSD_OP_DNLOCK = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 6,
/* fancy read */
CEPH_OSD_OP_GREP = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 3,
/* fancy write */
CEPH_OSD_OP_APPEND = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 6,
CEPH_OSD_OP_STARTSYNC = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 7,
};
static inline int ceph_osd_op_type_lock(int op)
@ -1230,33 +1236,39 @@ static inline const char *ceph_osd_op_name(int op)
case CEPH_OSD_OP_READ: return "read";
case CEPH_OSD_OP_STAT: return "stat";
case CEPH_OSD_OP_GREP: return "grep";
case CEPH_OSD_OP_MASKTRUNC: return "masktrunc";
case CEPH_OSD_OP_WRITE: return "write";
case CEPH_OSD_OP_DELETE: return "delete";
case CEPH_OSD_OP_TRUNCATE: return "truncate";
case CEPH_OSD_OP_ZERO: return "zero";
case CEPH_OSD_OP_WRITEFULL: return "writefull";
case CEPH_OSD_OP_APPEND: return "append";
case CEPH_OSD_OP_STARTSYNC: return "startsync";
case CEPH_OSD_OP_SETTRUNC: return "settrunc";
case CEPH_OSD_OP_TRIMTRUNC: return "trimtrunc";
case CEPH_OSD_OP_GETXATTR: return "getxattr";
case CEPH_OSD_OP_GETXATTRS: return "getxattrs";
case CEPH_OSD_OP_SETXATTR: return "setxattr";
case CEPH_OSD_OP_SETXATTRS: return "setxattrs";
case CEPH_OSD_OP_RESETXATTRS: return "resetxattrs";
case CEPH_OSD_OP_RMXATTR: return "rmxattr";
case CEPH_OSD_OP_WRLOCK: return "wrlock";
case CEPH_OSD_OP_WRUNLOCK: return "wrunlock";
case CEPH_OSD_OP_RDLOCK: return "rdlock";
case CEPH_OSD_OP_RDUNLOCK: return "rdunlock";
case CEPH_OSD_OP_UPLOCK: return "uplock";
case CEPH_OSD_OP_DNLOCK: return "dnlock";
case CEPH_OSD_OP_PULL: return "pull";
case CEPH_OSD_OP_PUSH: return "push";
case CEPH_OSD_OP_BALANCEREADS: return "balance-reads";
case CEPH_OSD_OP_UNBALANCEREADS: return "unbalance-reads";
case CEPH_OSD_OP_SCRUB: return "scrub";
case CEPH_OSD_OP_GREP: return "grep";
case CEPH_OSD_OP_APPEND: return "append";
case CEPH_OSD_OP_STARTSYNC: return "startsync";
case CEPH_OSD_OP_WRLOCK: return "wrlock";
case CEPH_OSD_OP_WRUNLOCK: return "wrunlock";
case CEPH_OSD_OP_RDLOCK: return "rdlock";
case CEPH_OSD_OP_RDUNLOCK: return "rdunlock";
case CEPH_OSD_OP_UPLOCK: return "uplock";
case CEPH_OSD_OP_DNLOCK: return "dnlock";
default: return "???";
}
@ -1286,12 +1298,15 @@ struct ceph_osd_op {
union {
struct {
__le64 offset, length;
__le32 seq;
};
struct {
__le32 name_len;
__le32 value_len;
};
struct {
__le64 truncate_size;
__le32 truncate_seq;
};
};
} __attribute__ ((packed));

View File

@ -380,9 +380,14 @@ inline ostream& operator<<(ostream& out, const ceph_fsid_t& f) {
inline ostream& operator<<(ostream& out, const ceph_osd_op& op) {
out << ceph_osd_op_name(op.op);
if (ceph_osd_op_type_data(op.op))
out << " " << op.offset << "~" << op.length;
else if (ceph_osd_op_type_attr(op.op))
if (ceph_osd_op_type_data(op.op)) {
if (op.op == CEPH_OSD_OP_SETTRUNC ||
op.op == CEPH_OSD_OP_MASKTRUNC ||
op.op == CEPH_OSD_OP_TRIMTRUNC)
out << " " << op.truncate_seq << "@" << op.truncate_size;
else
out << " " << op.offset << "~" << op.length;
} else if (ceph_osd_op_type_attr(op.op))
out << " " << op.name_len << "+" << op.value_len;
return out;
}

View File

@ -755,6 +755,69 @@ void ReplicatedPG::op_read(MOSDOp *op)
}
break;
case CEPH_OSD_OP_MASKTRUNC:
if (p != op->ops.begin()) {
ceph_osd_op& rd = *(p - 1);
ceph_osd_op& m = *p;
// are we beyond truncate_size?
if (rd.offset + rd.length > m.truncate_size) {
__u32 seq;
interval_set<__u64> tm;
bufferlist::iterator p = oi.truncate_info.begin();
::decode(seq, p);
::decode(tm, p);
// truncated portion of the read
unsigned from = MAX(rd.offset, m.truncate_size); // also end of data
unsigned to = rd.offset + rd.length;
unsigned trim = to-from;
rd.length = rd.length - trim;
dout(10) << " masktrunc " << m << ": overlap " << from << "~" << trim << dendl;
bufferlist keep;
keep.substr_of(data, 0, data.length() - trim);
bufferlist truncated; // everthing after 'from'
truncated.substr_of(data, data.length() - trim, trim);
keep.swap(data);
if (seq == rd.truncate_seq) {
// keep any valid extents beyond 'from'
unsigned data_end = from;
for (map<__u64,__u64>::iterator q = tm.m.begin();
q != tm.m.end();
q++) {
unsigned s = MAX(q->first, from);
unsigned e = MIN(q->first+q->second, to);
if (e > s) {
unsigned l = e-s;
dout(10) << " " << q->first << "~" << q->second << " overlap " << s << "~" << l << dendl;
// add in zeros?
if (s > data_end) {
bufferptr bp(s-from);
bp.zero();
data.push_back(bp);
dout(20) << " adding " << bp.length() << " zeros" << dendl;
rd.length = rd.length + bp.length();
data_end += bp.length();
}
bufferlist b;
b.substr_of(truncated, s-from, l);
dout(20) << " adding " << b.length() << " bytes from " << s << "~" << l << dendl;
data.claim_append(b);
rd.length = rd.length + l;
data_end += l;
}
} // for
} // seq == rd.truncate_eq
}
}
break;
default:
dout(1) << "unrecognized osd op " << p->op
<< " " << ceph_osd_op_name(p->op)
@ -891,9 +954,10 @@ void ReplicatedPG::add_interval_usage(interval_set<__u64>& s, pg_stat_t& stats)
// low level object operations
int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t reqid, pg_stat_t& st,
pobject_t poid, __u64& old_size, bool& exists, object_info_t& oi,
ceph_osd_op& op, bufferlist::iterator& bp,
vector<ceph_osd_op>& ops, int opn, bufferlist::iterator& bp,
SnapContext& snapc)
{
ceph_osd_op& op = ops[opn];
int eop = op.op;
// munge ZERO -> DELETE or TRUNCATE?
@ -1094,11 +1158,12 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
{
// just do it inline; this works because we are happy to execute
// fancy op on replicas as well.
ceph_osd_op newop;
vector<ceph_osd_op> nops(1);
ceph_osd_op& newop = nops[0];
newop.op = CEPH_OSD_OP_WRITE;
newop.offset = old_size;
newop.length = op.length;
prepare_simple_op(t, reqid, st, poid, old_size, exists, oi, newop, bp, snapc);
prepare_simple_op(t, reqid, st, poid, old_size, exists, oi, nops, 0, bp, snapc);
}
break;
@ -1106,6 +1171,34 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
t.start_sync();
break;
case CEPH_OSD_OP_SETTRUNC:
if (opn > 0 && ops[opn-1].op == CEPH_OSD_OP_WRITE) {
// set truncate seq over preceeding write's range
ceph_osd_op& wr = ops[opn-1];
__u32 seq = 0;
interval_set<__u64> tm;
bufferlist::iterator p;
if (oi.truncate_info.length()) {
p = oi.truncate_info.begin();
::decode(seq, p);
}
if (seq < op.truncate_seq) {
seq = op.truncate_seq;
tm.insert(wr.offset, wr.length);
} else {
if (oi.truncate_info.length())
::decode(tm, p);
interval_set<__u64> n;
n.insert(wr.offset, wr.length);
tm.union_of(n);
}
oi.truncate_info.clear();
::encode(seq, oi.truncate_info);
::encode(tm, oi.truncate_info);
}
break;
default:
return -EINVAL;
}
@ -1143,7 +1236,7 @@ void ReplicatedPG::prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t
did_snap = true;
}
prepare_simple_op(t, reqid, info.stats, poid, size, exists, oi,
ops[i], bp, snapc);
ops, i, bp, snapc);
}
// finish.

View File

@ -185,7 +185,7 @@ protected:
void add_interval_usage(interval_set<__u64>& s, pg_stat_t& st);
int prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t reqid, pg_stat_t& st,
pobject_t poid, __u64& old_size, bool& exists, object_info_t& oi,
ceph_osd_op& op, bufferlist::iterator& bp, SnapContext& snapc);
vector<ceph_osd_op>& ops, int opn, bufferlist::iterator& bp, SnapContext& snapc);
void prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t reqid,
pobject_t poid,
vector<ceph_osd_op>& ops, bufferlist& bl,

View File

@ -728,6 +728,8 @@ struct object_info_t {
SnapSet snapset; // [head]
vector<snapid_t> snaps; // [clone]
bufferlist truncate_info; // bah.. messy layering.
void encode(bufferlist& bl) const {
::encode(poid, bl);
::encode(version, bl);
@ -739,6 +741,7 @@ struct object_info_t {
::encode(wrlock_by, bl);
} else
::encode(snaps, bl);
::encode(truncate_info, bl);
}
void decode(bufferlist::iterator& bl) {
::decode(poid, bl);
@ -751,6 +754,7 @@ struct object_info_t {
::decode(wrlock_by, bl);
} else
::decode(snaps, bl);
::decode(truncate_info, bl);
}
void decode(bufferlist& bl) {
bufferlist::iterator p = bl.begin();